-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(substrait): modular substrait producer #13931
base: main
Are you sure you want to change the base?
Conversation
a8273c0
to
4a464cb
Compare
|
||
fn consume_plan(&mut self, plan: &LogicalPlan) -> Result<Box<Rel>> { | ||
to_substrait_rel(self, plan) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though this is the SubstraitProducer, I used consume
as the verb for the API as it consumes DataFusion and produces Substrait.
I though about using produce_plan
, produce_projection
, etc but found that pattern a little weird reading-wise.
For example does produce_between
create a Substrait Between expression (which does not exist), or does it convert a Between expression into a Substrait equivalent. Because DataFusion relations and expressions don't map 1-1 with Substrait, I found it easier to think of this as consuming DataFusion. Just my 2 cents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree that "produce" doesn't make sense here, as it's more logical to think of the functions in terms of processing DF concepts rather than in producing Substrait things. However, the "consume" in producer can be a bit confusing w.r.t "consumer" - would it make sense to use some alternative, like "from" (which is already used for the functions) or "handle", "process", or something?
self.extensions | ||
} | ||
|
||
fn consume_extension(&mut self, plan: &Extension) -> Result<Box<Rel>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following was copied from the existing code for handling LogicalPlan::Extension nodes found later on.
state: &dyn SubstraitPlanningState, | ||
) -> Result<Box<Plan>> { | ||
let mut extensions = Extensions::default(); | ||
pub fn to_substrait_plan(plan: &LogicalPlan, state: &SessionState) -> Result<Box<Plan>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The public API stays mostly the same, taking a &SessionState
instead of a &dyn SubstraitPlanningState
which most users shouldn't notice.
maintain_singular_struct: false, | ||
}); | ||
pub fn from_table_scan( | ||
_producer: &mut impl SubstraitProducer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This currently isn't used. However, in the future we're likely going to want to use this producer when converting the DataFusion schema into Substrait, especially after the logical type work lands and we can potentially add user-define logical types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likely
Indicates to me we shouldn't add it there yet, since there's a risk it won't be used :) And I think it'll be fine to add it later - it'll be an API break, but only for those customizing the usage, and at least it'll be a clear break.
} else { | ||
Operator::Eq | ||
}; | ||
let join_on = to_substrait_join_expr(producer, &join.on, eq_op, &in_join_schema)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code here has changed slightly. to_substrait_join_expr
now takes the output schema of the join which makes it easier to process the join condition. More details below.
}; | ||
Ok(Box::new(Rel { | ||
rel_type: Some(rel_type), | ||
})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved up into the DefaultSubstraitProducer
extensions, | ||
)?; | ||
let l = producer.consume_expr(left, join_schema)?; | ||
let r = producer.consume_expr(right, join_schema)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We no longer need to track the column offset explicitly.
The column offset code was added as part of #6135 to handle queries like
SELECT d1.b, d2.c
FROM data d1
JOIN data d2 ON d1.b = d2.e
which caused issue because the left and right inputs both had the same name. This could potentially cause column name references in DataFusion to converted incorrectly into Substrait column indices in some cases. Additionally, there were issues with duplicate schema errors.
However, the introduction and usage of
datafusion/datafusion/substrait/src/logical_plan/consumer.rs
Lines 1772 to 1777 in a08dc0a
/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise | |
/// conflict with the columns from the other. | |
/// Substrait doesn't currently allow specifying aliases, neither for columns nor for tables. For | |
/// Substrait the names don't matter since it only refers to columns by indices, however DataFusion | |
/// requires columns to be uniquely identifiable, in some places (see e.g. DFSchema::check_names). | |
fn requalify_sides_if_needed( |
This was the only place were the column offset was used. Removing this here allowed me to remove the col_ref_offset
argument from a number of functions, which IMO simplifies the API substantially.
For further verification, a test has been added for in in roundtrip_logical_plan.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow how the requalify_sides_if_needed (added by me in #11049, just for reference) affects the need for this handling, given it's on the consumer side and this is on the producer. https://github.com/apache/datafusion/pull/6135/files#r1215611954 seems to indicate the re-added test doesn't catch the issue. Does this change affect the produced substrait plan?
Expr::Negative(arg) => ("negate", arg), | ||
expr => not_impl_err!("Unsupported expression: {expr:?}")?, | ||
}; | ||
to_substrait_unary_scalar_fn(producer, fn_name, arg, schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consolidated the handling of unary expression like Not, IsNull, IsNotNull etc into a single function for improved readability.
BREAKING CHANGE: SubstraitPlanningState is no longer available
@Blizzara, @ccciudatu I would appreciate if y'all could take a look when you have an opportunity. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, thanks @vbarua! I left some comments or thoughts, but nothing major.
LogicalPlan::Values(plan) => producer.consume_values(plan), | ||
LogicalPlan::Distinct(plan) => producer.consume_distinct(plan), | ||
LogicalPlan::Extension(plan) => producer.consume_extension(plan), | ||
_ => not_impl_err!("Unsupported plan type: {plan:?}")?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a lot of options behind this _
? might be nice to explicitly list them out, to make it clear what isn't supported yet
|
||
fn consume_plan(&mut self, plan: &LogicalPlan) -> Result<Box<Rel>> { | ||
to_substrait_rel(self, plan) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree that "produce" doesn't make sense here, as it's more logical to think of the functions in terms of processing DF concepts rather than in producing Substrait things. However, the "consume" in producer can be a bit confusing w.r.t "consumer" - would it make sense to use some alternative, like "from" (which is already used for the functions) or "handle", "process", or something?
|
||
fn consume_scalar_function( | ||
&mut self, | ||
scalar_fn: &expr::ScalarFunction, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is to de-conflict with Substrait's ScalarFunction, which is imported? 👍
maintain_singular_struct: false, | ||
}); | ||
pub fn from_table_scan( | ||
_producer: &mut impl SubstraitProducer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likely
Indicates to me we shouldn't add it there yet, since there's a risk it won't be used :) And I think it'll be fine to add it later - it'll be an API break, but only for those customizing the usage, and at least it'll be a clear break.
if e.produce_one_row { | ||
return not_impl_err!("Producing a row from empty relation is unsupported"); | ||
} | ||
#[allow(deprecated)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think previously it was allowed on even higher level so this is fine, but ooc, what's deprecated in all these?
let substrait_expr = producer.consume_expr(expr.as_ref(), schema)?; | ||
let substrait_low = producer.consume_expr(low.as_ref(), schema)?; | ||
let substrait_high = producer.consume_expr(high.as_ref(), schema)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated to this PR and probs better not to change now to keep diff small(er), but I think there's no reason to duplicate these below, they could just happen above the if
let mut arguments: Vec<FunctionArgument> = vec![]; | ||
for arg in args { | ||
arguments.push(FunctionArgument { | ||
arg_type: Some(ArgType::Value(to_substrait_rex(producer, arg, schema)?)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be produced.consume_expr(..) like on lines 1607/1612 below? there's couple other places as well where in the same function we call both. Is there a reason to do that, or should everything go through the producer.consume_
calls?
@@ -1727,29 +2001,26 @@ fn make_substrait_window_function( | |||
} | |||
} | |||
|
|||
#[allow(deprecated)] | |||
#[allow(clippy::too_many_arguments)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be removed now too, or do we still have too many args?
|
||
fn consume_extension(&mut self, plan: &Extension) -> Result<Box<Rel>> { | ||
let extension_bytes = self | ||
.state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the only use for SessionState in the DefaultSubstraitProducer, so presumably it wouldn't need the full state to operate with... But given users have the option of making their own producer if they care, maybe that's fine and better to just have the state here for future needs?
@@ -571,6 +571,21 @@ async fn roundtrip_self_implicit_cross_join() -> Result<()> { | |||
roundtrip("SELECT left.a left_a, left.b, right.a right_a, right.c FROM data AS left, data AS right").await | |||
} | |||
|
|||
#[tokio::test] | |||
async fn self_join_introduces_aliases() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is adding back this test, right? I seem to have argued back then that it is unnecessary given the roundtrip_self_join test.
Which issue does this PR close?
Closes #13901
Rationale for this change
This is the producer equivalent to the consumer changes in #13803
Improves the reusability of the Substrait Producer for users that wish to customize how DataFusion relations and expression are converted into Substrait.
This is especially useful for controlling how UserDefinedLogicalNodes are converted into Substrait.
What changes are included in this PR?
Refactoring
from_*
functions (i.e.from_projection
,from_filter
,from_between
) to aid re-use.&mut impl SubstraitProducer
SubstraitPlanningState
has been fully removed it is no longer used anywhere.Code Changes
The conversion of joins has been simplified to no longer require a column offset when converting the join condition. This allowed for the removal of the
col_ref_offset
argument from all methods that used it, simplifying the API.Are these changes tested?
These changes refactor existing code and leverage their tests.
A test was added to verify the behaviour when converting joins, as there is a code change there.
Are there any user-facing changes?
to_substrait_plan
function now consumes a&SessionState
directly, instead of a&dyn SubstraitPlanningState
which most users should not notice.to_substrait_rel
andto_substrait_rex
have had their API change, but this should not affect most users.