Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Allow nested is_in() in when()/then() for full-streaming #20052

Merged
merged 4 commits into from
Nov 29, 2024

Conversation

nameexhaustion
Copy link
Collaborator

@nameexhaustion nameexhaustion commented Nov 28, 2024

Fixes #15767

This PR also does some refactoring to consolidate and improve the way we identify expressions as elementwise.

Replacing streamable with is_elementwise

A lot of places where we currently use the streamable terminology actually needed to use an even stricter elementwise requirement (e.g. filter / slice pushdown. streamable expressions include all elementwise expressions and more. For example, filter() / explode() expressions are streamable, but they are not elementwise.

Where this matters is if the result of a streamable expression needs to be projected next to other result columns, which is almost always the case. The existing in-memory engine and evaluators cannot do this properly as it projects the result columns independently within every chunk, which could lead to height mismatches. The only place that properly supports this is the new-streaming engine, where the ZipNode performs the projection properly.

@github-actions github-actions bot added fix Bug fix python Related to Python Polars rust Related to Rust Polars labels Nov 28, 2024
@nameexhaustion nameexhaustion changed the title fix: Allow nested is_in() in when()/then() for streaming sink fix: Allow nested is_in() in when()/then() for full-streaming Nov 28, 2024
pub(crate) fn groups_sensitive(&self) -> bool {

/// Checks whether this expression is elementwise. This only checks the top level expression.
pub(crate) fn is_elementwise_top_level(&self) -> bool {
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_elementwise_top_level() replaces groups_sensitive(). It is also changed to consider Explode and Filter as non-elementwise

}
/// Checks if the top-level expression node is elementwise. If this is the case, then `stack` will
/// be extended further with any nested expression nodes.
pub fn is_elementwise(stack: &mut Vec<Node>, ae: &AExpr, expr_arena: &Arena<AExpr>) -> bool {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Base is_elementwise() function - recursion is done through a nodes stack which allows the caller to inspect the exprs.

}
if matches!(expr_arena.get(rhs), AExpr::Literal { .. }) {
stack.push_node(input.first().unwrap().node());
return;
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for the when(A.is_in(B)) issue - we don't traverse into RHS literals of is_in() during recursive checking. We had this already in predicate pushdown but it has been moved here.

..
} => {
assert!(options.is_elementwise());
opts.strict
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule was moved from new-streaming below

if cfg!(debug_assertions) {
for v in acc_predicates.values() {
let ae = expr_arena.get(v.node());
assert!(permits_filter_pushdown(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by - predicates that hit here should already satisfy these requirements - otherwise they should have been caught at the Filter node

options.collect_groups,
ApplyOptions::ElementWise | ApplyOptions::ApplyList
),
Context::Aggregation => matches!(options.collect_groups, ApplyOptions::ElementWise),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was never called with Context::Aggregation

.contains(FunctionFlags::CHANGES_LENGTH | FunctionFlags::RETURNS_SCALAR)
matches!(
self.collect_groups,
ApplyOptions::ElementWise | ApplyOptions::ApplyList
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this, but we apparently consider ApplyList as elementwise according to the test suite

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should see when we hit this. I believe only in a few python udf cases. I think we would never hit the streaming engine with this.

Copy link
Collaborator Author

@nameexhaustion nameexhaustion Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is hit by map_elements() / map_batches(.., agg_list=True). This also hits the streaming engines because FunctionOptions::is_elementwise() gets used by all of the engines during physical plan creation / IR lowering (indirectly through fn is_elementwise(_rec)).

I don't think ApplyList is strictly elementwise, but both the in-memory and existing streaming engine currently expect it to be identified as such, so I've made it the default here. For the new-streaming engine I added an override below.

@nameexhaustion nameexhaustion force-pushed the is-elementwise-streamable branch from 8adc9dd to 55ec6b6 Compare November 28, 2024 16:22
Copy link

codecov bot commented Nov 28, 2024

Codecov Report

Attention: Patch coverage is 84.18605% with 34 lines in your changes missing coverage. Please review.

Project coverage is 79.52%. Comparing base (44ddbc2) to head (5c45bf1).
Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
...ates/polars-stream/src/physical_plan/lower_expr.rs 0.00% 23 Missing ⚠️
crates/polars-plan/src/plans/aexpr/utils.rs 91.22% 5 Missing ⚠️
...rs-plan/src/plans/optimizer/slice_pushdown_expr.rs 0.00% 3 Missing ⚠️
crates/polars-plan/src/plans/lit.rs 75.00% 1 Missing ⚠️
...es/polars-plan/src/plans/optimizer/cse/cse_expr.rs 50.00% 1 Missing ⚠️
crates/polars-stream/src/physical_plan/lower_ir.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #20052      +/-   ##
==========================================
- Coverage   79.52%   79.52%   -0.01%     
==========================================
  Files        1563     1563              
  Lines      217194   217173      -21     
  Branches     2464     2464              
==========================================
- Hits       172729   172701      -28     
- Misses      43905    43912       +7     
  Partials      560      560              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@nameexhaustion nameexhaustion force-pushed the is-elementwise-streamable branch from a57466f to 66c30d4 Compare November 29, 2024 05:59
@nameexhaustion nameexhaustion marked this pull request as ready for review November 29, 2024 06:09
@nameexhaustion nameexhaustion marked this pull request as draft November 29, 2024 06:10
@nameexhaustion nameexhaustion marked this pull request as ready for review November 29, 2024 06:17
Alias(_, _) | BinaryExpr { .. } | Column(_) | Ternary { .. } | Cast { .. } => true,

Agg { .. }
| Explode(_)
Copy link
Member

@ritchie46 ritchie46 Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter, gather, sort and explode don't seem elementwise. They are group sensitive but not elementwise. Maybe this is an unlucky name?

crates/polars-plan/src/plans/aexpr/utils.rs Outdated Show resolved Hide resolved
crates/polars-plan/src/plans/aexpr/utils.rs Outdated Show resolved Hide resolved
crates/polars-plan/src/plans/aexpr/utils.rs Outdated Show resolved Hide resolved
.contains(FunctionFlags::CHANGES_LENGTH | FunctionFlags::RETURNS_SCALAR)
matches!(
self.collect_groups,
ApplyOptions::ElementWise | ApplyOptions::ApplyList
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should see when we hit this. I believe only in a few python udf cases. I think we would never hit the streaming engine with this.

@nameexhaustion nameexhaustion marked this pull request as draft November 29, 2024 09:43
@nameexhaustion nameexhaustion force-pushed the is-elementwise-streamable branch from 66c30d4 to 5c45bf1 Compare November 29, 2024 09:55
@nameexhaustion nameexhaustion marked this pull request as ready for review November 29, 2024 10:19
@ritchie46 ritchie46 merged commit ca8c1ef into pola-rs:main Nov 29, 2024
25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
fix Bug fix python Related to Python Polars rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Cannot sink_parquet when using .is_in() inside of pl.when()/then() in polars > 0.20.19
2 participants