fix(resource_manager): reinitialize consumer threads after os.fork()#1658
Conversation
Signed-off-by: 박영규 <pyg410@naver.com>
…r reinitialization Signed-off-by: 박영규 <pyg410@naver.com>
…ock in child process Signed-off-by: 박영규 <pyg410@naver.com>
…ocess-unsafe socket sharing Signed-off-by: 박영규 <pyg410@naver.com>
…d of replacing it Signed-off-by: 박영규 <pyg410@naver.com>
|
@pyg410 One more thing: I tried gunicon with Also had a look at how others do it and seems they all lazy reinitalize the connection. |
…ault Signed-off-by: 박영규 <pyg410@naver.com>
@wochinge |
|
@pyg410 Thanks! Just tried it and still breaks. I checked a bit how others do it and the simplest options seems to be:
This deactivates the proxy discovery on Mac and makes the PR full functional. Other libraries issues on this:
|
…nt segfault Signed-off-by: 박영규 <pyg410@naver.com>
# Conflicts: # langfuse/_client/resource_manager.py
|
@claude review |
|
@claude review |
There was a problem hiding this comment.
The prior concern about Langfuse.api / Langfuse.async_api being cached on __init__ (bypassing the post-fork reinit) has been correctly addressed by converting them to @property delegates on the resource manager — test coverage (assert client.api is rm.api) confirms it. Only a minor ordering nit remains in _init_consumer_threads (see inline).
Extended reasoning...
Overview
This PR reinitializes the Langfuse resource manager after os.fork() so Gunicorn --preload workers don't lose their consumer threads and hang on flush(). It has evolved substantially since first submission: it now (1) recreates internally-managed httpx.Client / API clients on fork while preserving caller-supplied clients, (2) resets the class-level RLock to avoid a fork-inherited stuck-mutex deadlock, (3) works around the macOS SystemConfiguration post-fork segfault via no_proxy="*", and (4) exposes Langfuse.api / Langfuse.async_api as @property delegates so post-fork reinit is visible to callers.
Prior feedback resolved
My previous review flagged that Langfuse.__init__ cached self.api = self._resources.api as plain attributes, so under --preload all langfuse.api.* calls (e.g. get_prompt, dataset ops) would keep routing through the parent's httpx.Client even after reinit. The current diff converts both to properties (client.py:422-448), and the added test_at_fork_reinit_recreates_httpx_client_by_default asserts client.api is rm.api after the reinit — the gap is closed.
Security risks
No new attack surface: the code paths added are internal lifecycle management (thread startup, client recreation, lock reset). The no_proxy="*" write only takes effect when no proxy env vars are already configured, and it's confined to the darwin branch — so users who rely on HTTPS_PROXY in workers are not silently deproxied.
Level of scrutiny
High. Fork-safety plus lock reset plus platform-conditional env-var mutation is inherently subtle: correctness depends on ordering (lock before shutdown check), on which state is class-level vs instance-level, and on the behavior of weakref.WeakMethod under GC. The test suite is well-designed — it exercises the stuck-lock scenario explicitly and asserts the outer Langfuse client sees fresh api objects — but real Gunicorn --preload behavior on Linux and macOS is worth spot-checking before merge given that this exact codepath has already regressed once (the macOS segfault). I'd defer to a human familiar with the deployment topology.
Other factors
Only remaining finding is a nit about ordering inside _init_consumer_threads: if the media consumer.start() loop raises (e.g. RuntimeError: can't start new thread), the score queue never gets reset and the child inherits the parent's queue with non-zero unfinished_tasks, causing the exact flush() hang this PR prevents. It's guarded by the outer try/except but the safety net still leaves stale queue state — a one-line hoist of the score-queue reset above the media loop closes it. Not a blocker.
|
Thanks for the fix @pyg410 ! 🚀 Will do a release with it now. |
What does this PR do?
When using Gunicorn with
--preload,os.fork()copies memory but not threads(POSIX.1: https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html).
LangfuseResourceManagerstarts media upload and score ingestion consumer threadson initialization, but these threads are not inherited by forked worker processes.
As a result, all media upload and score ingestion events are silently lost, and
calling
flush()blocks forever onqueue.join()→ Gunicorn worker timeout (SIGABRT).Note: span export is already fork-safe via
LangfuseSpanProcessor(BatchSpanProcessorinheritance). This fix covers the remaining background threads managed by
LangfuseResourceManager.Related PR
Changes:
_init_consumer_threads()for reusescore_ingestion_clientas an instance variable for access in_at_fork_reinit()_at_fork_reinit()to reinitialize queues and consumer threads after forkos.register_at_fork(after_in_child=...)usingweakref.WeakMethodtoavoid permanent strong references that would prevent garbage collection
_shutdownflag to skip reinitialization on already-stopped instancesType of change
Verification
List the main commands you ran:
Checklist
code_review.md..env.templateif needed.Greptile Summary
This PR fixes silent data loss and
flush()deadlocks in Gunicorn--preloaddeployments by registering anos.register_at_fork(after_in_child=...)handler that reinitializes the media upload and score ingestion consumer threads in each forked worker._init_consumer_threads()so it can be called both during initial setup and after fork; queues are intentionally recreated in the child to prevent duplicating pre-fork events across workers.WeakMethod-based fork handler prevents theos.register_at_forkregistry from holding a strong reference that would block garbage collection, and a_shutdownflag guards against reinitializing already-torn-down instances.httpx.Client/LangfuseClientinstances after fork (acknowledged in comments), leaving inherited connection-pool FDs that could cause initial connection errors in workers; theafter_in_childcallback also has no exception handling, so a thread-creation failure propagates out ofos.fork()and crashes the worker rather than degrading gracefully.Confidence Score: 3/5
The core fix is sound and addresses a real production issue, but the
after_in_childcallback has no exception handling, meaning a thread-creation failure would crash the Gunicorn worker rather than falling back gracefully.The fork-reinitialization logic is correct for the happy path: queues are replaced, old dead thread references are discarded, new consumer threads are started, and the WeakMethod approach is appropriate. The main gap is that
_init_consumer_threads()is called directly inside theafter_in_childcallback without a try/except — if the OS refuses to create a thread at worker startup time, the exception surfaces fromos.fork()and kills the worker instead of allowing it to continue with degraded telemetry. The inherited httpx connection pool FDs are an acknowledged limitation that could surface as transient connection errors in workers.langfuse/_client/resource_manager.pywarrants a second look around the_at_fork_reinitmethod — specifically exception handling and whether the HTTP clients need to be recreated for full fork safety.Sequence Diagram
sequenceDiagram participant Master as Master Process (Gunicorn) participant Fork as os.fork() participant Parent as Parent Process participant Child as Child Worker Process Master->>Master: Langfuse() → LangfuseResourceManager.__new__() Master->>Master: _initialize_instance() Master->>Master: _init_consumer_threads() → start media + score threads Master->>Master: "os.register_at_fork(after_in_child=weak_reinit_lambda)" Master->>Fork: os.fork() Fork-->>Parent: returns child PID (old threads alive) Fork-->>Child: returns 0 (threads NOT inherited) Child->>Child: after_in_child: weak_reinit() → _at_fork_reinit() Child->>Child: _shutdown? → False → proceed Child->>Child: _init_consumer_threads() Child->>Child: new _media_upload_queue (fresh Queue) Child->>Child: new _score_ingestion_queue (fresh Queue) Child->>Child: start fresh MediaUploadConsumer threads Child->>Child: start fresh ScoreIngestionConsumer thread Child->>Child: child ready to handle requests Parent->>Parent: continues with original threads unchangedPrompt To Fix All With AI
Reviews (1): Last reviewed commit: "fix(resource_manager): reinitialize cons..." | Re-trigger Greptile