diff --git a/packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py b/packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py index 27b79f266bc1..1e0b561e8c5b 100644 --- a/packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py +++ b/packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py @@ -177,7 +177,7 @@ def from_table( project_id: str, dataset_id: str, table_id: str, - uid_gen: guid.SequentialUIDGenerator, + uid_gen: guid.SequentialUIDGenerator | None = None, columns: typing.Sequence[str] = (), sql_predicate: typing.Optional[str] = None, system_time: typing.Optional[datetime.datetime] = None, @@ -202,6 +202,8 @@ def from_table( if system_time else None ) + if uid_gen is None: + uid_gen = guid.SequentialUIDGenerator() table_alias = next(uid_gen.get_uid_stream("bft_")) table_expr = sge.Table( this=sql.identifier(table_id), diff --git a/packages/bigframes/bigframes/core/rewrite/__init__.py b/packages/bigframes/bigframes/core/rewrite/__init__.py index ab5559ab6554..ae4b142b1a46 100644 --- a/packages/bigframes/bigframes/core/rewrite/__init__.py +++ b/packages/bigframes/bigframes/core/rewrite/__init__.py @@ -19,7 +19,7 @@ from bigframes.core.rewrite.implicit_align import try_row_join from bigframes.core.rewrite.legacy_align import legacy_join_as_projection from bigframes.core.rewrite.nullity import simplify_join -from bigframes.core.rewrite.order import bake_order, defer_order +from bigframes.core.rewrite.order import bake_order, defer_order, pull_out_order from bigframes.core.rewrite.pruning import column_pruning from bigframes.core.rewrite.scan_reduction import ( try_reduce_to_local_scan, @@ -50,6 +50,7 @@ "rewrite_range_rolling", "try_reduce_to_table_scan", "bake_order", + "pull_out_order", "try_reduce_to_local_scan", "fold_row_counts", "pull_out_window_order", diff --git a/packages/bigframes/bigframes/core/rewrite/order.py b/packages/bigframes/bigframes/core/rewrite/order.py index 7beb510ac6b8..cf059ee3e146 100644 --- a/packages/bigframes/bigframes/core/rewrite/order.py +++ b/packages/bigframes/bigframes/core/rewrite/order.py @@ -47,6 +47,14 @@ def bake_order( return node +def pull_out_order( + node: bigframes.core.nodes.BigFrameNode, +) -> Tuple[bigframes.core.nodes.BigFrameNode, bigframes.core.ordering.RowOrdering]: + import bigframes.core.rewrite.slices + node = node.bottom_up(bigframes.core.rewrite.slices.rewrite_slice) + return _pull_up_order(node, order_root=True) + + # Makes ordering explicit in window definitions def _pull_up_order( root: bigframes.core.nodes.BigFrameNode, @@ -267,7 +275,7 @@ def pull_up_order_inner( offsets_id ) return new_explode, child_order.join(inner_order) - raise ValueError(f"Unexpected node: {node}") + raise ValueError(f"Unexpected node type {type(node).__name__}") def pull_order_concat( node: bigframes.core.nodes.ConcatNode, diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index 8d3782c5ef6b..ca67e19f3528 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -20,8 +20,6 @@ from typing import Literal, Mapping, Optional, Sequence, Tuple import google.api_core.exceptions -import google.cloud.bigquery.job as bq_job -import google.cloud.bigquery.table as bq_table import google.cloud.bigquery_storage_v1 from google.cloud import bigquery @@ -30,7 +28,7 @@ import bigframes.core import bigframes.core.events import bigframes.core.guid -import bigframes.core.identifiers +import bigframes.core.ordering import bigframes.core.nodes as nodes import bigframes.core.schema as schemata import bigframes.core.tree_properties as tree_properties @@ -41,8 +39,7 @@ import bigframes.session.metrics import bigframes.session.planner import bigframes.session.temporary_storage -from bigframes import exceptions as bfe -from bigframes.core import bq_data, compile, local_data, rewrite +from bigframes.core import bq_data, compile, guid, identifiers, local_data, rewrite from bigframes.core.compile.sqlglot import sql as sg_sql from bigframes.core.compile.sqlglot import sqlglot_ir from bigframes.session import ( @@ -51,7 +48,9 @@ local_scan_executor, read_api_execution, semi_executor, + direct_gbq_execution, ) +import dataclasses # Max complexity that should be executed as a single query QUERY_COMPLEXITY_LIMIT = 1e7 @@ -87,10 +86,8 @@ def __init__( self.cache: execution_cache.ExecutionCache = execution_cache.ExecutionCache() self.metrics = metrics self.loader = loader - self.bqstoragereadclient = bqstoragereadclient self._enable_polars_execution = enable_polars_execution self._publisher = publisher - self._labels = labels # TODO(tswast): Send events from semi-executors, too. self._semi_executors: Sequence[semi_executor.SemiExecutor] = ( @@ -108,6 +105,14 @@ def __init__( polars_executor.PolarsExecutor(), ) self._upload_lock = threading.Lock() + self._gbq_executor = direct_gbq_execution.DirectGbqExecutor( + bqclient, + compiler=compile.compiler().compile_sql, + bqstoragereadclient=bqstoragereadclient, + metrics=self.metrics, + publisher=self._publisher, + labels=labels, + ) def to_sql( self, @@ -135,54 +140,105 @@ def execute( execution_spec: ex_spec.ExecutionSpec, ) -> executor.ExecuteResult: self._publisher.publish(bigframes.core.events.ExecutionStarted()) + maybe_result = self._try_execute_semi_executors(array_value, execution_spec) + if maybe_result is not None: + return maybe_result + result = self._execute_bigquery(array_value, execution_spec) + self._publisher.publish( + bigframes.core.events.ExecutionFinished( + result=result, + ) + ) + return result - # TODO: Support export jobs in combination with semi executors - if execution_spec.destination_spec is None: - plan = self.prepare_plan(array_value.node, target="simplify") - for exec in self._semi_executors: - maybe_result = exec.execute( - plan, ordered=execution_spec.ordered, peek=execution_spec.peek - ) - if maybe_result: - self._publisher.publish( - bigframes.core.events.ExecutionFinished( - result=maybe_result, - ) + def _try_execute_semi_executors( + self, + array_value: bigframes.core.ArrayValue, + execution_spec: ex_spec.ExecutionSpec, + ) -> Optional[executor.ExecuteResult]: + plan = self.prepare_plan(array_value.node, target="simplify") + for exec in self._semi_executors: + maybe_result = exec.execute(plan, execution_spec) + if maybe_result: + self._publisher.publish( + bigframes.core.events.ExecutionFinished( + result=maybe_result, ) - return maybe_result - - if isinstance(execution_spec.destination_spec, ex_spec.TableOutputSpec): - if execution_spec.peek or execution_spec.ordered: - raise NotImplementedError( - "Ordering and peeking not supported for gbq export" ) - # separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml - result = self._export_gbq(array_value, execution_spec.destination_spec) - self._publisher.publish( - bigframes.core.events.ExecutionFinished( - result=result, + return maybe_result + return None + + def _execute_bigquery( + self, + array_value: bigframes.core.ArrayValue, + execution_spec: ex_spec.ExecutionSpec, + ) -> executor.ExecuteResult: + dest_spec = execution_spec.destination_spec + # Recursive handlers for different cases, maybe extract to explicit interface. + if isinstance(dest_spec, ex_spec.GcsOutputSpec): + execution_spec = dataclasses.replace( + execution_spec, destination_spec=ex_spec.EphemeralTableSpec() + ) + results = self._execute_bigquery(array_value, execution_spec) + self._export_result_gcs(results, dest_spec) + return results + elif isinstance(dest_spec, ex_spec.TableOutputSpec) and dest_spec.permit_dml: + # Special DML path - maybe this should be configurable, dml vs query destination has tradeoffs + existing_table = self._maybe_find_existing_table(dest_spec) + if (existing_table is not None) and _is_schema_match( + existing_table.schema, array_value.schema + ): + execution_spec = dataclasses.replace( + execution_spec, destination_spec=ex_spec.EphemeralTableSpec() ) + results = self._execute_bigquery(array_value, execution_spec) + self._export_gbq_with_dml(results, dest_spec) + return results + elif isinstance(dest_spec, ex_spec.SessionTableSpec): + # "ephemeral" temp tables created in the course of exeuction, don't need to be allocated + # materialized ordering only really makes sense for internal temp tables used by caching + cluster_cols = dest_spec.cluster_cols + # Rewrite plan to materialize ordering as extra columns + plan = array_value.node + if dest_spec.ordering == "offsets_col": + order_col_id = guid.generate_guid() + plan = nodes.PromoteOffsetsNode(plan, identifiers.ColumnId(order_col_id)) + cluster_cols = [order_col_id] + elif dest_spec.ordering == "order_key": + plan, ordering = rewrite.pull_out_order(plan) + destination_table = self.storage_manager.create_temp_table( + plan.schema.to_bigquery(), cluster_cols + ) + arr_value = bigframes.core.ArrayValue(plan) + execution_spec = dataclasses.replace( + execution_spec, + destination_spec=ex_spec.TableOutputSpec( + table=destination_table, + cluster_cols=dest_spec.cluster_cols, + if_exists="replace", + # Avoid loops, also dml is mostly used to avoid quotas on user-owned tables + permit_dml=False, + ), ) + result = self._execute_bigquery(arr_value, execution_spec) + result._data = dataclasses.replace(result._data, ordering=ordering) return result - - result = self._execute_plan_gbq( - array_value.node, - ordered=execution_spec.ordered, - peek=execution_spec.peek, - cache_spec=execution_spec.destination_spec - if isinstance(execution_spec.destination_spec, ex_spec.CacheSpec) - else None, - must_create_table=not execution_spec.promise_under_10gb, - ) - # post steps: export - if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec): - self._export_result_gcs(result, execution_spec.destination_spec) - - self._publisher.publish( - bigframes.core.events.ExecutionFinished( - result=result, + # Force table creation if result might be large (and user explicitly allowed large results) + elif isinstance(dest_spec, ex_spec.EphemeralTableSpec) or dest_spec is None: + if not execution_spec.promise_under_10gb: + execution_spec = dataclasses.replace( + execution_spec, destination_spec=ex_spec.SessionTableSpec() + ) + return self._execute_bigquery(array_value, execution_spec) + + # At this point, dst should be unspecified, a specific bq table, or an ephemeral temp table + # Also, ordering mode will either be none or row-sorted + gbq_plan = self.prepare_plan(array_value.node, target="bq_execution") + result = self._gbq_executor.execute(gbq_plan, execution_spec) + if result is None: + raise ValueError( + f"Couldn't execute plan {array_value.node} with {execution_spec}" ) - ) return result def _export_result_gcs( @@ -231,75 +287,34 @@ def _maybe_find_existing_table( except google.api_core.exceptions.NotFound: return None - def _export_gbq( - self, array_value: bigframes.core.ArrayValue, spec: ex_spec.TableOutputSpec + def _export_gbq_with_dml( + self, result: executor.ExecuteResult, spec: ex_spec.TableOutputSpec ) -> executor.ExecuteResult: """ - Export the ArrayValue to an existing BigQuery table. + Export the ArrayValue to an existing BigQuery table, using DML. """ - plan = self.prepare_plan(array_value.node, target="bq_execution") - - # validate destination table - existing_table = self._maybe_find_existing_table(spec) - - compiled = compile.compiler().compile_sql( - compile.CompileRequest(plan, sort_rows=False) - ) - sql = compiled.sql - - if (existing_table is not None) and _is_schema_match( - existing_table.schema, array_value.schema - ): - # b/409086472: Uses DML for table appends and replacements to avoid - # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: - # https://cloud.google.com/bigquery/quotas#standard_tables - job_config = bigquery.QueryJobConfig() - - ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql) - if spec.if_exists == "append": - sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table)) - else: # for "replace" - assert spec.if_exists == "replace" - sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table)) - else: - dispositions = { - "fail": bigquery.WriteDisposition.WRITE_EMPTY, - "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, - "append": bigquery.WriteDisposition.WRITE_APPEND, - } - job_config = bigquery.QueryJobConfig( - write_disposition=dispositions[spec.if_exists], - destination=spec.table, - clustering_fields=spec.cluster_cols if spec.cluster_cols else None, - ) - - # Attach data type usage to the job labels - job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. - iterator, job = self._run_execute_query( - sql=sql, - job_config=job_config, - session=array_value.session, - ) - - has_special_dtype_col = any( - t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE) - for t in array_value.schema.dtypes + # b/409086472: Uses DML for table appends and replacements to avoid + # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: + # https://cloud.google.com/bigquery/quotas#standard_tables + ir = sqlglot_ir.SQLGlotIR.from_table( + result.query_job.destination.project, + result.query_job.destination.dataset_id, + result.query_job.destination.table_id, ) + sql = "" + if spec.if_exists == "append": + sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table)) + else: # for "replace" + assert spec.if_exists == "replace" + sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table)) - if spec.if_exists != "append" and has_special_dtype_col: - # Only update schema if this is not modifying an existing table, and the - # new table contains special columns (like timedelta or obj_ref). - table = self.bqclient.get_table(spec.table) - table.schema = array_value.schema.to_bigquery() - self.bqclient.update_table(table, ["schema"]) - - return executor.EmptyExecuteResult( - bf_schema=array_value.schema, - execution_metadata=executor.ExecutionMetadata.from_iterator_and_job( - iterator, job - ), + bq_io.start_query_with_client( + self.bqclient, + sql, + job_config=bigquery.QueryJobConfig(), + metrics=self.metrics, + publisher=self._publisher, + query_with_job=True, ) def dry_run( @@ -343,62 +358,6 @@ def cached( ) # Helpers - def _run_execute_query( - self, - sql: str, - job_config: Optional[bq_job.QueryJobConfig] = None, - query_with_job: bool = True, - session=None, - ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: - """ - Starts BigQuery query job and waits for results. - """ - job_config = bq_job.QueryJobConfig() if job_config is None else job_config - if bigframes.options.compute.maximum_bytes_billed is not None: - job_config.maximum_bytes_billed = ( - bigframes.options.compute.maximum_bytes_billed - ) - - if self._labels: - job_config.labels.update(self._labels) - - try: - # Trick the type checker into thinking we got a literal. - if query_with_job: - return bq_io.start_query_with_client( - self.bqclient, - sql, - job_config=job_config, - metrics=self.metrics, - project=None, - location=None, - timeout=None, - query_with_job=True, - publisher=self._publisher, - session=session, - ) - else: - return bq_io.start_query_with_client( - self.bqclient, - sql, - job_config=job_config, - metrics=self.metrics, - project=None, - location=None, - timeout=None, - query_with_job=False, - publisher=self._publisher, - session=session, - ) - - except google.api_core.exceptions.BadRequest as e: - # Unfortunately, this error type does not have a separate error code or exception type - if "Resources exceeded during query execution" in e.message: - new_message = "Computation is too complex to execute as a single query. Try using DataFrame.cache() on intermediate results, or setting bigframes.options.compute.enable_multi_query_execution." - raise bfe.QueryComplexityError(new_message) from e - else: - raise - def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): """ Can the block be evaluated very cheaply? @@ -441,23 +400,37 @@ def _cache_with_cluster_cols( self, array_value: bigframes.core.ArrayValue, cluster_cols: Sequence[str] ): """Executes the query and uses the resulting table to rewrite future executions.""" + cluster_cols = [ + col + for col in cluster_cols + if bigframes.dtypes.is_clusterable(array_value.schema.get_type(col)) + ] + cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS] execution_spec = ex_spec.ExecutionSpec( - destination_spec=ex_spec.CacheSpec(cluster_cols=tuple(cluster_cols)) + destination_spec=ex_spec.SessionTableSpec(cluster_cols=tuple(cluster_cols), ordering="order_key") ) - self.execute( + result = self.execute( array_value, execution_spec=execution_spec, ) + assert isinstance(result, executor.BQTableExecuteResult) + assert result._data.ordering is not None + self.cache.cache_results_table(array_value.node, result._data) def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): """Executes the query and uses the resulting table to rewrite future executions.""" execution_spec = ex_spec.ExecutionSpec( - destination_spec=ex_spec.CacheSpec(cluster_cols=tuple()) + destination_spec=ex_spec.SessionTableSpec( + cluster_cols=(), ordering="offsets_col" + ) ) - self.execute( + result = self.execute( array_value, execution_spec=execution_spec, ) + assert isinstance(result, executor.BQTableExecuteResult) + assert result._data.ordering is not None + self.cache.cache_results_table(array_value.node, result._data) def _cache_with_session_awareness( self, @@ -500,8 +473,9 @@ def _cache_most_complex_subtree(self, node: nodes.BigFrameNode) -> bool: max_complexity=QUERY_COMPLEXITY_LIMIT, cache=self.cache, # Heuristic: subtree_compleixty * (copies of subtree)^2 - heuristic=lambda complexity, count: math.log(complexity) - + 2 * math.log(count), + heuristic=lambda complexity, count: ( + math.log(complexity) + 2 * math.log(count) + ), ) if selection is None: # No good subtrees to cache, just return original tree @@ -568,126 +542,11 @@ def map_local_scans(node: nodes.BigFrameNode): return original_root.bottom_up(map_local_scans) def _execute_plan_gbq( - self, - plan: nodes.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, - cache_spec: Optional[ex_spec.CacheSpec] = None, - must_create_table: bool = True, + self, plan: nodes.BigFrameNode, execution_spec: ex_spec.ExecutionSpec ) -> executor.ExecuteResult: """Just execute whatever plan as is, without further caching or decomposition.""" - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. - - og_plan = plan - og_schema = plan.schema - plan = self.prepare_plan(plan, target="bq_execution") - create_table = must_create_table - cluster_cols: Sequence[str] = [] - if cache_spec is not None: - if peek is not None: - raise ValueError("peek is not compatible with caching.") - - create_table = True - if not cache_spec.cluster_cols: - offsets_id = bigframes.core.identifiers.ColumnId( - bigframes.core.guid.generate_guid() - ) - plan = nodes.PromoteOffsetsNode(plan, offsets_id) - cluster_cols = [offsets_id.sql] - else: - cluster_cols = [ - col - for col in cache_spec.cluster_cols - if bigframes.dtypes.is_clusterable(plan.schema.get_type(col)) - ] - cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS] - - compiled = compile.compiler().compile_sql( - compile.CompileRequest( - plan, - sort_rows=ordered, - peek_count=peek, - materialize_all_order_keys=(cache_spec is not None), - ) - ) - # might have more columns than og schema, for hidden ordering columns - compiled_schema = compiled.sql_schema - - destination_table: Optional[bigquery.TableReference] = None - - job_config = bigquery.QueryJobConfig() - if create_table: - destination_table = self.storage_manager.create_temp_table( - compiled_schema, cluster_cols - ) - job_config.destination = destination_table - - # Attach data type usage to the job labels - job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs - iterator, query_job = self._run_execute_query( - sql=compiled.sql, - job_config=job_config, - query_with_job=(destination_table is not None), - session=plan.session, - ) - - # we could actually cache even when caching is not explicitly requested, but being conservative for now - result_bq_data = None - if query_job and query_job.destination: - # we might add extra sql columns in compilation, esp if caching w ordering, infer a bigframes type for them - result_bf_schema = _result_schema(og_schema, list(compiled.sql_schema)) - dst = query_job.destination - result_bq_data = bq_data.BigqueryDataSource( - table=bq_data.GbqNativeTable.from_ref_and_schema( - dst, - tuple(compiled_schema), - cluster_cols=tuple(cluster_cols), - location=iterator.location or self.storage_manager.location, - table_type="TABLE", - ), - schema=result_bf_schema, - ordering=compiled.row_order, - n_rows=iterator.total_rows, - ) - - if cache_spec is not None: - assert result_bq_data is not None - assert compiled.row_order is not None - self.cache.cache_results_table(og_plan, result_bq_data) - - execution_metadata = executor.ExecutionMetadata.from_iterator_and_job( - iterator, query_job - ) - result_mostly_cached = ( - hasattr(iterator, "_is_almost_completely_cached") - and iterator._is_almost_completely_cached() - ) - if result_bq_data is not None and not result_mostly_cached: - return executor.BQTableExecuteResult( - data=result_bq_data, - project_id=self.bqclient.project, - storage_client=self.bqstoragereadclient, - execution_metadata=execution_metadata, - selected_fields=tuple((col, col) for col in og_schema.names), - ) - else: - return executor.LocalExecuteResult( - data=iterator.to_arrow().select(og_schema.names), - bf_schema=plan.schema, - execution_metadata=execution_metadata, - ) - - -def _result_schema( - logical_schema: schemata.ArraySchema, sql_schema: list[bigquery.SchemaField] -) -> schemata.ArraySchema: - inferred_schema = bigframes.dtypes.bf_type_from_type_kind(sql_schema) - inferred_schema.update(logical_schema._mapping) - return schemata.ArraySchema( - tuple(schemata.SchemaItem(col, dtype) for col, dtype in inferred_schema.items()) - ) + return self._gbq_executor.execute(plan, execution_spec) def _is_schema_match( diff --git a/packages/bigframes/bigframes/session/direct_gbq_execution.py b/packages/bigframes/bigframes/session/direct_gbq_execution.py index 95cf4b204764..4281dd70fc23 100644 --- a/packages/bigframes/bigframes/session/direct_gbq_execution.py +++ b/packages/bigframes/bigframes/session/direct_gbq_execution.py @@ -13,7 +13,7 @@ # limitations under the License. from __future__ import annotations -from typing import Literal, Optional, Tuple +from typing import Callable, Literal, Optional, Tuple import google.cloud.bigquery.job as bq_job import google.cloud.bigquery.table as bq_table @@ -22,77 +22,180 @@ import bigframes.core.compile.ibis_compiler.ibis_compiler as ibis_compiler import bigframes.core.compile.sqlglot.compiler as sqlglot_compiler import bigframes.core.events +import bigframes.session.metrics import bigframes.session._io.bigquery as bq_io -from bigframes.core import compile, nodes -from bigframes.session import executor, semi_executor +from bigframes.core import bq_data, compile, nodes +from bigframes.session import executor, semi_executor, execution_spec +from bigframes.core.compile.configs import CompileRequest, CompileResult +from bigframes import exceptions as bfe +import bigframes.core.schema as schemata +import google.cloud.bigquery_storage_v1 + +import google.api_core.exceptions + + +_WRITE_DISPOSITIONS = { + "fail": bigquery.WriteDisposition.WRITE_EMPTY, + "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, + "append": bigquery.WriteDisposition.WRITE_APPEND, +} -# used only in testing right now, BigQueryCachingExecutor is the fully featured engine -# simplified, doesnt not do large >10 gb result queries, error handling, respect global config -# or record metrics. Also avoids caching, and most pre-compile rewrites, to better serve as a -# reference for validating more complex executors. class DirectGbqExecutor(semi_executor.SemiExecutor): def __init__( self, bqclient: bigquery.Client, - compiler: Literal["ibis", "sqlglot"] = "ibis", + bqstoragereadclient: google.cloud.bigquery_storage_v1.BigQueryReadClient, *, - publisher: bigframes.core.events.Publisher, + compiler: Literal["ibis", "sqlglot"] + | Callable[[CompileRequest], CompileResult] = "ibis", + metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, + publisher: Optional[bigframes.core.events.Publisher] = None, + labels: Mapping[str, str] = {}, ): self.bqclient = bqclient - self._compile_fn = ( - ibis_compiler.compile_sql - if compiler == "ibis" - else sqlglot_compiler.compile_sql - ) + if isinstance(compiler, str): + self._compile_fn = ( + ibis_compiler.compile_sql + if compiler == "ibis" + else sqlglot_compiler.compile_sql + ) + else: + self._compile_fn = compiler + self._bqstoragereadclient = bqstoragereadclient self._publisher = publisher + self._metrics = metrics + self._labels = labels def execute( self, plan: nodes.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + spec: execution_spec.ExecutionSpec, ) -> executor.ExecuteResult: """Just execute whatever plan as is, without further caching or decomposition.""" - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. - compiled = self._compile_fn( - compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek) + og_schema = plan.schema + compile_request = CompileRequest( + plan, + sort_rows=spec.ordered, + peek_count=spec.peek, ) + compiled = self._compile_fn(compile_request) + # might have more columns than og schema, for hidden ordering columns + compiled_schema = compiled.sql_schema + + job_config = bigquery.QueryJobConfig() + dest_spec = spec.destination_spec + cluster_cols = None + can_skip_job = True + if isinstance(dest_spec, execution_spec.TableOutputSpec): + job_config.destination = dest_spec.table + job_config.write_disposition = _WRITE_DISPOSITIONS[dest_spec.if_exists] + cluster_cols = dest_spec.cluster_cols if dest_spec.cluster_cols else None + job_config.clustering_fields = cluster_cols + can_skip_job = False + elif isinstance(dest_spec, execution_spec.EphemeralTableSpec): + # Need destination table, but jobless execution might not create a destination table + can_skip_job = False + elif dest_spec is not None: + raise ValueError( + f"Direct GBQ Executor does not support destination: {dest_spec}" + ) + + job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs iterator, query_job = self._run_execute_query( sql=compiled.sql, + job_config=job_config, + query_with_job=(not can_skip_job), session=plan.session, ) + result_bq_data = None + if query_job and query_job.destination: + # we might add extra sql columns in compilation, esp if caching w ordering, infer a bigframes type for them + result_bf_schema = _result_schema(og_schema, list(compiled.sql_schema)) + dst = query_job.destination + result_bq_data = bq_data.BigqueryDataSource( + table=bq_data.GbqNativeTable.from_ref_and_schema( + dst, + tuple(compiled_schema), + cluster_cols=cluster_cols or (), + location=iterator.location or self.bqclient.location, + table_type="TABLE", + ), + schema=result_bf_schema, + ordering=compiled.row_order, + n_rows=iterator.total_rows, + ) - # just immediately downlaod everything for simplicity - return executor.LocalExecuteResult( - data=iterator.to_arrow(), - bf_schema=plan.schema, - execution_metadata=executor.ExecutionMetadata.from_iterator_and_job( - iterator, query_job - ), + execution_metadata = executor.ExecutionMetadata.from_iterator_and_job( + iterator, query_job + ) + result_mostly_cached = ( + hasattr(iterator, "_is_almost_completely_cached") + and iterator._is_almost_completely_cached() ) + if (isinstance(dest_spec, execution_spec.EphemeralTableSpec)) or ( + result_bq_data is not None and not result_mostly_cached + ): + return executor.BQTableExecuteResult( + data=result_bq_data, + project_id=self.bqclient.project, + storage_client=self._bqstoragereadclient, + execution_metadata=execution_metadata, + selected_fields=tuple((col, col) for col in og_schema.names), + ) + else: + return executor.LocalExecuteResult( + data=iterator.to_arrow().select(og_schema.names), + bf_schema=plan.schema, + execution_metadata=execution_metadata, + ) + def _run_execute_query( self, sql: str, job_config: Optional[bq_job.QueryJobConfig] = None, + query_with_job: bool = True, session=None, ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: """ Starts BigQuery query job and waits for results. """ - return bq_io.start_query_with_client( - self.bqclient, - sql, - job_config=job_config or bq_job.QueryJobConfig(), - project=None, - location=None, - timeout=None, - metrics=None, - query_with_job=False, - publisher=self._publisher, - session=session, - ) + job_config = bq_job.QueryJobConfig() if job_config is None else job_config + if bigframes.options.compute.maximum_bytes_billed is not None: + job_config.maximum_bytes_billed = ( + bigframes.options.compute.maximum_bytes_billed + ) + + if self._labels: + job_config.labels.update(self._labels) + + try: + return bq_io.start_query_with_client( + self.bqclient, + sql, + job_config=job_config, + metrics=self._metrics, + query_with_job=query_with_job, + publisher=self._publisher, + session=session, + ) + except google.api_core.exceptions.BadRequest as e: + # Unfortunately, this error type does not have a separate error code or exception type + if "Resources exceeded during query execution" in e.message: + new_message = "Computation is too complex to execute as a single query. Try using DataFrame.cache() on intermediate results, or setting bigframes.options.compute.enable_multi_query_execution." + raise bfe.QueryComplexityError(new_message) from e + else: + raise + + +def _result_schema( + logical_schema: schemata.ArraySchema, sql_schema: list[bigquery.SchemaField] +) -> schemata.ArraySchema: + inferred_schema = bigframes.dtypes.bf_type_from_type_kind(sql_schema) + inferred_schema.update(logical_schema._mapping) + return schemata.ArraySchema( + tuple(schemata.SchemaItem(col, dtype) for col, dtype in inferred_schema.items()) + ) diff --git a/packages/bigframes/bigframes/session/execution_spec.py b/packages/bigframes/bigframes/session/execution_spec.py index c9431dbd1168..69d8d0c184fc 100644 --- a/packages/bigframes/bigframes/session/execution_spec.py +++ b/packages/bigframes/bigframes/session/execution_spec.py @@ -22,27 +22,66 @@ @dataclasses.dataclass(frozen=True) class ExecutionSpec: - destination_spec: Union[TableOutputSpec, GcsOutputSpec, CacheSpec, None] = None + # destination for the result of the operation. Executor may also incidentally create other temporary tables for its own purposes. + destination_spec: Union[ + TableOutputSpec, GcsOutputSpec, EphemeralTableSpec, SessionTableSpec, None + ] = None + # If set, the result will be truncated to the given number of rows. Which N rows is + # implementation dependent and not stable. peek: Optional[int] = None - ordered: bool = ( - False # ordered and promise_under_10gb must both be together for bq execution - ) + # Controls whether output iterator is ordered. Cannot be true if destination is not + # guaranteed to be ordered. + ordered: bool = False # This is an optimization flag for gbq execution, it doesn't change semantics, but if promise is falsely made, errors may occur promise_under_10gb: bool = False -# This one is temporary, in future, caching will not be done through immediate execution, but will label nodes -# that will be cached only when a super-tree is executed +# Used internally by execution @dataclasses.dataclass(frozen=True) -class CacheSpec: - cluster_cols: tuple[str, ...] +class EphemeralTableSpec: + """ + Specifies that the result of an operation should be a temporary table of some sort. + + No guarantees on lifetime, may be a session temp table, or a bq-created temp table with <24hr life. + + Used internally when results need temporary staging, because they are large (>10GB), or needed in subsequent operations. + """ + + pass + + +@dataclasses.dataclass(frozen=True) +class SessionTableSpec: + """ + Specifies that the result of an operation should be a session temp table. + The table will be automatically deleted after the session ends. + """ + + cluster_cols: tuple[ + str, ... + ] = () # if empty, will cluster using order key if ordering_key is set + # Controls ordering and whether extra columns are materialized to preserve ordering + # Any extra columns will be appended to the end of the schema. + # None: ordering may be discarded entirely (ordering metadata will still be provided if ordering is derivable from materialized columns) + # order_rows: the result iterator itself will be ordered. For gbq execution, result cannot exceed 10GB. + # order_key: the result set ordered by a key, may materialize extra columns. + # offsets_col: order the result set by an offsets column, materializes one extra column. + ordering: Literal["order_rows", "offsets_col", "order_key"] | None = None @dataclasses.dataclass(frozen=True) class TableOutputSpec: + """ + Specifies that the result of an operation should be exported to a specific named table. + + The executor is not responsible for managing lifecycle of the table. + """ + table: bigquery.TableReference cluster_cols: tuple[str, ...] if_exists: Literal["fail", "replace", "append"] = "fail" + # Allow DML to be used to populate table + permit_dml: bool = True @dataclasses.dataclass(frozen=True) diff --git a/packages/bigframes/bigframes/session/local_scan_executor.py b/packages/bigframes/bigframes/session/local_scan_executor.py index fee0f557ea76..2843bf598ccb 100644 --- a/packages/bigframes/bigframes/session/local_scan_executor.py +++ b/packages/bigframes/bigframes/session/local_scan_executor.py @@ -16,7 +16,7 @@ from typing import Optional from bigframes.core import bigframe_node, rewrite -from bigframes.session import executor, semi_executor +from bigframes.session import executor, semi_executor, execution_spec class LocalScanExecutor(semi_executor.SemiExecutor): @@ -27,15 +27,17 @@ class LocalScanExecutor(semi_executor.SemiExecutor): def execute( self, plan: bigframe_node.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + execution_spec: execution_spec.ExecutionSpec, ) -> Optional[executor.ExecuteResult]: + if execution_spec.destination_spec is not None: + return None + reduced_result = rewrite.try_reduce_to_local_scan(plan) if not reduced_result: return None node, limit = reduced_result - + peek = execution_spec.peek if limit is not None: if peek is None or limit < peek: peek = limit diff --git a/packages/bigframes/bigframes/session/polars_executor.py b/packages/bigframes/bigframes/session/polars_executor.py index 06c7fcb925c4..3728f488ff7e 100644 --- a/packages/bigframes/bigframes/session/polars_executor.py +++ b/packages/bigframes/bigframes/session/polars_executor.py @@ -34,7 +34,7 @@ numeric_ops, string_ops, ) -from bigframes.session import executor, semi_executor +from bigframes.session import executor, semi_executor, execution_spec if TYPE_CHECKING: import polars as pl @@ -140,20 +140,20 @@ def __init__(self): def execute( self, plan: bigframe_node.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + execution_spec: execution_spec.ExecutionSpec, ) -> Optional[executor.ExecuteResult]: if not self._can_execute(plan): return None - # Note: Ignoring ordered flag, as just executing totally ordered is fine. + if execution_spec.destination_spec is not None: + return None try: lazy_frame: pl.LazyFrame = self._compiler.compile( array_value.ArrayValue(plan).node ) except Exception: return None - if peek is not None: - lazy_frame = lazy_frame.limit(peek) + if execution_spec.peek is not None: + lazy_frame = lazy_frame.limit(execution_spec.peek) pa_table = lazy_frame.collect().to_arrow() return executor.LocalExecuteResult( data=pa_table, diff --git a/packages/bigframes/bigframes/session/read_api_execution.py b/packages/bigframes/bigframes/session/read_api_execution.py index 9f2d196ce8eb..76bf29dc29a1 100644 --- a/packages/bigframes/bigframes/session/read_api_execution.py +++ b/packages/bigframes/bigframes/session/read_api_execution.py @@ -18,7 +18,7 @@ from google.cloud import bigquery_storage_v1 from bigframes.core import bigframe_node, bq_data, nodes, rewrite -from bigframes.session import executor, semi_executor +from bigframes.session import executor, semi_executor, execution_spec class ReadApiSemiExecutor(semi_executor.SemiExecutor): @@ -37,14 +37,16 @@ def __init__( def execute( self, plan: bigframe_node.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + execution_spec: execution_spec.ExecutionSpec, ) -> Optional[executor.ExecuteResult]: - adapt_result = self._try_adapt_plan(plan, ordered) + if execution_spec.destination_spec is not None: + return None + + adapt_result = self._try_adapt_plan(plan, execution_spec.ordered) if not adapt_result: return None node, limit = adapt_result - if node.explicitly_ordered and ordered: + if node.explicitly_ordered and execution_spec.ordered: return None if not isinstance(node.source.table, bq_data.GbqNativeTable): @@ -53,6 +55,7 @@ def execute( if not node.source.table.is_physically_stored: return None + peek = execution_spec.peek if limit is not None: if peek is None or limit < peek: peek = limit diff --git a/packages/bigframes/bigframes/session/semi_executor.py b/packages/bigframes/bigframes/session/semi_executor.py index c41d7c96d3e9..6f110834fe91 100644 --- a/packages/bigframes/bigframes/session/semi_executor.py +++ b/packages/bigframes/bigframes/session/semi_executor.py @@ -15,7 +15,7 @@ from typing import Optional from bigframes.core import bigframe_node -from bigframes.session import executor +from bigframes.session import executor, execution_spec # Unstable interface, in development @@ -27,7 +27,6 @@ class SemiExecutor(abc.ABC): def execute( self, plan: bigframe_node.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + execution_spec: execution_spec.ExecutionSpec, ) -> Optional[executor.ExecuteResult]: raise NotImplementedError("execute not implemented for this executor") diff --git a/packages/bigframes/bigframes/testing/engine_utils.py b/packages/bigframes/bigframes/testing/engine_utils.py index edb68c3a9b0a..456d6b463840 100644 --- a/packages/bigframes/bigframes/testing/engine_utils.py +++ b/packages/bigframes/bigframes/testing/engine_utils.py @@ -15,7 +15,12 @@ import pandas.testing from bigframes.core import nodes -from bigframes.session import semi_executor +from bigframes.session import semi_executor, execution_spec + + +SPEC = execution_spec.ExecutionSpec( + ordered=True, +) def assert_equivalence_execution( @@ -23,8 +28,8 @@ def assert_equivalence_execution( engine1: semi_executor.SemiExecutor, engine2: semi_executor.SemiExecutor, ): - e1_result = engine1.execute(node, ordered=True) - e2_result = engine2.execute(node, ordered=True) + e1_result = engine1.execute(node, SPEC) + e2_result = engine2.execute(node, SPEC) assert e1_result is not None assert e2_result is not None # Convert to pandas, as pandas has better comparison utils than arrow diff --git a/packages/bigframes/tests/system/conftest.py b/packages/bigframes/tests/system/conftest.py index 361d9387bc77..f7565d32d29f 100644 --- a/packages/bigframes/tests/system/conftest.py +++ b/packages/bigframes/tests/system/conftest.py @@ -27,6 +27,7 @@ import google.cloud.bigquery_connection_v1 as bigquery_connection_v1 import google.cloud.exceptions import google.cloud.functions_v2 as functions_v2 +import google.cloud.bigquery_storage_v1 import google.cloud.resourcemanager_v3 as resourcemanager_v3 import google.cloud.storage as storage # type: ignore import numpy as np @@ -103,6 +104,13 @@ def bigquery_client(session: bigframes.Session) -> bigquery.Client: return session.bqclient +@pytest.fixture(scope="session") +def bigquery_storage_read_client( + session: bigframes.Session, +) -> google.cloud.bigquery_storage_v1.BigQueryReadClient: + return session.bqstoragereadclient + + @pytest.fixture(scope="session") def bigquery_client_tokyo(session_tokyo: bigframes.Session) -> bigquery.Client: return session_tokyo.bqclient diff --git a/packages/bigframes/tests/system/small/engines/conftest.py b/packages/bigframes/tests/system/small/engines/conftest.py index cea505dd28a6..10c238fbb4fa 100644 --- a/packages/bigframes/tests/system/small/engines/conftest.py +++ b/packages/bigframes/tests/system/small/engines/conftest.py @@ -19,6 +19,7 @@ from google.cloud import bigquery import bigframes +import google.cloud.bigquery_storage_v1 from bigframes.core import ArrayValue, events, local_data from bigframes.session import ( direct_gbq_execution, @@ -43,22 +44,37 @@ def fake_session() -> Generator[bigframes.Session, None, None]: with bigframes.core.global_session._GlobalSessionContext(session): yield session +@pytest.fixture(scope="session") +def sqlglot_engine(bigquery_client: bigquery.Client, bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient): + return direct_gbq_execution.DirectGbqExecutor( + bigquery_client, + bqstoragereadclient=bigquery_storage_read_client, + compiler="sqlglot", + publisher=events.Publisher(), + ) + +@pytest.fixture(scope="session") +def bq_engine(bigquery_client: bigquery.Client, bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient): + return direct_gbq_execution.DirectGbqExecutor( + bigquery_client, + bqstoragereadclient=bigquery_storage_read_client, + publisher=events.Publisher(), + ) @pytest.fixture(scope="session", params=["pyarrow", "polars", "bq", "bq-sqlglot"]) -def engine(request, bigquery_client: bigquery.Client) -> semi_executor.SemiExecutor: +def engine( + request, + sqlglot_engine, + bq_engine, +) -> semi_executor.SemiExecutor: if request.param == "pyarrow": return local_scan_executor.LocalScanExecutor() if request.param == "polars": return polars_executor.PolarsExecutor() - publisher = events.Publisher() if request.param == "bq": - return direct_gbq_execution.DirectGbqExecutor( - bigquery_client, publisher=publisher - ) + return bq_engine if request.param == "bq-sqlglot": - return direct_gbq_execution.DirectGbqExecutor( - bigquery_client, compiler="sqlglot", publisher=publisher - ) + return sqlglot_engine raise ValueError(f"Unrecognized param: {request.param}") diff --git a/packages/bigframes/tests/system/small/engines/test_aggregation.py b/packages/bigframes/tests/system/small/engines/test_aggregation.py index b95781d21b14..e6e4ac571578 100644 --- a/packages/bigframes/tests/system/small/engines/test_aggregation.py +++ b/packages/bigframes/tests/system/small/engines/test_aggregation.py @@ -128,19 +128,14 @@ def test_engines_unary_variance_aggregates( def test_sql_engines_median_op_aggregates( scalars_array_value: array_value.ArrayValue, bigquery_client: bigquery.Client, + bq_engine, + sqlglot_engine, ): node = apply_agg_to_all_valid( scalars_array_value, agg_ops.MedianOp(), ).node - publisher = events.Publisher() - left_engine = direct_gbq_execution.DirectGbqExecutor( - bigquery_client, publisher=publisher - ) - right_engine = direct_gbq_execution.DirectGbqExecutor( - bigquery_client, compiler="sqlglot", publisher=publisher - ) - assert_equivalence_execution(node, left_engine, right_engine) + assert_equivalence_execution(node, bq_engine, sqlglot_engine) @pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True) diff --git a/packages/bigframes/tests/system/small/engines/test_sorting.py b/packages/bigframes/tests/system/small/engines/test_sorting.py index 920282bc9319..bcab19f324dd 100644 --- a/packages/bigframes/tests/system/small/engines/test_sorting.py +++ b/packages/bigframes/tests/system/small/engines/test_sorting.py @@ -16,7 +16,7 @@ import bigframes.operations as bf_ops from bigframes.core import array_value, nodes, ordering -from bigframes.session import polars_executor +from bigframes.session import polars_executor, execution_spec from bigframes.testing.engine_utils import assert_equivalence_execution pytest.importorskip("polars") @@ -96,7 +96,7 @@ def test_polars_engines_skips_unrecognized_order_expr( ), ) node = nodes.OrderByNode(node, ORDER_EXPRESSIONS) - assert engine.execute(node, ordered=True) is None + assert engine.execute(node, execution_spec.ExecutionSpec(ordered=True)) is None def apply_reverse(node: nodes.BigFrameNode) -> nodes.BigFrameNode: diff --git a/packages/bigframes/tests/system/small/engines/test_windowing.py b/packages/bigframes/tests/system/small/engines/test_windowing.py index c748947d997b..fcc1dad928f8 100644 --- a/packages/bigframes/tests/system/small/engines/test_windowing.py +++ b/packages/bigframes/tests/system/small/engines/test_windowing.py @@ -46,8 +46,9 @@ def test_engines_with_offsets( @pytest.mark.parametrize("agg_op", [agg_ops.sum_op, agg_ops.count_op]) def test_engines_with_rows_window( scalars_array_value: array_value.ArrayValue, - bigquery_client: bigquery.Client, agg_op, + bq_engine, + sqlglot_engine, ): window = window_spec.WindowSpec( bounds=window_spec.RowsWindowBounds.from_window_size(3, "left"), @@ -62,12 +63,4 @@ def test_engines_with_rows_window( ), window_spec=window, ) - - publisher = events.Publisher() - bq_executor = direct_gbq_execution.DirectGbqExecutor( - bigquery_client, publisher=publisher - ) - bq_sqlgot_executor = direct_gbq_execution.DirectGbqExecutor( - bigquery_client, compiler="sqlglot", publisher=publisher - ) - assert_equivalence_execution(window_node, bq_executor, bq_sqlgot_executor) + assert_equivalence_execution(window_node, bq_engine, sqlglot_engine) diff --git a/packages/bigframes/tests/system/small/test_session.py b/packages/bigframes/tests/system/small/test_session.py index e4101d4e941b..7f620daf887e 100644 --- a/packages/bigframes/tests/system/small/test_session.py +++ b/packages/bigframes/tests/system/small/test_session.py @@ -118,7 +118,7 @@ def test_read_gbq_tokyo( exec_result = session_tokyo._executor.execute( df._block.expr, bigframes.session.execution_spec.ExecutionSpec( - bigframes.session.execution_spec.CacheSpec(()), promise_under_10gb=False + bigframes.session.execution_spec.EphemeralTableSpec(), promise_under_10gb=False ), ) assert exec_result.query_job is not None @@ -951,7 +951,7 @@ def test_read_pandas_tokyo( result = session_tokyo._executor.execute( df._block.expr, bigframes.session.execution_spec.ExecutionSpec( - bigframes.session.execution_spec.CacheSpec(()), promise_under_10gb=False + bigframes.session.execution_spec.EphemeralTableSpec(), promise_under_10gb=False ), ) assert result.query_job is not None diff --git a/packages/bigframes/tests/unit/session/test_local_scan_executor.py b/packages/bigframes/tests/unit/session/test_local_scan_executor.py index fc59253153b6..a76cdcd49a67 100644 --- a/packages/bigframes/tests/unit/session/test_local_scan_executor.py +++ b/packages/bigframes/tests/unit/session/test_local_scan_executor.py @@ -17,9 +17,13 @@ import pytest from bigframes.core import identifiers, local_data, nodes -from bigframes.session import local_scan_executor +from bigframes.session import local_scan_executor, execution_spec from bigframes.testing import mocks +SPEC = execution_spec.ExecutionSpec( + ordered=True, +) + @pytest.fixture def object_under_test(): @@ -72,7 +76,7 @@ def test_local_scan_executor_with_slice(start, stop, expected_rows, object_under stop=stop, ) - result = object_under_test.execute(plan, ordered=True) + result = object_under_test.execute(plan, SPEC) result_table = pyarrow.Table.from_batches(result.batches().arrow_batches) assert result_table.num_rows == expected_rows @@ -98,4 +102,4 @@ def test_local_scan_executor_with_slice_unsupported_inputs( stop=stop, step=step, ) - assert object_under_test.execute(plan, ordered=True) is None + assert object_under_test.execute(plan, SPEC) is None