-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: cdf reader for delta tables #2048
Conversation
Self::get_add_action_type(), | ||
)?; | ||
|
||
// Create the parquet scans for each associated type of file. I am not sure when we would use removes yet, but |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't remove actions then read and appended to the result with _change_type: delete
?
|
||
let results = scan.scan().await?; | ||
let data: Vec<RecordBatch> = collect_sendable_stream(results).await?; | ||
print_batches(&data)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an assert of the output of data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll flesh tests out to be more complete when we are more complete in the initial impl
|
||
// Create the parquet scans for each associated type of file. I am not sure when we would use removes yet, but | ||
// they would be here if / when they are necessary | ||
let cdc_scan = ParquetFormat::new() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered ctx.read_parquet for both and then just df_cdc.union(df_add)?.collect()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You lose some control when you use that facility and I'm not even sure that this would work given it looks like it's more expecting a directory and not explicit file paths. It might work, I haven't explored it extensively to be honest, but at a glance it doesn't look like it would provide the control necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does accept a list of filepaths. Comparing the two, it looks like these params are the same (I think)
ParquetReadOptions.schema == FileScanConfig.file_schema
table_paths == FileScanConfig.file_groups
ParquetReadOptions.table_partition_cols == FileScanConfig.table_partition_cols
Project and limit I guess you do after read_parquet on the dataframe (I assume it's lazy).
The only gap is the statistics part, but you don't seem to use it. Could be perhaps interesting to check if there is any performance difference between the two since the https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.execute_stream will create the physical plan for you
partition_values: Vec<String>, | ||
} | ||
|
||
impl DeltaCdfScan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not have time for a proper review, just as a quick comment - we should implement ExecutionPlan
for this scan, so it can be used in datafusion as an operation.
# Conflicts: # crates/core/src/operations/load_cdf.rs # crates/core/src/table/state.rs
# Description Some of my first workings on David's proposal in #2006, this is also meant to push #2048 and general CDF forward as well by making the logical operations of delta tables more composable than they are today. # Related Issue(s) #2006 #2048 I think and @Blajda correct me here, we can build upon this and eventually move towards a `DeltaPlanner` esq enum for operations and their associated logical plan building. # Still to do - [ ] Implement different path for partition columns that don't require scanning the file - [ ] Plumbing into `DeltaScan` so delta scan can make use of this logical node - [ ] General polish and cleanup, there are lots of unnecessary fields and way things are built - [ ] More tests, there is currently a large integration style end to end test, but this can / should be broken down
python/src/lib.rs
Outdated
.collect(); | ||
|
||
let final_batch = concat_batches(&batches[0].schema(), &batches).unwrap(); | ||
Ok(PyArrowType(final_batch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can also convert it as a reader again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, what do you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl IntoPyArrow for ArrowArrayStreamReader
Convert a ArrowArrayStreamReader into a pyarrow.RecordBatchReader.
https://docs.rs/arrow/latest/arrow/ffi_stream/struct.ArrowArrayStreamReader.html
I mean the above. You can take an arrowArrayStreamReader and move it into python as pyarrow.RecordBatchReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a RecordBatchReader first from the batches
then the stream
let stream = FFI_ArrowArrayStream::new(reader)
Ok(PyArrowType(stream))
With this as the return type: PyResult<PyArrowType>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I send you some comments on slack on the python signature.
Regarding the starting_version it always seems to be required but in spark sql it can also be just timestamps. I think it would make sense to have the starting version optional as well, and just raise if starting version nor starting timestamp is provided
Well, the rust side of it defaults to 0 for the starting version if you don't provide one, so in theory you can give nothing and it's just the full CDF feed for the table since the beginning. I don't know why I required python here to provide a starting version, but you are right we can mark it as optional. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple more comments
|
||
impl FileAction for Add { | ||
fn partition_values(&self) -> HashMap<String, Option<String>> { | ||
self.partition_values.clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like you only use this to get
out of the map, so you could probably return &HashMap<String, Option<String>>
and not have to clone (or just provide a get_partition_value
which does the probe internally (which makes it easier to not have to clone for remove).
} | ||
|
||
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { | ||
vec![self.plan.clone()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is probably my own ignorance regarding all these apis, but all the other things (like output_ordering
etc) seem to delegate to self.plan
, but this one says that plan
is the child. In particular, if this returns plan
I would have expected with_new_children
to replace plan
, but it instead makes the new children children of plan
.
Anyway, just something to think of :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right here, since this is a scan, it should be an empty vec
.unwrap_or(ScalarValue::Null) | ||
} | ||
|
||
pub fn create_spec_partition_values<F: FileAction>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should have some docs for a pub
func
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are pub in their module but are actually only exported as pub(crate)
outside the module. There actually is a deny lint set for publicly reachable things that don't have docs
use crate::delta_datafusion::{get_null_of_arrow_type, to_correct_scalar_value}; | ||
use crate::DeltaResult; | ||
|
||
pub fn map_action_to_scalar<F: FileAction>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs, as it's pub
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, lgtm!
|
||
fn with_new_children( | ||
self: Arc<Self>, | ||
_children: Vec<Arc<dyn ExecutionPlan>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you do use this, so why _children
and not children
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A side effect of me having this stubbed to start as todo!()
and then not fixing it properly.
Description
This PR is the initial work for Change Data Feed (CDF) readers for delta tables. This PR looks a lot larger than it really is because a physical test table is checked in with this which will be removed once the loop is closed on CDF reading/writing.
Related Issue(s)
Documentation
https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-data-files
https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file