diff --git a/sdk/arch/events.mdx b/sdk/arch/events.mdx index 2d37f966..371926f4 100644 --- a/sdk/arch/events.mdx +++ b/sdk/arch/events.mdx @@ -199,6 +199,61 @@ Two distinct error events exist in the SDK, with different purpose and visibilit - Effect: Run loop transitions to ERROR and run() raises ConversationRunError; surface top-level error to client applications - Code: https://github.com/OpenHands/software-agent-sdk/blob/main/openhands-sdk/openhands/sdk/event/conversation_error.py +## Event Stream Subscription Mechanism + +This section documents how clients subscribe to the event stream, how events are emitted, and a practical example of intercepting a headless run to inject instructions. + +### Subscription Model +- The server maintains an in-process publish/subscribe registry per conversation. Subscribers implement an async callable interface and receive events pushed by the server (source: openhands-agent-server/openhands/agent_server/pub_sub.py — Subscriber.__call__, PubSub.subscribe, PubSub.__call__). +- When a new subscriber registers, the service emits an initial ConversationStateUpdateEvent snapshot so clients have current state immediately (source: openhands-agent-server/openhands/agent_server/event_service.py — subscribe_to_events()). +- Event emission is thread-safe: server code schedules emission on the main loop, acquiring the conversation lock to persist and publish (source: openhands-agent-server/openhands/agent_server/event_service.py — _emit_event_from_thread()). + +### WebSocket Endpoint +- Path: /sockets/events/{conversation_id} +- Auth: session_api_key query parameter is checked against server config (source: openhands-agent-server/openhands/agent_server/sockets.py — events_socket()). +- Behavior: + - On connect, subscribes the socket to the conversation’s PubSub + - Optional resend_all=true replays historical events via paginated search (source: openhands-agent-server/openhands/agent_server/sockets.py — resend_all loop; event_service.search_events()) + - Incoming messages over the socket are validated as Message and routed to the conversation; if run=True, the run loop is triggered (source: openhands-agent-server/openhands/agent_server/sockets.py — Message.model_validate + event_service.send_message()) + +```mermaid +sequenceDiagram + participant Client + participant WS as WebSocket /sockets/events/{id} + participant Svc as EventService + participant PS as PubSub + + Client->>WS: Connect(session_api_key, resend_all) + WS->>Svc: subscribe_to_events(_WebSocketSubscriber) + Svc->>PS: subscribe(subscriber) + Note over Svc: Emit initial ConversationStateUpdateEvent to subscriber + loop if resend_all + WS->>Svc: search_events(page_id) + Svc-->>WS: Event batch + end + Note over Svc: Agent run creates events + Svc->>PS: __call__(event) + PS-->>WS: send_json(event) + Client->>WS: send_json(Message) + WS->>Svc: send_message(message, run=True) +``` + +### HTTP Event APIs +- Search: GET /conversations/{conversation_id}/events/search with filters and pagination (source: openhands-agent-server/openhands/agent_server/event_router.py — search_conversation_events()). +- Count: GET /conversations/{conversation_id}/events/count (source: openhands-agent-server/openhands/agent_server/event_router.py — count_conversation_events()). +- Fetch one: GET /conversations/{conversation_id}/events/{event_id} (source: openhands-agent-server/openhands/agent_server/event_router.py — get_conversation_event()). +- Batch fetch: GET /conversations/{conversation_id}/events?event_ids=... (source: openhands-agent-server/openhands/agent_server/event_router.py — batch_get_conversation_events()). +- Inject message: POST /conversations/{conversation_id}/events with SendMessageRequest to add Message and optionally trigger a run (source: openhands-agent-server/openhands/agent_server/event_router.py — send_message()). + +### Real-world example: Intercept and steer a headless run +- Subscribe to /sockets/events/{conversation_id} with resend_all=false to stream live events (source: openhands-agent-server/openhands/agent_server/sockets.py — events_socket()). +- Watch for ActionEvent and ObservationEvent to monitor progress; ConversationStateUpdateEvent provides status and stats (source: openhands-sdk/openhands/sdk/event/llm_convertible/observation.py; openhands-sdk/openhands/sdk/event/conversation_state.py). +- When a risky operation appears, send a Message via POST /conversations/{id}/events with instructions (e.g., “use a safer command”) and set run=true to resume (source: openhands-agent-server/openhands/agent_server/event_router.py — send_message()). + +--- +Last updated: 2025-12-09 UTC +Source commits: software-agent-sdk@93d405c, OpenHands@9b57a0b + ## See Also - **[Agent Architecture](/sdk/arch/agent)** - How agents read and write events diff --git a/sdk/arch/llm.mdx b/sdk/arch/llm.mdx index 66feaf46..e5775965 100644 --- a/sdk/arch/llm.mdx +++ b/sdk/arch/llm.mdx @@ -189,6 +189,11 @@ flowchart TB 4. **Telemetry:** Record tokens, cost, latency 5. **Response:** Return completion or raise error +#### Retry listener and telemetry updates +- The retry listener callback signature includes the exception: Callable[[int, int, BaseException | None], None]. This allows listeners to inspect the error that triggered the retry (source: openhands-sdk/openhands/sdk/llm/utils/retry_mixin.py; type alias and invocation with exception). +- LLM internally wraps the configured listener to also emit telemetry errors when a retryable error occurs (source: openhands-sdk/openhands/sdk/llm/llm.py; _retry_listener_fn forwards to listener and telemetry.on_error). +- Telemetry on_error now accepts BaseException to record both Exception and BaseException subclasses (source: openhands-sdk/openhands/sdk/llm/utils/telemetry.py; on_error signature). + ### Responses API Support In addition to the standard chat completion API, the LLM system supports [OpenAI's Responses API](https://platform.openai.com/docs/api-reference/responses) as an alternative invocation path for models that benefit from this newer interface (e.g., GPT-5-Codex only supports Responses API). The Responses API provides enhanced reasoning capabilities with encrypted thinking and detailed reasoning summaries. @@ -242,7 +247,11 @@ Models that automatically use the Responses API path: |---------|----------|---------------| | **gpt-5*** | `gpt-5`, `gpt-5-mini`, `gpt-5-codex` | OpenAI GPT-5 family | -**Detection:** The SDK automatically detects if a model supports the Responses API using pattern matching in [`model_features.py`](https://github.com/OpenHands/software-agent-sdk/blob/main/openhands-sdk/openhands/sdk/llm/utils/model_features.py). +**Detection:** The SDK automatically detects if a model supports the Responses API using pattern matching in [`model_features.py`](https://github.com/OpenHands/software-agent-sdk/blob/main/openhands-sdk/openhands/sdk/llm/utils/model_features.py). Patterns currently include substrings like "gpt-5" and "codex-mini-latest" (source: openhands-sdk/openhands/sdk/llm/utils/model_features.py — RESPONSES_API_PATTERNS). + +--- +Last updated: 2025-12-09 UTC +Source commit: software-agent-sdk@93d405c ## Provider Integration