-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Background
The service stores the history of events in a Postgres database. Currently, there is no API endpoint to query and retrieve historical events based on filters, sorting, and pagination.
Feature
Add a /stats/{topic_name} endpoint that returns a paginated list of events for the specified topic based on provided query parameters.
API Specification
Endpoint: POST /stats/{topic_name}
Request Body:
{
"filter": {
"source_app": "unify",
"timestamp_start_between": ["2025-01-01", "2025-01-31"],
"run_id": 12345
},
"sort": ["-timestamp_start", "source_app"],
"cursor": null,
"limit": 50
}Response (200 OK):
{
"success": true,
"statusCode": 200,
"data": [
{
"id": "uuid-123",
"topic": "public.cps.za.runs",
"timestamp": "2025-01-15T10:30:00Z",
"payload": { "..." }
}
],
"pagination": {
"cursor": "eyJpZCI6MTIzLCJ0aW1lc3RhbXAiOiIyMDI1LTAxLTE1VDEwOjMwOjAwWiJ9",
"has_more": true,
"limit": 50
}
}Error Responses:
401: Invalid or missing authentication token403: User not authorized for topic404: Topic not found400: Invalid filter parameters or validation error500: Database query error
Filter Capabilities
- Equality filters:
field_name: value - Range filters:
field_name_between: [start, end] - In filters:
field_name_in: [value1, value2, ...] - Support for nested JSON field filtering using dot notation (e.g.,
payload.source_app)
Sorting
- Multiple sort fields supported
- Prefix with
-for descending order (e.g.,-timestamp_start) - Default sort:
["-timestamp"]
Pagination
- Cursor-based pagination for efficient queries
cursor: opaque token from previous responselimit: max records per page (default: 50, max: 1000)
Proposed Solution
Separate Stats Service (Recommended)
Why this approach:
- Separation of concerns: Read operations are isolated from write operations, reducing risk of impacting event ingestion performance
- Scalability: Analytics/reporting queries can scale independently without affecting write throughput
- Security: Read-only database connection prevents accidental data modification
- Future-proofing: Easy to add caching layer (Redis/ElastiCache) or move to read replicas later
- Monitoring: Separate CloudWatch metrics and logs for analytics workload
Implementation Steps:
-
Create new Lambda function:
src/event_stats_lambda.py- Reuse authentication/authorization logic from
event_gate_lambda.py - Import shared configuration and topic schemas
- Implement dedicated query handler
- Reuse authentication/authorization logic from
-
Add query builder module:
src/query_builder.pyQueryBuilderclass for safe SQL construction- Schema-based field validation against
TOPICS - Parameterized query generation to prevent SQL injection
- Cursor encoding/decoding logic
-
Add read-only database interface:
src/reader_postgres.py- Similar to
writer_postgres.pybut with read-only connection - Execute parameterized SELECT queries with filters
- Return paginated results with cursor information
- Similar to
-
Infrastructure changes:
- New API Gateway resource:
POST /stats/{topic_name} - New Lambda function deployment with separate IAM role
- Configure read-only database credentials (separate from writer)
- Set up CloudWatch alarms for query performance
- New API Gateway resource:
-
Shared configuration:
- Reuse
TOPICS,ACCESS, and JWT validation logic - Load from same S3 configuration bucket or local files
- Maintain consistent authentication flow
- Reuse
Detailed Implementation
New file: src/event_stats_lambda.py
"""Event Stats Lambda - Read-only event history queries."""
import json
import logging
import os
from typing import Any, Dict
import jwt
# Import shared modules
try:
from . import reader_postgres
from . import query_builder
from .conf_path import CONF_DIR
except ImportError:
import reader_postgres
import query_builder
from conf_path import CONF_DIR
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
# Load same configuration as event_gate_lambda
with open(os.path.join(CONF_DIR, "config.json"), "r", encoding="utf-8") as file:
CONFIG = json.load(file)
# Load topics and access (same as event_gate_lambda.py)
TOPICS = {}
ACCESS = {}
TOKEN_PUBLIC_KEY = None # Load same way as event_gate_lambda
reader_postgres.init(logger, CONFIG)
def _error_response(status: int, err_type: str, message: str) -> Dict[str, Any]:
"""Build standardized error response."""
return {
"statusCode": status,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({
"success": False,
"statusCode": status,
"errors": [{"type": err_type, "message": message}]
})
}
def get_topic_stats(topic_name: str, query: Dict[str, Any], token_encoded: str) -> Dict[str, Any]:
"""Query historical events with filters, sorting, and pagination.
Args:
topic_name: Target topic name.
query: Query parameters (filter, sort, cursor, limit).
token_encoded: Encoded bearer JWT token string.
"""
logger.debug("Handling POST /stats/%s", topic_name)
# Validate token (same as event_gate_lambda)
try:
token = jwt.decode(token_encoded, TOKEN_PUBLIC_KEY, algorithms=["RS256"])
except jwt.PyJWTError:
return _error_response(401, "auth", "Invalid or missing token")
# Check topic exists
if topic_name not in TOPICS:
return _error_response(404, "topic", f"Topic '{topic_name}' not found")
# Check authorization (same as event_gate_lambda)
user = token.get("sub")
if topic_name not in ACCESS or user not in ACCESS[topic_name]:
return _error_response(403, "auth", "User not authorized for topic")
# Validate query parameters
filters = query.get("filter", {})
sort = query.get("sort", ["-timestamp"])
cursor = query.get("cursor")
limit = min(query.get("limit", 50), 1000)
try:
# Build and execute query
builder = query_builder.QueryBuilder(TOPICS[topic_name])
data, next_cursor, has_more = reader_postgres.read_events(
topic_name, filters, sort, cursor, limit, builder
)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({
"success": True,
"statusCode": 200,
"data": data,
"pagination": {
"cursor": next_cursor,
"has_more": has_more,
"limit": limit
}
})
}
except ValueError as exc:
return _error_response(400, "validation", str(exc))
except Exception as exc:
logger.exception("Query execution failed")
return _error_response(500, "database", "Query execution failed")
def extract_token(event_headers: Dict[str, str]) -> str:
"""Extract bearer token - same implementation as event_gate_lambda."""
# Copy implementation from event_gate_lambda.extract_token()
pass
def lambda_handler(event: Dict[str, Any], context: Any):
"""AWS Lambda entry point for stats queries."""
try:
resource = event.get("resource", "").lower()
if resource == "/stats/{topic_name}":
method = event.get("httpMethod")
if method == "POST":
return get_topic_stats(
event["pathParameters"]["topic_name"].lower(),
json.loads(event["body"]),
extract_token(event.get("headers", {}))
)
return _error_response(404, "route", "Resource not found")
except Exception as exc:
logger.error("Unexpected exception: %s", exc)
return _error_response(500, "internal", "Unexpected server error")New file: src/reader_postgres.py
"""Read-only PostgreSQL interface for event history queries."""
import json
import logging
from typing import Any, Dict, List, Tuple
import psycopg2
import psycopg2.extras
logger = None
connection = None
def init(log: logging.Logger, config: Dict[str, Any]) -> None:
"""Initialize read-only database connection."""
global logger, connection
logger = log
# Use read-only credentials from config
db_config = config.get("postgres_read", config.get("postgres"))
connection = psycopg2.connect(
host=db_config["host"],
port=db_config["port"],
database=db_config["database"],
user=db_config["user"],
password=db_config["password"]
)
connection.set_session(readonly=True, autocommit=True)
def read_events(
topic_name: str,
filters: Dict[str, Any],
sort: List[str],
cursor: str,
limit: int,
builder: Any
) -> Tuple[List[Dict[str, Any]], str, bool]:
"""Execute query and return paginated results.
Returns:
Tuple of (events_list, next_cursor, has_more)
"""
sql_query, parameters = builder.build_select(topic_name, filters, sort, cursor, limit + 1)
with connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
cursor.execute(sql_query, parameters)
rows = cursor.fetchall()
has_more = len(rows) > limit
events = rows[:limit]
# Generate next cursor from last event
next_cursor = builder.encode_cursor(events[-1]) if has_more else None
return events, next_cursor, has_moreNew file: src/query_builder.py
"""SQL query builder with schema validation and cursor pagination."""
import base64
import json
from typing import Any, Dict, List, Tuple
class QueryBuilder:
"""Build safe parameterized SQL queries with filter validation."""
def __init__(self, topic_schema: Dict[str, Any]):
self.topic_schema = topic_schema
self.allowed_fields = self._extract_allowed_fields(topic_schema)
def _extract_allowed_fields(self, schema: Dict[str, Any]) -> set:
"""Extract filterable fields from JSON schema."""
# Parse schema properties and nested payload fields
pass
def build_select(
self,
topic_name: str,
filters: Dict[str, Any],
sort: List[str],
cursor: str,
limit: int
) -> Tuple[str, List[Any]]:
"""Build parameterized SELECT query.
Returns:
Tuple of (sql_query, parameters)
"""
# Validate all filter fields exist in schema
for field in filters.keys():
if field not in self.allowed_fields:
raise ValueError(f"Invalid filter field: {field}")
# Build WHERE clause with parameter placeholders
# Build ORDER BY clause
# Apply cursor pagination
# Return (sql_query, [param1, param2, ...])
pass
def encode_cursor(self, last_event: Dict[str, Any]) -> str:
"""Encode last event fields into base64 cursor."""
cursor_data = {
"id": last_event["id"],
"timestamp": last_event["timestamp"]
}
return base64.b64encode(json.dumps(cursor_data).encode()).decode()
def decode_cursor(self, cursor: str) -> Dict[str, Any]:
"""Decode cursor back to field values."""
return json.loads(base64.b64decode(cursor).decode())Infrastructure Requirements
AWS Lambda Configuration:
- Function name:
event-stats-lambda - Runtime: Python 3.11 (same as event_gate_lambda)
- Memory: 512 MB (adjust based on query complexity)
- Timeout: 30 seconds
- Environment variables:
LOG_LEVEL=INFOCONF_DIR=/var/task/conf(or S3 path)
IAM Role Permissions:
- Read access to configuration S3 bucket
- CloudWatch Logs write permissions
- VPC execution role (if database in VPC)
API Gateway:
- New resource:
/stats/{topic_name} - Method:
POST - Lambda proxy integration
- Same authorizer as
/topics/{topic_name}
Database Configuration:
{
"postgres_read": {
"host": "your-db-read-replica.rds.amazonaws.com",
"port": 5432,
"database": "events",
"user": "reader_user",
"password": "read_only_password"
}
}Security Considerations
- Separate read-only database user with SELECT-only privileges
- Validate all filter fields against topic schemas from
TOPICS - Use parameterized queries exclusively (prevent SQL injection)
- Apply same ACL checks as write operations (user must be in
ACCESS[topic_name]) - Validate JWT token using existing
TOKEN_PUBLIC_KEY - Limit maximum query result size (max
limit: 1000) - Consider rate limiting for expensive queries
- Set read query timeout at database level
Testing Requirements
- Unit tests for
QueryBuilderwith various filter combinations - Unit tests for cursor encoding/decoding
- Integration tests with test Postgres database
- Authorization tests (401, 403 scenarios using existing
ACCESSconfig) - Pagination tests (cursor handling, boundary conditions, empty results)
- Performance tests with large datasets (10k+ events)
- Validate filter fields against topic schemas from
TOPICS - Test query timeout handling
- Test concurrent query execution
Migration Path
- Deploy stats Lambda alongside existing event_gate_lambda
- Update API Gateway with new
/stats/{topic_name}resource - Configure read-only database credentials
- Test thoroughly in non-production environment
- Monitor query performance and adjust timeouts/memory
- Consider adding read replica if query load increases
Future Enhancements
- Add Redis/ElastiCache for cursor caching
- Implement query result caching for common filters
- Add aggregation endpoints (count, groupBy)
- Support full-text search on payload fields
- Add query cost estimation and limiting