Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections.abc import Iterable
from functools import lru_cache
from typing import Annotated

from fastapi import Depends
Expand Down Expand Up @@ -86,6 +87,19 @@ async def list_resources(
return response["items"]


@lru_cache(maxsize=1)
def _get_cached_agentex_authorization() -> AgentexAuthorizationProxy:
"""Cached AgentexAuthorizationProxy instance."""
from src.config.dependencies import resolve_environment_variable_dependency

url = resolve_environment_variable_dependency(EnvVarKeys.AGENTEX_AUTH_URL)
return AgentexAuthorizationProxy(agentex_auth_url=url)


def _get_agentex_authorization() -> AgentexAuthorizationProxy:
return _get_cached_agentex_authorization()


DAgentexAuthorization = Annotated[
AgentexAuthorizationProxy, Depends(AgentexAuthorizationProxy)
AgentexAuthorizationProxy, Depends(_get_agentex_authorization)
]
12 changes: 11 additions & 1 deletion agentex/src/adapters/temporal/adapter_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,17 +612,27 @@ async def delete_schedule(self, schedule_id: str) -> None:


# Dependency injection annotation for FastAPI
# Module-level cache for the temporal adapter singleton
_cached_temporal_adapter: TemporalAdapter | None = None


async def get_temporal_adapter() -> TemporalAdapter:
"""
Factory function for dependency injection.
Gets the temporal client from global dependencies.
Caches the adapter instance to avoid per-request instantiation.
"""
global _cached_temporal_adapter
if _cached_temporal_adapter is not None:
return _cached_temporal_adapter

from src.config.dependencies import GlobalDependencies

global_deps = GlobalDependencies()
await global_deps.load()
client = global_deps.temporal_client
return TemporalAdapter(temporal_client=client)
_cached_temporal_adapter = TemporalAdapter(temporal_client=client)
return _cached_temporal_adapter


DTemporalAdapter = Annotated[TemporalAdapter, Depends(get_temporal_adapter)]
35 changes: 34 additions & 1 deletion agentex/src/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,42 @@ def __init__(
self.message = message


async def _initialize_service_container(app: FastAPI):
"""
Initialize singleton services and store them on app.state.
This bypasses FastAPI's per-request DI overhead for stateless services.
"""
from src.adapters.temporal.adapter_temporal import get_temporal_adapter
from src.config.dependencies import (
_get_cached_agent_repository,
_get_cached_deployment_history_repository,
)

# Initialize repositories (already cached, but ensure they're created)
agent_repo = _get_cached_agent_repository()
deployment_history_repo = _get_cached_deployment_history_repository()
temporal_adapter = await get_temporal_adapter()

# Create and cache the use case
from src.domain.use_cases.agents_use_case import AgentsUseCase

agents_use_case = AgentsUseCase(
agent_repository=agent_repo,
deployment_history_repository=deployment_history_repo,
temporal_adapter=temporal_adapter,
)

# Store on app.state for direct access in routes
app.state.agents_use_case = agents_use_case
app.state.agent_repository = agent_repo

logger.info("Service container initialized on app.state")


@asynccontextmanager
async def lifespan(_: FastAPI):
async def lifespan(app: FastAPI):
await dependencies.startup_global_dependencies()
await _initialize_service_container(app)
configure_statsd()
yield
await dependencies.async_shutdown()
Expand Down
59 changes: 59 additions & 0 deletions agentex/src/api/authentication_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
verify_agent_identity,
verify_auth_gateway,
)
from src.api.schemas.authorization_types import (
AgentexResourceType,
AuthorizedOperationType,
)
from src.config.dependencies import (
DEnvironmentVariable,
resolve_environment_variable_dependency,
Expand All @@ -44,13 +48,65 @@ def __init__(self, app: ASGIApp) -> None:
),
environment=resolve_environment_variable_dependency(EnvVarKeys.ENVIRONMENT),
)
# Cached authorization proxy for pre-computing authorized resources
self._authz_proxy = None
if self._enabled:
from src.adapters.authorization.adapter_agentex_authz_proxy import (
_get_cached_agentex_authorization,
)

self._authz_proxy = _get_cached_agentex_authorization()

def is_enabled(self) -> bool:
return self._enabled

async def _cache_authorized_resources(self, request: Request) -> None:
"""
Pre-compute authorized resource IDs and cache on request.state.
This avoids DI overhead in route handlers.
"""
# Initialize cache dict for authorized resources
request.state.authorized_resources = {}

# If agent_identity is set, auth is bypassed - no filtering needed
if request.state.agent_identity:
return

# If auth is disabled, no filtering needed (use method for testability)
if not self.is_enabled():
return

# If no principal context, can't compute authorizations
if not request.state.principal_context:
return

# Lazily initialize authz proxy if needed (for test mocking support)
authz_proxy = self._authz_proxy
if authz_proxy is None:
from src.adapters.authorization.adapter_agentex_authz_proxy import (
_get_cached_agentex_authorization,
)

authz_proxy = _get_cached_agentex_authorization()

# Pre-compute authorized agent IDs (most common case)
try:
agent_ids = await authz_proxy.list_resources(
request.state.principal_context,
AgentexResourceType.agent,
AuthorizedOperationType.read,
)
request.state.authorized_resources[AgentexResourceType.agent] = (
list(agent_ids) if agent_ids else None
)
except Exception as e:
logger.warning(f"Failed to pre-compute authorized agents: {e}")
request.state.authorized_resources[AgentexResourceType.agent] = None

async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
request.state.principal_context = None
request.state.agent_identity = None
request.state.authorized_resources = {} # Pre-computed auth, None means no filtering

# Skip authentication for OPTIONS requests (CORS preflight)
if request.method == "OPTIONS":
Expand Down Expand Up @@ -158,6 +214,9 @@ async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
headers_dict, request.state.principal_context
)

# Pre-compute and cache authorized resources to avoid DI overhead in routes
await self._cache_authorized_resources(request)

return await call_next(request)


Expand Down
13 changes: 9 additions & 4 deletions agentex/src/api/routes/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from src.utils.authorization_shortcuts import (
DAuthorizedId,
DAuthorizedName,
DAuthorizedResourceIds,
)
from src.utils.logging import make_logger

Expand Down Expand Up @@ -90,22 +89,28 @@ async def get_agent_by_name(
description="List all registered agents, optionally filtered by query parameters.",
)
async def list_agents(
agents_use_case: DAgentsUseCase,
_authorized_ids: DAuthorizedResourceIds(AgentexResourceType.agent),
request: Request,
task_id: str | None = Query(None, description="Task ID"),
limit: int = Query(50, description="Limit", ge=1),
page_number: int = Query(1, description="Page number", ge=1),
order_by: str | None = Query(None, description="Field to order by"),
order_direction: str = Query("desc", description="Order direction (asc or desc)"),
):
"""List all registered agents."""
# Access use case from app.state (bypasses DI overhead)
agents_use_case = request.app.state.agents_use_case

# Get authorized IDs from middleware cache (bypasses DI overhead)
# None means no filtering (auth bypassed or disabled)
authorized_ids = request.state.authorized_resources.get(AgentexResourceType.agent)

agent_entities = await agents_use_case.list(
task_id=task_id,
limit=limit,
page_number=page_number,
order_by=order_by,
order_direction=order_direction,
**{"id": _authorized_ids} if _authorized_ids is not None else {},
**{"id": authorized_ids} if authorized_ids is not None else {},
)
return [Agent.model_validate(agent_entity) for agent_entity in agent_entities]

Expand Down
73 changes: 67 additions & 6 deletions agentex/src/config/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import os
from functools import lru_cache
from typing import Annotated

import httpx
Expand Down Expand Up @@ -280,12 +281,15 @@ def middleware_async_read_only_engine() -> AsyncEngine:
# DDatabaseAsyncReadOnlyEngine = Annotated[AsyncEngine, Depends(database_async_read_only_engine)]


def database_async_read_write_session_maker(
db_async_read_write_engine: DDatabaseAsyncReadWriteEngine,
) -> async_sessionmaker[AsyncSession]:
return async_sessionmaker(
autoflush=False, bind=db_async_read_write_engine, expire_on_commit=False
)
@lru_cache(maxsize=1)
def _get_cached_session_maker() -> async_sessionmaker[AsyncSession]:
"""Cached session maker - created once and reused for all requests."""
engine = database_async_read_write_engine()
return async_sessionmaker(autoflush=False, bind=engine, expire_on_commit=False)


def database_async_read_write_session_maker() -> async_sessionmaker[AsyncSession]:
return _get_cached_session_maker()


# def database_async_read_only_session_maker(
Expand All @@ -306,6 +310,63 @@ def middleware_async_read_only_session_maker() -> async_sessionmaker[AsyncSessio
]


# =============================================================================
# Cached Repository Factories
# =============================================================================
# These factories cache repository instances to avoid per-request instantiation.
# Repositories are stateless - they only hold a reference to the session maker.


@lru_cache(maxsize=1)
def _get_cached_agent_repository():
"""Cached AgentRepository instance."""
from src.domain.repositories.agent_repository import AgentRepository

return AgentRepository(_get_cached_session_maker())


@lru_cache(maxsize=1)
def _get_cached_task_repository():
"""Cached TaskRepository instance."""
from src.domain.repositories.task_repository import TaskRepository

return TaskRepository(_get_cached_session_maker())


@lru_cache(maxsize=1)
def _get_cached_deployment_history_repository():
"""Cached DeploymentHistoryRepository instance."""
from src.domain.repositories.deployment_history_repository import (
DeploymentHistoryRepository,
)

return DeploymentHistoryRepository(_get_cached_session_maker())


@lru_cache(maxsize=1)
def _get_cached_span_repository():
"""Cached SpanRepository instance."""
from src.domain.repositories.span_repository import SpanRepository

return SpanRepository(_get_cached_session_maker())


@lru_cache(maxsize=1)
def _get_cached_schedule_repository():
"""Cached ScheduleRepository instance."""
from src.domain.repositories.schedule_repository import ScheduleRepository

return ScheduleRepository(_get_cached_session_maker())


@lru_cache(maxsize=1)
def _get_cached_agent_api_keys_repository():
"""Cached AgentAPIKeysRepository instance."""
from src.domain.repositories.agent_api_keys_repository import AgentAPIKeysRepository

return AgentAPIKeysRepository(_get_cached_session_maker())


# DDatabaseAsyncReadOnlySessionMaker = Annotated[
# async_sessionmaker[AsyncSession], Depends(database_async_read_only_session_maker)
# ]
Expand Down
8 changes: 7 additions & 1 deletion agentex/src/domain/repositories/agent_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,10 @@ async def acquire_advisory_lock(
yield await session.scalar(select(func.pg_try_advisory_xact_lock(lock_key)))


DAgentRepository = Annotated[AgentRepository, Depends(AgentRepository)]
def _get_agent_repository() -> AgentRepository:
from src.config.dependencies import _get_cached_agent_repository

return _get_cached_agent_repository()


DAgentRepository = Annotated[AgentRepository, Depends(_get_agent_repository)]
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ async def create_from_agent(


# Type alias for dependency injection
def _get_deployment_history_repository() -> DeploymentHistoryRepository:
from src.config.dependencies import _get_cached_deployment_history_repository

return _get_cached_deployment_history_repository()


DDeploymentHistoryRepository = Annotated[
DeploymentHistoryRepository, Depends(DeploymentHistoryRepository)
DeploymentHistoryRepository, Depends(_get_deployment_history_repository)
]
8 changes: 7 additions & 1 deletion agentex/src/domain/repositories/task_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,10 @@ async def update(self, task: TaskEntity) -> TaskEntity:
return TaskEntity.model_validate(modified_orm)


DTaskRepository = Annotated[TaskRepository, Depends(TaskRepository)]
def _get_task_repository() -> TaskRepository:
from src.config.dependencies import _get_cached_task_repository

return _get_cached_task_repository()


DTaskRepository = Annotated[TaskRepository, Depends(_get_task_repository)]
24 changes: 23 additions & 1 deletion agentex/src/domain/use_cases/agents_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,26 @@ async def list(
)


DAgentsUseCase = Annotated[AgentsUseCase, Depends(AgentsUseCase)]
# Module-level cache for the use case singleton
_cached_agents_use_case: AgentsUseCase | None = None


async def _get_agents_use_case(
agent_repository: DAgentRepository,
deployment_history_repository: DDeploymentHistoryRepository,
temporal_adapter: DTemporalAdapter,
) -> AgentsUseCase:
"""Cached factory for AgentsUseCase to avoid per-request instantiation."""
global _cached_agents_use_case
if _cached_agents_use_case is not None:
return _cached_agents_use_case

_cached_agents_use_case = AgentsUseCase(
agent_repository=agent_repository,
deployment_history_repository=deployment_history_repository,
temporal_adapter=temporal_adapter,
)
return _cached_agents_use_case


DAgentsUseCase = Annotated[AgentsUseCase, Depends(_get_agents_use_case)]
Loading
Loading