Skip to content

perf: Automatically condense internal expression representation #516

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 29, 2024
14 changes: 9 additions & 5 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def project_to_id(self, expression: ex.Expression, output_id: str):
child=self.node,
assignments=tuple(exprs),
)
)
).merge_projections()

def assign(self, source_id: str, destination_id: str) -> ArrayValue:
if destination_id in self.column_ids: # Mutate case
Expand All @@ -208,7 +208,7 @@ def assign(self, source_id: str, destination_id: str) -> ArrayValue:
child=self.node,
assignments=tuple(exprs),
)
)
).merge_projections()

def assign_constant(
self,
Expand Down Expand Up @@ -242,7 +242,7 @@ def assign_constant(
child=self.node,
assignments=tuple(exprs),
)
)
).merge_projections()

def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue:
selections = ((ex.free_var(col_id), col_id) for col_id in column_ids)
Expand All @@ -251,7 +251,7 @@ def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue:
child=self.node,
assignments=tuple(selections),
)
)
).merge_projections()

def drop_columns(self, columns: Iterable[str]) -> ArrayValue:
new_projection = (
Expand All @@ -264,7 +264,7 @@ def drop_columns(self, columns: Iterable[str]) -> ArrayValue:
child=self.node,
assignments=tuple(new_projection),
)
)
).merge_projections()

def aggregate(
self,
Expand Down Expand Up @@ -466,3 +466,7 @@ def _uniform_sampling(self, fraction: float) -> ArrayValue:
The row numbers of result is non-deterministic, avoid to use.
"""
return ArrayValue(nodes.RandomSampleNode(self.node, fraction))

def merge_projections(self) -> ArrayValue:
new_node = bigframes.core.rewrite.maybe_squash_projection(self.node)
return ArrayValue(new_node)
26 changes: 18 additions & 8 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -1050,27 +1050,37 @@ def _hide_column(self, column_id) -> OrderedIR:
def _bake_ordering(self) -> OrderedIR:
"""Bakes ordering expression into the selection, maybe creating hidden columns."""
ordering_expressions = self._ordering.all_ordering_columns
new_exprs = []
new_baked_cols = []
new_exprs: list[OrderingExpression] = []
new_baked_cols: list[ibis_types.Value] = []
for expr in ordering_expressions:
if isinstance(expr.scalar_expression, ex.OpExpression):
baked_column = self._compile_expression(expr.scalar_expression).name(
bigframes.core.guid.generate_guid()
)
new_baked_cols.append(baked_column)
new_expr = OrderingExpression(
ex.free_var(baked_column.name), expr.direction, expr.na_last
ex.free_var(baked_column.get_name()), expr.direction, expr.na_last
)
new_exprs.append(new_expr)
else:
elif isinstance(expr.scalar_expression, ex.UnboundVariableExpression):
order_col = expr.scalar_expression.id
new_exprs.append(expr)

ordering = self._ordering.with_ordering_columns(new_exprs)
if order_col not in self.column_ids:
new_baked_cols.append(
self._ibis_bindings[expr.scalar_expression.id]
)

new_ordering = ExpressionOrdering(
tuple(new_exprs),
self._ordering.integer_encoding,
self._ordering.string_encoding,
self._ordering.total_ordering_columns,
)
return OrderedIR(
self._table,
columns=self.columns,
hidden_ordering_columns=[*self._hidden_ordering_columns, *new_baked_cols],
Copy link
Contributor

@chelsea-lin chelsea-lin Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the intention to omit the existing _hidden_ordering_columns here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we are rewriting the hidden columns here, so we are recreating the set of hidden columns and discarding the old set.

ordering=ordering,
hidden_ordering_columns=tuple(new_baked_cols),
ordering=new_ordering,
predicates=self._predicates,
)

Expand Down
48 changes: 38 additions & 10 deletions bigframes/core/rewrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,21 @@ class SquashedSelect:
columns: Tuple[Tuple[scalar_exprs.Expression, str], ...]
predicate: Optional[scalar_exprs.Expression]
ordering: Tuple[order.OrderingExpression, ...]
reverse_root: bool = False

@classmethod
def from_node(cls, node: nodes.BigFrameNode) -> SquashedSelect:
def from_node(
cls, node: nodes.BigFrameNode, projections_only: bool = False
) -> SquashedSelect:
if isinstance(node, nodes.ProjectionNode):
return cls.from_node(node.child).project(node.assignments)
elif isinstance(node, nodes.FilterNode):
return cls.from_node(node.child, projections_only=projections_only).project(
node.assignments
)
elif not projections_only and isinstance(node, nodes.FilterNode):
return cls.from_node(node.child).filter(node.predicate)
elif isinstance(node, nodes.ReversedNode):
elif not projections_only and isinstance(node, nodes.ReversedNode):
return cls.from_node(node.child).reverse()
elif isinstance(node, nodes.OrderByNode):
elif not projections_only and isinstance(node, nodes.OrderByNode):
return cls.from_node(node.child).order_with(node.by)
else:
selection = tuple(
Expand All @@ -63,7 +68,9 @@ def project(
new_columns = tuple(
(expr.bind_all_variables(self.column_lookup), id) for expr, id in projection
)
return SquashedSelect(self.root, new_columns, self.predicate, self.ordering)
return SquashedSelect(
self.root, new_columns, self.predicate, self.ordering, self.reverse_root
)

def filter(self, predicate: scalar_exprs.Expression) -> SquashedSelect:
if self.predicate is None:
Expand All @@ -72,18 +79,24 @@ def filter(self, predicate: scalar_exprs.Expression) -> SquashedSelect:
new_predicate = ops.and_op.as_expr(
self.predicate, predicate.bind_all_variables(self.column_lookup)
)
return SquashedSelect(self.root, self.columns, new_predicate, self.ordering)
return SquashedSelect(
self.root, self.columns, new_predicate, self.ordering, self.reverse_root
)

def reverse(self) -> SquashedSelect:
new_ordering = tuple(expr.with_reverse() for expr in self.ordering)
return SquashedSelect(self.root, self.columns, self.predicate, new_ordering)
return SquashedSelect(
self.root, self.columns, self.predicate, new_ordering, not self.reverse_root
)

def order_with(self, by: Tuple[order.OrderingExpression, ...]):
adjusted_orderings = [
order_part.bind_variables(self.column_lookup) for order_part in by
]
new_ordering = (*adjusted_orderings, *self.ordering)
return SquashedSelect(self.root, self.columns, self.predicate, new_ordering)
return SquashedSelect(
self.root, self.columns, self.predicate, new_ordering, self.reverse_root
)

def maybe_join(
self, right: SquashedSelect, join_def: join_defs.JoinDefinition
Expand Down Expand Up @@ -126,8 +139,10 @@ def maybe_join(
new_columns = remap_names(join_def, lselection, rselection)

# Reconstruct ordering
reverse_root = self.reverse_root
if join_type == "right":
new_ordering = right.ordering
reverse_root = right.reverse_root
elif join_type == "outer":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to redefine the reverse_root for the outer join case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outer join we treat as left order taking precedence, so it inherits reverse_root from the self.

if lmask is not None:
prefix = order.OrderingExpression(lmask, order.OrderingDirection.DESC)
Expand Down Expand Up @@ -158,18 +173,31 @@ def maybe_join(
new_ordering = self.ordering
else:
raise ValueError(f"Unexpected join type {join_type}")
return SquashedSelect(self.root, new_columns, new_predicate, new_ordering)
return SquashedSelect(
self.root, new_columns, new_predicate, new_ordering, reverse_root
)

def expand(self) -> nodes.BigFrameNode:
# Safest to apply predicates first, as it may filter out inputs that cannot be handled by other expressions
root = self.root
if self.reverse_root:
root = nodes.ReversedNode(child=root)
if self.predicate:
root = nodes.FilterNode(child=root, predicate=self.predicate)
if self.ordering:
root = nodes.OrderByNode(child=root, by=self.ordering)
return nodes.ProjectionNode(child=root, assignments=self.columns)


def maybe_squash_projection(node: nodes.BigFrameNode) -> nodes.BigFrameNode:
if isinstance(node, nodes.ProjectionNode) and isinstance(
node.child, nodes.ProjectionNode
):
# Conservative approach, only squash consecutive projections, even though could also squash filters, reorderings
return SquashedSelect.from_node(node, projections_only=True).expand()
return node


def maybe_rewrite_join(join_node: nodes.JoinNode) -> nodes.BigFrameNode:
left_side = SquashedSelect.from_node(join_node.left_child)
right_side = SquashedSelect.from_node(join_node.right_child)
Expand Down