diff --git a/.gitignore b/.gitignore index be0c739..063361a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .vscode .idea +.claude # Binaries for programs and plugins *.exe diff --git a/.golangci.yml b/.golangci.yml index b3863f0..5903fa7 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -53,6 +53,12 @@ linters: # - wsl linters-settings: + depguard: + rules: + main: + allow: + - $gostd + - github.com/databricks/databricks-sql-go gosec: exclude-generated: true severity: "low" diff --git a/telemetry/DESIGN.md b/telemetry/DESIGN.md index e1509c9..157a16a 100644 --- a/telemetry/DESIGN.md +++ b/telemetry/DESIGN.md @@ -4,7 +4,7 @@ This document outlines a **telemetry design** for the Databricks SQL Go driver that collects usage metrics and exports them to the Databricks telemetry service. The design leverages Go's `context.Context` and middleware patterns to instrument driver operations without impacting performance. -**Important Note:** Telemetry is **disabled by default** and will be enabled only after full testing and validation is complete. +**Important Note:** Telemetry is **disabled by default** and requires explicit opt-in. A gradual rollout strategy will be used to ensure reliability and user control. **Key Objectives:** - Collect driver usage metrics and performance data @@ -14,9 +14,10 @@ This document outlines a **telemetry design** for the Databricks SQL Go driver t - Follow Go best practices and idiomatic patterns **Design Principles:** +- **Opt-in first**: User explicit consent required, disabled by default - **Non-blocking**: All operations async using goroutines - **Privacy-first**: No PII or query data collected -- **Server-controlled**: Feature flag support for enable/disable +- **Server-controlled**: Feature flag support for gradual rollout - **Fail-safe**: All telemetry errors swallowed silently - **Idiomatic Go**: Use standard library patterns and interfaces @@ -44,12 +45,17 @@ This document outlines a **telemetry design** for the Databricks SQL Go driver t 4. [Data Collection](#4-data-collection) 5. [Export Mechanism](#5-export-mechanism) 6. [Configuration](#6-configuration) + - 6.1 [Configuration Structure](#61-configuration-structure) + - 6.2 [Configuration from DSN](#62-configuration-from-dsn) + - 6.3 [Feature Flag Integration](#63-feature-flag-integration) + - 6.4 [Opt-In Control & Priority](#64-opt-in-control--priority) 7. [Privacy & Compliance](#7-privacy--compliance) 8. [Error Handling](#8-error-handling) 9. [Graceful Shutdown](#9-graceful-shutdown) 10. [Testing Strategy](#10-testing-strategy) -11. [Implementation Checklist](#11-implementation-checklist) -12. [References](#12-references) +11. [Partial Launch Strategy](#11-partial-launch-strategy) +12. [Implementation Checklist](#12-implementation-checklist) +13. [References](#13-references) --- @@ -373,7 +379,7 @@ func (m *clientManager) releaseClient(host string) error { ### 3.3 circuitBreaker -**Purpose**: Implement circuit breaker pattern to protect against failing telemetry endpoint. +**Purpose**: Implement circuit breaker pattern to protect against failing telemetry endpoint using a sliding window and failure rate percentage algorithm (matching JDBC's Resilience4j implementation). **Location**: `telemetry/circuitbreaker.go` @@ -382,11 +388,28 @@ func (m *clientManager) releaseClient(host string) error { - **Not just rate limiting**: Protects against 5xx errors, timeouts, network failures - **Resource efficiency**: Prevents wasting resources on a failing endpoint - **Auto-recovery**: Automatically detects when endpoint becomes healthy again +- **JDBC alignment**: Uses sliding window with failure rate percentage, matching JDBC driver behavior exactly + +#### Algorithm: Sliding Window with Failure Rate +The circuit breaker tracks recent calls in a **sliding window** (ring buffer) and calculates the **failure rate percentage**: +- Tracks the last N calls (default: 30) +- Opens circuit when failure rate >= threshold (default: 50%) +- Requires minimum calls before evaluation (default: 20) +- Uses percentage-based evaluation instead of consecutive failures + +**Example**: With 30 calls in window, if 15 or more fail (50%), circuit opens. This is more robust than consecutive-failure counting as it considers overall reliability. #### States 1. **Closed**: Normal operation, requests pass through -2. **Open**: After threshold failures, all requests rejected immediately (drop events) -3. **Half-Open**: After timeout, allows test requests to check if endpoint recovered +2. **Open**: After failure rate exceeds threshold, all requests rejected immediately (drop events) +3. **Half-Open**: After wait duration, allows test requests to check if endpoint recovered + +#### Configuration (matching JDBC defaults) +- **failureRateThreshold**: 50% - Opens circuit if failure rate >= 50% +- **minimumNumberOfCalls**: 20 - Minimum calls before evaluating failure rate +- **slidingWindowSize**: 30 - Track last 30 calls in sliding window +- **waitDurationInOpenState**: 30s - Wait before transitioning to half-open +- **permittedCallsInHalfOpen**: 3 - Test with 3 successful calls before closing #### Interface @@ -410,32 +433,53 @@ const ( stateHalfOpen ) +// callResult represents the result of a call (success or failure). +type callResult bool + +const ( + callSuccess callResult = true + callFailure callResult = false +) + // circuitBreaker implements the circuit breaker pattern. +// It protects against failing telemetry endpoints by tracking failures +// using a sliding window and failure rate percentage. type circuitBreaker struct { mu sync.RWMutex state atomic.Int32 // circuitState - failureCount int - successCount int - lastFailTime time.Time lastStateTime time.Time + // Sliding window for tracking calls + window []callResult + windowIndex int + windowFilled bool + totalCalls int + failureCount int + + // Half-open state tracking + halfOpenSuccesses int + config circuitBreakerConfig } // circuitBreakerConfig holds circuit breaker configuration. type circuitBreakerConfig struct { - failureThreshold int // Open after N failures - successThreshold int // Close after N successes in half-open - timeout time.Duration // Try again after timeout + failureRateThreshold int // Open if failure rate >= this percentage (0-100) + minimumNumberOfCalls int // Minimum calls before evaluating failure rate + slidingWindowSize int // Number of recent calls to track + waitDurationInOpenState time.Duration // Wait before transitioning to half-open + permittedCallsInHalfOpen int // Number of test calls in half-open state } -// defaultCircuitBreakerConfig returns default configuration. +// defaultCircuitBreakerConfig returns default configuration matching JDBC. func defaultCircuitBreakerConfig() circuitBreakerConfig { return circuitBreakerConfig{ - failureThreshold: 5, - successThreshold: 2, - timeout: 1 * time.Minute, + failureRateThreshold: 50, // 50% failure rate + minimumNumberOfCalls: 20, // Minimum sample size + slidingWindowSize: 30, // Keep recent 30 calls + waitDurationInOpenState: 30 * time.Second, + permittedCallsInHalfOpen: 3, // Test with 3 calls } } @@ -444,6 +488,7 @@ func newCircuitBreaker(cfg circuitBreakerConfig) *circuitBreaker { cb := &circuitBreaker{ config: cfg, lastStateTime: time.Now(), + window: make([]callResult, cfg.slidingWindowSize), } cb.state.Store(int32(stateClosed)) return cb @@ -458,9 +503,9 @@ func (cb *circuitBreaker) execute(ctx context.Context, fn func() error) error { switch state { case stateOpen: - // Check if timeout has passed + // Check if wait duration has passed cb.mu.RLock() - shouldRetry := time.Since(cb.lastStateTime) > cb.config.timeout + shouldRetry := time.Since(cb.lastStateTime) > cb.config.waitDurationInOpenState cb.mu.RUnlock() if shouldRetry { @@ -485,48 +530,96 @@ func (cb *circuitBreaker) tryExecute(ctx context.Context, fn func() error) error err := fn() if err != nil { - cb.recordFailure() + cb.recordCall(callFailure) return err } - cb.recordSuccess() + cb.recordCall(callSuccess) return nil } -// recordFailure records a failure and potentially opens the circuit. -func (cb *circuitBreaker) recordFailure() { +// recordCall records a call result in the sliding window and evaluates state transitions. +func (cb *circuitBreaker) recordCall(result callResult) { cb.mu.Lock() defer cb.mu.Unlock() - cb.failureCount++ - cb.successCount = 0 - cb.lastFailTime = time.Now() - state := circuitState(cb.state.Load()) + // Handle half-open state specially if state == stateHalfOpen { - // Failure in half-open immediately opens circuit - cb.setStateUnlocked(stateOpen) - } else if cb.failureCount >= cb.config.failureThreshold { - cb.setStateUnlocked(stateOpen) + if result == callFailure { + // Any failure in half-open immediately reopens circuit + cb.resetWindowUnlocked() + cb.setStateUnlocked(stateOpen) + return + } + + cb.halfOpenSuccesses++ + if cb.halfOpenSuccesses >= cb.config.permittedCallsInHalfOpen { + // Enough successes to close circuit + cb.resetWindowUnlocked() + cb.setStateUnlocked(stateClosed) + } + return + } + + // Record in sliding window + // Remove old value from count if window is full + if cb.windowFilled && cb.window[cb.windowIndex] == callFailure { + cb.failureCount-- + } + + // Add new value + cb.window[cb.windowIndex] = result + if result == callFailure { + cb.failureCount++ + } + + // Move to next position + cb.windowIndex = (cb.windowIndex + 1) % cb.config.slidingWindowSize + if cb.windowIndex == 0 { + cb.windowFilled = true + } + + cb.totalCalls++ + + // Evaluate if we should open the circuit + if state == stateClosed { + cb.evaluateStateUnlocked() } } -// recordSuccess records a success and potentially closes the circuit. -func (cb *circuitBreaker) recordSuccess() { - cb.mu.Lock() - defer cb.mu.Unlock() +// evaluateStateUnlocked checks if the circuit should open based on failure rate. +// Caller must hold cb.mu lock. +func (cb *circuitBreaker) evaluateStateUnlocked() { + // Need minimum number of calls before evaluating + windowSize := cb.totalCalls + if cb.windowFilled { + windowSize = cb.config.slidingWindowSize + } - cb.failureCount = 0 - cb.successCount++ + if windowSize < cb.config.minimumNumberOfCalls { + return + } - state := circuitState(cb.state.Load()) + // Calculate failure rate + failureRate := (cb.failureCount * 100) / windowSize - if state == stateHalfOpen && cb.successCount >= cb.config.successThreshold { - cb.setStateUnlocked(stateClosed) + if failureRate >= cb.config.failureRateThreshold { + cb.setStateUnlocked(stateOpen) } } +// resetWindowUnlocked clears the sliding window. +// Caller must hold cb.mu lock. +func (cb *circuitBreaker) resetWindowUnlocked() { + cb.windowIndex = 0 + cb.windowFilled = false + cb.totalCalls = 0 + cb.failureCount = 0 + cb.halfOpenSuccesses = 0 +} + // setState transitions to a new state. func (cb *circuitBreaker) setState(newState circuitState) { cb.mu.Lock() @@ -535,6 +628,7 @@ func (cb *circuitBreaker) setState(newState circuitState) { } // setStateUnlocked transitions to a new state without locking. +// Caller must hold cb.mu lock. func (cb *circuitBreaker) setStateUnlocked(newState circuitState) { oldState := circuitState(cb.state.Load()) if oldState == newState { @@ -543,14 +637,13 @@ func (cb *circuitBreaker) setStateUnlocked(newState circuitState) { cb.state.Store(int32(newState)) cb.lastStateTime = time.Now() - cb.failureCount = 0 - cb.successCount = 0 // Log state transition at DEBUG level // logger.Debug().Msgf("circuit breaker: %v -> %v", oldState, newState) } // circuitBreakerManager manages circuit breakers per host. +// Each host gets its own circuit breaker to provide isolation. type circuitBreakerManager struct { mu sync.RWMutex breakers map[string]*circuitBreaker @@ -572,6 +665,7 @@ func getCircuitBreakerManager() *circuitBreakerManager { } // getCircuitBreaker gets or creates a circuit breaker for the host. +// Thread-safe for concurrent access. func (m *circuitBreakerManager) getCircuitBreaker(host string) *circuitBreaker { m.mu.RLock() cb, exists := m.breakers[host] @@ -1343,6 +1437,14 @@ type Config struct { // Enabled controls whether telemetry is active Enabled bool + // ForceEnableTelemetry bypasses server-side feature flag checks + // When true, telemetry is always enabled regardless of server flags + ForceEnableTelemetry bool + + // EnableTelemetry indicates user wants telemetry enabled if server allows + // Respects server-side feature flags and rollout percentage + EnableTelemetry bool + // BatchSize is the number of metrics to batch before flushing BatchSize int @@ -1366,11 +1468,13 @@ type Config struct { } // DefaultConfig returns default telemetry configuration. -// Note: Telemetry is disabled by default and will be enabled after full testing and validation. +// Note: Telemetry is disabled by default and requires explicit opt-in. func DefaultConfig() *Config { return &Config{ - Enabled: false, // Disabled by default until testing is complete - BatchSize: 100, + Enabled: false, // Disabled by default, requires explicit opt-in + ForceEnableTelemetry: false, + EnableTelemetry: false, + BatchSize: 100, FlushInterval: 5 * time.Second, MaxRetries: 3, RetryDelay: 100 * time.Millisecond, @@ -1381,15 +1485,27 @@ func DefaultConfig() *Config { } ``` -### 6.2 Configuration from DSN +### 6.2 Configuration from Connection Parameters ```go -// ParseTelemetryConfig extracts telemetry config from DSN query parameters. +// ParseTelemetryConfig extracts telemetry config from connection parameters. func ParseTelemetryConfig(params map[string]string) *Config { cfg := DefaultConfig() - if v, ok := params["telemetry"]; ok { - cfg.Enabled = v == "true" || v == "1" + // Check for forceEnableTelemetry flag (bypasses server feature flags) + if v, ok := params["forceEnableTelemetry"]; ok { + if v == "true" || v == "1" { + cfg.ForceEnableTelemetry = true + } + } + + // Check for enableTelemetry flag (respects server feature flags) + if v, ok := params["enableTelemetry"]; ok { + if v == "true" || v == "1" { + cfg.EnableTelemetry = true + } else if v == "false" || v == "0" { + cfg.EnableTelemetry = false + } } if v, ok := params["telemetry_batch_size"]; ok { @@ -1442,10 +1558,101 @@ func checkFeatureFlag(ctx context.Context, host string, httpClient *http.Client) return false, err } - return result.Flags["databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver"], nil + // Parse flag response + flagValue := result.Flags["databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver"] + + response := &featureFlagResponse{ + Enabled: false, + RolloutPercentage: 0, + } + + // Handle both boolean and object responses for backward compatibility + switch v := flagValue.(type) { + case bool: + response.Enabled = v + if v { + response.RolloutPercentage = 100 + } + case map[string]interface{}: + if enabled, ok := v["enabled"].(bool); ok { + response.Enabled = enabled + } + if rollout, ok := v["rollout_percentage"].(float64); ok { + response.RolloutPercentage = int(rollout) + } + } + + return response, nil +} + +// isInRollout checks if this connection is in the rollout percentage. +// Uses consistent hashing based on workspace ID for stable rollout. +func isInRollout(workspaceID string, rolloutPercentage int) bool { + if rolloutPercentage >= 100 { + return true + } + if rolloutPercentage <= 0 { + return false + } + + // Use consistent hashing based on workspace ID + h := fnv.New32a() + h.Write([]byte(workspaceID)) + hash := h.Sum32() + + return int(hash%100) < rolloutPercentage +} +``` + +### 6.4 Opt-In Control & Priority + +The telemetry system supports multiple layers of control for gradual rollout with clear priority order: + +**Opt-In Priority (highest to lowest):** +1. **forceEnableTelemetry=true** - Bypasses all server-side feature flag checks, always enables +2. **enableTelemetry=false** - Explicit opt-out, always disables telemetry +3. **enableTelemetry=true + Server Feature Flag** - User wants telemetry, respects server decision +4. **Server-Side Feature Flag Only** - Databricks-controlled when user hasn't specified preference +5. **Default** - Disabled (`false`) + +```go +// isTelemetryEnabled checks if telemetry should be enabled for this connection. +// Implements the priority-based decision tree for telemetry enablement. +func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, httpClient *http.Client) bool { + // Priority 1: Force enable bypasses all server checks + if cfg.ForceEnableTelemetry { + return true + } + + // Priority 2: Explicit opt-out always disables + if !cfg.EnableTelemetry && cfg.EnableTelemetry != nil { + // User explicitly set to false + return false + } + + // Priority 3 & 4: Check server-side feature flag + flagCache := getFeatureFlagCache() + serverEnabled, err := flagCache.isTelemetryEnabled(ctx, host, httpClient) + if err != nil { + // On error, respect default (disabled) + return false + } + + return serverEnabled } ``` +**Note**: Rollout percentage and gradual enablement can be added in a future phase after basic opt-in is validated. + +**Configuration Flag Summary:** + +| Flag | Behavior | Use Case | +|------|----------|----------| +| `forceEnableTelemetry=true` | Bypass server flags, always enable | Testing, internal users, debugging | +| `enableTelemetry=true` | Enable if server allows | User opt-in during beta phase | +| `enableTelemetry=false` | Always disable telemetry | User wants to opt-out | +| *(no flags set)* | Respect server feature flag | Default behavior | + --- ## 7. Privacy & Compliance @@ -1771,67 +1978,188 @@ func BenchmarkInterceptor_Disabled(b *testing.B) { ## 11. Implementation Checklist -### Phase 1: Core Infrastructure ✅ COMPLETED (PECOBLR-1145) +**Strategy**: Build infrastructure bottom-up: Circuit Breaker → Export (POST to endpoint) → Opt-In Configuration → Collection & Aggregation → Driver Integration. This allows unit testing each layer before adding metric collection. + +**JIRA Tickets**: +- **PECOBLR-1143**: Phases 1-5 (Core Infrastructure → Opt-In Configuration) + - **PECOBLR-1381**: Phase 6 (Collection & Aggregation) - subtask + - **PECOBLR-1382**: Phase 7 (Driver Integration) - subtask + +### Phase 1: Core Infrastructure ✅ COMPLETED - [x] Create `telemetry` package structure -- [x] Implement `config.go` with configuration types +- [x] Implement `config.go` with configuration types (basic structure) - [x] Implement `tags.go` with tag definitions and filtering - [x] Add unit tests for configuration and tags ### Phase 2: Per-Host Management ✅ COMPLETED -- [x] Implement `featureflag.go` with caching and reference counting (PECOBLR-1146) -- [x] Implement `manager.go` for client management (PECOBLR-1147) +- [x] Implement `featureflag.go` with caching and reference counting +- [x] Implement `manager.go` for client management - [x] Thread-safe singleton pattern with per-host client holders - [x] Reference counting for automatic cleanup - [x] Error handling for client start failures - [x] Shutdown method for graceful application shutdown - [x] Comprehensive documentation on thread-safety and connection sharing -- [x] Implement `client.go` with minimal telemetryClient stub (PECOBLR-1147) +- [x] Implement `client.go` with minimal telemetryClient stub - [x] Thread-safe start() and close() methods - [x] Mutex protection for state flags - [x] Detailed documentation on concurrent access requirements -- [x] Add comprehensive unit tests for all components (PECOBLR-1147) +- [x] Add comprehensive unit tests for all components - [x] Singleton pattern verification - [x] Reference counting (increment/decrement/cleanup) - [x] Concurrent access tests (100+ goroutines) - [x] Shutdown scenarios (empty, with active refs, multiple hosts) - [x] Race detector tests passing -- [ ] Implement `circuitbreaker.go` with state machine (PECOBLR-1148) -### Phase 3: Collection & Aggregation +### Phase 3: Circuit Breaker (PECOBLR-1143) +- [ ] Implement `circuitbreaker.go` with state machine + - [ ] Implement circuit breaker states (Closed, Open, Half-Open) + - [ ] Implement circuitBreakerManager singleton per host + - [ ] Add configurable thresholds and timeout + - [ ] Implement execute() method with state transitions + - [ ] Implement failure/success tracking +- [ ] Add comprehensive unit tests + - [ ] Test state transitions (Closed → Open → Half-Open → Closed) + - [ ] Test failure/success counting + - [ ] Test timeout and retry logic + - [ ] Test per-host circuit breaker isolation + - [ ] Test concurrent access + +### Phase 4: Export Infrastructure (PECOBLR-1143) +- [ ] Implement `exporter.go` with retry logic + - [ ] Implement HTTP POST to telemetry endpoint (/api/2.0/telemetry-ext) + - [ ] Implement retry logic with exponential backoff + - [ ] Implement tag filtering for export (shouldExportToDatabricks) + - [ ] Integrate with circuit breaker + - [ ] Add error swallowing + - [ ] Implement toExportedMetric() conversion + - [ ] Implement telemetryPayload JSON structure +- [ ] Add unit tests for export logic + - [ ] Test HTTP request construction + - [ ] Test retry logic (with mock HTTP responses) + - [ ] Test circuit breaker integration + - [ ] Test tag filtering + - [ ] Test error swallowing +- [ ] Add integration tests with mock HTTP server + - [ ] Test successful export + - [ ] Test error scenarios (4xx, 5xx) + - [ ] Test retry behavior + - [ ] Test circuit breaker opening/closing + +### Phase 5: Opt-In Configuration Integration (PECOBLR-1143) +- [ ] Implement `isTelemetryEnabled()` with priority-based logic in config.go + - [ ] Priority 1: ForceEnableTelemetry=true bypasses all checks → return true + - [ ] Priority 2: EnableTelemetry=false explicit opt-out → return false + - [ ] Priority 3: EnableTelemetry=true + check server feature flag + - [ ] Priority 4: Server-side feature flag only (default behavior) + - [ ] Priority 5: Default disabled if no flags set and server check fails +- [ ] Integrate feature flag cache with opt-in logic + - [ ] Wire up isTelemetryEnabled() to call featureFlagCache.isTelemetryEnabled() + - [ ] Implement fallback behavior on errors (return cached value or false) + - [ ] Add proper error handling and logging +- [ ] Add unit tests for opt-in priority logic + - [ ] Test forceEnableTelemetry=true (always enabled, bypasses server) + - [ ] Test enableTelemetry=false (always disabled, explicit opt-out) + - [ ] Test enableTelemetry=true with server flag enabled + - [ ] Test enableTelemetry=true with server flag disabled + - [ ] Test default behavior (server flag controls) + - [ ] Test error scenarios (server unreachable, use cached value) +- [ ] Add integration tests with mock feature flag server + - [ ] Test opt-in priority with mock server + - [ ] Test cache expiration and refresh + - [ ] Test concurrent connections with shared cache + +### Phase 6: Collection & Aggregation (PECOBLR-1381) - [ ] Implement `interceptor.go` for metric collection + - [ ] Implement beforeExecute() and afterExecute() hooks + - [ ] Implement context-based metric tracking with metricContext + - [ ] Implement latency measurement (startTime, latencyMs calculation) + - [ ] Add tag collection methods (addTag) + - [ ] Implement error swallowing with panic recovery - [ ] Implement `aggregator.go` for batching + - [ ] Implement statement-level aggregation (statementMetrics) + - [ ] Implement batch size and flush interval logic + - [ ] Implement background flush goroutine (flushLoop) + - [ ] Add thread-safe metric recording + - [ ] Implement completeStatement() for final aggregation - [ ] Implement error classification in `errors.go` + - [ ] Implement error type classification (terminal vs retryable) + - [ ] Implement HTTP status code classification + - [ ] Add error pattern matching + - [ ] Implement isTerminalError() function +- [ ] Update `client.go` to integrate aggregator + - [ ] Wire up aggregator with exporter + - [ ] Implement background flush timer + - [ ] Update start() and close() methods - [ ] Add unit tests for collection and aggregation - -### Phase 4: Export -- [ ] Implement `exporter.go` with retry logic -- [ ] Implement `client.go` for telemetry client with full functionality -- [ ] Wire up circuit breaker with exporter -- [ ] Integrate shutdown method into driver lifecycle: - - [ ] Option 1: Export public `Shutdown()` API for applications to call - - [ ] Option 2: Hook into `sql.DB.Close()` or driver cleanup - - [ ] Option 3: Integrate with connection pool shutdown logic - - [ ] Document shutdown integration points and usage patterns -- [ ] Add unit tests for export logic - -### Phase 5: Driver Integration -- [ ] Add telemetry to `connection.go` -- [ ] Add telemetry to `statement.go` + - [ ] Test interceptor metric collection and latency tracking + - [ ] Test aggregation logic + - [ ] Test batch flushing (size-based and time-based) + - [ ] Test error classification + - [ ] Test client with aggregator integration + +### Phase 7: Driver Integration (PECOBLR-1382) +- [ ] Add telemetry initialization to `connection.go` + - [ ] Call isTelemetryEnabled() at connection open + - [ ] Initialize telemetry client via clientManager.getOrCreateClient() + - [ ] Increment feature flag cache reference count + - [ ] Store telemetry interceptor in connection +- [ ] Add telemetry hooks to `statement.go` + - [ ] Add beforeExecute() hook at statement start + - [ ] Add afterExecute() hook at statement completion + - [ ] Add tag collection during execution (result format, chunk count, bytes, etc.) + - [ ] Call completeStatement() at statement end - [ ] Add cleanup in `Close()` methods + - [ ] Release client manager reference in connection.Close() + - [ ] Release feature flag cache reference + - [ ] Flush pending metrics before close - [ ] Add integration tests - -### Phase 6: Testing & Validation + - [ ] Test telemetry enabled via forceEnableTelemetry=true + - [ ] Test telemetry disabled by default + - [ ] Test metric collection and export end-to-end + - [ ] Test multiple concurrent connections + - [ ] Test latency measurement accuracy + - [ ] Test opt-in priority in driver context + +### Phase 8: Testing & Validation - [ ] Run benchmark tests + - [ ] Measure overhead when enabled + - [ ] Measure overhead when disabled + - [ ] Ensure <1% overhead when enabled - [ ] Perform load testing with concurrent connections + - [ ] Test 100+ concurrent connections + - [ ] Verify per-host client sharing + - [ ] Verify no rate limiting with per-host clients - [ ] Validate graceful shutdown + - [ ] Test reference counting cleanup + - [ ] Test final flush on shutdown + - [ ] Test shutdown method works correctly - [ ] Test circuit breaker behavior -- [ ] Verify privacy compliance (no PII) - -### Phase 7: Documentation + - [ ] Test circuit opening on repeated failures + - [ ] Test circuit recovery after timeout + - [ ] Test metrics dropped when circuit open +- [ ] Test opt-in priority logic end-to-end + - [ ] Verify forceEnableTelemetry works in real driver + - [ ] Verify enableTelemetry works in real driver + - [ ] Verify server flag integration works +- [ ] Verify privacy compliance + - [ ] Verify no SQL queries collected + - [ ] Verify no PII collected + - [ ] Verify tag filtering works (shouldExportToDatabricks) + +### Phase 9: Partial Launch Preparation +- [ ] Document `forceEnableTelemetry` and `enableTelemetry` flags +- [ ] Create internal testing plan for Phase 1 (use forceEnableTelemetry=true) +- [ ] Prepare beta opt-in documentation for Phase 2 (use enableTelemetry=true) +- [ ] Set up monitoring for rollout health metrics +- [ ] Document rollback procedures (set server flag to false) + +### Phase 10: Documentation - [ ] Document configuration options in README -- [ ] Add examples for enabling/disabling telemetry +- [ ] Add examples for opt-in flags +- [ ] Document partial launch strategy and phases - [ ] Document metric tags and their meanings - [ ] Create troubleshooting guide +- [ ] Document architecture and design decisions --- diff --git a/telemetry/circuitbreaker.go b/telemetry/circuitbreaker.go new file mode 100644 index 0000000..e399f03 --- /dev/null +++ b/telemetry/circuitbreaker.go @@ -0,0 +1,302 @@ +package telemetry + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "time" +) + +// circuitState represents the state of the circuit breaker. +type circuitState int32 + +const ( + stateClosed circuitState = iota + stateOpen + stateHalfOpen +) + +// callResult represents the result of a call (success or failure). +type callResult bool + +const ( + callSuccess callResult = true + callFailure callResult = false +) + +// circuitBreaker implements the circuit breaker pattern. +// It protects against failing telemetry endpoints by tracking failures +// using a sliding window and failure rate percentage. +// +// State transitions: +// - Closed → Open: When failure rate exceeds threshold after minimum calls +// - Open → Half-Open: After wait duration +// - Half-Open → Closed: After successful test calls +// - Half-Open → Open: On any failure in half-open state +type circuitBreaker struct { + mu sync.RWMutex + + state atomic.Int32 // circuitState + lastStateTime time.Time + + // Sliding window for tracking calls + window []callResult + windowIndex int + windowFilled bool + totalCalls int + failureCount int + + // Half-open state tracking + halfOpenSuccesses int + + config circuitBreakerConfig +} + +// circuitBreakerConfig holds circuit breaker configuration. +type circuitBreakerConfig struct { + failureRateThreshold int // Open if failure rate >= this percentage (0-100) + minimumNumberOfCalls int // Minimum calls before evaluating failure rate + slidingWindowSize int // Number of recent calls to track + waitDurationInOpenState time.Duration // Wait before transitioning to half-open + permittedCallsInHalfOpen int // Number of test calls in half-open state +} + +// defaultCircuitBreakerConfig returns default configuration matching JDBC. +func defaultCircuitBreakerConfig() circuitBreakerConfig { + return circuitBreakerConfig{ + failureRateThreshold: 50, // 50% failure rate + minimumNumberOfCalls: 20, // Minimum sample size + slidingWindowSize: 30, // Keep recent 30 calls + waitDurationInOpenState: 30 * time.Second, + permittedCallsInHalfOpen: 3, // Test with 3 calls + } +} + +// newCircuitBreaker creates a new circuit breaker. +func newCircuitBreaker(cfg circuitBreakerConfig) *circuitBreaker { + cb := &circuitBreaker{ + config: cfg, + lastStateTime: time.Now(), + window: make([]callResult, cfg.slidingWindowSize), + } + cb.state.Store(int32(stateClosed)) + return cb +} + +// ErrCircuitOpen is returned when circuit is open. +var ErrCircuitOpen = errors.New("circuit breaker is open") + +// execute executes the function if circuit allows. +// Returns ErrCircuitOpen if the circuit is open and the wait duration hasn't elapsed. +func (cb *circuitBreaker) execute(ctx context.Context, fn func() error) error { + state := circuitState(cb.state.Load()) + + switch state { + case stateOpen: + // Check if wait duration has passed + cb.mu.RLock() + shouldRetry := time.Since(cb.lastStateTime) > cb.config.waitDurationInOpenState + cb.mu.RUnlock() + + if shouldRetry { + // Transition to half-open + cb.setState(stateHalfOpen) + return cb.tryExecute(ctx, fn) + } + return ErrCircuitOpen + + case stateHalfOpen: + return cb.tryExecute(ctx, fn) + + case stateClosed: + return cb.tryExecute(ctx, fn) + } + + return nil +} + +// tryExecute attempts to execute the function and updates state. +func (cb *circuitBreaker) tryExecute(ctx context.Context, fn func() error) error { + err := fn() + + if err != nil { + cb.recordCall(callFailure) + return err + } + + cb.recordCall(callSuccess) + return nil +} + +// recordCall records a call result in the sliding window and evaluates state transitions. +func (cb *circuitBreaker) recordCall(result callResult) { + cb.mu.Lock() + defer cb.mu.Unlock() + + state := circuitState(cb.state.Load()) + + // Handle half-open state specially + if state == stateHalfOpen { + if result == callFailure { + // Any failure in half-open immediately reopens circuit + cb.resetWindowUnlocked() + cb.setStateUnlocked(stateOpen) + return + } + + cb.halfOpenSuccesses++ + if cb.halfOpenSuccesses >= cb.config.permittedCallsInHalfOpen { + // Enough successes to close circuit + cb.resetWindowUnlocked() + cb.setStateUnlocked(stateClosed) + } + return + } + + // Record in sliding window + // Remove old value from count if window is full + if cb.windowFilled && cb.window[cb.windowIndex] == callFailure { + cb.failureCount-- + } + + // Add new value + cb.window[cb.windowIndex] = result + if result == callFailure { + cb.failureCount++ + } + + // Move to next position + cb.windowIndex = (cb.windowIndex + 1) % cb.config.slidingWindowSize + if cb.windowIndex == 0 { + cb.windowFilled = true + } + + cb.totalCalls++ + + // Evaluate if we should open the circuit + if state == stateClosed { + cb.evaluateStateUnlocked() + } +} + +// evaluateStateUnlocked checks if the circuit should open based on failure rate. +// Caller must hold cb.mu lock. +func (cb *circuitBreaker) evaluateStateUnlocked() { + // Need minimum number of calls before evaluating + windowSize := cb.totalCalls + if cb.windowFilled { + windowSize = cb.config.slidingWindowSize + } + + if windowSize < cb.config.minimumNumberOfCalls { + return + } + + // Calculate failure rate + failureRate := (cb.failureCount * 100) / windowSize + + if failureRate >= cb.config.failureRateThreshold { + cb.setStateUnlocked(stateOpen) + } +} + +// resetWindowUnlocked clears the sliding window. +// Caller must hold cb.mu lock. +func (cb *circuitBreaker) resetWindowUnlocked() { + cb.windowIndex = 0 + cb.windowFilled = false + cb.totalCalls = 0 + cb.failureCount = 0 + cb.halfOpenSuccesses = 0 +} + +// setState transitions to a new state. +func (cb *circuitBreaker) setState(newState circuitState) { + cb.mu.Lock() + defer cb.mu.Unlock() + cb.setStateUnlocked(newState) +} + +// setStateUnlocked transitions to a new state without locking. +// Caller must hold cb.mu lock. +func (cb *circuitBreaker) setStateUnlocked(newState circuitState) { + oldState := circuitState(cb.state.Load()) + if oldState == newState { + return + } + + cb.state.Store(int32(newState)) + cb.lastStateTime = time.Now() + + // Log state transition at DEBUG level + // logger.Debug().Msgf("circuit breaker: %v -> %v", oldState, newState) +} + +// getState returns the current state (for testing). +func (cb *circuitBreaker) getState() circuitState { + return circuitState(cb.state.Load()) +} + +// getFailureRate returns the current failure rate percentage (for testing). +func (cb *circuitBreaker) getFailureRate() int { + cb.mu.RLock() + defer cb.mu.RUnlock() + + windowSize := cb.totalCalls + if cb.windowFilled { + windowSize = cb.config.slidingWindowSize + } + + if windowSize == 0 { + return 0 + } + + return (cb.failureCount * 100) / windowSize +} + +// circuitBreakerManager manages circuit breakers per host. +// Each host gets its own circuit breaker to provide isolation. +type circuitBreakerManager struct { + mu sync.RWMutex + breakers map[string]*circuitBreaker +} + +var ( + breakerManagerOnce sync.Once + breakerManagerInstance *circuitBreakerManager +) + +// getCircuitBreakerManager returns the singleton instance. +func getCircuitBreakerManager() *circuitBreakerManager { + breakerManagerOnce.Do(func() { + breakerManagerInstance = &circuitBreakerManager{ + breakers: make(map[string]*circuitBreaker), + } + }) + return breakerManagerInstance +} + +// getCircuitBreaker gets or creates a circuit breaker for the host. +// Thread-safe for concurrent access. +func (m *circuitBreakerManager) getCircuitBreaker(host string) *circuitBreaker { + m.mu.RLock() + cb, exists := m.breakers[host] + m.mu.RUnlock() + + if exists { + return cb + } + + m.mu.Lock() + defer m.mu.Unlock() + + // Double-check after acquiring write lock + if cb, exists = m.breakers[host]; exists { + return cb + } + + cb = newCircuitBreaker(defaultCircuitBreakerConfig()) + m.breakers[host] = cb + return cb +} diff --git a/telemetry/circuitbreaker_test.go b/telemetry/circuitbreaker_test.go new file mode 100644 index 0000000..92c2ecc --- /dev/null +++ b/telemetry/circuitbreaker_test.go @@ -0,0 +1,481 @@ +package telemetry + +import ( + "context" + "errors" + "sync" + "testing" + "time" +) + +func TestCircuitBreaker_InitialState(t *testing.T) { + cfg := defaultCircuitBreakerConfig() + cb := newCircuitBreaker(cfg) + + if cb.getState() != stateClosed { + t.Errorf("Expected initial state to be Closed, got %v", cb.getState()) + } +} + +func TestCircuitBreaker_ClosedToOpen_FailureRate(t *testing.T) { + cfg := circuitBreakerConfig{ + failureRateThreshold: 50, // 50% failure rate + minimumNumberOfCalls: 10, // Lower minimum for testing + slidingWindowSize: 20, + waitDurationInOpenState: 1 * time.Second, + permittedCallsInHalfOpen: 2, + } + cb := newCircuitBreaker(cfg) + + ctx := context.Background() + failFunc := func() error { + return errors.New("test error") + } + successFunc := func() error { + return nil + } + + // Execute 10 calls: 6 failures (60% failure rate) should open circuit + for i := 0; i < 6; i++ { + _ = cb.execute(ctx, failFunc) + } + for i := 0; i < 4; i++ { + _ = cb.execute(ctx, successFunc) + } + + // Circuit should be open (60% > 50% threshold) + if cb.getState() != stateOpen { + t.Errorf("Expected state to be Open after 60%% failure rate, got %v (failure rate: %d%%)", cb.getState(), cb.getFailureRate()) + } +} + +func TestCircuitBreaker_MinimumCallsRequired(t *testing.T) { + cfg := circuitBreakerConfig{ + failureRateThreshold: 50, + minimumNumberOfCalls: 20, + slidingWindowSize: 30, + waitDurationInOpenState: 1 * time.Second, + permittedCallsInHalfOpen: 2, + } + cb := newCircuitBreaker(cfg) + + ctx := context.Background() + failFunc := func() error { + return errors.New("test error") + } + + // Execute 10 failures (less than minimum) + for i := 0; i < 10; i++ { + _ = cb.execute(ctx, failFunc) + } + + // Circuit should still be closed (not enough calls) + if cb.getState() != stateClosed { + t.Errorf("Expected state to remain Closed with insufficient calls, got %v", cb.getState()) + } + + // Execute 10 more failures (now 20 total, 100% failure rate) + for i := 0; i < 10; i++ { + _ = cb.execute(ctx, failFunc) + } + + // Now circuit should be open + if cb.getState() != stateOpen { + t.Errorf("Expected state to be Open after minimum calls with 100%% failure rate, got %v", cb.getState()) + } +} + +func TestCircuitBreaker_SlidingWindow(t *testing.T) { + cfg := circuitBreakerConfig{ + failureRateThreshold: 50, + minimumNumberOfCalls: 10, + slidingWindowSize: 10, // Small window for testing + waitDurationInOpenState: 1 * time.Second, + permittedCallsInHalfOpen: 2, + } + cb := newCircuitBreaker(cfg) + + ctx := context.Background() + failFunc := func() error { + return errors.New("test error") + } + successFunc := func() error { + return nil + } + + // Fill window with 10 failures (100% failure rate) + for i := 0; i < 10; i++ { + _ = cb.execute(ctx, failFunc) + } + + if cb.getState() != stateOpen { + t.Fatalf("Expected circuit to be Open, got %v", cb.getState()) + } + + // Wait and transition to half-open + time.Sleep(cfg.waitDurationInOpenState + 50*time.Millisecond) + + // Successful call to move to half-open + _ = cb.execute(ctx, successFunc) + + if cb.getState() != stateHalfOpen { + t.Fatalf("Expected state to be HalfOpen, got %v", cb.getState()) + } + + // One more success should close it (2 successes needed) + _ = cb.execute(ctx, successFunc) + + if cb.getState() != stateClosed { + t.Errorf("Expected state to be Closed after half-open successes, got %v", cb.getState()) + } + + // Window should be reset - now add 10 successes + for i := 0; i < 10; i++ { + _ = cb.execute(ctx, successFunc) + } + + // Should remain closed (0% failure rate) + if cb.getState() != stateClosed { + t.Errorf("Expected state to remain Closed with all successes, got %v", cb.getState()) + } +} + +func TestCircuitBreaker_OpenRejectsRequests(t *testing.T) { + cfg := circuitBreakerConfig{ + failureRateThreshold: 50, + minimumNumberOfCalls: 10, + slidingWindowSize: 20, + waitDurationInOpenState: 1 * time.Hour, // Long wait so it stays open + permittedCallsInHalfOpen: 2, + } + cb := newCircuitBreaker(cfg) + + ctx := context.Background() + failFunc := func() error { + return errors.New("test error") + } + + // Open the circuit (10 failures = 100% failure rate) + for i := 0; i < 10; i++ { + _ = cb.execute(ctx, failFunc) + } + + if cb.getState() != stateOpen { + t.Fatalf("Expected circuit to be Open, got %v", cb.getState()) + } + + // Circuit is open, should reject immediately + err := cb.execute(ctx, func() error { + t.Fatal("Function should not be executed when circuit is open") + return nil + }) + + if err != ErrCircuitOpen { + t.Errorf("Expected ErrCircuitOpen, got %v", err) + } +} + +func TestCircuitBreaker_OpenToHalfOpen(t *testing.T) { + cfg := circuitBreakerConfig{ + failureRateThreshold: 50, + minimumNumberOfCalls: 10, + slidingWindowSize: 20, + waitDurationInOpenState: 100 * time.Millisecond, + permittedCallsInHalfOpen: 2, + } + cb := newCircuitBreaker(cfg) + + ctx := context.Background() + failFunc := func() error { + return errors.New("test error") + } + + // Open the circuit + for i := 0; i < 10; i++ { + _ = cb.execute(ctx, failFunc) + } + + if cb.getState() != stateOpen { + t.Fatalf("Expected state to be Open, got %v", cb.getState()) + } + + // Wait for wait duration + time.Sleep(cfg.waitDurationInOpenState + 50*time.Millisecond) + + // Next request should transition to half-open + successFunc := func() error { + return nil + } + err := cb.execute(ctx, successFunc) + if err != nil { + t.Errorf("Expected no error in half-open state, got %v", err) + } + + // Should be in half-open state + if cb.getState() != stateHalfOpen { + t.Errorf("Expected state to be HalfOpen, got %v", cb.getState()) + } +} + +func TestCircuitBreaker_HalfOpenToClosed(t *testing.T) { + cfg := circuitBreakerConfig{ + failureRateThreshold: 50, + minimumNumberOfCalls: 10, + slidingWindowSize: 20, + waitDurationInOpenState: 100 * time.Millisecond, + permittedCallsInHalfOpen: 3, // Need 3 successes + } + cb := newCircuitBreaker(cfg) + + ctx := context.Background() + failFunc := func() error { + return errors.New("test error") + } + successFunc := func() error { + return nil + } + + // Open the circuit + for i := 0; i < 10; i++ { + _ = cb.execute(ctx, failFunc) + } + + // Wait for wait duration + time.Sleep(cfg.waitDurationInOpenState + 50*time.Millisecond) + + // Execute 3 successes to close + for i := 0; i < cfg.permittedCallsInHalfOpen; i++ { + err := cb.execute(ctx, successFunc) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + } + + // Circuit should now be closed + if cb.getState() != stateClosed { + t.Errorf("Expected state to be Closed after %d successes, got %v", cfg.permittedCallsInHalfOpen, cb.getState()) + } +} + +func TestCircuitBreaker_HalfOpenToOpen(t *testing.T) { + cfg := circuitBreakerConfig{ + failureRateThreshold: 50, + minimumNumberOfCalls: 10, + slidingWindowSize: 20, + waitDurationInOpenState: 100 * time.Millisecond, + permittedCallsInHalfOpen: 3, + } + cb := newCircuitBreaker(cfg) + + ctx := context.Background() + failFunc := func() error { + return errors.New("test error") + } + + // Open the circuit + for i := 0; i < 10; i++ { + _ = cb.execute(ctx, failFunc) + } + + // Wait for wait duration + time.Sleep(cfg.waitDurationInOpenState + 50*time.Millisecond) + + // First request transitions to half-open, but fails + err := cb.execute(ctx, failFunc) + if err == nil { + t.Errorf("Expected error, got nil") + } + + // Circuit should immediately reopen on failure in half-open + if cb.getState() != stateOpen { + t.Errorf("Expected state to be Open after failure in HalfOpen, got %v", cb.getState()) + } +} + +func TestCircuitBreaker_SuccessInClosed(t *testing.T) { + cfg := defaultCircuitBreakerConfig() + cb := newCircuitBreaker(cfg) + + ctx := context.Background() + successFunc := func() error { + return nil + } + + // Execute successful requests + for i := 0; i < 50; i++ { + err := cb.execute(ctx, successFunc) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + } + + // Should remain closed (0% failure rate) + if cb.getState() != stateClosed { + t.Errorf("Expected state to remain Closed, got %v", cb.getState()) + } + + // Failure rate should be 0% + if cb.getFailureRate() != 0 { + t.Errorf("Expected 0%% failure rate, got %d%%", cb.getFailureRate()) + } +} + +func TestCircuitBreaker_ConcurrentAccess(t *testing.T) { + cfg := circuitBreakerConfig{ + failureRateThreshold: 50, + minimumNumberOfCalls: 20, + slidingWindowSize: 50, + waitDurationInOpenState: 100 * time.Millisecond, + permittedCallsInHalfOpen: 3, + } + cb := newCircuitBreaker(cfg) + + ctx := context.Background() + var wg sync.WaitGroup + + // Launch many concurrent goroutines that all fail + for i := 0; i < 30; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = cb.execute(ctx, func() error { + return errors.New("test error") + }) + }() + } + + wg.Wait() + + // Circuit should have opened due to high failure rate + if cb.getState() != stateOpen { + t.Errorf("Expected circuit to be Open after concurrent failures, got %v (failure rate: %d%%)", cb.getState(), cb.getFailureRate()) + } +} + +func TestCircuitBreakerManager_Singleton(t *testing.T) { + mgr1 := getCircuitBreakerManager() + mgr2 := getCircuitBreakerManager() + + if mgr1 != mgr2 { + t.Errorf("Expected singleton instance, got different instances") + } +} + +func TestCircuitBreakerManager_PerHostIsolation(t *testing.T) { + mgr := getCircuitBreakerManager() + + host1 := "host1.example.com" + host2 := "host2.example.com" + + cb1 := mgr.getCircuitBreaker(host1) + cb2 := mgr.getCircuitBreaker(host2) + + if cb1 == cb2 { + t.Errorf("Expected different circuit breakers for different hosts") + } + + // Same host should return same circuit breaker + cb1Again := mgr.getCircuitBreaker(host1) + if cb1 != cb1Again { + t.Errorf("Expected same circuit breaker for same host") + } +} + +func TestCircuitBreakerManager_ConcurrentAccess(t *testing.T) { + mgr := getCircuitBreakerManager() + var wg sync.WaitGroup + + // Use unique host names to avoid conflicts with other tests + hosts := []string{"sliding-host1", "sliding-host2", "sliding-host3"} + + // Count how many breakers exist before our test + mgr.mu.RLock() + initialCount := len(mgr.breakers) + mgr.mu.RUnlock() + + // Launch many concurrent goroutines accessing circuit breakers + for i := 0; i < 100; i++ { + wg.Add(1) + go func(index int) { + defer wg.Done() + host := hosts[index%len(hosts)] + cb := mgr.getCircuitBreaker(host) + if cb == nil { + t.Errorf("Expected non-nil circuit breaker") + } + }(i) + } + + wg.Wait() + + // Verify we added exactly 3 circuit breakers + mgr.mu.RLock() + finalCount := len(mgr.breakers) + mgr.mu.RUnlock() + + if finalCount-initialCount != 3 { + t.Errorf("Expected 3 new circuit breakers, got %d (initial: %d, final: %d)", finalCount-initialCount, initialCount, finalCount) + } + + // Verify all our hosts have circuit breakers + for _, host := range hosts { + cb := mgr.getCircuitBreaker(host) + if cb == nil { + t.Errorf("Expected circuit breaker for host %s", host) + } + } +} + +func TestCircuitBreaker_FailureRateCalculation(t *testing.T) { + cfg := circuitBreakerConfig{ + failureRateThreshold: 50, + minimumNumberOfCalls: 20, + slidingWindowSize: 30, + waitDurationInOpenState: 1 * time.Second, + permittedCallsInHalfOpen: 2, + } + cb := newCircuitBreaker(cfg) + + ctx := context.Background() + + // Execute 30 calls: 15 failures, 15 successes (50% failure rate) + for i := 0; i < 30; i++ { + if i%2 == 0 { + _ = cb.execute(ctx, func() error { + return errors.New("test error") + }) + } else { + _ = cb.execute(ctx, func() error { + return nil + }) + } + } + + // Failure rate should be 50% + failureRate := cb.getFailureRate() + if failureRate != 50 { + t.Errorf("Expected 50%% failure rate, got %d%%", failureRate) + } + + // Circuit should be open (50% >= 50% threshold) + if cb.getState() != stateOpen { + t.Errorf("Expected circuit to be Open at 50%% failure rate, got %v", cb.getState()) + } +} + +func TestCircuitBreaker_ContextCancellation(t *testing.T) { + cfg := defaultCircuitBreakerConfig() + cb := newCircuitBreaker(cfg) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + // Circuit breaker should still execute (doesn't check context) + err := cb.execute(ctx, func() error { + return nil + }) + + if err != nil { + t.Errorf("Expected no error, got %v", err) + } +} diff --git a/telemetry/config.go b/telemetry/config.go index 00c5604..c7474b0 100644 --- a/telemetry/config.go +++ b/telemetry/config.go @@ -10,6 +10,14 @@ type Config struct { // Enabled controls whether telemetry is active Enabled bool + // ForceEnableTelemetry bypasses server-side feature flag checks + // When true, telemetry is always enabled regardless of server flags + ForceEnableTelemetry bool + + // EnableTelemetry indicates user wants telemetry enabled if server allows + // Respects server-side feature flags and rollout percentage + EnableTelemetry bool + // BatchSize is the number of metrics to batch before flushing BatchSize int @@ -33,10 +41,12 @@ type Config struct { } // DefaultConfig returns default telemetry configuration. -// Note: Telemetry is disabled by default and will be enabled after full testing and validation. +// Note: Telemetry is disabled by default and requires explicit opt-in. func DefaultConfig() *Config { return &Config{ - Enabled: false, // Disabled by default until testing is complete + Enabled: false, // Disabled by default, requires explicit opt-in + ForceEnableTelemetry: false, + EnableTelemetry: false, BatchSize: 100, FlushInterval: 5 * time.Second, MaxRetries: 3, @@ -47,12 +57,25 @@ func DefaultConfig() *Config { } } -// ParseTelemetryConfig extracts telemetry config from DSN query parameters. +// ParseTelemetryConfig extracts telemetry config from connection parameters. func ParseTelemetryConfig(params map[string]string) *Config { cfg := DefaultConfig() - if v, ok := params["telemetry"]; ok { - cfg.Enabled = v == "true" || v == "1" + // Check for forceEnableTelemetry flag (bypasses server feature flags) + if v, ok := params["forceEnableTelemetry"]; ok { + if v == "true" || v == "1" { + cfg.ForceEnableTelemetry = true + cfg.Enabled = true // Also set Enabled for backward compatibility + } + } + + // Check for enableTelemetry flag (respects server feature flags) + if v, ok := params["enableTelemetry"]; ok { + if v == "true" || v == "1" { + cfg.EnableTelemetry = true + } else if v == "false" || v == "0" { + cfg.EnableTelemetry = false + } } if v, ok := params["telemetry_batch_size"]; ok { diff --git a/telemetry/config_test.go b/telemetry/config_test.go index 9878492..a696a10 100644 --- a/telemetry/config_test.go +++ b/telemetry/config_test.go @@ -59,34 +59,34 @@ func TestParseTelemetryConfig_EmptyParams(t *testing.T) { func TestParseTelemetryConfig_EnabledTrue(t *testing.T) { params := map[string]string{ - "telemetry": "true", + "enableTelemetry": "true", } cfg := ParseTelemetryConfig(params) - if !cfg.Enabled { - t.Error("Expected telemetry to be enabled when set to 'true'") + if !cfg.EnableTelemetry { + t.Error("Expected EnableTelemetry to be true when set to 'true'") } } func TestParseTelemetryConfig_Enabled1(t *testing.T) { params := map[string]string{ - "telemetry": "1", + "enableTelemetry": "1", } cfg := ParseTelemetryConfig(params) - if !cfg.Enabled { - t.Error("Expected telemetry to be enabled when set to '1'") + if !cfg.EnableTelemetry { + t.Error("Expected EnableTelemetry to be true when set to '1'") } } func TestParseTelemetryConfig_EnabledFalse(t *testing.T) { params := map[string]string{ - "telemetry": "false", + "enableTelemetry": "false", } cfg := ParseTelemetryConfig(params) - if cfg.Enabled { - t.Error("Expected telemetry to be disabled when set to 'false'") + if cfg.EnableTelemetry { + t.Error("Expected EnableTelemetry to be false when set to 'false'") } } @@ -162,14 +162,14 @@ func TestParseTelemetryConfig_FlushIntervalInvalid(t *testing.T) { func TestParseTelemetryConfig_MultipleParams(t *testing.T) { params := map[string]string{ - "telemetry": "true", + "enableTelemetry": "true", "telemetry_batch_size": "200", "telemetry_flush_interval": "30s", } cfg := ParseTelemetryConfig(params) - if !cfg.Enabled { - t.Error("Expected telemetry to be enabled") + if !cfg.EnableTelemetry { + t.Error("Expected EnableTelemetry to be true") } if cfg.BatchSize != 200 { diff --git a/telemetry/manager.go b/telemetry/manager.go index ebeb6af..33bfe1c 100644 --- a/telemetry/manager.go +++ b/telemetry/manager.go @@ -54,7 +54,7 @@ func (m *clientManager) getOrCreateClient(host string, httpClient *http.Client, client := newTelemetryClient(host, httpClient, cfg) if err := client.start(); err != nil { // Failed to start client, don't add to map - logger.Logger.Warn().Str("host", host).Err(err).Msg("failed to start telemetry client") + logger.Logger.Debug().Str("host", host).Err(err).Msg("failed to start telemetry client") return nil } holder = &clientHolder{ @@ -100,7 +100,7 @@ func (m *clientManager) shutdown() error { var lastErr error for host, holder := range m.clients { if err := holder.client.close(); err != nil { - logger.Logger.Warn().Str("host", host).Err(err).Msg("error closing telemetry client during shutdown") + logger.Logger.Debug().Str("host", host).Err(err).Msg("error closing telemetry client during shutdown") lastErr = err } }