Error Handling

Learn how to implement robust error handling across all VerbalisAI SDK features for both Python and JavaScript.

Overview

Effective error handling is crucial for building reliable applications with the VerbalisAI API. This guide covers:

  • Understanding VerbalisAI error types
  • Implementing retry strategies
  • Handling network issues
  • Managing rate limits
  • Graceful degradation patterns
  • Monitoring and alerting

Error Types

API Errors

VerbalisAI returns structured error responses with specific error codes:

Error CodeHTTP StatusDescriptionRetry Strategy
INVALID_API_KEY401API key is invalid or missingDon’t retry
INSUFFICIENT_CREDITS402Not enough credits for operationDon’t retry
RATE_LIMIT_EXCEEDED429Too many requestsRetry with backoff
AUDIO_FORMAT_UNSUPPORTED400Unsupported audio formatDon’t retry
AUDIO_TOO_LARGE413Audio file exceeds size limitDon’t retry
AUDIO_TOO_SHORT400Audio file too short to processDon’t retry
TRANSCRIPTION_FAILED422Transcription processing failedRetry once
SERVICE_UNAVAILABLE503Service temporarily unavailableRetry with backoff
INTERNAL_ERROR500Internal server errorRetry with backoff

Python SDK Error Handling

Basic Error Handling

from verbalisai import VerbalisAI, VerbalisAIError
import asyncio

async def basic_error_handling():
    client = VerbalisAI()
    
    try:
        transcription = await client.transcriptions.create(
            audio_url="https://example.com/audio.mp3",
            model="mini"
        )
        
        print("Transcription successful:", transcription.text)
        
    except VerbalisAIError as e:
        print(f"VerbalisAI API Error: {e.error_code}")
        print(f"Message: {e.message}")
        print(f"HTTP Status: {e.status_code}")
        
        # Handle specific error types
        if e.error_code == "INSUFFICIENT_CREDITS":
            print("Please add credits to your account")
        elif e.error_code == "AUDIO_FORMAT_UNSUPPORTED":
            print("Please use a supported audio format (MP3, WAV, FLAC)")
        elif e.error_code == "RATE_LIMIT_EXCEEDED":
            print("Rate limit exceeded, please wait before retrying")
            
    except Exception as e:
        print(f"Unexpected error: {e}")

asyncio.run(basic_error_handling())

Advanced Error Handler Class

import asyncio
import logging
from typing import Optional, Callable, Any
from verbalisai import VerbalisAI, VerbalisAIError

class VerbalisAIErrorHandler:
    def __init__(self, 
                 max_retries: int = 3,
                 base_delay: float = 1.0,
                 max_delay: float = 60.0,
                 backoff_factor: float = 2.0,
                 logger: Optional[logging.Logger] = None):
        
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.backoff_factor = backoff_factor
        self.logger = logger or logging.getLogger(__name__)
        
        # Retryable error codes
        self.retryable_errors = {
            "RATE_LIMIT_EXCEEDED",
            "SERVICE_UNAVAILABLE", 
            "INTERNAL_ERROR",
            "TRANSCRIPTION_FAILED"
        }
        
        # Non-retryable error codes
        self.non_retryable_errors = {
            "INVALID_API_KEY",
            "INSUFFICIENT_CREDITS",
            "AUDIO_FORMAT_UNSUPPORTED",
            "AUDIO_TOO_LARGE",
            "AUDIO_TOO_SHORT"
        }
    
    async def execute_with_retry(self, 
                               operation: Callable,
                               *args,
                               on_retry: Optional[Callable] = None,
                               **kwargs) -> Any:
        """Execute operation with retry logic"""
        
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                return await operation(*args, **kwargs)
                
            except VerbalisAIError as e:
                last_exception = e
                
                self.logger.warning(f"Attempt {attempt + 1} failed: {e.error_code} - {e.message}")
                
                # Don't retry non-retryable errors
                if e.error_code in self.non_retryable_errors:
                    self.logger.error(f"Non-retryable error: {e.error_code}")
                    raise e
                
                # Don't retry if not a known retryable error and not server error
                if (e.error_code not in self.retryable_errors and 
                    e.status_code < 500):
                    self.logger.error(f"Client error, not retrying: {e.error_code}")
                    raise e
                
                # Calculate delay for next retry
                if attempt < self.max_retries:
                    delay = self.calculate_delay(attempt, e)
                    
                    self.logger.info(f"Retrying in {delay:.1f} seconds...")
                    
                    if on_retry:
                        await on_retry(attempt + 1, e, delay)
                    
                    await asyncio.sleep(delay)
                
            except Exception as e:
                # Handle non-VerbalisAI errors
                self.logger.error(f"Unexpected error: {e}")
                
                if attempt < self.max_retries:
                    delay = self.calculate_delay(attempt)
                    await asyncio.sleep(delay)
                    last_exception = e
                else:
                    raise e
        
        # All retries exhausted
        self.logger.error(f"All {self.max_retries} retries exhausted")
        raise last_exception
    
    def calculate_delay(self, attempt: int, error: Optional[VerbalisAIError] = None) -> float:
        """Calculate delay with exponential backoff"""
        
        # Use retry-after header if available (for rate limiting)
        if error and hasattr(error, 'retry_after') and error.retry_after:
            return min(error.retry_after, self.max_delay)
        
        # Exponential backoff with jitter
        delay = self.base_delay * (self.backoff_factor ** attempt)
        
        # Add jitter (random factor between 0.5 and 1.5)
        import random
        jitter = 0.5 + random.random()
        delay *= jitter
        
        return min(delay, self.max_delay)

# Usage
async def robust_transcription():
    client = VerbalisAI()
    error_handler = VerbalisAIErrorHandler(max_retries=3)
    
    async def transcribe_operation():
        return await client.transcriptions.create(
            audio_url="https://example.com/audio.mp3",
            model="mini"
        )
    
    async def on_retry_callback(attempt, error, delay):
        print(f"Retry attempt {attempt} after {delay:.1f}s due to: {error.error_code}")
    
    try:
        transcription = await error_handler.execute_with_retry(
            transcribe_operation,
            on_retry=on_retry_callback
        )
        
        print("Transcription successful:", transcription.text)
        
    except VerbalisAIError as e:
        print(f"Final error after retries: {e.error_code} - {e.message}")

asyncio.run(robust_transcription())

Circuit Breaker Pattern

import asyncio
import time
from enum import Enum
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"  
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self,
                 failure_threshold: int = 5,
                 recovery_timeout: float = 60.0,
                 expected_exception: type = VerbalisAIError):
        
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker protection"""
        
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN - service unavailable")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
            
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset"""
        return (time.time() - self.last_failure_time) >= self.recovery_timeout
    
    def _on_success(self):
        """Handle successful call"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# Usage
async def circuit_breaker_example():
    client = VerbalisAI()
    circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30.0)
    
    async def transcribe_with_circuit_breaker(audio_url):
        return await circuit_breaker.call(
            client.transcriptions.create,
            audio_url=audio_url,
            model="mini"
        )
    
    # Multiple attempts
    for i in range(10):
        try:
            result = await transcribe_with_circuit_breaker("https://example.com/audio.mp3")
            print(f"Attempt {i+1}: Success")
            
        except Exception as e:
            print(f"Attempt {i+1}: Failed - {e}")
            await asyncio.sleep(5)

asyncio.run(circuit_breaker_example())

JavaScript SDK Error Handling

Basic Error Handling

import { VerbalisAI, VerbalisAIError } from '@verbalisai/sdk';

async function basicErrorHandling() {
  const client = new VerbalisAI();
  
  try {
    const transcription = await client.transcriptions.create({
      audioUrl: 'https://example.com/audio.mp3',
      model: 'mini'
    });
    
    console.log('Transcription successful:', transcription.text);
    
  } catch (error) {
    if (error instanceof VerbalisAIError) {
      console.log(`VerbalisAI API Error: ${error.errorCode}`);
      console.log(`Message: ${error.message}`);
      console.log(`HTTP Status: ${error.statusCode}`);
      
      // Handle specific error types
      switch (error.errorCode) {
        case 'INSUFFICIENT_CREDITS':
          console.log('Please add credits to your account');
          break;
        case 'AUDIO_FORMAT_UNSUPPORTED':
          console.log('Please use a supported audio format (MP3, WAV, FLAC)');
          break;
        case 'RATE_LIMIT_EXCEEDED':
          console.log('Rate limit exceeded, please wait before retrying');
          break;
        default:
          console.log('Unknown API error');
      }
    } else {
      console.log(`Unexpected error: ${error.message}`);
    }
  }
}

basicErrorHandling();

Advanced Error Handler Class

import { VerbalisAIError } from '@verbalisai/sdk';

class VerbalisAIErrorHandler {
  constructor(options = {}) {
    this.maxRetries = options.maxRetries || 3;
    this.baseDelay = options.baseDelay || 1000;
    this.maxDelay = options.maxDelay || 60000;
    this.backoffFactor = options.backoffFactor || 2.0;
    
    this.retryableErrors = new Set([
      'RATE_LIMIT_EXCEEDED',
      'SERVICE_UNAVAILABLE',
      'INTERNAL_ERROR',
      'TRANSCRIPTION_FAILED'
    ]);
    
    this.nonRetryableErrors = new Set([
      'INVALID_API_KEY',
      'INSUFFICIENT_CREDITS',
      'AUDIO_FORMAT_UNSUPPORTED',
      'AUDIO_TOO_LARGE',
      'AUDIO_TOO_SHORT'
    ]);
  }
  
  async executeWithRetry(operation, options = {}) {
    const { onRetry, context } = options;
    let lastError = null;
    
    for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
      try {
        return await operation();
        
      } catch (error) {
        lastError = error;
        
        console.warn(`Attempt ${attempt + 1} failed: ${error.message}`);
        
        if (error instanceof VerbalisAIError) {
          // Don't retry non-retryable errors
          if (this.nonRetryableErrors.has(error.errorCode)) {
            console.error(`Non-retryable error: ${error.errorCode}`);
            throw error;
          }
          
          // Don't retry unknown client errors
          if (!this.retryableErrors.has(error.errorCode) && error.statusCode < 500) {
            console.error(`Client error, not retrying: ${error.errorCode}`);
            throw error;
          }
        }
        
        // Calculate delay for next retry
        if (attempt < this.maxRetries) {
          const delay = this.calculateDelay(attempt, error);
          
          console.info(`Retrying in ${delay}ms...`);
          
          if (onRetry) {
            await onRetry(attempt + 1, error, delay, context);
          }
          
          await this.sleep(delay);
        }
      }
    }
    
    // All retries exhausted
    console.error(`All ${this.maxRetries} retries exhausted`);
    throw lastError;
  }
  
  calculateDelay(attempt, error) {
    // Use retry-after header if available
    if (error instanceof VerbalisAIError && error.retryAfter) {
      return Math.min(error.retryAfter * 1000, this.maxDelay);
    }
    
    // Exponential backoff with jitter
    let delay = this.baseDelay * Math.pow(this.backoffFactor, attempt);
    
    // Add jitter
    const jitter = 0.5 + Math.random();
    delay *= jitter;
    
    return Math.min(delay, this.maxDelay);
  }
  
  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// Usage
async function robustTranscription() {
  const client = new VerbalisAI();
  const errorHandler = new VerbalisAIErrorHandler({ maxRetries: 3 });
  
  const transcribeOperation = () => client.transcriptions.create({
    audioUrl: 'https://example.com/audio.mp3',
    model: 'mini'
  });
  
  const onRetryCallback = (attempt, error, delay, context) => {
    console.log(`Retry attempt ${attempt} after ${delay}ms due to: ${error.errorCode || error.message}`);
  };
  
  try {
    const transcription = await errorHandler.executeWithRetry(
      transcribeOperation,
      { onRetry: onRetryCallback }
    );
    
    console.log('Transcription successful:', transcription.text);
    
  } catch (error) {
    console.error('Final error after retries:', error.message);
  }
}

robustTranscription();

Promise-based Circuit Breaker

class CircuitBreaker {
  constructor(options = {}) {
    this.failureThreshold = options.failureThreshold || 5;
    this.recoveryTimeout = options.recoveryTimeout || 60000;
    this.monitoringPeriod = options.monitoringPeriod || 10000;
    
    this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
    this.failureCount = 0;
    this.lastFailureTime = null;
    this.successCount = 0;
  }
  
  async execute(operation) {
    if (this.state === 'OPEN') {
      if (this.shouldAttemptReset()) {
        this.state = 'HALF_OPEN';
        this.successCount = 0;
      } else {
        throw new Error('Circuit breaker is OPEN - service unavailable');
      }
    }
    
    try {
      const result = await operation();
      this.onSuccess();
      return result;
      
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }
  
  shouldAttemptReset() {
    return Date.now() - this.lastFailureTime >= this.recoveryTimeout;
  }
  
  onSuccess() {
    if (this.state === 'HALF_OPEN') {
      this.successCount++;
      if (this.successCount >= 3) { // Require 3 successes to close
        this.state = 'CLOSED';
        this.failureCount = 0;
      }
    } else {
      this.failureCount = 0;
      this.state = 'CLOSED';
    }
  }
  
  onFailure() {
    this.failureCount++;
    this.lastFailureTime = Date.now();
    
    if (this.failureCount >= this.failureThreshold) {
      this.state = 'OPEN';
    }
  }
  
  getStatus() {
    return {
      state: this.state,
      failureCount: this.failureCount,
      successCount: this.successCount,
      lastFailureTime: this.lastFailureTime
    };
  }
}

// Usage
const circuitBreaker = new CircuitBreaker({
  failureThreshold: 3,
  recoveryTimeout: 30000
});

async function transcribeWithCircuitBreaker(audioUrl) {
  const client = new VerbalisAI();
  
  return await circuitBreaker.execute(() => 
    client.transcriptions.create({
      audioUrl: audioUrl,
      model: 'mini'
    })
  );
}

// Test circuit breaker
async function testCircuitBreaker() {
  for (let i = 0; i < 10; i++) {
    try {
      const result = await transcribeWithCircuitBreaker('https://example.com/audio.mp3');
      console.log(`Attempt ${i + 1}: Success`);
      
    } catch (error) {
      console.log(`Attempt ${i + 1}: Failed - ${error.message}`);
      console.log('Circuit Breaker Status:', circuitBreaker.getStatus());
    }
    
    await new Promise(resolve => setTimeout(resolve, 5000));
  }
}

testCircuitBreaker();

Rate Limiting

Rate Limit Handler

class RateLimitHandler {
  constructor(options = {}) {
    this.requestsPerSecond = options.requestsPerSecond || 10;
    this.burstSize = options.burstSize || 20;
    this.tokens = this.burstSize;
    this.lastRefill = Date.now();
    this.queue = [];
  }
  
  async execute(operation) {
    return new Promise((resolve, reject) => {
      this.queue.push({ operation, resolve, reject });
      this.processQueue();
    });
  }
  
  async processQueue() {
    if (this.queue.length === 0) return;
    
    this.refillTokens();
    
    if (this.tokens > 0) {
      this.tokens--;
      const { operation, resolve, reject } = this.queue.shift();
      
      try {
        const result = await operation();
        resolve(result);
      } catch (error) {
        reject(error);
      }
      
      // Process next item if available
      if (this.queue.length > 0) {
        setTimeout(() => this.processQueue(), 0);
      }
    } else {
      // Wait for token refill
      const waitTime = 1000 / this.requestsPerSecond;
      setTimeout(() => this.processQueue(), waitTime);
    }
  }
  
  refillTokens() {
    const now = Date.now();
    const timePassed = now - this.lastRefill;
    const tokensToAdd = Math.floor(timePassed / (1000 / this.requestsPerSecond));
    
    if (tokensToAdd > 0) {
      this.tokens = Math.min(this.burstSize, this.tokens + tokensToAdd);
      this.lastRefill = now;
    }
  }
  
  getStatus() {
    this.refillTokens();
    return {
      tokens: this.tokens,
      queueLength: this.queue.length,
      burstSize: this.burstSize
    };
  }
}

// Usage
const rateLimiter = new RateLimitHandler({
  requestsPerSecond: 5,
  burstSize: 10
});

async function rateLimitedTranscription(audioUrl) {
  const client = new VerbalisAI();
  
  return await rateLimiter.execute(() =>
    client.transcriptions.create({
      audioUrl: audioUrl,
      model: 'mini'
    })
  );
}

// Test rate limiting
async function testRateLimit() {
  const audioUrls = Array.from({ length: 20 }, (_, i) => 
    `https://example.com/audio${i}.mp3`
  );
  
  const promises = audioUrls.map(url => 
    rateLimitedTranscription(url).catch(error => ({
      error: error.message,
      url: url
    }))
  );
  
  const results = await Promise.all(promises);
  console.log('Rate limited results:', results);
  console.log('Rate limiter status:', rateLimiter.getStatus());
}

testRateLimit();

Error Monitoring & Alerting

Error Metrics Collection

import asyncio
import time
from collections import defaultdict, deque
from typing import Dict, List
import json

class ErrorMetricsCollector:
    def __init__(self, window_size: int = 3600):  # 1 hour window
        self.window_size = window_size
        self.error_counts = defaultdict(int)
        self.error_timestamps = defaultdict(deque)
        self.response_times = deque()
        self.success_count = 0
        self.total_requests = 0
    
    def record_success(self, response_time: float):
        """Record successful request"""
        self.success_count += 1
        self.total_requests += 1
        self.response_times.append((time.time(), response_time))
        self._cleanup_old_data()
    
    def record_error(self, error_code: str, response_time: float = None):
        """Record error occurrence"""
        timestamp = time.time()
        self.error_counts[error_code] += 1
        self.error_timestamps[error_code].append(timestamp)
        self.total_requests += 1
        
        if response_time:
            self.response_times.append((timestamp, response_time))
        
        self._cleanup_old_data()
    
    def _cleanup_old_data(self):
        """Remove data outside the time window"""
        cutoff_time = time.time() - self.window_size
        
        # Clean up response times
        while self.response_times and self.response_times[0][0] < cutoff_time:
            self.response_times.popleft()
        
        # Clean up error timestamps
        for error_code in list(self.error_timestamps.keys()):
            timestamps = self.error_timestamps[error_code]
            while timestamps and timestamps[0] < cutoff_time:
                timestamps.popleft()
                self.error_counts[error_code] -= 1
            
            if not timestamps:
                del self.error_timestamps[error_code]
                del self.error_counts[error_code]
    
    def get_metrics(self) -> Dict:
        """Get current metrics"""
        self._cleanup_old_data()
        
        total_errors = sum(self.error_counts.values())
        error_rate = total_errors / max(self.total_requests, 1)
        
        avg_response_time = 0
        if self.response_times:
            avg_response_time = sum(rt for _, rt in self.response_times) / len(self.response_times)
        
        return {
            'total_requests': self.total_requests,
            'success_count': self.success_count,
            'error_count': total_errors,
            'error_rate': error_rate,
            'avg_response_time': avg_response_time,
            'error_breakdown': dict(self.error_counts),
            'timestamp': time.time()
        }
    
    def should_alert(self, thresholds: Dict) -> List[str]:
        """Check if any alert thresholds are exceeded"""
        metrics = self.get_metrics()
        alerts = []
        
        if metrics['error_rate'] > thresholds.get('error_rate', 0.1):
            alerts.append(f"High error rate: {metrics['error_rate']:.2%}")
        
        if metrics['avg_response_time'] > thresholds.get('response_time', 5.0):
            alerts.append(f"High response time: {metrics['avg_response_time']:.2f}s")
        
        # Check specific error types
        for error_code, count in metrics['error_breakdown'].items():
            threshold_key = f'error_{error_code.lower()}'
            if count > thresholds.get(threshold_key, 10):
                alerts.append(f"High {error_code} count: {count}")
        
        return alerts

# Usage with VerbalisAI client
metrics_collector = ErrorMetricsCollector()

async def monitored_transcription(client, audio_url):
    """Transcription with error monitoring"""
    start_time = time.time()
    
    try:
        result = await client.transcriptions.create(
            audio_url=audio_url,
            model="mini"
        )
        
        response_time = time.time() - start_time
        metrics_collector.record_success(response_time)
        
        return result
        
    except VerbalisAIError as e:
        response_time = time.time() - start_time
        metrics_collector.record_error(e.error_code, response_time)
        raise e

# Alert checking
async def check_alerts():
    """Periodically check for alert conditions"""
    thresholds = {
        'error_rate': 0.05,  # 5% error rate
        'response_time': 10.0,  # 10 seconds
        'error_rate_limit_exceeded': 5,  # Max 5 rate limit errors
        'error_service_unavailable': 3   # Max 3 service unavailable errors
    }
    
    while True:
        alerts = metrics_collector.should_alert(thresholds)
        
        if alerts:
            print("🚨 ALERTS:")
            for alert in alerts:
                print(f"  - {alert}")
            
            # Send alerts (email, Slack, etc.)
            await send_alert_notifications(alerts)
        
        # Print current metrics
        metrics = metrics_collector.get_metrics()
        print(f"📊 Metrics: {json.dumps(metrics, indent=2)}")
        
        await asyncio.sleep(60)  # Check every minute

async def send_alert_notifications(alerts: List[str]):
    """Send alert notifications (implement based on your needs)"""
    print(f"Sending {len(alerts)} alerts to notification channels...")
    # Implement email, Slack, webhook notifications here

Best Practices

Error Context & Logging

import logging
import traceback
from contextlib import contextmanager

class TranscriptionContext:
    def __init__(self, audio_url: str, user_id: str = None):
        self.audio_url = audio_url
        self.user_id = user_id
        self.start_time = time.time()
        self.metadata = {}
    
    def add_metadata(self, key: str, value: any):
        self.metadata[key] = value
    
    def get_context_info(self) -> Dict:
        return {
            'audio_url': self.audio_url,
            'user_id': self.user_id,
            'duration': time.time() - self.start_time,
            'metadata': self.metadata
        }

@contextmanager
def transcription_context(audio_url: str, user_id: str = None):
    """Context manager for transcription operations"""
    context = TranscriptionContext(audio_url, user_id)
    logger = logging.getLogger(__name__)
    
    try:
        logger.info(f"Starting transcription", extra=context.get_context_info())
        yield context
        logger.info(f"Transcription completed", extra=context.get_context_info())
        
    except VerbalisAIError as e:
        context.add_metadata('error_code', e.error_code)
        context.add_metadata('error_message', e.message)
        logger.error(f"Transcription failed: {e.error_code}", extra=context.get_context_info())
        raise
        
    except Exception as e:
        context.add_metadata('error_type', type(e).__name__)
        context.add_metadata('error_message', str(e))
        context.add_metadata('traceback', traceback.format_exc())
        logger.error(f"Unexpected error: {e}", extra=context.get_context_info())
        raise

# Usage
async def contextual_transcription():
    client = VerbalisAI()
    
    with transcription_context("https://example.com/audio.mp3", user_id="user123") as ctx:
        ctx.add_metadata('model', 'pro')
        ctx.add_metadata('language', 'en')
        
        result = await client.transcriptions.create(
            audio_url=ctx.audio_url,
            model="pro",
            language="en"
        )
        
        ctx.add_metadata('result_length', len(result.text))
        return result

Graceful Degradation

class GracefulDegradationHandler {
  constructor(client) {
    this.client = client;
    this.fallbackStrategies = {
      'RATE_LIMIT_EXCEEDED': this.queueForLater.bind(this),
      'SERVICE_UNAVAILABLE': this.useLocalCache.bind(this),
      'AUDIO_FORMAT_UNSUPPORTED': this.convertAudioFormat.bind(this),
      'INSUFFICIENT_CREDITS': this.notifyUserAndFail.bind(this)
    };
  }
  
  async transcribeWithFallback(audioUrl, options = {}) {
    try {
      return await this.client.transcriptions.create({
        audioUrl,
        ...options
      });
      
    } catch (error) {
      if (error instanceof VerbalisAIError) {
        const fallbackStrategy = this.fallbackStrategies[error.errorCode];
        
        if (fallbackStrategy) {
          console.log(`Using fallback strategy for: ${error.errorCode}`);
          return await fallbackStrategy(audioUrl, options, error);
        }
      }
      
      throw error;
    }
  }
  
  async queueForLater(audioUrl, options, error) {
    // Queue the request for later processing
    console.log('Queueing transcription for later due to rate limit');
    
    return {
      status: 'queued',
      message: 'Transcription queued due to rate limiting',
      retryAfter: error.retryAfter || 60,
      audioUrl: audioUrl
    };
  }
  
  async useLocalCache(audioUrl, options, error) {
    // Check if we have a cached result
    const cachedResult = await this.getCachedTranscription(audioUrl);
    
    if (cachedResult) {
      console.log('Using cached transcription due to service unavailability');
      return {
        ...cachedResult,
        fromCache: true,
        warning: 'Service temporarily unavailable, using cached result'
      };
    }
    
    throw error; // No cache available
  }
  
  async convertAudioFormat(audioUrl, options, error) {
    // Attempt to convert audio format
    console.log('Attempting audio format conversion');
    
    try {
      const convertedUrl = await this.convertAudio(audioUrl);
      return await this.client.transcriptions.create({
        audioUrl: convertedUrl,
        ...options
      });
    } catch (conversionError) {
      throw error; // Conversion failed, throw original error
    }
  }
  
  async notifyUserAndFail(audioUrl, options, error) {
    // Notify user about insufficient credits
    await this.notifyUser({
      type: 'insufficient_credits',
      message: 'Please add credits to your account to continue',
      audioUrl: audioUrl
    });
    
    throw error;
  }
  
  async getCachedTranscription(audioUrl) {
    // Implement caching logic
    return null;
  }
  
  async convertAudio(audioUrl) {
    // Implement audio conversion logic
    throw new Error('Audio conversion not implemented');
  }
  
  async notifyUser(notification) {
    console.log('User notification:', notification);
    // Implement user notification logic
  }
}

// Usage
const client = new VerbalisAI();
const gracefulHandler = new GracefulDegradationHandler(client);

async function robustTranscription(audioUrl) {
  try {
    const result = await gracefulHandler.transcribeWithFallback(audioUrl, {
      model: 'mini',
      topics: true
    });
    
    if (result.fromCache) {
      console.log('⚠️ Using cached result due to service issues');
    } else if (result.status === 'queued') {
      console.log('📋 Request queued for later processing');
    } else {
      console.log('✅ Transcription completed successfully');
    }
    
    return result;
    
  } catch (error) {
    console.error('❌ All fallback strategies failed:', error.message);
    throw error;
  }
}

Summary

Effective error handling with VerbalisAI SDKs involves:

  1. Understanding error types and implementing appropriate retry strategies
  2. Using exponential backoff with jitter for retryable errors
  3. Implementing circuit breakers to prevent cascading failures
  4. Managing rate limits with token buckets or queuing
  5. Monitoring errors and setting up alerting thresholds
  6. Providing context in error logs for debugging
  7. Graceful degradation with fallback strategies
  8. Testing error scenarios to ensure robustness

By following these patterns, you can build resilient applications that handle API failures gracefully and provide a good user experience even when things go wrong.

This completes the comprehensive SDK documentation. You now have all the tools needed to build robust, production-ready applications with the VerbalisAI API.