Skip to content

Implement stats endpoint #84

@oto-macenauer-absa

Description

@oto-macenauer-absa

Background

The service stores the history of events in a Postgres database. Currently, there is no API endpoint to query and retrieve historical events based on filters, sorting, and pagination.

Feature

Add a /stats/{topic_name} endpoint that returns a paginated list of events for the specified topic based on provided query parameters.

API Specification

Endpoint: POST /stats/{topic_name}

Request Body:

{
  "filter": {
    "source_app": "unify",
    "timestamp_start_between": ["2025-01-01", "2025-01-31"],
    "run_id": 12345
  },
  "sort": ["-timestamp_start", "source_app"],
  "cursor": null,
  "limit": 50
}

Response (200 OK):

{
  "success": true,
  "statusCode": 200,
  "data": [
    {
      "id": "uuid-123",
      "topic": "public.cps.za.runs",
      "timestamp": "2025-01-15T10:30:00Z",
      "payload": { "..." }
    }
  ],
  "pagination": {
    "cursor": "eyJpZCI6MTIzLCJ0aW1lc3RhbXAiOiIyMDI1LTAxLTE1VDEwOjMwOjAwWiJ9",
    "has_more": true,
    "limit": 50
  }
}

Error Responses:

  • 401: Invalid or missing authentication token
  • 403: User not authorized for topic
  • 404: Topic not found
  • 400: Invalid filter parameters or validation error
  • 500: Database query error

Filter Capabilities

  • Equality filters: field_name: value
  • Range filters: field_name_between: [start, end]
  • In filters: field_name_in: [value1, value2, ...]
  • Support for nested JSON field filtering using dot notation (e.g., payload.source_app)

Sorting

  • Multiple sort fields supported
  • Prefix with - for descending order (e.g., -timestamp_start)
  • Default sort: ["-timestamp"]

Pagination

  • Cursor-based pagination for efficient queries
  • cursor: opaque token from previous response
  • limit: max records per page (default: 50, max: 1000)

Proposed Solution

Separate Stats Service (Recommended)

Why this approach:

  • Separation of concerns: Read operations are isolated from write operations, reducing risk of impacting event ingestion performance
  • Scalability: Analytics/reporting queries can scale independently without affecting write throughput
  • Security: Read-only database connection prevents accidental data modification
  • Future-proofing: Easy to add caching layer (Redis/ElastiCache) or move to read replicas later
  • Monitoring: Separate CloudWatch metrics and logs for analytics workload

Implementation Steps:

  1. Create new Lambda function: src/event_stats_lambda.py

    • Reuse authentication/authorization logic from event_gate_lambda.py
    • Import shared configuration and topic schemas
    • Implement dedicated query handler
  2. Add query builder module: src/query_builder.py

    • QueryBuilder class for safe SQL construction
    • Schema-based field validation against TOPICS
    • Parameterized query generation to prevent SQL injection
    • Cursor encoding/decoding logic
  3. Add read-only database interface: src/reader_postgres.py

    • Similar to writer_postgres.py but with read-only connection
    • Execute parameterized SELECT queries with filters
    • Return paginated results with cursor information
  4. Infrastructure changes:

    • New API Gateway resource: POST /stats/{topic_name}
    • New Lambda function deployment with separate IAM role
    • Configure read-only database credentials (separate from writer)
    • Set up CloudWatch alarms for query performance
  5. Shared configuration:

    • Reuse TOPICS, ACCESS, and JWT validation logic
    • Load from same S3 configuration bucket or local files
    • Maintain consistent authentication flow

Detailed Implementation

New file: src/event_stats_lambda.py

"""Event Stats Lambda - Read-only event history queries."""
import json
import logging
import os
from typing import Any, Dict

import jwt

# Import shared modules
try:
    from . import reader_postgres
    from . import query_builder
    from .conf_path import CONF_DIR
except ImportError:
    import reader_postgres
    import query_builder
    from conf_path import CONF_DIR

logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))

# Load same configuration as event_gate_lambda
with open(os.path.join(CONF_DIR, "config.json"), "r", encoding="utf-8") as file:
    CONFIG = json.load(file)

# Load topics and access (same as event_gate_lambda.py)
TOPICS = {}
ACCESS = {}
TOKEN_PUBLIC_KEY = None  # Load same way as event_gate_lambda

reader_postgres.init(logger, CONFIG)


def _error_response(status: int, err_type: str, message: str) -> Dict[str, Any]:
    """Build standardized error response."""
    return {
        "statusCode": status,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({
            "success": False,
            "statusCode": status,
            "errors": [{"type": err_type, "message": message}]
        })
    }


def get_topic_stats(topic_name: str, query: Dict[str, Any], token_encoded: str) -> Dict[str, Any]:
    """Query historical events with filters, sorting, and pagination.
    
    Args:
        topic_name: Target topic name.
        query: Query parameters (filter, sort, cursor, limit).
        token_encoded: Encoded bearer JWT token string.
    """
    logger.debug("Handling POST /stats/%s", topic_name)
    
    # Validate token (same as event_gate_lambda)
    try:
        token = jwt.decode(token_encoded, TOKEN_PUBLIC_KEY, algorithms=["RS256"])
    except jwt.PyJWTError:
        return _error_response(401, "auth", "Invalid or missing token")
    
    # Check topic exists
    if topic_name not in TOPICS:
        return _error_response(404, "topic", f"Topic '{topic_name}' not found")
    
    # Check authorization (same as event_gate_lambda)
    user = token.get("sub")
    if topic_name not in ACCESS or user not in ACCESS[topic_name]:
        return _error_response(403, "auth", "User not authorized for topic")
    
    # Validate query parameters
    filters = query.get("filter", {})
    sort = query.get("sort", ["-timestamp"])
    cursor = query.get("cursor")
    limit = min(query.get("limit", 50), 1000)
    
    try:
        # Build and execute query
        builder = query_builder.QueryBuilder(TOPICS[topic_name])
        data, next_cursor, has_more = reader_postgres.read_events(
            topic_name, filters, sort, cursor, limit, builder
        )
        
        return {
            "statusCode": 200,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({
                "success": True,
                "statusCode": 200,
                "data": data,
                "pagination": {
                    "cursor": next_cursor,
                    "has_more": has_more,
                    "limit": limit
                }
            })
        }
    except ValueError as exc:
        return _error_response(400, "validation", str(exc))
    except Exception as exc:
        logger.exception("Query execution failed")
        return _error_response(500, "database", "Query execution failed")


def extract_token(event_headers: Dict[str, str]) -> str:
    """Extract bearer token - same implementation as event_gate_lambda."""
    # Copy implementation from event_gate_lambda.extract_token()
    pass


def lambda_handler(event: Dict[str, Any], context: Any):
    """AWS Lambda entry point for stats queries."""
    try:
        resource = event.get("resource", "").lower()
        if resource == "/stats/{topic_name}":
            method = event.get("httpMethod")
            if method == "POST":
                return get_topic_stats(
                    event["pathParameters"]["topic_name"].lower(),
                    json.loads(event["body"]),
                    extract_token(event.get("headers", {}))
                )
        return _error_response(404, "route", "Resource not found")
    except Exception as exc:
        logger.error("Unexpected exception: %s", exc)
        return _error_response(500, "internal", "Unexpected server error")

New file: src/reader_postgres.py

"""Read-only PostgreSQL interface for event history queries."""
import json
import logging
from typing import Any, Dict, List, Tuple

import psycopg2
import psycopg2.extras

logger = None
connection = None


def init(log: logging.Logger, config: Dict[str, Any]) -> None:
    """Initialize read-only database connection."""
    global logger, connection
    logger = log
    
    # Use read-only credentials from config
    db_config = config.get("postgres_read", config.get("postgres"))
    connection = psycopg2.connect(
        host=db_config["host"],
        port=db_config["port"],
        database=db_config["database"],
        user=db_config["user"],
        password=db_config["password"]
    )
    connection.set_session(readonly=True, autocommit=True)


def read_events(
    topic_name: str,
    filters: Dict[str, Any],
    sort: List[str],
    cursor: str,
    limit: int,
    builder: Any
) -> Tuple[List[Dict[str, Any]], str, bool]:
    """Execute query and return paginated results.
    
    Returns:
        Tuple of (events_list, next_cursor, has_more)
    """
    sql_query, parameters = builder.build_select(topic_name, filters, sort, cursor, limit + 1)
    
    with connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
        cursor.execute(sql_query, parameters)
        rows = cursor.fetchall()
    
    has_more = len(rows) > limit
    events = rows[:limit]
    
    # Generate next cursor from last event
    next_cursor = builder.encode_cursor(events[-1]) if has_more else None
    
    return events, next_cursor, has_more

New file: src/query_builder.py

"""SQL query builder with schema validation and cursor pagination."""
import base64
import json
from typing import Any, Dict, List, Tuple


class QueryBuilder:
    """Build safe parameterized SQL queries with filter validation."""
    
    def __init__(self, topic_schema: Dict[str, Any]):
        self.topic_schema = topic_schema
        self.allowed_fields = self._extract_allowed_fields(topic_schema)
    
    def _extract_allowed_fields(self, schema: Dict[str, Any]) -> set:
        """Extract filterable fields from JSON schema."""
        # Parse schema properties and nested payload fields
        pass
    
    def build_select(
        self,
        topic_name: str,
        filters: Dict[str, Any],
        sort: List[str],
        cursor: str,
        limit: int
    ) -> Tuple[str, List[Any]]:
        """Build parameterized SELECT query.
        
        Returns:
            Tuple of (sql_query, parameters)
        """
        # Validate all filter fields exist in schema
        for field in filters.keys():
            if field not in self.allowed_fields:
                raise ValueError(f"Invalid filter field: {field}")
        
        # Build WHERE clause with parameter placeholders
        # Build ORDER BY clause
        # Apply cursor pagination
        # Return (sql_query, [param1, param2, ...])
        pass
    
    def encode_cursor(self, last_event: Dict[str, Any]) -> str:
        """Encode last event fields into base64 cursor."""
        cursor_data = {
            "id": last_event["id"],
            "timestamp": last_event["timestamp"]
        }
        return base64.b64encode(json.dumps(cursor_data).encode()).decode()
    
    def decode_cursor(self, cursor: str) -> Dict[str, Any]:
        """Decode cursor back to field values."""
        return json.loads(base64.b64decode(cursor).decode())

Infrastructure Requirements

AWS Lambda Configuration:

  • Function name: event-stats-lambda
  • Runtime: Python 3.11 (same as event_gate_lambda)
  • Memory: 512 MB (adjust based on query complexity)
  • Timeout: 30 seconds
  • Environment variables:
    • LOG_LEVEL=INFO
    • CONF_DIR=/var/task/conf (or S3 path)

IAM Role Permissions:

  • Read access to configuration S3 bucket
  • CloudWatch Logs write permissions
  • VPC execution role (if database in VPC)

API Gateway:

  • New resource: /stats/{topic_name}
  • Method: POST
  • Lambda proxy integration
  • Same authorizer as /topics/{topic_name}

Database Configuration:

{
  "postgres_read": {
    "host": "your-db-read-replica.rds.amazonaws.com",
    "port": 5432,
    "database": "events",
    "user": "reader_user",
    "password": "read_only_password"
  }
}

Security Considerations

  • Separate read-only database user with SELECT-only privileges
  • Validate all filter fields against topic schemas from TOPICS
  • Use parameterized queries exclusively (prevent SQL injection)
  • Apply same ACL checks as write operations (user must be in ACCESS[topic_name])
  • Validate JWT token using existing TOKEN_PUBLIC_KEY
  • Limit maximum query result size (max limit: 1000)
  • Consider rate limiting for expensive queries
  • Set read query timeout at database level

Testing Requirements

  • Unit tests for QueryBuilder with various filter combinations
  • Unit tests for cursor encoding/decoding
  • Integration tests with test Postgres database
  • Authorization tests (401, 403 scenarios using existing ACCESS config)
  • Pagination tests (cursor handling, boundary conditions, empty results)
  • Performance tests with large datasets (10k+ events)
  • Validate filter fields against topic schemas from TOPICS
  • Test query timeout handling
  • Test concurrent query execution

Migration Path

  1. Deploy stats Lambda alongside existing event_gate_lambda
  2. Update API Gateway with new /stats/{topic_name} resource
  3. Configure read-only database credentials
  4. Test thoroughly in non-production environment
  5. Monitor query performance and adjust timeouts/memory
  6. Consider adding read replica if query load increases

Future Enhancements

  • Add Redis/ElastiCache for cursor caching
  • Implement query result caching for common filters
  • Add aggregation endpoints (count, groupBy)
  • Support full-text search on payload fields
  • Add query cost estimation and limiting

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions