Skip to content

Commit

Permalink
chore: check schema for small parquet file too. (#17175)
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun authored Jan 6, 2025
1 parent a070678 commit 99cb6e1
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl RowGroupReaderForCopy {
parquet_table_schema,
schema_descr,
Some(arrow_schema),
None,
)
.with_push_downs(Some(&pushdowns));
reader_builder.build_output()?;
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/parquet/src/parquet_rs/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub async fn read_metas_in_parallel(
Ok(metas)
}

fn check_parquet_schema(
pub(crate) fn check_parquet_schema(
expect: &SchemaDescriptor,
actual: &SchemaDescriptor,
path: &str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct ParquetRSReaderBuilder<'a> {
op: Operator,
table_schema: TableSchemaRef,
schema_desc: SchemaDescPtr,
schema_desc_from: Option<String>,
arrow_schema: Option<arrow_schema::Schema>,

push_downs: Option<&'a PushDownInfo>,
Expand Down Expand Up @@ -85,6 +86,7 @@ impl<'a> ParquetRSReaderBuilder<'a> {
table_schema,
schema_desc,
Some(arrow_schema),
None,
))
}

Expand All @@ -94,13 +96,15 @@ impl<'a> ParquetRSReaderBuilder<'a> {
table_schema: TableSchemaRef,
schema_desc: SchemaDescPtr,
arrow_schema: Option<arrow_schema::Schema>,
schema_desc_from: Option<String>,
) -> ParquetRSReaderBuilder<'a> {
ParquetRSReaderBuilder {
ctx,
op,
table_schema,
schema_desc,
arrow_schema,
schema_desc_from,
push_downs: None,
options: Default::default(),
pruner: None,
Expand Down Expand Up @@ -221,6 +225,10 @@ impl<'a> ParquetRSReaderBuilder<'a> {
let (_, _, output_schema, _) = self.built_output.as_ref().unwrap();
Ok(ParquetRSFullReader {
op: self.op.clone(),
expect_file_schema: self
.schema_desc_from
.as_ref()
.map(|p| (self.schema_desc.clone(), p.clone())),
output_schema: output_schema.clone(),
predicate,
projection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ use parquet::arrow::async_reader::ParquetRecordBatchStream;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
use parquet::schema::types::SchemaDescPtr;

use crate::parquet_rs::meta::check_parquet_schema;
use crate::parquet_rs::parquet_reader::predicate::ParquetPredicate;
use crate::parquet_rs::parquet_reader::utils::transform_record_batch;
use crate::parquet_rs::parquet_reader::utils::transform_record_batch_by_field_paths;
Expand All @@ -51,6 +53,7 @@ use crate::ParquetRSPruner;
/// The reader to read a whole parquet file.
pub struct ParquetRSFullReader {
pub(super) op: Operator,
pub(super) expect_file_schema: Option<(SchemaDescPtr, String)>,
pub(super) output_schema: TableSchemaRef,
pub(super) predicate: Option<Arc<ParquetPredicate>>,

Expand Down Expand Up @@ -168,7 +171,7 @@ impl ParquetRSFullReader {
}

/// Read a [`DataBlock`] from bytes.
pub fn read_blocks_from_binary(&self, raw: Vec<u8>) -> Result<Vec<DataBlock>> {
pub fn read_blocks_from_binary(&self, raw: Vec<u8>, path: &str) -> Result<Vec<DataBlock>> {
let bytes = Bytes::from(raw);
let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
bytes,
Expand All @@ -179,6 +182,14 @@ impl ParquetRSFullReader {

// Prune row groups.
let file_meta = builder.metadata().clone();
if let Some((expect_schema, expect_schema_from)) = &self.expect_file_schema {
check_parquet_schema(
expect_schema,
file_meta.file_metadata().schema_descr(),
path,
expect_schema_from,
)?;
}

let mut full_match = false;
if let Some(pruner) = &self.pruner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl ParquetRSTable {
table_schema.clone(),
self.schema_descr.clone(),
Some(self.arrow_schema.clone()),
Some(self.schema_from.clone()),
)
.with_options(self.read_options)
.with_push_downs(plan.push_downs.as_ref())
Expand Down
6 changes: 3 additions & 3 deletions src/query/storages/parquet/src/parquet_rs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Processor for ParquetSource {
.full_file_reader
.as_ref()
.unwrap()
.read_blocks_from_binary(buffer)?;
.read_blocks_from_binary(buffer, &path)?;
let num_rows = bs.iter().map(|b| b.num_rows()).sum();
self.copy_status.add_chunk(path.as_str(), FileStatus {
num_rows_loaded: num_rows,
Expand All @@ -175,12 +175,12 @@ impl Processor for ParquetSource {
blocks.extend(bs);
}
} else {
for (_, buffer) in buffers {
for (path, buffer) in buffers {
blocks.extend(
self.full_file_reader
.as_ref()
.unwrap()
.read_blocks_from_binary(buffer)?,
.read_blocks_from_binary(buffer, &path)?,
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
query error diff schema
select $1 from @data/parquet/ (files=>('tuple.parquet', 'complex.parquet'))

statement ok
create or replace table t1 (id int, t TUPLE(A INT32, B STRING));

query error diff schema
copy into t1 from @data/parquet/ files=('tuple.parquet', 'complex.parquet')

query error diff schema
copy /*+ set_var(parquet_fast_read_bytes=0) */ into t1 from @data/parquet/ files=('tuple.parquet', 'complex.parquet')

0 comments on commit 99cb6e1

Please sign in to comment.