Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions example_config.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,36 @@
---
# =============================================================================
# vCon Server Configuration
# =============================================================================
#
# WORKER CONFIGURATION (via environment variables):
#
# CONSERVER_WORKERS: Number of worker processes (default: 1)
# - Set to 1 for single-threaded mode (original behavior)
# - Set to 2+ for multi-worker mode (parallel vCon processing)
# - Each worker independently consumes from Redis queues
# - Redis BLPOP provides atomic work distribution
# Example: CONSERVER_WORKERS=4
#
# CONSERVER_PARALLEL_STORAGE: Enable parallel storage writes (default: true)
# - When true, writes to multiple storage backends happen concurrently
# - Improves throughput when using multiple storage backends (e.g., S3 + MongoDB)
# - Set to "false" to use sequential storage writes
# Example: CONSERVER_PARALLEL_STORAGE=true
#
# CONSERVER_START_METHOD: Multiprocessing start method (default: platform default)
# - "fork": Copy-on-write memory sharing, fastest startup (Unix only)
# - Lower memory usage due to copy-on-write
# - Can cause issues with threads and some libraries (OpenSSL, CUDA)
# - "spawn": Fresh Python interpreter per worker
# - Higher memory but safer and more predictable
# - Required on Windows, recommended for macOS
# - "forkserver": Hybrid approach using a clean forked server process
# - "" or unset: Use Python's platform default
# Example: CONSERVER_START_METHOD=spawn
#
# =============================================================================

# Ingress-specific API key authentication
# This section defines API keys that are authorized to push vCons to specific ingress lists
# Each key grants access only to the specified ingress list, providing secure isolation
Expand Down
28 changes: 23 additions & 5 deletions prod_mgt/02_CORE_FUNCTIONALITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,25 @@ The vCon (Voice Conversation) is the fundamental data structure that represents
### Chain Architecture
- **Ingress Lists**: Entry points for vCons
- **Processing Links**: Sequential processing steps
- **Storage Operations**: Persistence to backends
- **Storage Operations**: Persistence to backends (with parallel write support)
- **Egress Lists**: Output queues for processed vCons

### Multi-Worker Architecture
The server supports parallel vCon processing through multiple worker processes:

- **Single Worker Mode** (default): Traditional single-threaded processing
- **Multi-Worker Mode**: Multiple processes consuming from Redis queues concurrently
- **Atomic Work Distribution**: Redis BLPOP ensures each vCon is processed by exactly one worker
- **Graceful Shutdown**: Workers complete current vCon before exiting on SIGTERM/SIGINT

### Parallel Storage Operations
When multiple storage backends are configured, writes can execute concurrently:

- **ThreadPoolExecutor**: Concurrent writes to all storage backends
- **Independent Failures**: One backend failure doesn't block others
- **Significant Speedup**: 3-4x faster with multiple I/O-bound backends
- **Configurable**: Can be disabled for sequential writes if needed

### Chain Configuration Example
```yaml
chains:
Expand All @@ -91,6 +107,7 @@ chains:
storages:
- postgres # Primary storage
- s3 # Backup storage
- milvus # Vector storage (all written in parallel)
ingress_lists:
- main_ingress
egress_lists:
Expand All @@ -100,10 +117,11 @@ chains:

### Processing Flow
1. vCon enters via ingress list
2. Each link processes sequentially
3. Storage operations execute
4. vCon moves to egress lists
5. Dead letter queue for failures
2. Worker atomically pops vCon from queue (BLPOP)
3. Each link processes sequentially within the worker
4. Storage operations execute (parallel or sequential based on config)
5. vCon moves to egress lists
6. Dead letter queue for failures

## API Functionality

Expand Down
74 changes: 65 additions & 9 deletions prod_mgt/06_DEPLOYMENT_OPERATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,32 @@ GROQ_API_KEY=gsk-key
VCON_REDIS_EXPIRY=3600
VCON_INDEX_EXPIRY=86400
TICK_INTERVAL=5000

# Worker configuration (parallel processing)
CONSERVER_WORKERS=4 # Number of worker processes (default: 1)
CONSERVER_PARALLEL_STORAGE=true # Enable parallel storage writes (default: true)
CONSERVER_START_METHOD=fork # Multiprocessing method: fork, spawn, forkserver (default: platform)
```

### Worker Configuration Details

#### CONSERVER_WORKERS
- **Default**: 1 (single-threaded mode)
- **Recommended**: Number of CPU cores for I/O-bound workloads
- Workers atomically consume from Redis queues via BLPOP
- Each worker processes vCons independently

#### CONSERVER_PARALLEL_STORAGE
- **Default**: true (enabled)
- When multiple storage backends configured, writes execute concurrently
- Set to "false" for sequential storage writes

#### CONSERVER_START_METHOD
- **"fork"**: Copy-on-write memory sharing (Unix only, fastest startup)
- **"spawn"**: Fresh Python interpreter per worker (safer, higher memory)
- **"forkserver"**: Hybrid approach using a clean forked server
- **Empty/unset**: Use platform default (fork on Unix, spawn on Windows/macOS)

### Configuration File Structure
```yaml
# config.yml
Expand All @@ -167,31 +191,63 @@ chains:
## Scaling Strategies

### Horizontal Scaling
1. **Stateless Workers**: Scale conserver instances
2. **Queue Distribution**: Redis-based load balancing
3. **Storage Scaling**: Distributed storage backends
4. **Cache Scaling**: Redis cluster mode
1. **Multi-Worker Mode**: Scale workers within a single instance via CONSERVER_WORKERS
2. **Multi-Instance**: Scale conserver instances across hosts
3. **Queue Distribution**: Redis BLPOP provides atomic load balancing
4. **Storage Scaling**: Distributed storage backends
5. **Cache Scaling**: Redis cluster mode

### Vertical Scaling
1. **Resource Allocation**: CPU and memory limits
2. **Connection Pooling**: Database connections
3. **Batch Processing**: Larger batch sizes
4. **Caching**: Increase cache sizes
2. **Worker Count**: Increase CONSERVER_WORKERS for more parallelism
3. **Connection Pooling**: Database connections
4. **Parallel Storage**: Enable concurrent storage writes
5. **Caching**: Increase cache sizes

### Performance Optimization
```bash
# High-throughput configuration
CONSERVER_WORKERS=8 # 8 parallel workers
CONSERVER_PARALLEL_STORAGE=true # Concurrent storage writes
CONSERVER_START_METHOD=fork # Memory-efficient on Unix
```

```yaml
# Optimized configuration
# Optimized chain configuration
chains:
high_performance:
links:
- sampler: # Process 10% sample
rate: 0.1
- deepgram_link:
batch_size: 50
workers: 8
storages:
- postgres # All written in parallel
- s3
- milvus
timeout: 300
```

### Memory Optimization for Multi-Worker

When running multiple workers, memory management is important:

| Start Method | Memory Usage | Best For |
|-------------|--------------|----------|
| fork | Lower (copy-on-write) | Unix servers with stable libraries |
| spawn | Higher (fresh interpreter) | macOS, Windows, or when using CUDA/OpenSSL |
| forkserver | Medium (clean fork) | Balance of memory and safety |

```bash
# Memory-efficient configuration (Unix)
CONSERVER_WORKERS=4
CONSERVER_START_METHOD=fork

# Safe configuration (any platform)
CONSERVER_WORKERS=4
CONSERVER_START_METHOD=spawn
```

## Monitoring & Observability

### Metrics Collection
Expand Down
109 changes: 84 additions & 25 deletions prod_mgt/09_ARCHITECTURE_DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,33 @@
└────────┬────────┘
┌─────────────────────────────────────────────────────────────┐
│ Processing Pipeline │
│ ┌─────────────┐ ┌─────────────┐ ┌──────────────────┐ │
│ │Chain Manager│ │Link Executor│ │Storage Dispatcher│ │
│ └─────────────┘ └─────────────┘ └──────────────────┘ │
└────────┬───────────────┬───────────────────┬───────────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────────┐ ┌──────────────────┐
│Redis Queue │ │ Link Modules │ │Storage Adapters │
│ & Cache │ │ ┌───────────┐ │ │ ┌──────────────┐ │
│┌───────────┐│ │ │Deepgram │ │ │ │ PostgreSQL │ │
││ Ingress ││ │ │Analyze │ │ │ │ S3 │ │
││ Egress ││ │ │Webhook │ │ │ │ Elasticsearch│ │
││ DLQ ││ │ │Tag Router │ │ │ │ Milvus │ │
│└───────────┘│ │ └───────────┘ │ │ └──────────────┘ │
└─────────────┘ └─────────────────┘ └──────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Processing Pipeline (Multi-Worker) │
│ ┌─────────────┐ ┌─────────────┐ ┌──────────────────────────┐ │
│ │Chain Manager│ │Link Executor│ │Storage Dispatcher │ │
│ └─────────────┘ └─────────────┘ │(ThreadPoolExecutor) │ │
│ └──────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Worker Processes (CONSERVER_WORKERS) │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Worker-1 │ │Worker-2 │ │Worker-3 │ │Worker-N │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ └────────────┴────────────┴────────────┘ │ │
│ │ │ BLPOP (atomic) │ │
│ └─────────────────────────┼───────────────────────────────────┘ │
└────────────────────────────┼──────────────────────────────────────┘
┌───────────────────┴───────────────────┐
▼ ▼
┌─────────────┐ ┌─────────────────┐ ┌──────────────────────────┐
│Redis Queue │ │ Link Modules │ │Storage Adapters │
│ & Cache │ │ ┌───────────┐ │ │(Parallel Writes) │
│┌───────────┐│ │ │Deepgram │ │ │ ┌────────┬────────────┐ │
││ Ingress ││ │ │Analyze │ │ │ │Postgres│ S3 │ │
││ Egress ││ │ │Webhook │ │ │ ├────────┼────────────┤ │
││ DLQ ││ │ │Tag Router │ │ │ │Elastic │ Milvus │ │
│└───────────┘│ │ └───────────┘ │ │ └────────┴────────────┘ │
└─────────────┘ └─────────────────┘ └──────────────────────────┘
```

### Component Architecture
Expand Down Expand Up @@ -133,17 +143,66 @@ Processed vCon → Egress Queue → External Systems

## Scalability Design

### Multi-Worker Processing
The server supports multiple parallel worker processes for improved throughput:

```
┌─────────────────────────────────────────────────────────┐
│ Main Process │
│ - Spawns N worker processes (CONSERVER_WORKERS) │
│ - Monitors worker health, restarts on failure │
│ - Handles graceful shutdown (SIGTERM/SIGINT) │
└────────────────────────┬────────────────────────────────┘
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Worker-1 │ │Worker-2 │ │Worker-N │
│ (PID A) │ │ (PID B) │ │ (PID N) │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└──────────────────┴──────────────────┘
▼ BLPOP (atomic)
┌──────────────┐
│ Redis Queues │
└──────────────┘
```

**Key Features:**
- **Atomic Distribution**: Redis BLPOP ensures each vCon is processed by exactly one worker
- **Process Isolation**: Worker crashes don't affect other workers
- **Graceful Shutdown**: Workers complete current vCon before exiting
- **Auto-Restart**: Main process restarts failed workers

### Parallel Storage Operations
When multiple storage backends are configured, writes execute concurrently:

```python
# Sequential (old): ~400ms for 4 backends @ 100ms each
for storage in storages:
storage.save(vcon)

# Parallel (new): ~100ms for 4 backends @ 100ms each
with ThreadPoolExecutor(max_workers=len(storages)) as executor:
futures = [executor.submit(s.save, vcon) for s in storages]
for future in as_completed(futures):
future.result()
```

### Horizontal Scaling
1. **Stateless Workers**: No session state
2. **Queue-Based Distribution**: Redis list operations
3. **Shared Nothing Architecture**: Independent instances
4. **Load Balancing**: Round-robin or least-connections
1. **Multi-Worker Mode**: Scale within instance via CONSERVER_WORKERS
2. **Multi-Instance**: Scale instances across hosts
3. **Queue-Based Distribution**: Redis BLPOP for atomic work distribution
4. **Shared Nothing Architecture**: Independent worker processes
5. **Load Balancing**: Redis queues provide natural load balancing

### Vertical Scaling
1. **Resource Pooling**: Connection pools
2. **Batch Processing**: Bulk operations
3. **Caching Strategy**: Multi-tier caching
4. **Async Operations**: Non-blocking I/O
1. **Worker Count**: Increase CONSERVER_WORKERS for more parallelism
2. **Resource Pooling**: Connection pools per worker
3. **Parallel Storage**: Concurrent writes to multiple backends
4. **Caching Strategy**: Multi-tier caching
5. **Start Method Selection**: Choose fork/spawn based on memory needs

### Database Design

Expand Down
10 changes: 9 additions & 1 deletion prod_mgt/10_ROADMAP_FUTURE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Roadmap & Future Development

## Current State (v1.0)
## Current State (v1.1)

### Established Features
- ✅ Core vCon data model implementation
Expand All @@ -13,6 +13,14 @@
- ✅ Dead letter queue handling
- ✅ Search functionality

### New in v1.1: Parallel Processing
- ✅ **Multi-Worker Support**: Configurable worker processes (CONSERVER_WORKERS)
- ✅ **Parallel Storage Writes**: Concurrent storage operations (CONSERVER_PARALLEL_STORAGE)
- ✅ **Configurable Start Method**: fork/spawn/forkserver selection (CONSERVER_START_METHOD)
- ✅ **Graceful Shutdown**: Workers complete current vCon on SIGTERM/SIGINT
- ✅ **Worker Auto-Restart**: Main process monitors and restarts failed workers
- ✅ **Memory Optimization**: Deferred module imports for spawn start method

## Short-Term Roadmap (Q1-Q2 2024)

### Performance Enhancements
Expand Down
18 changes: 14 additions & 4 deletions prod_mgt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ Product roadmap, planned features, community requests, and long-term vision for

### Core Capabilities
- 🎯 **Flexible Processing Pipeline**: Chain-based architecture with modular components
- 🗄️ **Multi-Storage Support**: 10+ backend storage options
- 🗄️ **Multi-Storage Support**: 10+ backend storage options with parallel write support
- 🤖 **AI Integration**: Built-in support for transcription and analysis
- 🔍 **Advanced Search**: Query by phone, email, name, and metadata
- 📊 **Scalable Architecture**: Horizontal and vertical scaling options
- 📊 **Scalable Architecture**: Multi-worker processing with configurable parallelism
- ⚡ **High Performance**: Parallel storage writes and multi-process vCon processing

### Processing Links
- **Transcription**: Deepgram, Whisper (Groq/Hugging Face)
Expand All @@ -65,6 +66,14 @@ Product roadmap, planned features, community requests, and long-term vision for

## Configuration Examples

### Worker Configuration (Environment Variables)
```bash
# Enable parallel processing
CONSERVER_WORKERS=4 # Run 4 worker processes
CONSERVER_PARALLEL_STORAGE=true # Parallel writes to storage backends
CONSERVER_START_METHOD=fork # Memory-efficient forking (Unix)
```

### Basic Pipeline
```yaml
chains:
Expand All @@ -77,7 +86,7 @@ chains:
- postgres # Store
```

### Advanced Pipeline
### Advanced Pipeline with Parallel Storage
```yaml
chains:
advanced_chain:
Expand All @@ -93,10 +102,11 @@ chains:
rules:
- sentiment: positive
queue: satisfied_customers
storages:
storages: # All written in parallel
- postgres # Primary
- s3 # Archive
- elasticsearch # Search
- milvus # Vector search
```

## Support & Resources
Expand Down
Loading
Loading