test: add fuzz fallback + canonicalization suites for #3949 [experimental]#3977
test: add fuzz fallback + canonicalization suites for #3949 [experimental]#3977andygrove wants to merge 2 commits intoapache:mainfrom
Conversation
Adds a diagnostic harness that randomly vetoes Comet shuffle and operator conversions so the rule pipeline produces irregular Spark/Comet boundaries, plus tests that probe canonicalization of Comet plans. None of the suites reproduce apache#3949 yet — the bug requires post-stage-materialization plan shapes that don't appear in the initial physical plan — but the infrastructure is useful as a general regression guard and documents the hypotheses we've ruled out. Changes: - FuzzFallback: seeded, node-identity-keyed decision cache. - Inject points: CometShuffleExchangeExec.{nativeShuffleSupported, columnarShuffleSupported} and the generic operator branch of CometExecRule.convertNode. Four new testing-category configs, all default-off. - CometFuzzFallbackSuite: 129 TPC-DS queries x N seeds, plan-only (no data required). - CometFuzzDppSuite: small partitioned fact/dim data, actually executes DPP-flavored queries across SMJ/BHJ/coalesce variants with AQE on. - CometCanonicalizationSuite / CometCanonicalizationTpcdsSuite: walk each Comet plan node and assert p.canonicalized.supportsColumnar == true, and that ColumnarToRowExec(p).canonicalized does not throw (mirrors the apache#3949 stack trace).
…stage-prep stageContainsDPPScan walks s.child.exists for a FileSourceScanExec with a PlanExpression partition filter. In AQE, once the inner stage materializes, its subtree is replaced by a ShuffleQueryStageExec whose children is Seq.empty — .exists no longer descends into it and the DPP scan becomes invisible. The same shuffle therefore falls back at initial planning (columnarShuffleSupported=false) but is converted to Comet at stage-prep (columnarShuffleSupported=true). That plan-shape flip is the suspected trigger for apache#3949. The test builds a DPP-shaped query, observes the initial decision, then swaps the shuffle's child for an opaque LeafExecNode stub that mimics how ShuffleQueryStageExec presents to tree walks, and observes the decision flip. A fix requires stageContainsDPPScan to descend into QueryStageExec.plan.
|
Update: @andygrove's initial-plan-vs-AQE inconsistency theory is confirmed. New commit adds Mechanism
The test demonstrates this by swapping the shuffle's child for an opaque Suggested fix
Once fixed, the test's |
|
Honest update: the decision flip is real, but it is not sufficient to reproduce the full #3949 crash. I tried many DPP-shaped queries that match the general pattern (inner DPP join + outer broadcast join + aggregate/topK/intersect/IN-subquery, across BHJ/SMJ/coalesce/localRead variants). Every query executed cleanly — no Where that leaves us:
|
Which issue does this PR close?
Related to #3949.
Rationale for this change
Issue #3949 reports an intermittent
AssertionErrorfromColumnarToRowExec.<init>in the awslabs TPC-DS benchmark (q14a, q14b, q31, q47, q57 on Spark 4.0). Prior repro attempts using small DPP queries failed, and 1TB local runs did not reproduce either. This PR adds infrastructure to stress the Spark/Comet boundary and probe the canonicalization invariants the stack trace points at, so future regressions around mixed-mode plan shapes have automated coverage.The full stack trace shared on the issue shows the assertion fires from
QueryPlan.doCanonicalize→withNewChildren(canonicalizedChildren)→ColumnarToRowExec.copy(...)insideAdaptiveSparkPlanExec.createQueryStages. That meanscometPlan.canonicalized.supportsColumnarisfalsefor some Comet plan produced during post-stage-materialization re-planning. None of the suites in this PR reproduce the bug yet — the bad canonical form does not appear in the initial physical plan — but the tools are useful regardless.What changes are included in this PR?
FuzzFallbackutility (spark/src/main/scala/org/apache/comet/FuzzFallback.scala) — seeded, node-identity-keyed decision cache. CallingshouldVetoShuffle(s)orshouldVetoExec(op)twice for the same node returns the same answer, sogetSupportLevelandcreateExecstay consistent.CometShuffleExchangeExec.nativeShuffleSupportedandcolumnarShuffleSupportedCometExecRule.convertNodetesting-category configs, all default-off:spark.comet.fuzz.fallback.enabledspark.comet.fuzz.fallback.seedspark.comet.fuzz.fallback.shuffleVetoProbability(default 0.5)spark.comet.fuzz.fallback.execVetoProbability(default 0.0)CometFuzzFallbackSuite— all 129 TPC-DS queries (v1.4 + v2.7) × N seeds, plan-only (no data required; usesTPCDSBaseschemas).CometFuzzDppSuite— small partitioned fact/dim data, actually executes DPP-flavored queries across SMJ/BHJ/coalesce variants with AQE on.CometCanonicalizationSuite— targeted node-level canonicalization checks for scan/filter/project/aggregate/BHJ/DPP shapes, both directly (p.canonicalized.supportsColumnar) and wrapped (ColumnarToRowExec(p).canonicalized— mirrors the [INTERNAL_ERROR] The "collect" action failed. #3949 stack exactly).CometCanonicalizationTpcdsSuite— same checks across every TPC-DS query plan.How are these changes tested?
All four suites run green locally (Spark 4.0):
CometFuzzFallbackSuite: 129 tests × 2 seeds pass in ~20s.CometFuzzDppSuite: 900 iterations (3 variants × 3 queries × 100 seeds) pass in ~2 min.CometCanonicalizationSuite: 6 tests, ~10s.CometCanonicalizationTpcdsSuite: 129 TPC-DS queries, ~8s.All new configs default to off; no existing behavior changes when the fuzz harness is disabled.
Draft because the primary goal — reproducing #3949 — is not met by any of these suites. Leaving it in draft until either (a) one of the suites catches the bug after further tuning, (b) a real failing plan arrives from awslabs so we can tighten the canonicalization tests, or (c) we agree to merge the infrastructure as-is as a regression guard even though it doesn't demonstrate the repro today.