Skip to content

test: add fuzz fallback + canonicalization suites for #3949 [experimental]#3977

Closed
andygrove wants to merge 2 commits intoapache:mainfrom
andygrove:fuzz-fallback-3949
Closed

test: add fuzz fallback + canonicalization suites for #3949 [experimental]#3977
andygrove wants to merge 2 commits intoapache:mainfrom
andygrove:fuzz-fallback-3949

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Related to #3949.

Rationale for this change

Issue #3949 reports an intermittent AssertionError from ColumnarToRowExec.<init> in the awslabs TPC-DS benchmark (q14a, q14b, q31, q47, q57 on Spark 4.0). Prior repro attempts using small DPP queries failed, and 1TB local runs did not reproduce either. This PR adds infrastructure to stress the Spark/Comet boundary and probe the canonicalization invariants the stack trace points at, so future regressions around mixed-mode plan shapes have automated coverage.

The full stack trace shared on the issue shows the assertion fires from QueryPlan.doCanonicalizewithNewChildren(canonicalizedChildren)ColumnarToRowExec.copy(...) inside AdaptiveSparkPlanExec.createQueryStages. That means cometPlan.canonicalized.supportsColumnar is false for some Comet plan produced during post-stage-materialization re-planning. None of the suites in this PR reproduce the bug yet — the bad canonical form does not appear in the initial physical plan — but the tools are useful regardless.

What changes are included in this PR?

  • FuzzFallback utility (spark/src/main/scala/org/apache/comet/FuzzFallback.scala) — seeded, node-identity-keyed decision cache. Calling shouldVetoShuffle(s) or shouldVetoExec(op) twice for the same node returns the same answer, so getSupportLevel and createExec stay consistent.
  • Injection points:
    • CometShuffleExchangeExec.nativeShuffleSupported and columnarShuffleSupported
    • The generic operator branch of CometExecRule.convertNode
  • Four new testing-category configs, all default-off:
    • spark.comet.fuzz.fallback.enabled
    • spark.comet.fuzz.fallback.seed
    • spark.comet.fuzz.fallback.shuffleVetoProbability (default 0.5)
    • spark.comet.fuzz.fallback.execVetoProbability (default 0.0)
  • CometFuzzFallbackSuite — all 129 TPC-DS queries (v1.4 + v2.7) × N seeds, plan-only (no data required; uses TPCDSBase schemas).
  • CometFuzzDppSuite — small partitioned fact/dim data, actually executes DPP-flavored queries across SMJ/BHJ/coalesce variants with AQE on.
  • CometCanonicalizationSuite — targeted node-level canonicalization checks for scan/filter/project/aggregate/BHJ/DPP shapes, both directly (p.canonicalized.supportsColumnar) and wrapped (ColumnarToRowExec(p).canonicalized — mirrors the [INTERNAL_ERROR] The "collect" action failed. #3949 stack exactly).
  • CometCanonicalizationTpcdsSuite — same checks across every TPC-DS query plan.

How are these changes tested?

All four suites run green locally (Spark 4.0):

  • CometFuzzFallbackSuite: 129 tests × 2 seeds pass in ~20s.
  • CometFuzzDppSuite: 900 iterations (3 variants × 3 queries × 100 seeds) pass in ~2 min.
  • CometCanonicalizationSuite: 6 tests, ~10s.
  • CometCanonicalizationTpcdsSuite: 129 TPC-DS queries, ~8s.

All new configs default to off; no existing behavior changes when the fuzz harness is disabled.

Draft because the primary goal — reproducing #3949 — is not met by any of these suites. Leaving it in draft until either (a) one of the suites catches the bug after further tuning, (b) a real failing plan arrives from awslabs so we can tighten the canonicalization tests, or (c) we agree to merge the infrastructure as-is as a regression guard even though it doesn't demonstrate the repro today.

Adds a diagnostic harness that randomly vetoes Comet shuffle and operator
conversions so the rule pipeline produces irregular Spark/Comet boundaries,
plus tests that probe canonicalization of Comet plans. None of the suites
reproduce apache#3949 yet — the bug requires post-stage-materialization plan
shapes that don't appear in the initial physical plan — but the
infrastructure is useful as a general regression guard and documents the
hypotheses we've ruled out.

Changes:
- FuzzFallback: seeded, node-identity-keyed decision cache.
- Inject points: CometShuffleExchangeExec.{nativeShuffleSupported,
  columnarShuffleSupported} and the generic operator branch of
  CometExecRule.convertNode. Four new testing-category configs, all
  default-off.
- CometFuzzFallbackSuite: 129 TPC-DS queries x N seeds, plan-only
  (no data required).
- CometFuzzDppSuite: small partitioned fact/dim data, actually executes
  DPP-flavored queries across SMJ/BHJ/coalesce variants with AQE on.
- CometCanonicalizationSuite / CometCanonicalizationTpcdsSuite: walk
  each Comet plan node and assert p.canonicalized.supportsColumnar ==
  true, and that ColumnarToRowExec(p).canonicalized does not throw
  (mirrors the apache#3949 stack trace).
…stage-prep

stageContainsDPPScan walks s.child.exists for a FileSourceScanExec with a
PlanExpression partition filter. In AQE, once the inner stage materializes,
its subtree is replaced by a ShuffleQueryStageExec whose children is
Seq.empty — .exists no longer descends into it and the DPP scan becomes
invisible. The same shuffle therefore falls back at initial planning
(columnarShuffleSupported=false) but is converted to Comet at stage-prep
(columnarShuffleSupported=true). That plan-shape flip is the suspected
trigger for apache#3949.

The test builds a DPP-shaped query, observes the initial decision, then
swaps the shuffle's child for an opaque LeafExecNode stub that mimics how
ShuffleQueryStageExec presents to tree walks, and observes the decision
flip. A fix requires stageContainsDPPScan to descend into
QueryStageExec.plan.
@andygrove
Copy link
Copy Markdown
Member Author

Update: @andygrove's initial-plan-vs-AQE inconsistency theory is confirmed.

New commit adds CometDppFallbackConsistencySuite, which shows that columnarShuffleSupported (via stageContainsDPPScan) gives two different answers for the same shuffle:

initialDppVisible=true,  initialDecision=false   (correctly falls back — DPP scan visible)
postAqeDppVisible=false, postAqeDecision=true    (converts to Comet — DPP scan hidden)

Mechanism

stageContainsDPPScan in CometShuffleExchangeExec walks s.child.exists(...) looking for a FileSourceScanExec with a PlanExpression partition filter. In AQE, once the inner stage materializes, its subtree is replaced by a ShuffleQueryStageExec, whose children is Seq.empty. .exists no longer descends through it, so the DPP scan becomes invisible and the #3879 fallback stops firing. The same shuffle flips from Spark to Comet on the second planning pass.

The test demonstrates this by swapping the shuffle's child for an opaque LeafExecNode stub that matches the tree-walking behavior of a materialized stage.

Suggested fix

stageContainsDPPScan needs to descend into QueryStageExec.plan when walking for DPP filters. The same care is probably needed anywhere else we use .exists on a shuffle's child to reason about what's underneath.

Once fixed, the test's TODO(#3949) assertions flip to the symmetric form (initialDecision == postAqeDecision).

@andygrove
Copy link
Copy Markdown
Member Author

Honest update: the decision flip is real, but it is not sufficient to reproduce the full #3949 crash.

I tried many DPP-shaped queries that match the general pattern (inner DPP join + outer broadcast join + aggregate/topK/intersect/IN-subquery, across BHJ/SMJ/coalesce/localRead variants). Every query executed cleanly — no ColumnarToRowExec canonicalization assertion. So the decision flip demonstrated in CometDppFallbackConsistencySuite is a real inconsistency, but something else — larger scale, specific stats, or a plan shape unique to q14a/q14b/q31/q47/q57 — is needed to actually crash.

Where that leaves us:

  1. stageContainsDPPScan descending into QueryStageExec.plan is likely still worth doing as a correctness fix, but I can no longer claim it will close [INTERNAL_ERROR] The "collect" action failed. #3949 until we have a real repro.
  2. The fuzz + canonicalization infrastructure in this PR is still useful as regression coverage going forward.
  3. To make progress, we likely need the actual plan from a failing awslabs run, or a diagnostic build that logs the plan at the moment AdaptiveSparkPlanExec.createQueryStages crashes.

@andygrove andygrove changed the title test: add fuzz fallback + canonicalization suites for #3949 test: add fuzz fallback + canonicalization suites for #3949 [experimental] Apr 17, 2026
@andygrove andygrove closed this Apr 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant