Fix duplicate job execution despite limits_concurrency#689
Fix duplicate job execution despite limits_concurrency#689mhenrixon wants to merge 7 commits intorails:mainfrom
Conversation
When a job runs longer than its concurrency_duration, the semaphore expires and gets deleted by the dispatcher's concurrency maintenance. Previously, BlockedExecution.releasable would consider a concurrency key releasable when its semaphore was missing, causing blocked jobs to be released and run concurrently with the still-executing job. This violated the concurrency guarantee: with limits_concurrency set to 1, two jobs with the same concurrency key could run simultaneously if the first job exceeded its concurrency_duration. Fix: Check for claimed executions before marking a concurrency key as releasable. A key is only releasable when: - Its semaphore is missing OR has available slots, AND - No jobs with that key are currently being executed (claimed) This ensures concurrency limits are respected even when jobs exceed their configured duration.
…hutdown When a worker gracefully shuts down, claimed executions are released back to ready state via ClaimedExecution#release. Previously, this always used dispatch_bypassing_concurrency_limits, which ignored whether another job with the same concurrency key was already running. This could cause duplicate concurrent executions in the following scenario: 1. Job A starts running with concurrency key "X" (semaphore value = 0) 2. Time passes, semaphore expires and is deleted 3. Job B with same key "X" enqueues, creates new semaphore, starts running 4. Worker running Job A receives shutdown signal 5. Job A is released via dispatch_bypassing_concurrency_limits 6. Job A goes to ready state, gets picked up by another worker 7. Both Job A and Job B now running concurrently (violates limit!) Fix: Before releasing a job, check if any other jobs with the same concurrency key are currently executing. If so, go through normal dispatch which respects the concurrency policy (block or discard). If not, continue to bypass limits for performance. This ensures that graceful shutdown doesn't violate concurrency guarantees, even when semaphores have expired during long-running job execution.
|
Is the issue that it violates the expectation? Because this seems like expected behavior - if the duration elapsed, then you don't lock the job anymore. That job just exists outside of the concurrency limits now? |
Exactly, even with a 12 hour limit on the concurrency I saw endless duplicate jobs for two different reasons, both fixed here and one being exactly what you just insinuated. Retry, restarts, OOM errors, etc cause a number of duplicates which this PR resolves. Ultimately, that worker will be rewritten anyway but it seemed like the locking mechanism has some flaws/bleeds when I looked at it. |
|
Hey @mhenrixon, sorry for the delay! I'm just catching up on PRs and issues this week. I'm also sorry that I don't follow the issue here. What you describe as flawed is exactly how the system should behave. From the README:
That is, if the expiry time finishes, the lock is no longer valid, and other jobs are allowed to run concurrently. This is what |
|
I get that but this isn't what I see in the running app where duplicates still show up. Maybe because of restarts, maybe because of other reasons but within that expiry I still see duplicates without these changes. This PR fixes that so maybe have a deeper look on the code changes. |
Proxy.signal_all previously did an unconditional `value = value + 1` without the `value < limit` guard that attempt_increment has. If called on a job whose semaphore slot was already returned, the value could exceed the limit, theoretically allowing concurrent executions beyond the configured maximum. Fix: Group jobs by their concurrency_limit and apply the same `value < limit` guard used by attempt_increment.
Previously, `finished` and `failed_with` committed their transactions (destroying claimed_execution and marking the job) separately from `unblock_next_job` (signaling semaphore + releasing blocked job). If a process crashed between the two, the signal would be lost, causing blocked jobs to wait until semaphore expiry (concurrency_duration). Fix: Move `unblock_next_job` inside the same transaction as `finished` and `failed_with`. If the process crashes, the whole transaction rolls back, the claimed_execution survives, and the supervisor's process pruning will properly signal via `fail_all_with`. Also remove the redundant `unblock_next_job` call in `fail_all_with` since `failed_with` now handles it internally.
Verify that `perform` and `failed_with` correctly unblock the next blocked job as part of their transaction. These tests ensure that the signal (semaphore increment + blocked execution release) happens within the same transaction as the job completion, preventing signal loss if the process crashes.
When releasing a claimed execution during shutdown while other jobs with the same concurrency key are executing, the previous code called job.dispatch without first returning the semaphore slot. This caused the semaphore to under-count available slots (value < actual free slots), reducing effective parallelism until the semaphore expired. Fix: Signal the semaphore before re-dispatching, so the slot is returned to the pool. The subsequent dispatch will re-acquire a slot if available or block appropriately. This maintains accurate semaphore accounting across shutdown cycles.
Summary
Fixes multiple race conditions that allow duplicate concurrent job execution despite
limits_concurrency. These were discovered while debugging very long-running jobs crashing our production system.Root Causes & Fixes
1. Blocked jobs released while job is still executing
Root cause: When a job exceeds
concurrency_duration, the semaphore expires and gets deleted by concurrency maintenance.BlockedExecution.releasableconsidered a key releasable when its semaphore was missing — causing blocked jobs to run concurrently with the still-executing job.Fix: Check for claimed executions before marking a key as releasable. A key is only releasable when its semaphore is missing or has available slots AND no jobs with that key are currently executing.
2. Race condition between job enqueue and concurrency unblock
Root cause: Without a
FOR UPDATElock inSemaphore#wait, this interleaving caused permanently stuck jobs:release_one→ finds nothing (B's BlockedExecution not committed yet)Fix:
Semaphore.lock.find_by(key: key)acquires aFOR UPDATElock, serializing the enqueue path with the signal path. The signal UPDATE must wait for the enqueue transaction to commit, guaranteeing the BlockedExecution is visible torelease_one.3. Concurrency bypass during graceful shutdown
Root cause:
ClaimedExecution#releaseunconditionally calleddispatch_bypassing_concurrency_limits, ignoring whether another job with the same key was already running. After semaphore expiry, the released job could run concurrently with an existing execution.Fix:
other_executions_holding_concurrency_lock?checks for other claimed executions with the same key. If found, dispatch goes through normal concurrency controls. If not, bypass is safe.4.
signal_allcould push semaphore above limitRoot cause:
Proxy.signal_alldidUPDATE SET value = value + 1without thevalue < limitguard that individualsignalhas. If called on a job whose slot was already returned, the semaphore value could exceed the limit, allowing over-admission.Fix: Group jobs by their
concurrency_limitand apply the sameWHERE value < limitguard used byattempt_increment.5. Process crash between finish and unblock loses signal
Root cause:
finishedandfailed_withcommitted their transactions (destroying claimed_execution) separately fromunblock_next_job(signaling semaphore + releasing blocked job). A process crash between them lost the signal, causing blocked jobs to wait until semaphore expiry (concurrency_duration).Fix: Move
unblock_next_jobinside the same transaction asfinishedandfailed_with. If the process crashes, the whole transaction rolls back — the claimed_execution survives, and the supervisor's process pruning will properly signal viafail_all_with.6. Semaphore under-count during shutdown release
Root cause: When releasing a claimed execution while other jobs with the same key are executing, the code re-dispatched without first returning the held semaphore slot. This caused the semaphore to permanently under-count available slots, reducing effective parallelism until expiry.
Fix:
Semaphore.signal(job)before re-dispatch returns the slot to the pool. The subsequentdispatchre-acquires or blocks as appropriate.Test Plan
signal_alldoes not increment semaphore beyond limitsignal_allincrements when below limitperformunblocks next job atomically with finishfailed_withunblocks next job atomicallyrelease_oneafter signal