From e239b61f95187650128ef3d7b66e1f76ac5675f9 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 17 Apr 2026 18:14:53 +0000 Subject: [PATCH 1/4] refactor: Unify gbq execution as semi-executor --- .../core/compile/sqlglot/sqlglot_ir.py | 4 +- .../bigframes/session/bq_caching_executor.py | 419 ++++++------------ .../bigframes/session/direct_gbq_execution.py | 187 ++++++-- .../bigframes/session/execution_spec.py | 35 +- .../bigframes/session/local_scan_executor.py | 10 +- .../bigframes/session/polars_executor.py | 12 +- .../bigframes/session/read_api_execution.py | 13 +- .../bigframes/session/semi_executor.py | 5 +- .../tests/system/small/test_session.py | 4 +- 9 files changed, 331 insertions(+), 358 deletions(-) diff --git a/packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py b/packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py index 27b79f266bc1..1e0b561e8c5b 100644 --- a/packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py +++ b/packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py @@ -177,7 +177,7 @@ def from_table( project_id: str, dataset_id: str, table_id: str, - uid_gen: guid.SequentialUIDGenerator, + uid_gen: guid.SequentialUIDGenerator | None = None, columns: typing.Sequence[str] = (), sql_predicate: typing.Optional[str] = None, system_time: typing.Optional[datetime.datetime] = None, @@ -202,6 +202,8 @@ def from_table( if system_time else None ) + if uid_gen is None: + uid_gen = guid.SequentialUIDGenerator() table_alias = next(uid_gen.get_uid_stream("bft_")) table_expr = sge.Table( this=sql.identifier(table_id), diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index 8d3782c5ef6b..b9ca1a2a4299 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -20,8 +20,6 @@ from typing import Literal, Mapping, Optional, Sequence, Tuple import google.api_core.exceptions -import google.cloud.bigquery.job as bq_job -import google.cloud.bigquery.table as bq_table import google.cloud.bigquery_storage_v1 from google.cloud import bigquery @@ -31,6 +29,7 @@ import bigframes.core.events import bigframes.core.guid import bigframes.core.identifiers +import bigframes.core.ordering import bigframes.core.nodes as nodes import bigframes.core.schema as schemata import bigframes.core.tree_properties as tree_properties @@ -41,8 +40,7 @@ import bigframes.session.metrics import bigframes.session.planner import bigframes.session.temporary_storage -from bigframes import exceptions as bfe -from bigframes.core import bq_data, compile, local_data, rewrite +from bigframes.core import compile, guid, local_data, rewrite from bigframes.core.compile.sqlglot import sql as sg_sql from bigframes.core.compile.sqlglot import sqlglot_ir from bigframes.session import ( @@ -51,7 +49,9 @@ local_scan_executor, read_api_execution, semi_executor, + direct_gbq_execution, ) +import dataclasses # Max complexity that should be executed as a single query QUERY_COMPLEXITY_LIMIT = 1e7 @@ -108,6 +108,9 @@ def __init__( polars_executor.PolarsExecutor(), ) self._upload_lock = threading.Lock() + self._gbq_executor = direct_gbq_execution.DirectGbqExecutor( + bqclient, compile.compiler, metrics=self.metrics, publisher=self._publisher + ) def to_sql( self, @@ -129,60 +132,100 @@ def to_sql( ) return compiled.sql + def execute( self, array_value: bigframes.core.ArrayValue, execution_spec: ex_spec.ExecutionSpec, ) -> executor.ExecuteResult: self._publisher.publish(bigframes.core.events.ExecutionStarted()) + maybe_result = self._try_execute_semi_executors(array_value, execution_spec) + if maybe_result is not None: + return maybe_result + result = self._execute_bigquery(array_value, execution_spec) + self._publisher.publish( + bigframes.core.events.ExecutionFinished( + result=result, + ) + ) + return result - # TODO: Support export jobs in combination with semi executors - if execution_spec.destination_spec is None: - plan = self.prepare_plan(array_value.node, target="simplify") - for exec in self._semi_executors: - maybe_result = exec.execute( - plan, ordered=execution_spec.ordered, peek=execution_spec.peek - ) - if maybe_result: - self._publisher.publish( - bigframes.core.events.ExecutionFinished( - result=maybe_result, - ) + def _try_execute_semi_executors( + self, + array_value: bigframes.core.ArrayValue, + execution_spec: ex_spec.ExecutionSpec, + ) -> Optional[executor.ExecuteResult]: + plan = self.prepare_plan(array_value.node, target="simplify") + for exec in self._semi_executors: + maybe_result = exec.execute(plan, execution_spec) + if maybe_result: + self._publisher.publish( + bigframes.core.events.ExecutionFinished( + result=maybe_result, ) - return maybe_result + ) + return maybe_result + return None - if isinstance(execution_spec.destination_spec, ex_spec.TableOutputSpec): - if execution_spec.peek or execution_spec.ordered: - raise NotImplementedError( - "Ordering and peeking not supported for gbq export" + + def _execute_bigquery( + self, + array_value: bigframes.core.ArrayValue, + execution_spec: ex_spec.ExecutionSpec, + ) -> executor.ExecuteResult: + dest_spec = execution_spec.destination_spec + # Recursive handlers for different cases, maybe extract to explicit interface. + if isinstance(dest_spec, ex_spec.GcsOutputSpec): + execution_spec = dataclasses.replace( + execution_spec, destination_spec=ex_spec.TempTableSpec(cluster_cols=dest_spec.cluster_cols, lifetime="ephemeral") + ) + results = self._execute_bigquery(array_value, execution_spec) + self._export_result_gcs(results, dest_spec) + return results + if isinstance(dest_spec, ex_spec.TableOutputSpec): + # Special DML path - maybe this should be configurable, dml vs query destination has tradeoffs + existing_table = self._maybe_find_existing_table(dest_spec) + if execution_spec.ordered: + raise ValueError("Ordering not supported with table outputs") + if (existing_table is not None) and _is_schema_match( + existing_table.schema, array_value.schema + ): + execution_spec = dataclasses.replace( + execution_spec, destination_spec=ex_spec.TempTableSpec(cluster_cols=execution_spec.destination_spec.cluster_cols, lifetime="ephemeral") ) - # separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml - result = self._export_gbq(array_value, execution_spec.destination_spec) - self._publisher.publish( - bigframes.core.events.ExecutionFinished( - result=result, + results = self._execute_bigquery(array_value, execution_spec) + self._export_gbq_with_dml(results, dest_spec) + return results + if isinstance(dest_spec, ex_spec.TempTableSpec): + # "ephemeral" temp tables created in the course of exeuction, don't need to be allocated + # materialized ordering only really makes sense for internal temp tables used by caching + cluster_cols = dest_spec.cluster_cols + plan = array_value.node + if dest_spec.ordering == "offsets_col": + order_col_id = guid.generate_guid() + plan = nodes.PromoteOffsetsNode(plan, order_col_id) + cluster_cols = [order_col_id] + elif dest_spec.ordering == "order_key": + plan = nodes.defer_order(plan, output_hidden_row_keys=True) + if dest_spec.lifetime == "session": + destination_table = self.storage_manager.create_temp_table( + plan.schema, cluster_cols ) + arr_value = bigframes.core.ArrayValue(plan) + execution_spec = dataclasses.replace( + execution_spec, destination_spec=ex_spec.TableOutputSpec(table=destination_table, cluster_cols=dest_spec.cluster_cols, if_exists="replace") + ) + return self._execute_bigquery(arr_value, execution_spec) + + + # At this point, dst should be unspecified, a specific bq table, or an ephemeral temp table + # Also, ordering mode will either be none or row-sorted + gbq_plan = self.prepare_plan(array_value.node, target="bq_execution") + result = self._gbq_executor.execute(gbq_plan, execution_spec) + if result is None: + raise ValueError( + f"Couldn't execute plan {array_value.node} with {execution_spec}" ) - return result - - result = self._execute_plan_gbq( - array_value.node, - ordered=execution_spec.ordered, - peek=execution_spec.peek, - cache_spec=execution_spec.destination_spec - if isinstance(execution_spec.destination_spec, ex_spec.CacheSpec) - else None, - must_create_table=not execution_spec.promise_under_10gb, - ) - # post steps: export - if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec): - self._export_result_gcs(result, execution_spec.destination_spec) - - self._publisher.publish( - bigframes.core.events.ExecutionFinished( - result=result, - ) - ) return result def _export_result_gcs( @@ -231,75 +274,33 @@ def _maybe_find_existing_table( except google.api_core.exceptions.NotFound: return None - def _export_gbq( - self, array_value: bigframes.core.ArrayValue, spec: ex_spec.TableOutputSpec + def _export_gbq_with_dml( + self, result: executor.ExecuteResult, spec: ex_spec.TableOutputSpec ) -> executor.ExecuteResult: """ - Export the ArrayValue to an existing BigQuery table. + Export the ArrayValue to an existing BigQuery table, using DML. """ - plan = self.prepare_plan(array_value.node, target="bq_execution") - - # validate destination table - existing_table = self._maybe_find_existing_table(spec) - - compiled = compile.compiler().compile_sql( - compile.CompileRequest(plan, sort_rows=False) + # b/409086472: Uses DML for table appends and replacements to avoid + # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: + # https://cloud.google.com/bigquery/quotas#standard_tables + ir = sqlglot_ir.SQLGlotIR.from_table( + result.query_job.destination.project, + result.query_job.destination.dataset_id, + result.query_job.destination.table_id, ) - sql = compiled.sql + sql = "" + if spec.if_exists == "append": + sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table)) + else: # for "replace" + assert spec.if_exists == "replace" + sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table)) - if (existing_table is not None) and _is_schema_match( - existing_table.schema, array_value.schema - ): - # b/409086472: Uses DML for table appends and replacements to avoid - # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: - # https://cloud.google.com/bigquery/quotas#standard_tables - job_config = bigquery.QueryJobConfig() - - ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql) - if spec.if_exists == "append": - sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table)) - else: # for "replace" - assert spec.if_exists == "replace" - sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table)) - else: - dispositions = { - "fail": bigquery.WriteDisposition.WRITE_EMPTY, - "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, - "append": bigquery.WriteDisposition.WRITE_APPEND, - } - job_config = bigquery.QueryJobConfig( - write_disposition=dispositions[spec.if_exists], - destination=spec.table, - clustering_fields=spec.cluster_cols if spec.cluster_cols else None, - ) - - # Attach data type usage to the job labels - job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. - iterator, job = self._run_execute_query( - sql=sql, - job_config=job_config, - session=array_value.session, - ) - - has_special_dtype_col = any( - t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE) - for t in array_value.schema.dtypes - ) - - if spec.if_exists != "append" and has_special_dtype_col: - # Only update schema if this is not modifying an existing table, and the - # new table contains special columns (like timedelta or obj_ref). - table = self.bqclient.get_table(spec.table) - table.schema = array_value.schema.to_bigquery() - self.bqclient.update_table(table, ["schema"]) - - return executor.EmptyExecuteResult( - bf_schema=array_value.schema, - execution_metadata=executor.ExecutionMetadata.from_iterator_and_job( - iterator, job - ), + bq_io.start_query_with_client( + self.bqclient, + sql, + job_config=bigquery.QueryJobConfig(), + metrics=self.metrics, + publisher=self._publisher, ) def dry_run( @@ -343,62 +344,6 @@ def cached( ) # Helpers - def _run_execute_query( - self, - sql: str, - job_config: Optional[bq_job.QueryJobConfig] = None, - query_with_job: bool = True, - session=None, - ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: - """ - Starts BigQuery query job and waits for results. - """ - job_config = bq_job.QueryJobConfig() if job_config is None else job_config - if bigframes.options.compute.maximum_bytes_billed is not None: - job_config.maximum_bytes_billed = ( - bigframes.options.compute.maximum_bytes_billed - ) - - if self._labels: - job_config.labels.update(self._labels) - - try: - # Trick the type checker into thinking we got a literal. - if query_with_job: - return bq_io.start_query_with_client( - self.bqclient, - sql, - job_config=job_config, - metrics=self.metrics, - project=None, - location=None, - timeout=None, - query_with_job=True, - publisher=self._publisher, - session=session, - ) - else: - return bq_io.start_query_with_client( - self.bqclient, - sql, - job_config=job_config, - metrics=self.metrics, - project=None, - location=None, - timeout=None, - query_with_job=False, - publisher=self._publisher, - session=session, - ) - - except google.api_core.exceptions.BadRequest as e: - # Unfortunately, this error type does not have a separate error code or exception type - if "Resources exceeded during query execution" in e.message: - new_message = "Computation is too complex to execute as a single query. Try using DataFrame.cache() on intermediate results, or setting bigframes.options.compute.enable_multi_query_execution." - raise bfe.QueryComplexityError(new_message) from e - else: - raise - def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): """ Can the block be evaluated very cheaply? @@ -441,23 +386,33 @@ def _cache_with_cluster_cols( self, array_value: bigframes.core.ArrayValue, cluster_cols: Sequence[str] ): """Executes the query and uses the resulting table to rewrite future executions.""" + cluster_cols = [ + col + for col in cluster_cols + if bigframes.dtypes.is_clusterable(array_value.schema.get_type(col)) + ] + cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS] execution_spec = ex_spec.ExecutionSpec( - destination_spec=ex_spec.CacheSpec(cluster_cols=tuple(cluster_cols)) + destination_spec=ex_spec.TempTableSpec(cluster_cols=tuple(cluster_cols), lifetime="session", ordering="order_key") ) - self.execute( + result_bq_data = self.execute( array_value, execution_spec=execution_spec, ) + assert isinstance(result_bq_data, bigframes.core.BigqueryDataSource) + self.cache.cache_results_table(array_value.node, result_bq_data) def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): """Executes the query and uses the resulting table to rewrite future executions.""" execution_spec = ex_spec.ExecutionSpec( - destination_spec=ex_spec.CacheSpec(cluster_cols=tuple()) + destination_spec=ex_spec.TempTableSpec(cluster_cols=(), lifetime="session", ordering="offsets_col") ) - self.execute( + result_bq_data = self.execute( array_value, execution_spec=execution_spec, ) + assert isinstance(result_bq_data, bigframes.core.BigqueryDataSource) + self.cache.cache_results_table(array_value.node, result_bq_data) def _cache_with_session_awareness( self, @@ -500,8 +455,9 @@ def _cache_most_complex_subtree(self, node: nodes.BigFrameNode) -> bool: max_complexity=QUERY_COMPLEXITY_LIMIT, cache=self.cache, # Heuristic: subtree_compleixty * (copies of subtree)^2 - heuristic=lambda complexity, count: math.log(complexity) - + 2 * math.log(count), + heuristic=lambda complexity, count: ( + math.log(complexity) + 2 * math.log(count) + ), ) if selection is None: # No good subtrees to cache, just return original tree @@ -568,126 +524,11 @@ def map_local_scans(node: nodes.BigFrameNode): return original_root.bottom_up(map_local_scans) def _execute_plan_gbq( - self, - plan: nodes.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, - cache_spec: Optional[ex_spec.CacheSpec] = None, - must_create_table: bool = True, + self, plan: nodes.BigFrameNode, execution_spec: ex_spec.ExecutionSpec ) -> executor.ExecuteResult: """Just execute whatever plan as is, without further caching or decomposition.""" - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. - - og_plan = plan - og_schema = plan.schema - plan = self.prepare_plan(plan, target="bq_execution") - create_table = must_create_table - cluster_cols: Sequence[str] = [] - if cache_spec is not None: - if peek is not None: - raise ValueError("peek is not compatible with caching.") - - create_table = True - if not cache_spec.cluster_cols: - offsets_id = bigframes.core.identifiers.ColumnId( - bigframes.core.guid.generate_guid() - ) - plan = nodes.PromoteOffsetsNode(plan, offsets_id) - cluster_cols = [offsets_id.sql] - else: - cluster_cols = [ - col - for col in cache_spec.cluster_cols - if bigframes.dtypes.is_clusterable(plan.schema.get_type(col)) - ] - cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS] - - compiled = compile.compiler().compile_sql( - compile.CompileRequest( - plan, - sort_rows=ordered, - peek_count=peek, - materialize_all_order_keys=(cache_spec is not None), - ) - ) - # might have more columns than og schema, for hidden ordering columns - compiled_schema = compiled.sql_schema - - destination_table: Optional[bigquery.TableReference] = None - - job_config = bigquery.QueryJobConfig() - if create_table: - destination_table = self.storage_manager.create_temp_table( - compiled_schema, cluster_cols - ) - job_config.destination = destination_table - - # Attach data type usage to the job labels - job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs - iterator, query_job = self._run_execute_query( - sql=compiled.sql, - job_config=job_config, - query_with_job=(destination_table is not None), - session=plan.session, - ) - - # we could actually cache even when caching is not explicitly requested, but being conservative for now - result_bq_data = None - if query_job and query_job.destination: - # we might add extra sql columns in compilation, esp if caching w ordering, infer a bigframes type for them - result_bf_schema = _result_schema(og_schema, list(compiled.sql_schema)) - dst = query_job.destination - result_bq_data = bq_data.BigqueryDataSource( - table=bq_data.GbqNativeTable.from_ref_and_schema( - dst, - tuple(compiled_schema), - cluster_cols=tuple(cluster_cols), - location=iterator.location or self.storage_manager.location, - table_type="TABLE", - ), - schema=result_bf_schema, - ordering=compiled.row_order, - n_rows=iterator.total_rows, - ) - - if cache_spec is not None: - assert result_bq_data is not None - assert compiled.row_order is not None - self.cache.cache_results_table(og_plan, result_bq_data) - - execution_metadata = executor.ExecutionMetadata.from_iterator_and_job( - iterator, query_job - ) - result_mostly_cached = ( - hasattr(iterator, "_is_almost_completely_cached") - and iterator._is_almost_completely_cached() - ) - if result_bq_data is not None and not result_mostly_cached: - return executor.BQTableExecuteResult( - data=result_bq_data, - project_id=self.bqclient.project, - storage_client=self.bqstoragereadclient, - execution_metadata=execution_metadata, - selected_fields=tuple((col, col) for col in og_schema.names), - ) - else: - return executor.LocalExecuteResult( - data=iterator.to_arrow().select(og_schema.names), - bf_schema=plan.schema, - execution_metadata=execution_metadata, - ) - - -def _result_schema( - logical_schema: schemata.ArraySchema, sql_schema: list[bigquery.SchemaField] -) -> schemata.ArraySchema: - inferred_schema = bigframes.dtypes.bf_type_from_type_kind(sql_schema) - inferred_schema.update(logical_schema._mapping) - return schemata.ArraySchema( - tuple(schemata.SchemaItem(col, dtype) for col, dtype in inferred_schema.items()) - ) + return self._gbq_executor.execute(plan, execution_spec) def _is_schema_match( diff --git a/packages/bigframes/bigframes/session/direct_gbq_execution.py b/packages/bigframes/bigframes/session/direct_gbq_execution.py index 95cf4b204764..33e805855ba4 100644 --- a/packages/bigframes/bigframes/session/direct_gbq_execution.py +++ b/packages/bigframes/bigframes/session/direct_gbq_execution.py @@ -13,7 +13,7 @@ # limitations under the License. from __future__ import annotations -from typing import Literal, Optional, Tuple +from typing import Callable, Literal, Optional, Tuple import google.cloud.bigquery.job as bq_job import google.cloud.bigquery.table as bq_table @@ -22,77 +22,184 @@ import bigframes.core.compile.ibis_compiler.ibis_compiler as ibis_compiler import bigframes.core.compile.sqlglot.compiler as sqlglot_compiler import bigframes.core.events +import bigframes.session.metrics import bigframes.session._io.bigquery as bq_io -from bigframes.core import compile, nodes -from bigframes.session import executor, semi_executor +from bigframes.core import bq_data, compile, nodes +from bigframes.session import executor, semi_executor, execution_spec +from bigframes import exceptions as bfe +import bigframes.core.schema as schemata + +import google.api_core.exceptions + + +_WRITE_DISPOSITIONS = { + "fail": bigquery.WriteDisposition.WRITE_EMPTY, + "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, + "append": bigquery.WriteDisposition.WRITE_APPEND, +} -# used only in testing right now, BigQueryCachingExecutor is the fully featured engine -# simplified, doesnt not do large >10 gb result queries, error handling, respect global config -# or record metrics. Also avoids caching, and most pre-compile rewrites, to better serve as a -# reference for validating more complex executors. class DirectGbqExecutor(semi_executor.SemiExecutor): def __init__( self, bqclient: bigquery.Client, - compiler: Literal["ibis", "sqlglot"] = "ibis", + compiler: Literal["ibis", "sqlglot"] + | Callable[[compile.CompileRequest], executor.CompileResult] = "ibis", *, - publisher: bigframes.core.events.Publisher, + metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, + publisher: Optional[bigframes.core.events.Publisher] = None, ): self.bqclient = bqclient - self._compile_fn = ( - ibis_compiler.compile_sql - if compiler == "ibis" - else sqlglot_compiler.compile_sql - ) + if isinstance(compiler, str): + self._compile_fn = ( + ibis_compiler.compile_sql + if compiler == "ibis" + else sqlglot_compiler.compile_sql + ) + else: + self._compile_fn = compiler self._publisher = publisher + self._metrics = metrics def execute( self, plan: nodes.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + spec: execution_spec.ExecutionSpec, ) -> executor.ExecuteResult: """Just execute whatever plan as is, without further caching or decomposition.""" - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. - compiled = self._compile_fn( - compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek) - ) + og_schema = plan.schema + compile_request = compile.CompileRequest( + plan, + sort_rows=spec.ordered, + peek_count=spec.peek, + ) + + compiled = self._compile_fn(compile_request) + # might have more columns than og schema, for hidden ordering columns + compiled_schema = compiled.sql_schema + + job_config = bigquery.QueryJobConfig() + if isinstance(spec.destination_spec, execution_spec.TableOutputSpec): + job_config.destination = spec.destination_spec.table + job_config.write_disposition = _WRITE_DISPOSITIONS[spec.destination_spec.if_exists] + job_config.clustering_fields = spec.destination_spec.cluster_cols + elif isinstance(spec.destination_spec, execution_spec.TempTableSpec) and spec.destination_spec.lifetime == "ephemeral": + pass + elif spec.destination_spec is not None: + raise ValueError(f"Direct GBQ Executor does not support destination: {spec.destination_spec}") + job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs + can_skip_job = spec.destination_spec is None and spec.promise_under_10gb iterator, query_job = self._run_execute_query( sql=compiled.sql, + job_config=job_config, + query_with_job=(not can_skip_job), session=plan.session, ) - # just immediately downlaod everything for simplicity - return executor.LocalExecuteResult( - data=iterator.to_arrow(), - bf_schema=plan.schema, - execution_metadata=executor.ExecutionMetadata.from_iterator_and_job( - iterator, query_job - ), + cluster_cols = spec.destination_spec.cluster_cols if spec.desination_spec else () + result_bq_data = None + if query_job and query_job.destination: + # we might add extra sql columns in compilation, esp if caching w ordering, infer a bigframes type for them + result_bf_schema = _result_schema(og_schema, list(compiled.sql_schema)) + dst = query_job.destination + result_bq_data = bq_data.BigqueryDataSource( + table=bq_data.GbqNativeTable.from_ref_and_schema( + dst, + tuple(compiled_schema), + cluster_cols=cluster_cols, + location=iterator.location or self.bqclient.location, + table_type="TABLE", + ), + schema=result_bf_schema, + ordering=compiled.row_order, + n_rows=iterator.total_rows, + ) + + execution_metadata = executor.ExecutionMetadata.from_iterator_and_job( + iterator, query_job + ) + result_mostly_cached = ( + hasattr(iterator, "_is_almost_completely_cached") + and iterator._is_almost_completely_cached() ) + if result_bq_data is not None and not result_mostly_cached: + return executor.BQTableExecuteResult( + data=result_bq_data, + project_id=self.bqclient.project, + storage_client=self.bqstoragereadclient, + execution_metadata=execution_metadata, + selected_fields=tuple((col, col) for col in og_schema.names), + ) + else: + return executor.LocalExecuteResult( + data=iterator.to_arrow().select(og_schema.names), + bf_schema=plan.schema, + execution_metadata=execution_metadata, + ) def _run_execute_query( self, sql: str, job_config: Optional[bq_job.QueryJobConfig] = None, + query_with_job: bool = True, session=None, ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: """ Starts BigQuery query job and waits for results. """ - return bq_io.start_query_with_client( - self.bqclient, - sql, - job_config=job_config or bq_job.QueryJobConfig(), - project=None, - location=None, - timeout=None, - metrics=None, - query_with_job=False, - publisher=self._publisher, - session=session, - ) + job_config = bq_job.QueryJobConfig() if job_config is None else job_config + if bigframes.options.compute.maximum_bytes_billed is not None: + job_config.maximum_bytes_billed = ( + bigframes.options.compute.maximum_bytes_billed + ) + + if self._labels: + job_config.labels.update(self._labels) + + try: + # Trick the type checker into thinking we got a literal. + if query_with_job: + return bq_io.start_query_with_client( + self.bqclient, + sql, + job_config=job_config, + metrics=self._metrics, + project=None, + location=None, + timeout=None, + query_with_job=True, + publisher=self._publisher, + session=session, + ) + else: + return bq_io.start_query_with_client( + self.bqclient, + sql, + job_config=job_config, + metrics=self._metrics, + project=None, + location=None, + timeout=None, + query_with_job=False, + publisher=self._publisher, + session=session, + ) + + except google.api_core.exceptions.BadRequest as e: + # Unfortunately, this error type does not have a separate error code or exception type + if "Resources exceeded during query execution" in e.message: + new_message = "Computation is too complex to execute as a single query. Try using DataFrame.cache() on intermediate results, or setting bigframes.options.compute.enable_multi_query_execution." + raise bfe.QueryComplexityError(new_message) from e + else: + raise + +def _result_schema( + logical_schema: schemata.ArraySchema, sql_schema: list[bigquery.SchemaField] +) -> schemata.ArraySchema: + inferred_schema = bigframes.dtypes.bf_type_from_type_kind(sql_schema) + inferred_schema.update(logical_schema._mapping) + return schemata.ArraySchema( + tuple(schemata.SchemaItem(col, dtype) for col, dtype in inferred_schema.items()) + ) diff --git a/packages/bigframes/bigframes/session/execution_spec.py b/packages/bigframes/bigframes/session/execution_spec.py index c9431dbd1168..b07ff715fc1b 100644 --- a/packages/bigframes/bigframes/session/execution_spec.py +++ b/packages/bigframes/bigframes/session/execution_spec.py @@ -22,24 +22,43 @@ @dataclasses.dataclass(frozen=True) class ExecutionSpec: - destination_spec: Union[TableOutputSpec, GcsOutputSpec, CacheSpec, None] = None + # destination for the result of the operation. Executor may also incidentally create other temporary tables for its own purposes. + destination_spec: Union[TableOutputSpec, GcsOutputSpec, TempTableSpec, None] = None + # If set, the result will be truncated to the given number of rows. Which N rows is + # implementation dependent and not stable. peek: Optional[int] = None - ordered: bool = ( - False # ordered and promise_under_10gb must both be together for bq execution - ) + # Controls whether output iterator is ordered. Cannot be true if destination is not + # guaranteed to be ordered. + ordered: bool = False # This is an optimization flag for gbq execution, it doesn't change semantics, but if promise is falsely made, errors may occur promise_under_10gb: bool = False -# This one is temporary, in future, caching will not be done through immediate execution, but will label nodes -# that will be cached only when a super-tree is executed +# Used internally by execution @dataclasses.dataclass(frozen=True) -class CacheSpec: - cluster_cols: tuple[str, ...] +class TempTableSpec: + """ + Specifies that the result of an operation should be a session temp table. + The table will be automatically deleted after the session ends. + """ + cluster_cols: tuple[str, ...] # if empty, will cluster using order key if ordering_key is set + lifetime: Literal["session", "ephemeral"] = "session" + # Controls ordering and whether extra columns are materialized to preserve ordering + # Any extra columns will be appended to the end of the schema. + # None: ordering may be discarded entirely (ordering metadata will still be provided if ordering is derivable from materialized columns) + # order_rows: the result iterator itself will be ordered. For gbq execution, result cannot exceed 10GB. + # order_key: the result set ordered by a key, may materialize extra columns. + # offsets_col: order the result set by an offsets column, materializes one extra column. + ordering: Literal["order_rows", "offsets_col", "order_key"] | None = None @dataclasses.dataclass(frozen=True) class TableOutputSpec: + """ + Specifies that the result of an operation should be exported to a specific named table. + + The executor is not responsible for managing lifecycle of the table. + """ table: bigquery.TableReference cluster_cols: tuple[str, ...] if_exists: Literal["fail", "replace", "append"] = "fail" diff --git a/packages/bigframes/bigframes/session/local_scan_executor.py b/packages/bigframes/bigframes/session/local_scan_executor.py index fee0f557ea76..2843bf598ccb 100644 --- a/packages/bigframes/bigframes/session/local_scan_executor.py +++ b/packages/bigframes/bigframes/session/local_scan_executor.py @@ -16,7 +16,7 @@ from typing import Optional from bigframes.core import bigframe_node, rewrite -from bigframes.session import executor, semi_executor +from bigframes.session import executor, semi_executor, execution_spec class LocalScanExecutor(semi_executor.SemiExecutor): @@ -27,15 +27,17 @@ class LocalScanExecutor(semi_executor.SemiExecutor): def execute( self, plan: bigframe_node.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + execution_spec: execution_spec.ExecutionSpec, ) -> Optional[executor.ExecuteResult]: + if execution_spec.destination_spec is not None: + return None + reduced_result = rewrite.try_reduce_to_local_scan(plan) if not reduced_result: return None node, limit = reduced_result - + peek = execution_spec.peek if limit is not None: if peek is None or limit < peek: peek = limit diff --git a/packages/bigframes/bigframes/session/polars_executor.py b/packages/bigframes/bigframes/session/polars_executor.py index 06c7fcb925c4..3728f488ff7e 100644 --- a/packages/bigframes/bigframes/session/polars_executor.py +++ b/packages/bigframes/bigframes/session/polars_executor.py @@ -34,7 +34,7 @@ numeric_ops, string_ops, ) -from bigframes.session import executor, semi_executor +from bigframes.session import executor, semi_executor, execution_spec if TYPE_CHECKING: import polars as pl @@ -140,20 +140,20 @@ def __init__(self): def execute( self, plan: bigframe_node.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + execution_spec: execution_spec.ExecutionSpec, ) -> Optional[executor.ExecuteResult]: if not self._can_execute(plan): return None - # Note: Ignoring ordered flag, as just executing totally ordered is fine. + if execution_spec.destination_spec is not None: + return None try: lazy_frame: pl.LazyFrame = self._compiler.compile( array_value.ArrayValue(plan).node ) except Exception: return None - if peek is not None: - lazy_frame = lazy_frame.limit(peek) + if execution_spec.peek is not None: + lazy_frame = lazy_frame.limit(execution_spec.peek) pa_table = lazy_frame.collect().to_arrow() return executor.LocalExecuteResult( data=pa_table, diff --git a/packages/bigframes/bigframes/session/read_api_execution.py b/packages/bigframes/bigframes/session/read_api_execution.py index 9f2d196ce8eb..76bf29dc29a1 100644 --- a/packages/bigframes/bigframes/session/read_api_execution.py +++ b/packages/bigframes/bigframes/session/read_api_execution.py @@ -18,7 +18,7 @@ from google.cloud import bigquery_storage_v1 from bigframes.core import bigframe_node, bq_data, nodes, rewrite -from bigframes.session import executor, semi_executor +from bigframes.session import executor, semi_executor, execution_spec class ReadApiSemiExecutor(semi_executor.SemiExecutor): @@ -37,14 +37,16 @@ def __init__( def execute( self, plan: bigframe_node.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + execution_spec: execution_spec.ExecutionSpec, ) -> Optional[executor.ExecuteResult]: - adapt_result = self._try_adapt_plan(plan, ordered) + if execution_spec.destination_spec is not None: + return None + + adapt_result = self._try_adapt_plan(plan, execution_spec.ordered) if not adapt_result: return None node, limit = adapt_result - if node.explicitly_ordered and ordered: + if node.explicitly_ordered and execution_spec.ordered: return None if not isinstance(node.source.table, bq_data.GbqNativeTable): @@ -53,6 +55,7 @@ def execute( if not node.source.table.is_physically_stored: return None + peek = execution_spec.peek if limit is not None: if peek is None or limit < peek: peek = limit diff --git a/packages/bigframes/bigframes/session/semi_executor.py b/packages/bigframes/bigframes/session/semi_executor.py index c41d7c96d3e9..6f110834fe91 100644 --- a/packages/bigframes/bigframes/session/semi_executor.py +++ b/packages/bigframes/bigframes/session/semi_executor.py @@ -15,7 +15,7 @@ from typing import Optional from bigframes.core import bigframe_node -from bigframes.session import executor +from bigframes.session import executor, execution_spec # Unstable interface, in development @@ -27,7 +27,6 @@ class SemiExecutor(abc.ABC): def execute( self, plan: bigframe_node.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + execution_spec: execution_spec.ExecutionSpec, ) -> Optional[executor.ExecuteResult]: raise NotImplementedError("execute not implemented for this executor") diff --git a/packages/bigframes/tests/system/small/test_session.py b/packages/bigframes/tests/system/small/test_session.py index e4101d4e941b..8955a878e014 100644 --- a/packages/bigframes/tests/system/small/test_session.py +++ b/packages/bigframes/tests/system/small/test_session.py @@ -118,7 +118,7 @@ def test_read_gbq_tokyo( exec_result = session_tokyo._executor.execute( df._block.expr, bigframes.session.execution_spec.ExecutionSpec( - bigframes.session.execution_spec.CacheSpec(()), promise_under_10gb=False + bigframes.session.execution_spec.TempTableSpec(()), promise_under_10gb=False ), ) assert exec_result.query_job is not None @@ -951,7 +951,7 @@ def test_read_pandas_tokyo( result = session_tokyo._executor.execute( df._block.expr, bigframes.session.execution_spec.ExecutionSpec( - bigframes.session.execution_spec.CacheSpec(()), promise_under_10gb=False + bigframes.session.execution_spec.TempTableSpec(()), promise_under_10gb=False ), ) assert result.query_job is not None From 2907468ce414230ec1decd9730cfd92f9ac951aa Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 23 Apr 2026 00:04:12 +0000 Subject: [PATCH 2/4] fix bugs --- .../bigframes/session/bq_caching_executor.py | 40 ++++++--- .../bigframes/session/direct_gbq_execution.py | 89 +++++++++---------- .../bigframes/session/execution_spec.py | 8 +- 3 files changed, 77 insertions(+), 60 deletions(-) diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index b9ca1a2a4299..89714c5a1200 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -87,10 +87,8 @@ def __init__( self.cache: execution_cache.ExecutionCache = execution_cache.ExecutionCache() self.metrics = metrics self.loader = loader - self.bqstoragereadclient = bqstoragereadclient self._enable_polars_execution = enable_polars_execution self._publisher = publisher - self._labels = labels # TODO(tswast): Send events from semi-executors, too. self._semi_executors: Sequence[semi_executor.SemiExecutor] = ( @@ -109,7 +107,12 @@ def __init__( ) self._upload_lock = threading.Lock() self._gbq_executor = direct_gbq_execution.DirectGbqExecutor( - bqclient, compile.compiler, metrics=self.metrics, publisher=self._publisher + bqclient, + compiler=compile.compiler, + bqstoragereadclient=bqstoragereadclient, + metrics=self.metrics, + publisher=self._publisher, + labels=labels, ) def to_sql( @@ -132,7 +135,6 @@ def to_sql( ) return compiled.sql - def execute( self, array_value: bigframes.core.ArrayValue, @@ -167,7 +169,6 @@ def _try_execute_semi_executors( return maybe_result return None - def _execute_bigquery( self, array_value: bigframes.core.ArrayValue, @@ -177,7 +178,10 @@ def _execute_bigquery( # Recursive handlers for different cases, maybe extract to explicit interface. if isinstance(dest_spec, ex_spec.GcsOutputSpec): execution_spec = dataclasses.replace( - execution_spec, destination_spec=ex_spec.TempTableSpec(cluster_cols=dest_spec.cluster_cols, lifetime="ephemeral") + execution_spec, + destination_spec=ex_spec.TempTableSpec( + cluster_cols=dest_spec.cluster_cols, lifetime="ephemeral" + ), ) results = self._execute_bigquery(array_value, execution_spec) self._export_result_gcs(results, dest_spec) @@ -191,7 +195,11 @@ def _execute_bigquery( existing_table.schema, array_value.schema ): execution_spec = dataclasses.replace( - execution_spec, destination_spec=ex_spec.TempTableSpec(cluster_cols=execution_spec.destination_spec.cluster_cols, lifetime="ephemeral") + execution_spec, + destination_spec=ex_spec.TempTableSpec( + cluster_cols=execution_spec.destination_spec.cluster_cols, + lifetime="ephemeral", + ), ) results = self._execute_bigquery(array_value, execution_spec) self._export_gbq_with_dml(results, dest_spec) @@ -213,10 +221,14 @@ def _execute_bigquery( ) arr_value = bigframes.core.ArrayValue(plan) execution_spec = dataclasses.replace( - execution_spec, destination_spec=ex_spec.TableOutputSpec(table=destination_table, cluster_cols=dest_spec.cluster_cols, if_exists="replace") + execution_spec, + destination_spec=ex_spec.TableOutputSpec( + table=destination_table, + cluster_cols=dest_spec.cluster_cols, + if_exists="replace", + ), ) return self._execute_bigquery(arr_value, execution_spec) - # At this point, dst should be unspecified, a specific bq table, or an ephemeral temp table # Also, ordering mode will either be none or row-sorted @@ -393,7 +405,11 @@ def _cache_with_cluster_cols( ] cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS] execution_spec = ex_spec.ExecutionSpec( - destination_spec=ex_spec.TempTableSpec(cluster_cols=tuple(cluster_cols), lifetime="session", ordering="order_key") + destination_spec=ex_spec.TempTableSpec( + cluster_cols=tuple(cluster_cols), + lifetime="session", + ordering="order_key", + ) ) result_bq_data = self.execute( array_value, @@ -405,7 +421,9 @@ def _cache_with_cluster_cols( def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): """Executes the query and uses the resulting table to rewrite future executions.""" execution_spec = ex_spec.ExecutionSpec( - destination_spec=ex_spec.TempTableSpec(cluster_cols=(), lifetime="session", ordering="offsets_col") + destination_spec=ex_spec.TempTableSpec( + cluster_cols=(), lifetime="session", ordering="offsets_col" + ) ) result_bq_data = self.execute( array_value, diff --git a/packages/bigframes/bigframes/session/direct_gbq_execution.py b/packages/bigframes/bigframes/session/direct_gbq_execution.py index 33e805855ba4..df03d15ef292 100644 --- a/packages/bigframes/bigframes/session/direct_gbq_execution.py +++ b/packages/bigframes/bigframes/session/direct_gbq_execution.py @@ -26,8 +26,10 @@ import bigframes.session._io.bigquery as bq_io from bigframes.core import bq_data, compile, nodes from bigframes.session import executor, semi_executor, execution_spec +from bigframes.core.compile.configs import CompileRequest, CompileResult from bigframes import exceptions as bfe import bigframes.core.schema as schemata +import google.cloud.bigquery_storage_v1 import google.api_core.exceptions @@ -43,11 +45,13 @@ class DirectGbqExecutor(semi_executor.SemiExecutor): def __init__( self, bqclient: bigquery.Client, - compiler: Literal["ibis", "sqlglot"] - | Callable[[compile.CompileRequest], executor.CompileResult] = "ibis", + bqstoragereadclient: google.cloud.bigquery_storage_v1.BigQueryReadClient, *, + compiler: Literal["ibis", "sqlglot"] + | Callable[[CompileRequest], CompileResult] = "ibis", metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, publisher: Optional[bigframes.core.events.Publisher] = None, + labels: Mapping[str, str] = {}, ): self.bqclient = bqclient if isinstance(compiler, str): @@ -58,8 +62,10 @@ def __init__( ) else: self._compile_fn = compiler + self._bqstoragereadclient = bqstoragereadclient self._publisher = publisher self._metrics = metrics + self._labels = labels def execute( self, @@ -69,36 +75,43 @@ def execute( """Just execute whatever plan as is, without further caching or decomposition.""" og_schema = plan.schema - compile_request = compile.CompileRequest( - plan, - sort_rows=spec.ordered, - peek_count=spec.peek, - ) + compile_request = CompileRequest( + plan, + sort_rows=spec.ordered, + peek_count=spec.peek, + ) compiled = self._compile_fn(compile_request) # might have more columns than og schema, for hidden ordering columns compiled_schema = compiled.sql_schema job_config = bigquery.QueryJobConfig() - if isinstance(spec.destination_spec, execution_spec.TableOutputSpec): - job_config.destination = spec.destination_spec.table - job_config.write_disposition = _WRITE_DISPOSITIONS[spec.destination_spec.if_exists] - job_config.clustering_fields = spec.destination_spec.cluster_cols - elif isinstance(spec.destination_spec, execution_spec.TempTableSpec) and spec.destination_spec.lifetime == "ephemeral": - pass - elif spec.destination_spec is not None: - raise ValueError(f"Direct GBQ Executor does not support destination: {spec.destination_spec}") + dest_spec = spec.destination_spec + cluster_cols = () + if isinstance(dest_spec, execution_spec.TableOutputSpec): + job_config.destination = dest_spec.table + job_config.write_disposition = _WRITE_DISPOSITIONS[dest_spec.if_exists] + cluster_cols = dest_spec.cluster_cols + job_config.clustering_fields = dest_spec.cluster_cols + elif ( + isinstance(dest_spec, execution_spec.TempTableSpec) + and dest_spec.lifetime == "ephemeral" + ): + cluster_cols = dest_spec.cluster_cols + job_config.clustering_fields = dest_spec.cluster_cols + elif dest_spec is not None: + raise ValueError( + f"Direct GBQ Executor does not support destination: {dest_spec}" + ) job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs - can_skip_job = spec.destination_spec is None and spec.promise_under_10gb + can_skip_job = dest_spec is None and spec.promise_under_10gb iterator, query_job = self._run_execute_query( sql=compiled.sql, job_config=job_config, query_with_job=(not can_skip_job), session=plan.session, ) - - cluster_cols = spec.destination_spec.cluster_cols if spec.desination_spec else () result_bq_data = None if query_job and query_job.destination: # we might add extra sql columns in compilation, esp if caching w ordering, infer a bigframes type for them @@ -128,7 +141,7 @@ def execute( return executor.BQTableExecuteResult( data=result_bq_data, project_id=self.bqclient.project, - storage_client=self.bqstoragereadclient, + storage_client=self._bqstoragereadclient, execution_metadata=execution_metadata, selected_fields=tuple((col, col) for col in og_schema.names), ) @@ -159,34 +172,15 @@ def _run_execute_query( job_config.labels.update(self._labels) try: - # Trick the type checker into thinking we got a literal. - if query_with_job: - return bq_io.start_query_with_client( - self.bqclient, - sql, - job_config=job_config, - metrics=self._metrics, - project=None, - location=None, - timeout=None, - query_with_job=True, - publisher=self._publisher, - session=session, - ) - else: - return bq_io.start_query_with_client( - self.bqclient, - sql, - job_config=job_config, - metrics=self._metrics, - project=None, - location=None, - timeout=None, - query_with_job=False, - publisher=self._publisher, - session=session, - ) - + return bq_io.start_query_with_client( + self.bqclient, + sql, + job_config=job_config, + metrics=self._metrics, + query_with_job=query_with_job, + publisher=self._publisher, + session=session, + ) except google.api_core.exceptions.BadRequest as e: # Unfortunately, this error type does not have a separate error code or exception type if "Resources exceeded during query execution" in e.message: @@ -195,6 +189,7 @@ def _run_execute_query( else: raise + def _result_schema( logical_schema: schemata.ArraySchema, sql_schema: list[bigquery.SchemaField] ) -> schemata.ArraySchema: diff --git a/packages/bigframes/bigframes/session/execution_spec.py b/packages/bigframes/bigframes/session/execution_spec.py index b07ff715fc1b..a69dae557d27 100644 --- a/packages/bigframes/bigframes/session/execution_spec.py +++ b/packages/bigframes/bigframes/session/execution_spec.py @@ -28,7 +28,7 @@ class ExecutionSpec: # implementation dependent and not stable. peek: Optional[int] = None # Controls whether output iterator is ordered. Cannot be true if destination is not - # guaranteed to be ordered. + # guaranteed to be ordered. ordered: bool = False # This is an optimization flag for gbq execution, it doesn't change semantics, but if promise is falsely made, errors may occur promise_under_10gb: bool = False @@ -41,7 +41,10 @@ class TempTableSpec: Specifies that the result of an operation should be a session temp table. The table will be automatically deleted after the session ends. """ - cluster_cols: tuple[str, ...] # if empty, will cluster using order key if ordering_key is set + + cluster_cols: tuple[ + str, ... + ] # if empty, will cluster using order key if ordering_key is set lifetime: Literal["session", "ephemeral"] = "session" # Controls ordering and whether extra columns are materialized to preserve ordering # Any extra columns will be appended to the end of the schema. @@ -59,6 +62,7 @@ class TableOutputSpec: The executor is not responsible for managing lifecycle of the table. """ + table: bigquery.TableReference cluster_cols: tuple[str, ...] if_exists: Literal["fail", "replace", "append"] = "fail" From b63b00a7dc37bef7ba36eda637b583a3fd26fc95 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 23 Apr 2026 02:02:13 +0000 Subject: [PATCH 3/4] more fixes --- .../bigframes/session/bq_caching_executor.py | 2 +- .../bigframes/bigframes/testing/engine_utils.py | 11 ++++++++--- packages/bigframes/tests/system/conftest.py | 8 ++++++++ .../tests/system/small/engines/conftest.py | 16 +++++++++++++--- .../unit/session/test_local_scan_executor.py | 10 +++++++--- 5 files changed, 37 insertions(+), 10 deletions(-) diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index 89714c5a1200..cf24e2faf932 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -108,7 +108,7 @@ def __init__( self._upload_lock = threading.Lock() self._gbq_executor = direct_gbq_execution.DirectGbqExecutor( bqclient, - compiler=compile.compiler, + compiler=compile.compiler().compile_sql, bqstoragereadclient=bqstoragereadclient, metrics=self.metrics, publisher=self._publisher, diff --git a/packages/bigframes/bigframes/testing/engine_utils.py b/packages/bigframes/bigframes/testing/engine_utils.py index edb68c3a9b0a..456d6b463840 100644 --- a/packages/bigframes/bigframes/testing/engine_utils.py +++ b/packages/bigframes/bigframes/testing/engine_utils.py @@ -15,7 +15,12 @@ import pandas.testing from bigframes.core import nodes -from bigframes.session import semi_executor +from bigframes.session import semi_executor, execution_spec + + +SPEC = execution_spec.ExecutionSpec( + ordered=True, +) def assert_equivalence_execution( @@ -23,8 +28,8 @@ def assert_equivalence_execution( engine1: semi_executor.SemiExecutor, engine2: semi_executor.SemiExecutor, ): - e1_result = engine1.execute(node, ordered=True) - e2_result = engine2.execute(node, ordered=True) + e1_result = engine1.execute(node, SPEC) + e2_result = engine2.execute(node, SPEC) assert e1_result is not None assert e2_result is not None # Convert to pandas, as pandas has better comparison utils than arrow diff --git a/packages/bigframes/tests/system/conftest.py b/packages/bigframes/tests/system/conftest.py index 361d9387bc77..f7565d32d29f 100644 --- a/packages/bigframes/tests/system/conftest.py +++ b/packages/bigframes/tests/system/conftest.py @@ -27,6 +27,7 @@ import google.cloud.bigquery_connection_v1 as bigquery_connection_v1 import google.cloud.exceptions import google.cloud.functions_v2 as functions_v2 +import google.cloud.bigquery_storage_v1 import google.cloud.resourcemanager_v3 as resourcemanager_v3 import google.cloud.storage as storage # type: ignore import numpy as np @@ -103,6 +104,13 @@ def bigquery_client(session: bigframes.Session) -> bigquery.Client: return session.bqclient +@pytest.fixture(scope="session") +def bigquery_storage_read_client( + session: bigframes.Session, +) -> google.cloud.bigquery_storage_v1.BigQueryReadClient: + return session.bqstoragereadclient + + @pytest.fixture(scope="session") def bigquery_client_tokyo(session_tokyo: bigframes.Session) -> bigquery.Client: return session_tokyo.bqclient diff --git a/packages/bigframes/tests/system/small/engines/conftest.py b/packages/bigframes/tests/system/small/engines/conftest.py index cea505dd28a6..b195e48fc6dd 100644 --- a/packages/bigframes/tests/system/small/engines/conftest.py +++ b/packages/bigframes/tests/system/small/engines/conftest.py @@ -19,6 +19,7 @@ from google.cloud import bigquery import bigframes +import google.cloud.bigquery_storage_v1 from bigframes.core import ArrayValue, events, local_data from bigframes.session import ( direct_gbq_execution, @@ -45,7 +46,11 @@ def fake_session() -> Generator[bigframes.Session, None, None]: @pytest.fixture(scope="session", params=["pyarrow", "polars", "bq", "bq-sqlglot"]) -def engine(request, bigquery_client: bigquery.Client) -> semi_executor.SemiExecutor: +def engine( + request, + bigquery_client: bigquery.Client, + bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient, +) -> semi_executor.SemiExecutor: if request.param == "pyarrow": return local_scan_executor.LocalScanExecutor() if request.param == "polars": @@ -53,11 +58,16 @@ def engine(request, bigquery_client: bigquery.Client) -> semi_executor.SemiExecu publisher = events.Publisher() if request.param == "bq": return direct_gbq_execution.DirectGbqExecutor( - bigquery_client, publisher=publisher + bigquery_client, + bqstoragereadclient=bigquery_storage_read_client, + publisher=publisher, ) if request.param == "bq-sqlglot": return direct_gbq_execution.DirectGbqExecutor( - bigquery_client, compiler="sqlglot", publisher=publisher + bigquery_client, + bqstoragereadclient=bigquery_storage_read_client, + compiler="sqlglot", + publisher=publisher, ) raise ValueError(f"Unrecognized param: {request.param}") diff --git a/packages/bigframes/tests/unit/session/test_local_scan_executor.py b/packages/bigframes/tests/unit/session/test_local_scan_executor.py index fc59253153b6..a76cdcd49a67 100644 --- a/packages/bigframes/tests/unit/session/test_local_scan_executor.py +++ b/packages/bigframes/tests/unit/session/test_local_scan_executor.py @@ -17,9 +17,13 @@ import pytest from bigframes.core import identifiers, local_data, nodes -from bigframes.session import local_scan_executor +from bigframes.session import local_scan_executor, execution_spec from bigframes.testing import mocks +SPEC = execution_spec.ExecutionSpec( + ordered=True, +) + @pytest.fixture def object_under_test(): @@ -72,7 +76,7 @@ def test_local_scan_executor_with_slice(start, stop, expected_rows, object_under stop=stop, ) - result = object_under_test.execute(plan, ordered=True) + result = object_under_test.execute(plan, SPEC) result_table = pyarrow.Table.from_batches(result.batches().arrow_batches) assert result_table.num_rows == expected_rows @@ -98,4 +102,4 @@ def test_local_scan_executor_with_slice_unsupported_inputs( stop=stop, step=step, ) - assert object_under_test.execute(plan, ordered=True) is None + assert object_under_test.execute(plan, SPEC) is None From 1f5ec8fe56685aa2e722bbfb9d1436416a4079ad Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 23 Apr 2026 22:51:34 +0000 Subject: [PATCH 4/4] fix issues --- .../bigframes/core/rewrite/__init__.py | 3 +- .../bigframes/bigframes/core/rewrite/order.py | 6 ++ .../bigframes/session/bq_caching_executor.py | 82 +++++++++---------- .../bigframes/session/direct_gbq_execution.py | 27 +++--- .../bigframes/session/execution_spec.py | 24 +++++- .../tests/system/small/engines/conftest.py | 34 ++++---- .../system/small/engines/test_windowing.py | 13 +-- .../tests/system/small/test_session.py | 4 +- 8 files changed, 108 insertions(+), 85 deletions(-) diff --git a/packages/bigframes/bigframes/core/rewrite/__init__.py b/packages/bigframes/bigframes/core/rewrite/__init__.py index ab5559ab6554..ae4b142b1a46 100644 --- a/packages/bigframes/bigframes/core/rewrite/__init__.py +++ b/packages/bigframes/bigframes/core/rewrite/__init__.py @@ -19,7 +19,7 @@ from bigframes.core.rewrite.implicit_align import try_row_join from bigframes.core.rewrite.legacy_align import legacy_join_as_projection from bigframes.core.rewrite.nullity import simplify_join -from bigframes.core.rewrite.order import bake_order, defer_order +from bigframes.core.rewrite.order import bake_order, defer_order, pull_out_order from bigframes.core.rewrite.pruning import column_pruning from bigframes.core.rewrite.scan_reduction import ( try_reduce_to_local_scan, @@ -50,6 +50,7 @@ "rewrite_range_rolling", "try_reduce_to_table_scan", "bake_order", + "pull_out_order", "try_reduce_to_local_scan", "fold_row_counts", "pull_out_window_order", diff --git a/packages/bigframes/bigframes/core/rewrite/order.py b/packages/bigframes/bigframes/core/rewrite/order.py index 7beb510ac6b8..e28a9efee4ce 100644 --- a/packages/bigframes/bigframes/core/rewrite/order.py +++ b/packages/bigframes/bigframes/core/rewrite/order.py @@ -47,6 +47,12 @@ def bake_order( return node +def pull_out_order( + node: bigframes.core.nodes.BigFrameNode, +) -> Tuple[bigframes.core.nodes.BigFrameNode, bigframes.core.ordering.RowOrdering]: + return _pull_up_order(node, order_root=False) + + # Makes ordering explicit in window definitions def _pull_up_order( root: bigframes.core.nodes.BigFrameNode, diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index cf24e2faf932..ee7ce7951129 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -40,7 +40,7 @@ import bigframes.session.metrics import bigframes.session.planner import bigframes.session.temporary_storage -from bigframes.core import compile, guid, local_data, rewrite +from bigframes.core import bq_data, compile, guid, local_data, rewrite from bigframes.core.compile.sqlglot import sql as sg_sql from bigframes.core.compile.sqlglot import sqlglot_ir from bigframes.session import ( @@ -176,59 +176,61 @@ def _execute_bigquery( ) -> executor.ExecuteResult: dest_spec = execution_spec.destination_spec # Recursive handlers for different cases, maybe extract to explicit interface. + if ( + isinstance(dest_spec, ex_spec.EphemeralTableSpec) + and not execution_spec.promise_under_10gb + ): + # Results over 10GB need to explicitly allocate a table. + execution_spec = dataclasses.replace( + execution_spec, destination_spec=ex_spec.SessionTableSpec() + ) + return self._execute_bigquery(array_value, execution_spec) if isinstance(dest_spec, ex_spec.GcsOutputSpec): execution_spec = dataclasses.replace( - execution_spec, - destination_spec=ex_spec.TempTableSpec( - cluster_cols=dest_spec.cluster_cols, lifetime="ephemeral" - ), + execution_spec, destination_spec=ex_spec.EphemeralTableSpec() ) results = self._execute_bigquery(array_value, execution_spec) self._export_result_gcs(results, dest_spec) return results - if isinstance(dest_spec, ex_spec.TableOutputSpec): + if isinstance(dest_spec, ex_spec.TableOutputSpec) and dest_spec.permit_dml: # Special DML path - maybe this should be configurable, dml vs query destination has tradeoffs existing_table = self._maybe_find_existing_table(dest_spec) - if execution_spec.ordered: - raise ValueError("Ordering not supported with table outputs") if (existing_table is not None) and _is_schema_match( existing_table.schema, array_value.schema ): execution_spec = dataclasses.replace( - execution_spec, - destination_spec=ex_spec.TempTableSpec( - cluster_cols=execution_spec.destination_spec.cluster_cols, - lifetime="ephemeral", - ), + execution_spec, destination_spec=ex_spec.EphemeralTableSpec() ) results = self._execute_bigquery(array_value, execution_spec) self._export_gbq_with_dml(results, dest_spec) return results - if isinstance(dest_spec, ex_spec.TempTableSpec): + if isinstance(dest_spec, ex_spec.SessionTableSpec): # "ephemeral" temp tables created in the course of exeuction, don't need to be allocated # materialized ordering only really makes sense for internal temp tables used by caching cluster_cols = dest_spec.cluster_cols + # Rewrite plan to materialize ordering as extra columns plan = array_value.node if dest_spec.ordering == "offsets_col": order_col_id = guid.generate_guid() plan = nodes.PromoteOffsetsNode(plan, order_col_id) cluster_cols = [order_col_id] elif dest_spec.ordering == "order_key": - plan = nodes.defer_order(plan, output_hidden_row_keys=True) - if dest_spec.lifetime == "session": - destination_table = self.storage_manager.create_temp_table( - plan.schema, cluster_cols - ) - arr_value = bigframes.core.ArrayValue(plan) - execution_spec = dataclasses.replace( - execution_spec, - destination_spec=ex_spec.TableOutputSpec( - table=destination_table, - cluster_cols=dest_spec.cluster_cols, - if_exists="replace", - ), - ) - return self._execute_bigquery(arr_value, execution_spec) + plan, _ = rewrite.pull_out_order(plan) + destination_table = self.storage_manager.create_temp_table( + plan.schema.to_bigquery(), cluster_cols + ) + arr_value = bigframes.core.ArrayValue(plan) + execution_spec = dataclasses.replace( + execution_spec, + destination_spec=ex_spec.TableOutputSpec( + table=destination_table, + cluster_cols=dest_spec.cluster_cols, + if_exists="replace", + # Avoid loops, also dml is mostly used to avoid quotas on user-owned tables + permit_dml=False, + ), + ) + return self._execute_bigquery(arr_value, execution_spec) # At this point, dst should be unspecified, a specific bq table, or an ephemeral temp table # Also, ordering mode will either be none or row-sorted @@ -405,32 +407,28 @@ def _cache_with_cluster_cols( ] cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS] execution_spec = ex_spec.ExecutionSpec( - destination_spec=ex_spec.TempTableSpec( - cluster_cols=tuple(cluster_cols), - lifetime="session", - ordering="order_key", - ) + destination_spec=ex_spec.SessionTableSpec(cluster_cols=tuple(cluster_cols)) ) - result_bq_data = self.execute( + result = self.execute( array_value, execution_spec=execution_spec, ) - assert isinstance(result_bq_data, bigframes.core.BigqueryDataSource) - self.cache.cache_results_table(array_value.node, result_bq_data) + assert isinstance(result, executor.BQTableExecuteResult) + self.cache.cache_results_table(array_value.node, result._data) def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): """Executes the query and uses the resulting table to rewrite future executions.""" execution_spec = ex_spec.ExecutionSpec( - destination_spec=ex_spec.TempTableSpec( - cluster_cols=(), lifetime="session", ordering="offsets_col" + destination_spec=ex_spec.SessionTableSpec( + cluster_cols=(), ordering="offsets_col" ) ) - result_bq_data = self.execute( + result = self.execute( array_value, execution_spec=execution_spec, ) - assert isinstance(result_bq_data, bigframes.core.BigqueryDataSource) - self.cache.cache_results_table(array_value.node, result_bq_data) + assert isinstance(result, executor.BQTableExecuteResult) + self.cache.cache_results_table(array_value.node, result._data) def _cache_with_session_awareness( self, diff --git a/packages/bigframes/bigframes/session/direct_gbq_execution.py b/packages/bigframes/bigframes/session/direct_gbq_execution.py index df03d15ef292..e50e120b91e3 100644 --- a/packages/bigframes/bigframes/session/direct_gbq_execution.py +++ b/packages/bigframes/bigframes/session/direct_gbq_execution.py @@ -87,25 +87,25 @@ def execute( job_config = bigquery.QueryJobConfig() dest_spec = spec.destination_spec - cluster_cols = () + cluster_cols = None + can_skip_job = True if isinstance(dest_spec, execution_spec.TableOutputSpec): + if spec.ordered: + raise ValueError("Ordering not supported with destination table") job_config.destination = dest_spec.table job_config.write_disposition = _WRITE_DISPOSITIONS[dest_spec.if_exists] - cluster_cols = dest_spec.cluster_cols - job_config.clustering_fields = dest_spec.cluster_cols - elif ( - isinstance(dest_spec, execution_spec.TempTableSpec) - and dest_spec.lifetime == "ephemeral" - ): - cluster_cols = dest_spec.cluster_cols - job_config.clustering_fields = dest_spec.cluster_cols + cluster_cols = dest_spec.cluster_cols if dest_spec.cluster_cols else None + job_config.clustering_fields = cluster_cols + can_skip_job = False + elif isinstance(dest_spec, execution_spec.EphemeralTableSpec): + # Need destination table, but jobless execution might not create a destination table + can_skip_job = False elif dest_spec is not None: raise ValueError( f"Direct GBQ Executor does not support destination: {dest_spec}" ) job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs - can_skip_job = dest_spec is None and spec.promise_under_10gb iterator, query_job = self._run_execute_query( sql=compiled.sql, job_config=job_config, @@ -121,7 +121,7 @@ def execute( table=bq_data.GbqNativeTable.from_ref_and_schema( dst, tuple(compiled_schema), - cluster_cols=cluster_cols, + cluster_cols=cluster_cols or (), location=iterator.location or self.bqclient.location, table_type="TABLE", ), @@ -137,7 +137,10 @@ def execute( hasattr(iterator, "_is_almost_completely_cached") and iterator._is_almost_completely_cached() ) - if result_bq_data is not None and not result_mostly_cached: + + if (isinstance(dest_spec, execution_spec.EphemeralTableSpec)) or ( + result_bq_data is not None and not result_mostly_cached + ): return executor.BQTableExecuteResult( data=result_bq_data, project_id=self.bqclient.project, diff --git a/packages/bigframes/bigframes/session/execution_spec.py b/packages/bigframes/bigframes/session/execution_spec.py index a69dae557d27..69d8d0c184fc 100644 --- a/packages/bigframes/bigframes/session/execution_spec.py +++ b/packages/bigframes/bigframes/session/execution_spec.py @@ -23,7 +23,9 @@ @dataclasses.dataclass(frozen=True) class ExecutionSpec: # destination for the result of the operation. Executor may also incidentally create other temporary tables for its own purposes. - destination_spec: Union[TableOutputSpec, GcsOutputSpec, TempTableSpec, None] = None + destination_spec: Union[ + TableOutputSpec, GcsOutputSpec, EphemeralTableSpec, SessionTableSpec, None + ] = None # If set, the result will be truncated to the given number of rows. Which N rows is # implementation dependent and not stable. peek: Optional[int] = None @@ -36,7 +38,20 @@ class ExecutionSpec: # Used internally by execution @dataclasses.dataclass(frozen=True) -class TempTableSpec: +class EphemeralTableSpec: + """ + Specifies that the result of an operation should be a temporary table of some sort. + + No guarantees on lifetime, may be a session temp table, or a bq-created temp table with <24hr life. + + Used internally when results need temporary staging, because they are large (>10GB), or needed in subsequent operations. + """ + + pass + + +@dataclasses.dataclass(frozen=True) +class SessionTableSpec: """ Specifies that the result of an operation should be a session temp table. The table will be automatically deleted after the session ends. @@ -44,8 +59,7 @@ class TempTableSpec: cluster_cols: tuple[ str, ... - ] # if empty, will cluster using order key if ordering_key is set - lifetime: Literal["session", "ephemeral"] = "session" + ] = () # if empty, will cluster using order key if ordering_key is set # Controls ordering and whether extra columns are materialized to preserve ordering # Any extra columns will be appended to the end of the schema. # None: ordering may be discarded entirely (ordering metadata will still be provided if ordering is derivable from materialized columns) @@ -66,6 +80,8 @@ class TableOutputSpec: table: bigquery.TableReference cluster_cols: tuple[str, ...] if_exists: Literal["fail", "replace", "append"] = "fail" + # Allow DML to be used to populate table + permit_dml: bool = True @dataclasses.dataclass(frozen=True) diff --git a/packages/bigframes/tests/system/small/engines/conftest.py b/packages/bigframes/tests/system/small/engines/conftest.py index b195e48fc6dd..10c238fbb4fa 100644 --- a/packages/bigframes/tests/system/small/engines/conftest.py +++ b/packages/bigframes/tests/system/small/engines/conftest.py @@ -44,31 +44,37 @@ def fake_session() -> Generator[bigframes.Session, None, None]: with bigframes.core.global_session._GlobalSessionContext(session): yield session +@pytest.fixture(scope="session") +def sqlglot_engine(bigquery_client: bigquery.Client, bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient): + return direct_gbq_execution.DirectGbqExecutor( + bigquery_client, + bqstoragereadclient=bigquery_storage_read_client, + compiler="sqlglot", + publisher=events.Publisher(), + ) + +@pytest.fixture(scope="session") +def bq_engine(bigquery_client: bigquery.Client, bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient): + return direct_gbq_execution.DirectGbqExecutor( + bigquery_client, + bqstoragereadclient=bigquery_storage_read_client, + publisher=events.Publisher(), + ) @pytest.fixture(scope="session", params=["pyarrow", "polars", "bq", "bq-sqlglot"]) def engine( request, - bigquery_client: bigquery.Client, - bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient, + sqlglot_engine, + bq_engine, ) -> semi_executor.SemiExecutor: if request.param == "pyarrow": return local_scan_executor.LocalScanExecutor() if request.param == "polars": return polars_executor.PolarsExecutor() - publisher = events.Publisher() if request.param == "bq": - return direct_gbq_execution.DirectGbqExecutor( - bigquery_client, - bqstoragereadclient=bigquery_storage_read_client, - publisher=publisher, - ) + return bq_engine if request.param == "bq-sqlglot": - return direct_gbq_execution.DirectGbqExecutor( - bigquery_client, - bqstoragereadclient=bigquery_storage_read_client, - compiler="sqlglot", - publisher=publisher, - ) + return sqlglot_engine raise ValueError(f"Unrecognized param: {request.param}") diff --git a/packages/bigframes/tests/system/small/engines/test_windowing.py b/packages/bigframes/tests/system/small/engines/test_windowing.py index c748947d997b..fcc1dad928f8 100644 --- a/packages/bigframes/tests/system/small/engines/test_windowing.py +++ b/packages/bigframes/tests/system/small/engines/test_windowing.py @@ -46,8 +46,9 @@ def test_engines_with_offsets( @pytest.mark.parametrize("agg_op", [agg_ops.sum_op, agg_ops.count_op]) def test_engines_with_rows_window( scalars_array_value: array_value.ArrayValue, - bigquery_client: bigquery.Client, agg_op, + bq_engine, + sqlglot_engine, ): window = window_spec.WindowSpec( bounds=window_spec.RowsWindowBounds.from_window_size(3, "left"), @@ -62,12 +63,4 @@ def test_engines_with_rows_window( ), window_spec=window, ) - - publisher = events.Publisher() - bq_executor = direct_gbq_execution.DirectGbqExecutor( - bigquery_client, publisher=publisher - ) - bq_sqlgot_executor = direct_gbq_execution.DirectGbqExecutor( - bigquery_client, compiler="sqlglot", publisher=publisher - ) - assert_equivalence_execution(window_node, bq_executor, bq_sqlgot_executor) + assert_equivalence_execution(window_node, bq_engine, sqlglot_engine) diff --git a/packages/bigframes/tests/system/small/test_session.py b/packages/bigframes/tests/system/small/test_session.py index 8955a878e014..7f620daf887e 100644 --- a/packages/bigframes/tests/system/small/test_session.py +++ b/packages/bigframes/tests/system/small/test_session.py @@ -118,7 +118,7 @@ def test_read_gbq_tokyo( exec_result = session_tokyo._executor.execute( df._block.expr, bigframes.session.execution_spec.ExecutionSpec( - bigframes.session.execution_spec.TempTableSpec(()), promise_under_10gb=False + bigframes.session.execution_spec.EphemeralTableSpec(), promise_under_10gb=False ), ) assert exec_result.query_job is not None @@ -951,7 +951,7 @@ def test_read_pandas_tokyo( result = session_tokyo._executor.execute( df._block.expr, bigframes.session.execution_spec.ExecutionSpec( - bigframes.session.execution_spec.TempTableSpec(()), promise_under_10gb=False + bigframes.session.execution_spec.EphemeralTableSpec(), promise_under_10gb=False ), ) assert result.query_job is not None