feat: add adaptive shard routing selector#313
Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 3 minutes and 36 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (4)
📝 WalkthroughWalkthroughAdds an adaptive shard-routing selector and runtime planning to the parallel executor, centralizes partition dispatch via workload profiles, introduces resolve/execute adaptive entrypoints, updates benches/docs/xtask to record worker hints and resolved plans, and bakes adaptive benchmark results. Changes
Sequence DiagramsequenceDiagram
participant Bench as Benchmark Runner
participant Analyzer as Workload Analyzer
participant Selector as AdaptiveShardRoutingSelector
participant Executor as Parallel Executor
participant Collector as Result Collector
Bench->>Analyzer: partition items, derive workload profile
Analyzer-->>Bench: ParallelExecutionWorkloadProfile
Bench->>Selector: resolve_adaptive_shard_routing(items, worker_hint)
Selector-->>Bench: (selected_policy, selected_workers)
Bench->>Executor: execute_parallel_with_adaptive_routing(view, items, worker_hint)
Executor->>Executor: partition shards -> execute_partitioned_shards
Executor->>Executor: dispatch per selected_policy and selected_workers
Executor-->>Collector: Vec<TickDelta] + execution metadata (selected_policy, selected_workers)
Collector-->>Bench: benchmark result row (includes worker_hint + selected plan)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review please 🥕 |
|
🧠 Learnings used✅ Actions performedFull review triggered. |
|
@coderabbitai review please 🥕 |
|
🧠 Learnings used✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
xtask/src/main.rs (1)
1213-1239:⚠️ Potential issue | 🟠 MajorConflicting adaptive rows still collapse nondeterministically.
This de-dupe only prefers
selected_policy = Some(_)overNone. If the selector changes and twoadaptive_shard_routing__hint_*__selected_*directories exist for the same(policy, workers, load), both rows have selected metadata and the first one seen wins. Sincestd::fs::read_dirdoes not guarantee iteration order (it's platform and filesystem dependent),bench policy-export/bench check-artifactsproduce different outputs based on directory traversal order and can preserve stale selected-plan metadata. Either reject conflictingSome/Somecollisions, or sort the input before processing to guarantee consistent results.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@xtask/src/main.rs` around lines 1213 - 1239, The collapse is nondeterministic when both existing and candidate have selected_policy=Some(...); update prefer_policy_matrix_row to handle Some/Some deterministically instead of only preferring Some over None: detect the Some/Some case and break ties deterministically (for example, compare candidate.selected_policy.as_ref().cmp(existing.selected_policy.as_ref()) and return true/false accordingly) so the entry.insert behavior is stable regardless of filesystem read_dir order; ensure you reference the selected_policy field and the prefer_policy_matrix_row function used by the by_case insertion logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/warp-benches/benches/parallel_baseline.rs`:
- Around line 346-359: The benchmark reimplements profiling in
workload_profile() which duplicates
ParallelExecutionWorkloadProfile::from_shards() and can drift; replace calls to
workload_profile() and the separate adaptive_plan_for_items() with a single
shared API that derives the profile from the actual shard partitioning via
ParallelExecutionWorkloadProfile::from_shards(...) and then call
AdaptiveShardRoutingSelector.select_plan(...) on that profile, and build any
adaptive label from the returned ParallelExecutionPlan so the benchmark and
runtime use the exact same selector inputs and plan.
In `@crates/warp-core/src/parallel/exec.rs`:
- Around line 1331-1361: The test adaptive_selector_matches_serial_oracle
currently only uses make_store_and_items(64) which never triggers the
medium/large adaptive branches; update the test
adaptive_selector_matches_serial_oracle to run the same assertions over multiple
workload sizes (e.g., keep 64 and add larger sizes such as 1024 and a larger one
that exceeds the small-workload cutoff) so that
execute_parallel_with_selector(view, &items, worker_hint(workers), selector)
exercises DYNAMIC_PER_WORKER and DYNAMIC_PER_SHARD paths for
AdaptiveShardRoutingSelector; for each workload size recreate (store, items) via
make_store_and_items(...) and run the existing workers loop and merge
assertions.
- Around line 1113-1153: The current tests only exercise broad buckets; add
focused unit tests for AdaptiveShardRoutingSelector::select_plan using
ParallelExecutionWorkloadProfile that pin the heuristic decision boundaries
exactly: create tests asserting behavior at total_items == 256 and total_items
== 257 (to capture the 256 threshold), total_items == 4096 and 4096+1 (to
capture the 4096 threshold), non_empty_shards == 4 and 3 (to capture the 4-shard
threshold), and cases that exercise the max_shard_len * 2 <= total_items gate
(one where max_shard_len * 2 == total_items and one where it is just one less
and one more) and assert the expected ParallelExecutionPlan (using
ParallelExecutionPlan::new and ParallelExecutionPolicy variants) for each input;
add these tests alongside the existing ones (naming them e.g.
adaptive_selector_boundary_total_256, adaptive_selector_boundary_total_4096,
adaptive_selector_boundary_non_empty_shards_4,
adaptive_selector_boundary_max_shard_len_gate) so the exact edges are locked
down.
In `@docs/benchmarks/index.html`:
- Around line 940-960: The report mixes raw input fields (row.policy,
row.workers) with migrated accessors, causing inconsistent narratives; finish
the executed-plan migration by routing every summary/path that currently reads
row.policy or row.workers through the new accessors: use selectedPolicy(row) and
selectedWorkers(row) (and selectedPlanLabel(row)) everywhere—update caseLabel,
any tooltip/headline/card renderers, and any POLICY_LABELS or
workerPoolLabel/workerHintLabel lookups so they call
POLICY_LABELS[selectedPolicy(row)] and workerPoolLabel(selectedWorkers(row))
instead of using raw row.* values, ensuring all displayed text consistently
reflects the executed plan.
In `@docs/benchmarks/PARALLEL_POLICY_MATRIX.md`:
- Around line 16-17: Update the paragraph to reflect the exported payload
distinction between workers, worker_hint, and selected_workers: explain that
fixed dynamic_* / static_* rows are concrete runs with explicit worker counts
(e.g., 1w/4w/8w) while adaptive rows supply worker_hint(s) and the selector
resolves a concrete selected_workers plan (pooled-worker policies are hints, not
final counts); ensure mentions of "pooled-worker" and the selector clarify that
the selector picks a policy which is then resolved into selected_workers, and
remove any phrasing that implies the pooled-worker axis is always a concrete
worker count.
In `@docs/benchmarks/parallel-policy-matrix.json`:
- Around line 15-56: The committed benchmark JSON contains rows where the
"series" and "mean_ns" values (e.g., "adaptive_shard_routing:8w" at load 100
with mean_ns ~46651, "adaptive_shard_routing:1w" at load 1000 with mean_ns
~243309, and "adaptive_shard_routing:8w" at load 10000 with mean_ns ~2284241)
contradict the PR summary; either regenerate the benchmark artifacts and replace
the JSON so the "policy"/"series"/"load"/"mean_ns" entries reflect the actual
run used in the writeup, or update the PR summary/conclusions to match the
committed JSON; locate and fix the inconsistent entries by checking the objects
with keys "policy", "series", "load", "mean_ns" (and related
"selected_policy"/"selected_series") and ensure the committed estimates come
from the same run that produced the PR conclusions.
---
Outside diff comments:
In `@xtask/src/main.rs`:
- Around line 1213-1239: The collapse is nondeterministic when both existing and
candidate have selected_policy=Some(...); update prefer_policy_matrix_row to
handle Some/Some deterministically instead of only preferring Some over None:
detect the Some/Some case and break ties deterministically (for example, compare
candidate.selected_policy.as_ref().cmp(existing.selected_policy.as_ref()) and
return true/false accordingly) so the entry.insert behavior is stable regardless
of filesystem read_dir order; ensure you reference the selected_policy field and
the prefer_policy_matrix_row function used by the by_case insertion logic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: d2c95a39-c2ca-4699-859a-f56a034aeb68
📒 Files selected for processing (9)
crates/warp-benches/benches/README.mdcrates/warp-benches/benches/parallel_baseline.rscrates/warp-core/src/parallel/exec.rscrates/warp-core/src/parallel/mod.rsdocs/benchmarks/PARALLEL_POLICY_MATRIX.mddocs/benchmarks/index.htmldocs/benchmarks/parallel-policy-matrix.jsondocs/benchmarks/report-inline.htmlxtask/src/main.rs
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 564919abd5
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
xtask/src/main.rs (1)
1171-1223:⚠️ Potential issue | 🟠 MajorReject conflicting duplicate rows instead of keeping whichever
read_dirreturns first.This only distinguishes
selected_policy = Some(_)fromNone. If two stale adaptive directories exist for the same(policy, workers, load)and both already carry selected-plan metadata, the winner is traversal-order dependent. That makesbench bake/check-artifactsnondeterministic and can silently preserve stale measurements after selector behavior changes.Possible fix
-fn collect_policy_matrix_rows(criterion_root: &Path, repo_root: &Path) -> Vec<PolicyMatrixRow> { +fn collect_policy_matrix_rows(criterion_root: &Path, repo_root: &Path) -> Result<Vec<PolicyMatrixRow>> { if !criterion_root.is_dir() { - return Vec::new(); + return Ok(Vec::new()); } let mut by_case = BTreeMap::new(); @@ let key = (row.policy.clone(), row.workers.clone(), row.load); match by_case.entry(key) { std::collections::btree_map::Entry::Vacant(entry) => { entry.insert(row); } std::collections::btree_map::Entry::Occupied(mut entry) => { + if policy_matrix_rows_conflict(entry.get(), &row) { + bail!( + "conflicting benchmark rows for {}:{} load {}: {} vs {}; clean {} before rebaking", + row.policy, + row.workers, + row.load, + entry.get().path, + row.path, + criterion_root.display(), + ); + } if prefer_policy_matrix_row(entry.get(), &row) { entry.insert(row); } } } } } let mut results: Vec<_> = by_case.into_values().collect(); @@ - results + Ok(results) } fn prefer_policy_matrix_row(existing: &PolicyMatrixRow, candidate: &PolicyMatrixRow) -> bool { existing.selected_policy.is_none() && candidate.selected_policy.is_some() } + +fn policy_matrix_rows_conflict(left: &PolicyMatrixRow, right: &PolicyMatrixRow) -> bool { + left.selected_series != right.selected_series || left.path != right.path +}Also applies to: 1237-1239
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@xtask/src/main.rs` around lines 1171 - 1223, The code currently keeps whichever PolicyMatrixRow wins traversal order when inserting into by_case; instead, detect true conflicts where both the existing and incoming rows have selected_policy.is_some() but differ (or both are Some for the same key) and reject the pair so neither is kept. In the match for std::collections::btree_map::Entry::Occupied in xtask/src/main.rs (symbols: by_case, PolicyMatrixRow, prefer_policy_matrix_row), add a check before calling entry.insert: if entry.get().selected_policy.is_some() && row.selected_policy.is_some() && entry.get().selected_policy != row.selected_policy { entry.remove(); /* skip inserting new row */ } else continue with the existing prefer_policy_matrix_row logic; apply the same conflict-rejection check at the other similar insertion site referenced in the comment (lines ~1237-1239).docs/benchmarks/index.html (1)
967-975:⚠️ Potential issue | 🟡 Minor
adaptive_shard_routingmissing frompolicySortKey.
indexOfreturns-1for unknown policies, placing adaptive rows before all others in the raw table sort. Add the new policy to the array.function policySortKey(policy) { return [ "dynamic_per_worker", "dynamic_per_shard", "static_per_worker", "static_per_shard", "dedicated_per_shard", + "adaptive_shard_routing", ].indexOf(policy); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/benchmarks/index.html` around lines 967 - 975, The sort key array in function policySortKey is missing the new policy "adaptive_shard_routing", causing indexOf to return -1 and put adaptive rows before others; update the array inside policySortKey to include "adaptive_shard_routing" in the desired ordering (e.g., add "adaptive_shard_routing" at the appropriate position among "dynamic_per_worker", "dynamic_per_shard", etc.) so indexOf(policy) returns a valid non-negative sort index for that policy.
♻️ Duplicate comments (4)
crates/warp-core/src/parallel/mod.rs (1)
16-20:⚠️ Potential issue | 🟠 MajorKeep selector injection off the public
warp-coresurface.Re-exporting
ParallelExecutionPlanSelectorplus the*_with_selectorentry points lets downstream callers choose execution topology from arbitrary ambient state. Canonical merge may stabilize final ops, but the publicVec<TickDelta>shape and thread topology can still vary across identical engine inputs. Keep this seam crate-private or behind a bench-only/internal feature.As per coding guidelines,
crates/warp-core/**: warp-core is the deterministic kernel. Every code path must be deterministic (no HashMap iteration order, no floating-point platform variance, no thread-dependent ordering). Flag any non-determinism risk. Performance matters — flag unnecessary allocations in hot paths.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/warp-core/src/parallel/mod.rs` around lines 16 - 20, The public re-export of selector injection (ParallelExecutionPlanSelector and the execute_parallel_with_selector / execute_parallel_sharded_with_selector entry points) creates a non-deterministic seam; make these crate-private or gate them behind an internal/bench feature: remove ParallelExecutionPlanSelector from the public exports in mod.rs and either (a) move the selector-aware functions execute_parallel_with_selector and execute_parallel_sharded_with_selector into a private module, or (b) add a feature flag (e.g., internal_selector or bench_only) and conditionally compile those functions and the selector type behind that feature so the default public warp-core surface exposes only the deterministic APIs (execute_parallel_with_policy, execute_parallel_sharded_with_policy, etc.).crates/warp-benches/benches/parallel_baseline.rs (1)
346-359: 🛠️ Refactor suggestion | 🟠 MajorStop predicting the adaptive label with a second planner.
benchmark_labelis derived from localworkload_profile()/adaptive_plan_for_items()before the run, but the measured path callsexecute_parallel_with_selector()and selects its plan again. That gives the artifact name/path two sources of truth for the executed plan, so the next selector/profile change can make the baked metadata lie again. Build the label from the same shared planning API the executor uses, or surface the selected plan fromexecute_parallel_with_selector().Also applies to: 431-459
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/warp-benches/benches/parallel_baseline.rs` around lines 346 - 359, The benchmark currently computes benchmark_label using a local workload_profile() and adaptive_plan_for_items() but execute_parallel_with_selector() re-plans, causing a mismatch; update the labeling to use the exact plan chosen by the executor instead of re-running selection: either call the shared planner API used by execute_parallel_with_selector() (e.g., AdaptiveShardRoutingSelector.select_plan with the same inputs) to generate the label, or modify execute_parallel_with_selector() to return or expose the selected ParallelExecutionPlan and use that returned plan to build benchmark_label; update references to workload_profile(), adaptive_plan_for_items(), and benchmark_label accordingly so there is a single source of truth for the selected plan.docs/benchmarks/PARALLEL_POLICY_MATRIX.md (1)
16-16:⚠️ Potential issue | 🟡 MinorSeparate fixed worker-count runs from adaptive worker hints.
This section still treats the pooled-worker axis as “worker hints”. In the baked matrix, fixed
dynamic_*/static_*rows are concrete1w/4w/8wexecutions (worker_hint: null), while only adaptive rows carryworker_hintand then resolveselected_policy+selected_workers. The selector is also not workload-shape-only: at load1000, hint1wpicksstatic_per_worker:1w, while hints4w/8wpickdynamic_per_worker:1w. Rewrite this as fixed worker-count sweeps plus adaptive hint-driven selection.As per coding guidelines,
docs/**/*.md: Documentation accuracy matters — especially anything touching determinism guarantees, hash stability, or canonical ordering. Flag factual errors and stale cross-references. Do not flag stylistic preferences in prose.Also applies to: 36-51
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/benchmarks/PARALLEL_POLICY_MATRIX.md` at line 16, The paragraph incorrectly mixes fixed worker-count runs with adaptive worker hints; update the text so that rows named dynamic_* / static_* represent concrete fixed worker-count executions with worker_hint: null (e.g., concrete 1w/4w/8w sweeps), and separate those from adaptive rows which carry a worker_hint and are resolved by the selector into selected_policy and selected_workers. Also clarify that the selector is not workload-shape-only (e.g., at load 1000 hint 1w → static_per_worker:1w while hints 4w/8w → dynamic_per_worker:1w) and rewrite the matrix description to present a fixed worker-count sweep plus a distinct adaptive hint-driven selection path, updating references to pooled-worker axis, worker_hint, selected_policy, selected_workers, static_per_worker, and dynamic_per_worker accordingly.docs/benchmarks/parallel-policy-matrix.json (1)
15-56:⚠️ Potential issue | 🟠 MajorThe baked artifact still contradicts the PR summary.
These rows still say load
100is fastest atadaptive_shard_routing:8w(46651ns) rather thanstatic_per_worker:1w(49395ns), load1000is fastest atadaptive_shard_routing:1w(243309ns) rather thanadaptive_shard_routing:4w(246259ns), and load10000is fastest atadaptive_shard_routing:8w(2284241ns) rather thanstatic_per_shard:8w(2484938ns). Regenerate the committed artifacts from the run used in the writeup, or update the PR conclusions to match this JSON.Also applies to: 183-223, 225-266, 435-475, 589-601
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/benchmarks/parallel-policy-matrix.json` around lines 15 - 56, The JSON rows in parallel-policy-matrix (entries with keys "load", "series", and "mean_ns") contradict the PR writeup: verify the benchmark artifacts used in the writeup and either (A) regenerate and re-commit the benchmark artifacts so the entries where series "adaptive_shard_routing:8w", "adaptive_shard_routing:1w", and "adaptive_shard_routing:8w" no longer show lower "mean_ns" than the claimed winners, or (B) update the PR text/conclusions to reflect the actual committed JSON numbers (adjust mentions of "static_per_worker:1w", "adaptive_shard_routing:4w", and "static_per_shard:8w" for loads 100, 1000, 10000 respectively); ensure you fix all duplicated sections flagged (rows around the other ranges referenced) so "load", "series", and "mean_ns" in the committed artifacts match the writeup or vice versa.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/warp-benches/benches/README.md`:
- Around line 36-43: Update the README wording to stop calling fixed
pooled-worker runs “worker hints”: clarify that the fixed dynamic_* and static_*
cases are actual 1w/4w/8w executions (not hints), and only adaptive rows carry a
worker_hint which the selector resolves into a chosen plan; also tighten the
description of the selector input (referencing selector and worker_hint) to
state it can yield different policies for the same load under different hints
rather than implying the policy is derived solely from workload shape or that it
is fixed “for that load” in the policy matrix and adaptive rows.
In `@crates/warp-core/src/parallel/exec.rs`:
- Around line 496-511: execute_partitioned_shards is recomputing the workload
profile and re-capping workers unnecessarily; change its signature to accept a
ParallelExecutionWorkloadProfile (constructed once by
execute_parallel_sharded_with_selector via
ParallelExecutionWorkloadProfile::from_shards) and use that profile instead of
calling from_shards again, and remove repeated
capped_workers(plan.workers()).get() calls inside execute_partitioned_shards so
the caller passes a single capped worker count
(capped_workers(plan.workers()).get()) — update all callers (notably
execute_parallel_sharded_with_selector) to pass the profile and the capped
worker count and adjust the Branch handling for
ShardAssignmentPolicy::DedicatedPerShard to use the provided values.
In `@xtask/src/main.rs`:
- Around line 1284-1296: The parser currently accepts any substring as
worker_hint; update parse_adaptive_policy_case to validate worker_hint matches
the producer format "hint_{n}w" (e.g., hint_1w, hint_10w) before constructing
ParsedPolicyCase: after computing worker_hint from rest, check the pattern
(numeric between "hint_" and trailing "w") and reject (return None) on mismatch
so stray directories like "bad" are ignored; keep using split_policy_case and
preserve setting worker_hint: Some(worker_hint.to_owned()) only when the pattern
passes.
---
Outside diff comments:
In `@docs/benchmarks/index.html`:
- Around line 967-975: The sort key array in function policySortKey is missing
the new policy "adaptive_shard_routing", causing indexOf to return -1 and put
adaptive rows before others; update the array inside policySortKey to include
"adaptive_shard_routing" in the desired ordering (e.g., add
"adaptive_shard_routing" at the appropriate position among "dynamic_per_worker",
"dynamic_per_shard", etc.) so indexOf(policy) returns a valid non-negative sort
index for that policy.
In `@xtask/src/main.rs`:
- Around line 1171-1223: The code currently keeps whichever PolicyMatrixRow wins
traversal order when inserting into by_case; instead, detect true conflicts
where both the existing and incoming rows have selected_policy.is_some() but
differ (or both are Some for the same key) and reject the pair so neither is
kept. In the match for std::collections::btree_map::Entry::Occupied in
xtask/src/main.rs (symbols: by_case, PolicyMatrixRow, prefer_policy_matrix_row),
add a check before calling entry.insert: if
entry.get().selected_policy.is_some() && row.selected_policy.is_some() &&
entry.get().selected_policy != row.selected_policy { entry.remove(); /* skip
inserting new row */ } else continue with the existing prefer_policy_matrix_row
logic; apply the same conflict-rejection check at the other similar insertion
site referenced in the comment (lines ~1237-1239).
---
Duplicate comments:
In `@crates/warp-benches/benches/parallel_baseline.rs`:
- Around line 346-359: The benchmark currently computes benchmark_label using a
local workload_profile() and adaptive_plan_for_items() but
execute_parallel_with_selector() re-plans, causing a mismatch; update the
labeling to use the exact plan chosen by the executor instead of re-running
selection: either call the shared planner API used by
execute_parallel_with_selector() (e.g., AdaptiveShardRoutingSelector.select_plan
with the same inputs) to generate the label, or modify
execute_parallel_with_selector() to return or expose the selected
ParallelExecutionPlan and use that returned plan to build benchmark_label;
update references to workload_profile(), adaptive_plan_for_items(), and
benchmark_label accordingly so there is a single source of truth for the
selected plan.
In `@crates/warp-core/src/parallel/mod.rs`:
- Around line 16-20: The public re-export of selector injection
(ParallelExecutionPlanSelector and the execute_parallel_with_selector /
execute_parallel_sharded_with_selector entry points) creates a non-deterministic
seam; make these crate-private or gate them behind an internal/bench feature:
remove ParallelExecutionPlanSelector from the public exports in mod.rs and
either (a) move the selector-aware functions execute_parallel_with_selector and
execute_parallel_sharded_with_selector into a private module, or (b) add a
feature flag (e.g., internal_selector or bench_only) and conditionally compile
those functions and the selector type behind that feature so the default public
warp-core surface exposes only the deterministic APIs
(execute_parallel_with_policy, execute_parallel_sharded_with_policy, etc.).
In `@docs/benchmarks/PARALLEL_POLICY_MATRIX.md`:
- Line 16: The paragraph incorrectly mixes fixed worker-count runs with adaptive
worker hints; update the text so that rows named dynamic_* / static_* represent
concrete fixed worker-count executions with worker_hint: null (e.g., concrete
1w/4w/8w sweeps), and separate those from adaptive rows which carry a
worker_hint and are resolved by the selector into selected_policy and
selected_workers. Also clarify that the selector is not workload-shape-only
(e.g., at load 1000 hint 1w → static_per_worker:1w while hints 4w/8w →
dynamic_per_worker:1w) and rewrite the matrix description to present a fixed
worker-count sweep plus a distinct adaptive hint-driven selection path, updating
references to pooled-worker axis, worker_hint, selected_policy,
selected_workers, static_per_worker, and dynamic_per_worker accordingly.
In `@docs/benchmarks/parallel-policy-matrix.json`:
- Around line 15-56: The JSON rows in parallel-policy-matrix (entries with keys
"load", "series", and "mean_ns") contradict the PR writeup: verify the benchmark
artifacts used in the writeup and either (A) regenerate and re-commit the
benchmark artifacts so the entries where series "adaptive_shard_routing:8w",
"adaptive_shard_routing:1w", and "adaptive_shard_routing:8w" no longer show
lower "mean_ns" than the claimed winners, or (B) update the PR text/conclusions
to reflect the actual committed JSON numbers (adjust mentions of
"static_per_worker:1w", "adaptive_shard_routing:4w", and "static_per_shard:8w"
for loads 100, 1000, 10000 respectively); ensure you fix all duplicated sections
flagged (rows around the other ranges referenced) so "load", "series", and
"mean_ns" in the committed artifacts match the writeup or vice versa.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 57cb2710-233b-46ba-84be-6d1e825b0dae
📒 Files selected for processing (9)
crates/warp-benches/benches/README.mdcrates/warp-benches/benches/parallel_baseline.rscrates/warp-core/src/parallel/exec.rscrates/warp-core/src/parallel/mod.rsdocs/benchmarks/PARALLEL_POLICY_MATRIX.mddocs/benchmarks/index.htmldocs/benchmarks/parallel-policy-matrix.jsondocs/benchmarks/report-inline.htmlxtask/src/main.rs
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/warp-benches/benches/parallel_baseline.rs`:
- Around line 418-426: The Adaptive branch in PolicyMatrixCase::Adaptive
currently calls make_test_store(n) and make_exec_items(&nodes) solely to compute
a label via resolve_adaptive_shard_routing and then drops that store, causing a
redundant allocation; change this by computing the adaptive label from the store
that the benchmark iteration already creates (or compute the label lazily inside
the benchmark) instead of calling make_test_store again — locate the calls to
make_test_store, make_exec_items, resolve_adaptive_shard_routing and
adaptive_case_label in the Adaptive branch and reuse the existing store/nodes or
defer label computation to avoid the duplicate allocation.
In `@xtask/src/main.rs`:
- Around line 1304-1317: The selected_workers value returned by
split_policy_case is not being strictly validated and can accept invalid forms
(zero or zero-padded counts); update parse_adaptive_policy_case to validate
selected_workers with the same strict parser used for worker_hint
(parse_worker_suffix) instead of trusting split_policy_case alone: after
extracting selected_workers (from split_policy_case or the selected_case string)
run the same parse_worker_suffix logic (or adapt split_policy_case to return a
parsed NonZeroUsize via parse_worker_suffix) and reject/return None on failure;
apply the same change to the other occurrence of this pattern noted in the diff
so both places enforce NonZeroUsize and no zero-padding.
- Around line 1215-1228: The code currently collapses two rows with the same key
by choosing one via mtime/path inside the by_case.entry occupied branch;
instead, detect and reject conflicting resolved-plan metadata before calling
prefer_policy_matrix_row: when entry.get() yields an existing tuple whose
existing_row and existing_mtime both indicate a resolved-plan (i.e., both have
truthful selected_policy/selected_workers) and the new row's
selected_policy/selected_workers disagree, return an explicit error or panic
rather than silently inserting; implement this check in the occupied branch
(around the use of entry.get(), existing_row/existing_mtime, and before calling
prefer_policy_matrix_row) and add a regression test that constructs two truthful
conflicting rows for the same key to assert the rejection behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5f3573d0-eb3b-458b-bf41-30431497e1dc
📒 Files selected for processing (9)
CHANGELOG.mdcrates/warp-benches/benches/README.mdcrates/warp-benches/benches/parallel_baseline.rscrates/warp-core/src/parallel/exec.rscrates/warp-core/src/parallel/mod.rsdocs/benchmarks/PARALLEL_POLICY_MATRIX.mddocs/benchmarks/parallel-policy-matrix.jsondocs/benchmarks/report-inline.htmlxtask/src/main.rs
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/warp-benches/benches/parallel_baseline.rs (1)
347-349: 🧹 Nitpick | 🔵 TrivialUnreachable fallback in
worker_hint.
workers.max(1)guarantees the value is ≥1, soNonZeroUsize::new(...)always returnsSome(...). Themap_orfallback is dead code.Simplify to unwrap
fn worker_hint(workers: usize) -> NonZeroUsize { - NonZeroUsize::new(workers.max(1)).map_or(NonZeroUsize::MIN, |w| w) + // SAFETY: workers.max(1) is always ≥ 1 + NonZeroUsize::new(workers.max(1)).unwrap() }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/warp-benches/benches/parallel_baseline.rs` around lines 347 - 349, The fallback in worker_hint is unreachable because workers.max(1) is always ≥1, so NonZeroUsize::new(...) returns Some; replace the map_or call with a direct unwrap (or expect with a brief message) on NonZeroUsize::new(workers.max(1)) in the worker_hint function to return the NonZeroUsize directly (avoid using unsafe new_unchecked).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@xtask/src/main.rs`:
- Around line 7405-7431: The test creates two truthful PolicyMatrixRow values
with different selected-plan metadata which violates the precondition enforced
by insert_policy_matrix_row() and policy_matrix_rows_conflict(); update the test
inside prefer_policy_matrix_row_row_prefers_newer_truthful_row_for_same_case (or
its fixture) so both rows share identical selected_policy, selected_workers and
selected_series (i.e., same selected-plan metadata) and then vary only their
mtime/path or mark one as stale (e.g., simulate older mtime or a stale flag) so
the helper prefer_policy_matrix_row sees legally comparable rows; ensure
references to prefer_policy_matrix_row, insert_policy_matrix_row, and
policy_matrix_rows_conflict remain consistent.
- Around line 1297-1310: The parser currently collapses both "not a benchmark
case" and "recognized-but-malformed benchmark ids" into None (silent skip);
change the parsing API so malformed but recognized inputs produce an Err with a
clear message instead of None. Concretely, update parse_adaptive_policy_case and
split_policy_case (and the code that matches policy_case.ends_with('w')) to
return Result<Option<ParsedPolicyCase>, E> (or Result<ParsedPolicyCase, E> with
a distinct NotABenchmarkCase error variant) and when you detect cases like "0w",
"01w", or broken "__selected_..." return Err describing the validation failure;
then update collect_policy_matrix_rows (and other callers) to propagate/handle
Err as a fatal error rather than treating it as a skip. Ensure error messages
reference the offending policy_case and the validation rule.
---
Outside diff comments:
In `@crates/warp-benches/benches/parallel_baseline.rs`:
- Around line 347-349: The fallback in worker_hint is unreachable because
workers.max(1) is always ≥1, so NonZeroUsize::new(...) returns Some; replace the
map_or call with a direct unwrap (or expect with a brief message) on
NonZeroUsize::new(workers.max(1)) in the worker_hint function to return the
NonZeroUsize directly (avoid using unsafe new_unchecked).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 84a2b9e1-c6ca-472d-a4e1-aa8062990fc1
📒 Files selected for processing (4)
crates/warp-benches/benches/parallel_baseline.rsdocs/benchmarks/parallel-policy-matrix.jsondocs/benchmarks/report-inline.htmlxtask/src/main.rs
|
CI is green, CodeRabbit is pass, and unresolved review threads are now 0 on head 8e36b6a. There are no pending review requests, but the only CODEOWNER is @flyingrobots and this PR is also authored by @flyingrobots, so the remaining blocker is formal approval/admin merge rather than code or automation debt. |
Summary
AdaptiveShardRoutingSelectoras the first selector implementationmainValidation
cargo fmt --all --checkcargo test -p warp-core parallel::exec::tests --message-format shortcargo clippy -p warp-core --lib -- -D warnings -D missing_docscargo clippy -p warp-benches --bench parallel_baseline -- -D warningscargo bench -p warp-benches --bench parallel_baseline -- parallel_policy_matrixcargo xtask bench bake --out docs/benchmarks/report-inline.html --policy-json-out docs/benchmarks/parallel-policy-matrix.jsoncargo xtask bench check-artifacts --html docs/benchmarks/report-inline.html --json docs/benchmarks/parallel-policy-matrix.jsonCurrent benchmark read
100:static_per_worker / 1wis the nominal winner, with adaptive variants effectively clustered in the same low-overhead tier1000:adaptive_shard_routing / 4wis the current winner10000:static_per_shard / 8wis the current winner, withadaptive_shard_routing / 4wa very close secondThat leaves the selector in a useful position: it does not dominate every load, but it is competitive across the matrix and wins the medium case without collapsing in the large case.
Summary by CodeRabbit
New Features
Documentation
Bug Fixes