fix: replace mutex with single flight pattern#1193
Open
ferhatelmas wants to merge 2 commits into
Open
Conversation
Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR refactors several concurrency-sensitive code paths to replace the deprecated semaphore-based per-key mutex pattern with a “single-flight” coalescing pattern, while also hardening pg-boss queue lifecycle behavior (ack/fail/complete) to reduce jobs getting stuck “active” due to transient DB issues.
Changes:
- Introduces
createSingleFlightByKeyand updates tenant/JWKS/S3-credentials/migrations flows to coalesce same-key concurrent work. - Replaces queue worker semaphore-based concurrency control with an in-repo
createConcurrencyLimiterand adds retry/backoff around pg-boss completion/failure acknowledgements. - Updates observability: removes semaphore OTEL class instrumentation and adds a new queue metric for completion-ack failures; adds/extends unit tests for the new behaviors.
Reviewed changes
Copilot reviewed 18 out of 19 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| src/test/utils/cache-metrics.ts | Updates test comments to reflect single-flight cache-fill behavior. |
| src/storage/protocols/s3/credentials/manager.ts | Switches S3 credential cache fill from per-key mutex to single-flight. |
| src/internal/queue/queue.ts | Replaces semaphore with concurrency limiter; adds ack retry/backoff and shutdown fail-fast for fetched jobs. |
| src/internal/queue/queue.test.ts | Expands worker tests to cover concurrency limiting, retry budget restoration, and ack retry behavior. |
| src/internal/queue/event.ts | Logs and exits early when pg-boss drops an enqueue due to policy (null insert result). |
| src/internal/monitoring/otel-class-instrumentations.ts | Removes OTEL class instrumentation for semaphore/permit (no longer used directly). |
| src/internal/monitoring/metrics.ts | Adds queue_job_complete_failed metric for completion-ack failures. |
| src/internal/database/tenant.ts | Switches tenant config cache fill from per-key mutex to single-flight. |
| src/internal/concurrency/single-flight.ts | Adds the single-flight implementation keyed by string. |
| src/internal/concurrency/single-flight.test.ts | Adds unit tests validating single-flight success/failure sharing and cleanup semantics. |
| src/internal/concurrency/mutex.ts | Removes the deprecated semaphore-based per-key mutex implementation. |
| src/internal/concurrency/limit-concurrency.ts | Adds a lightweight in-repo concurrency limiter utility. |
| src/internal/concurrency/limit-concurrency.test.ts | Adds unit tests for limiter validation, scheduling, and slot release behavior. |
| src/internal/concurrency/index.ts | Re-exports new concurrency utilities and removes the mutex export. |
| src/internal/auth/jwks/manager.ts | Switches JWKS cache fill from per-key mutex to single-flight. |
| src/http/plugins/db.ts | Coalesces ON_REQUEST migrations per tenant via single-flight and reduces repeated checks. |
| src/http/plugins/db.test.ts | Adds coverage for on-request migration single-flight behavior across concurrent requests/scopes and failure retry. |
| package.json | Removes direct dependency on @shopify/semaphore. |
| package-lock.json | Updates lockfile to reflect dependency graph changes after removing direct semaphore dependency. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Coverage Report for CI Build 28601863430Coverage increased (+0.04%) to 78.861%Details
Uncovered Changes
Coverage Regressions2 previously-covered lines in 2 files lost coverage.
Coverage Stats💛 - Coveralls |
Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What kind of change does this PR introduce?
Refactor for perf and some cleanup
What is the current behavior?
We use deprecated semaphore package as a mutex. Concurrent requests enter into critical section one by one.
What is the new behavior?
Pattern is read through cache, inflight pattern is a better fit. One request does the job and caches and others receive the same result. Drop the dependency after all usage is refactored.
Additional context
Harden queue lifecycle with ack for transient db error, instead of depending on expiration.