Streaming

Learn how to implement real-time streaming transcription for live audio processing with both Python and JavaScript SDKs.

Overview

Streaming transcription allows you to process audio in real-time as it’s being recorded or received, providing immediate results without waiting for the entire audio to be processed. This is ideal for:

  • Live meetings and conferences
  • Phone call transcription
  • Real-time captions
  • Voice assistants
  • Interactive applications

Streaming Modes

VerbalisAI supports several streaming modes:

ModeDescriptionUse Case
Real-timeProcess audio chunks as they arriveLive transcription
Near real-timeSmall buffering for better accuracyLive captions with slight delay
ProgressiveProcess long files in chunksLarge file processing
ContinuousOngoing transcription sessionExtended conversations

Python SDK Streaming

Basic Real-time Streaming

import asyncio
import pyaudio
from verbalisai import VerbalisAI

class RealTimeTranscriber:
    def __init__(self, api_key=None):
        self.client = VerbalisAI(api_key=api_key)
        self.audio_format = pyaudio.paInt16
        self.channels = 1
        self.rate = 16000
        self.chunk_size = 1024
        self.audio = pyaudio.PyAudio()
        
    async def start_streaming(self, on_transcript=None, on_partial=None):
        """Start real-time transcription from microphone"""
        
        # Initialize streaming session
        stream_session = await self.client.streaming.create_session({
            'sample_rate': self.rate,
            'encoding': 'LINEAR16',
            'language': 'auto',
            'interim_results': True,
            'single_utterance': False,
            'model': 'mini'
        })
        
        # Set up audio stream
        audio_stream = self.audio.open(
            format=self.audio_format,
            channels=self.channels,
            rate=self.rate,
            input=True,
            frames_per_buffer=self.chunk_size
        )
        
        print("Starting real-time transcription... (Press Ctrl+C to stop)")
        
        try:
            async for result in stream_session.stream():
                # Read audio chunk
                audio_chunk = audio_stream.read(self.chunk_size, exception_on_overflow=False)
                
                # Send audio to streaming session
                await stream_session.send_audio(audio_chunk)
                
                # Process transcription results
                if result.is_final:
                    print(f"Final: {result.transcript}")
                    if on_transcript:
                        await on_transcript(result.transcript)
                else:
                    print(f"Partial: {result.transcript}")
                    if on_partial:
                        await on_partial(result.transcript)
        
        except KeyboardInterrupt:
            print("\nStopping transcription...")
        
        finally:
            audio_stream.stop_stream()
            audio_stream.close()
            await stream_session.close()
    
    def __del__(self):
        if hasattr(self, 'audio'):
            self.audio.terminate()

# Usage
async def main():
    transcriber = RealTimeTranscriber()
    
    async def handle_transcript(text):
        print(f"FINAL TRANSCRIPT: {text}")
        # Save to database, send to API, etc.
    
    async def handle_partial(text):
        print(f"PARTIAL: {text}")
        # Update UI in real-time
    
    await transcriber.start_streaming(
        on_transcript=handle_transcript,
        on_partial=handle_partial
    )

if __name__ == "__main__":
    asyncio.run(main())

Progressive Streaming for Large Files

import asyncio
import aiofiles
from verbalisai import VerbalisAI

class ProgressiveTranscriber:
    def __init__(self, api_key=None):
        self.client = VerbalisAI(api_key=api_key)
        self.chunk_size = 5 * 1024 * 1024  # 5MB chunks
    
    async def transcribe_large_file(self, file_path, on_progress=None, on_segment=None):
        """Transcribe large file progressively"""
        
        # Create progressive streaming session
        stream_session = await self.client.streaming.create_progressive_session({
            'model': 'pro',
            'language': 'auto',
            'diarize': True,
            'topics': True,
            'return_segments': True
        })
        
        file_size = os.path.getsize(file_path)
        bytes_processed = 0
        
        try:
            async with aiofiles.open(file_path, 'rb') as f:
                while True:
                    chunk = await f.read(self.chunk_size)
                    if not chunk:
                        break
                    
                    # Send chunk to processing
                    result = await stream_session.send_chunk(chunk)
                    
                    bytes_processed += len(chunk)
                    progress = (bytes_processed / file_size) * 100
                    
                    if on_progress:
                        await on_progress(progress)
                    
                    # Process any available segments
                    if result.segments:
                        for segment in result.segments:
                            if on_segment:
                                await on_segment(segment)
                            print(f"[{segment.start:.1f}s]: {segment.text}")
            
            # Finalize and get complete results
            final_result = await stream_session.finalize()
            
            return {
                'text': final_result.text,
                'topics': final_result.topics,
                'segments': final_result.segments,
                'duration': final_result.duration
            }
        
        finally:
            await stream_session.close()

# Usage
async def process_large_file():
    transcriber = ProgressiveTranscriber()
    
    async def progress_callback(progress):
        print(f"Progress: {progress:.1f}%")
    
    async def segment_callback(segment):
        print(f"New segment: [{segment.start:.1f}s] {segment.text}")
        # Process segment immediately (save to DB, update UI, etc.)
    
    result = await transcriber.transcribe_large_file(
        'large_audio_file.mp3',
        on_progress=progress_callback,
        on_segment=segment_callback
    )
    
    print("Final result:", result)

asyncio.run(process_large_file())

Live Meeting Transcription

import asyncio
import websockets
from verbalisai import VerbalisAI

class LiveMeetingTranscriber:
    def __init__(self, api_key=None):
        self.client = VerbalisAI(api_key=api_key)
        self.active_sessions = {}
    
    async def start_meeting_session(self, meeting_id, participants):
        """Start transcription for a live meeting"""
        
        session = await self.client.streaming.create_meeting_session({
            'meeting_id': meeting_id,
            'participants': participants,
            'model': 'pro',
            'diarize': True,
            'real_time_captions': True,
            'language': 'auto',
            'format': 'enhanced'
        })
        
        self.active_sessions[meeting_id] = session
        
        # Start processing loop
        asyncio.create_task(self._process_meeting_audio(meeting_id, session))
        
        return session
    
    async def _process_meeting_audio(self, meeting_id, session):
        """Process audio stream for meeting"""
        
        async for result in session.stream():
            # Broadcast real-time captions
            await self.broadcast_caption(meeting_id, {
                'speaker': result.speaker_id,
                'text': result.transcript,
                'timestamp': result.timestamp,
                'is_final': result.is_final
            })
            
            # Store final transcripts
            if result.is_final:
                await self.store_transcript_segment(meeting_id, result)
    
    async def broadcast_caption(self, meeting_id, caption):
        """Send captions to all meeting participants"""
        
        # Send via WebSocket to meeting participants
        message = {
            'type': 'live_caption',
            'meeting_id': meeting_id,
            'caption': caption
        }
        
        # Broadcast to all connected clients
        await self.websocket_broadcast(meeting_id, message)
    
    async def send_audio_chunk(self, meeting_id, audio_data, speaker_id=None):
        """Send audio chunk from meeting participant"""
        
        if meeting_id not in self.active_sessions:
            raise ValueError(f"No active session for meeting {meeting_id}")
        
        session = self.active_sessions[meeting_id]
        await session.send_audio(audio_data, speaker_id=speaker_id)
    
    async def end_meeting_session(self, meeting_id):
        """End meeting transcription and get final results"""
        
        if meeting_id not in self.active_sessions:
            return None
        
        session = self.active_sessions[meeting_id]
        final_result = await session.finalize()
        
        del self.active_sessions[meeting_id]
        
        return {
            'meeting_id': meeting_id,
            'full_transcript': final_result.text,
            'speakers': final_result.speakers,
            'topics': final_result.topics,
            'summary': final_result.summary,
            'duration': final_result.duration,
            'segments': final_result.segments
        }

# Usage
async def run_meeting_transcription():
    transcriber = LiveMeetingTranscriber()
    
    # Start meeting
    participants = ['John Doe', 'Jane Smith', 'Bob Johnson']
    session = await transcriber.start_meeting_session('meeting_123', participants)
    
    # Simulate receiving audio chunks (in real app, this would come from audio stream)
    # await transcriber.send_audio_chunk('meeting_123', audio_chunk, 'john_doe')
    
    # End meeting after some time
    await asyncio.sleep(60)  # Meeting duration
    final_result = await transcriber.end_meeting_session('meeting_123')
    
    print("Meeting completed:", final_result)

asyncio.run(run_meeting_transcription())

JavaScript SDK Streaming

Browser Real-time Transcription

import { VerbalisAI } from '@verbalisai/sdk';

class BrowserRealTimeTranscriber {
  constructor(apiKey) {
    this.client = new VerbalisAI({ apiKey });
    this.mediaRecorder = null;
    this.audioStream = null;
    this.streamSession = null;
  }
  
  async startMicrophoneTranscription(options = {}) {
    try {
      // Get microphone access
      this.audioStream = await navigator.mediaDevices.getUserMedia({
        audio: {
          sampleRate: 16000,
          channelCount: 1,
          echoCancellation: true,
          noiseSuppression: true
        }
      });
      
      // Create streaming session
      this.streamSession = await this.client.streaming.createSession({
        sampleRate: 16000,
        encoding: 'WEBM_OPUS',
        language: 'auto',
        interimResults: true,
        model: 'mini',
        ...options
      });
      
      // Set up MediaRecorder
      this.mediaRecorder = new MediaRecorder(this.audioStream, {
        mimeType: 'audio/webm;codecs=opus',
        audioBitsPerSecond: 16000
      });
      
      // Handle audio data
      this.mediaRecorder.ondataavailable = async (event) => {
        if (event.data.size > 0) {
          const audioBuffer = await event.data.arrayBuffer();
          await this.streamSession.sendAudio(audioBuffer);
        }
      };
      
      // Start recording in small chunks
      this.mediaRecorder.start(250); // 250ms chunks
      
      // Listen for transcription results
      this.streamSession.onResult((result) => {
        if (result.isFinal) {
          this.onFinalTranscript(result.transcript);
        } else {
          this.onPartialTranscript(result.transcript);
        }
      });
      
      console.log('Real-time transcription started');
      
    } catch (error) {
      console.error('Failed to start transcription:', error);
      throw error;
    }
  }
  
  async stopTranscription() {
    if (this.mediaRecorder && this.mediaRecorder.state !== 'inactive') {
      this.mediaRecorder.stop();
    }
    
    if (this.audioStream) {
      this.audioStream.getTracks().forEach(track => track.stop());
    }
    
    if (this.streamSession) {
      await this.streamSession.close();
    }
    
    console.log('Transcription stopped');
  }
  
  onFinalTranscript(text) {
    console.log('Final:', text);
    // Override this method to handle final transcripts
  }
  
  onPartialTranscript(text) {
    console.log('Partial:', text);
    // Override this method to handle partial transcripts
  }
}

// Usage
const transcriber = new BrowserRealTimeTranscriber('your-api-key');

// Custom handlers
transcriber.onFinalTranscript = (text) => {
  document.getElementById('final-transcript').textContent += text + ' ';
};

transcriber.onPartialTranscript = (text) => {
  document.getElementById('partial-transcript').textContent = text;
};

// Start transcription
document.getElementById('start-btn').onclick = () => {
  transcriber.startMicrophoneTranscription({
    language: 'en',
    topics: true
  });
};

// Stop transcription
document.getElementById('stop-btn').onclick = () => {
  transcriber.stopTranscription();
};

Node.js Streaming Server

import { VerbalisAI } from '@verbalisai/sdk';
import WebSocket from 'ws';
import express from 'express';

class StreamingTranscriptionServer {
  constructor(port = 8080) {
    this.app = express();
    this.server = null;
    this.wss = null;
    this.port = port;
    this.client = new VerbalisAI();
    this.activeSessions = new Map();
  }
  
  start() {
    // HTTP server for health checks
    this.app.get('/health', (req, res) => {
      res.json({ status: 'healthy' });
    });
    
    this.server = this.app.listen(this.port, () => {
      console.log(`Streaming server listening on port ${this.port}`);
    });
    
    // WebSocket server for real-time communication
    this.wss = new WebSocket.Server({ server: this.server });
    
    this.wss.on('connection', (ws, req) => {
      console.log('Client connected');
      this.handleConnection(ws, req);
    });
  }
  
  async handleConnection(ws, req) {
    const sessionId = this.generateSessionId();
    let streamSession = null;
    
    ws.on('message', async (message) => {
      try {
        const data = JSON.parse(message);
        
        switch (data.type) {
          case 'start_transcription':
            streamSession = await this.startTranscription(sessionId, data.config);
            this.activeSessions.set(sessionId, { ws, streamSession });
            
            ws.send(JSON.stringify({
              type: 'transcription_started',
              sessionId: sessionId
            }));
            break;
            
          case 'audio_data':
            if (streamSession) {
              const audioBuffer = Buffer.from(data.audio, 'base64');
              await streamSession.sendAudio(audioBuffer);
            }
            break;
            
          case 'stop_transcription':
            if (streamSession) {
              const finalResult = await streamSession.finalize();
              ws.send(JSON.stringify({
                type: 'transcription_complete',
                result: finalResult
              }));
              await streamSession.close();
            }
            break;
        }
      } catch (error) {
        ws.send(JSON.stringify({
          type: 'error',
          message: error.message
        }));
      }
    });
    
    ws.on('close', async () => {
      console.log('Client disconnected');
      if (this.activeSessions.has(sessionId)) {
        const { streamSession } = this.activeSessions.get(sessionId);
        if (streamSession) {
          await streamSession.close();
        }
        this.activeSessions.delete(sessionId);
      }
    });
  }
  
  async startTranscription(sessionId, config) {
    const streamSession = await this.client.streaming.createSession({
      sampleRate: config.sampleRate || 16000,
      encoding: config.encoding || 'LINEAR16',
      language: config.language || 'auto',
      model: config.model || 'mini',
      interimResults: true
    });
    
    // Handle transcription results
    streamSession.onResult((result) => {
      const session = this.activeSessions.get(sessionId);
      if (session) {
        session.ws.send(JSON.stringify({
          type: 'transcription_result',
          result: {
            transcript: result.transcript,
            isFinal: result.isFinal,
            confidence: result.confidence,
            timestamp: result.timestamp
          }
        }));
      }
    });
    
    return streamSession;
  }
  
  generateSessionId() {
    return Math.random().toString(36).substring(2, 15);
  }
  
  stop() {
    if (this.wss) {
      this.wss.close();
    }
    if (this.server) {
      this.server.close();
    }
  }
}

// Usage
const server = new StreamingTranscriptionServer(8080);
server.start();

// Graceful shutdown
process.on('SIGINT', () => {
  console.log('Shutting down server...');
  server.stop();
  process.exit(0);
});

React Component for Live Transcription

import React, { useState, useRef, useEffect } from 'react';
import { VerbalisAI } from '@verbalisai/sdk';

const LiveTranscriptionComponent = ({ apiKey }) => {
  const [isRecording, setIsRecording] = useState(false);
  const [finalTranscript, setFinalTranscript] = useState('');
  const [partialTranscript, setPartialTranscript] = useState('');
  const [error, setError] = useState(null);
  
  const mediaRecorderRef = useRef(null);
  const streamSessionRef = useRef(null);
  const audioStreamRef = useRef(null);
  const clientRef = useRef(new VerbalisAI({ apiKey }));
  
  const startRecording = async () => {
    try {
      setError(null);
      
      // Get microphone access
      audioStreamRef.current = await navigator.mediaDevices.getUserMedia({
        audio: {
          sampleRate: 16000,
          channelCount: 1,
          echoCancellation: true,
          noiseSuppression: true
        }
      });
      
      // Create streaming session
      streamSessionRef.current = await clientRef.current.streaming.createSession({
        sampleRate: 16000,
        encoding: 'WEBM_OPUS',
        language: 'auto',
        interimResults: true,
        model: 'mini'
      });
      
      // Set up MediaRecorder
      mediaRecorderRef.current = new MediaRecorder(audioStreamRef.current, {
        mimeType: 'audio/webm;codecs=opus'
      });
      
      // Handle audio data
      mediaRecorderRef.current.ondataavailable = async (event) => {
        if (event.data.size > 0 && streamSessionRef.current) {
          const audioBuffer = await event.data.arrayBuffer();
          await streamSessionRef.current.sendAudio(audioBuffer);
        }
      };
      
      // Handle transcription results
      streamSessionRef.current.onResult((result) => {
        if (result.isFinal) {
          setFinalTranscript(prev => prev + result.transcript + ' ');
          setPartialTranscript('');
        } else {
          setPartialTranscript(result.transcript);
        }
      });
      
      // Start recording
      mediaRecorderRef.current.start(250); // 250ms chunks
      setIsRecording(true);
      
    } catch (err) {
      setError(`Failed to start recording: ${err.message}`);
    }
  };
  
  const stopRecording = async () => {
    if (mediaRecorderRef.current && mediaRecorderRef.current.state !== 'inactive') {
      mediaRecorderRef.current.stop();
    }
    
    if (audioStreamRef.current) {
      audioStreamRef.current.getTracks().forEach(track => track.stop());
    }
    
    if (streamSessionRef.current) {
      await streamSessionRef.current.close();
    }
    
    setIsRecording(false);
    setPartialTranscript('');
  };
  
  useEffect(() => {
    // Cleanup on unmount
    return () => {
      if (isRecording) {
        stopRecording();
      }
    };
  }, []);
  
  return (
    <div className="live-transcription">
      <div className="controls">
        {!isRecording ? (
          <button 
            onClick={startRecording}
            className="start-btn"
            disabled={!apiKey}
          >
            🎤 Start Recording
          </button>
        ) : (
          <button 
            onClick={stopRecording}
            className="stop-btn"
          >
            ⏹️ Stop Recording
          </button>
        )}
      </div>
      
      {error && (
        <div className="error">
{error}
        </div>
      )}
      
      {isRecording && (
        <div className="recording-indicator">
          🔴 Recording...
        </div>
      )}
      
      <div className="transcription-display">
        <div className="final-transcript">
          <h3>Final Transcript:</h3>
          <p>{finalTranscript}</p>
        </div>
        
        {partialTranscript && (
          <div className="partial-transcript">
            <h3>Partial (Live):</h3>
            <p className="partial-text">{partialTranscript}</p>
          </div>
        )}
      </div>
    </div>
  );
};

export default LiveTranscriptionComponent;

Advanced Streaming Features

Multi-speaker Stream Processing

import asyncio
from verbalisai import VerbalisAI

class MultiSpeakerStreamProcessor:
    def __init__(self, api_key=None):
        self.client = VerbalisAI(api_key=api_key)
        self.speaker_sessions = {}
    
    async def create_speaker_session(self, speaker_id, speaker_config):
        """Create a dedicated session for each speaker"""
        
        session = await self.client.streaming.create_session({
            'speaker_id': speaker_id,
            'model': 'pro',
            'diarize': False,  # We're handling speakers separately
            'language': speaker_config.get('language', 'auto'),
            'accent': speaker_config.get('accent'),
            'vocabulary': speaker_config.get('custom_vocab', [])
        })
        
        self.speaker_sessions[speaker_id] = {
            'session': session,
            'config': speaker_config
        }
        
        # Set up result handler for this speaker
        session.onResult(lambda result: self.handle_speaker_result(speaker_id, result))
        
        return session
    
    async def send_speaker_audio(self, speaker_id, audio_data):
        """Send audio data for specific speaker"""
        
        if speaker_id not in self.speaker_sessions:
            raise ValueError(f"No session for speaker {speaker_id}")
        
        session = self.speaker_sessions[speaker_id]['session']
        await session.sendAudio(audio_data)
    
    def handle_speaker_result(self, speaker_id, result):
        """Handle transcription result for specific speaker"""
        
        if result.isFinal:
            print(f"{speaker_id}: {result.transcript}")
            # Store final transcript with speaker attribution
            self.store_speaker_transcript(speaker_id, result.transcript)
    
    async def close_all_sessions(self):
        """Close all speaker sessions"""
        
        for speaker_id, session_info in self.speaker_sessions.items():
            await session_info['session'].close()
        
        self.speaker_sessions.clear()

# Usage
async def multi_speaker_example():
    processor = MultiSpeakerStreamProcessor()
    
    # Set up speakers
    speakers = {
        'john': {'language': 'en', 'accent': 'us'},
        'maria': {'language': 'es', 'accent': 'mx'},
        'pierre': {'language': 'fr', 'accent': 'fr'}
    }
    
    # Create sessions for each speaker
    for speaker_id, config in speakers.items():
        await processor.create_speaker_session(speaker_id, config)
    
    # Simulate sending audio for different speakers
    # In real application, you'd have speaker identification/separation
    # await processor.send_speaker_audio('john', john_audio_data)
    # await processor.send_speaker_audio('maria', maria_audio_data)
    
    # Clean up
    await processor.close_all_sessions()

asyncio.run(multi_speaker_example())

Stream Quality Monitoring

class StreamQualityMonitor {
  constructor(streamSession) {
    this.session = streamSession;
    this.metrics = {
      audioLevel: 0,
      noiseLevel: 0,
      speechRate: 0,
      confidence: 0,
      latency: 0
    };
    
    this.startMonitoring();
  }
  
  startMonitoring() {
    // Monitor audio quality
    this.session.onAudioAnalysis((analysis) => {
      this.metrics.audioLevel = analysis.volumeLevel;
      this.metrics.noiseLevel = analysis.noiseLevel;
      
      // Provide quality feedback
      if (analysis.volumeLevel < 0.1) {
        this.onQualityIssue('low_volume', 'Microphone volume is too low');
      }
      
      if (analysis.noiseLevel > 0.7) {
        this.onQualityIssue('high_noise', 'Background noise is too high');
      }
    });
    
    // Monitor transcription quality
    this.session.onResult((result) => {
      this.metrics.confidence = result.confidence;
      this.metrics.latency = result.processingTime;
      
      if (result.confidence < 0.5) {
        this.onQualityIssue('low_confidence', 'Transcription confidence is low');
      }
    });
    
    // Monitor speech rate
    this.session.onSpeechAnalysis((analysis) => {
      this.metrics.speechRate = analysis.wordsPerMinute;
      
      if (analysis.wordsPerMinute > 200) {
        this.onQualityIssue('fast_speech', 'Speech rate is very fast');
      }
    });
  }
  
  onQualityIssue(type, message) {
    console.warn(`Quality issue [${type}]: ${message}`);
    
    // Send quality feedback to UI
    this.session.emit('quality_warning', {
      type: type,
      message: message,
      metrics: this.metrics
    });
  }
  
  getQualityReport() {
    return {
      ...this.metrics,
      overallQuality: this.calculateOverallQuality()
    };
  }
  
  calculateOverallQuality() {
    const weights = {
      audioLevel: 0.3,
      noiseLevel: -0.2, // Negative because high noise is bad
      confidence: 0.4,
      latency: -0.1 // Negative because high latency is bad
    };
    
    let score = 0;
    score += this.metrics.audioLevel * weights.audioLevel;
    score += (1 - this.metrics.noiseLevel) * Math.abs(weights.noiseLevel);
    score += this.metrics.confidence * weights.confidence;
    score += (1 - Math.min(this.metrics.latency / 1000, 1)) * Math.abs(weights.latency);
    
    return Math.max(0, Math.min(1, score));
  }
}

// Usage
const qualityMonitor = new StreamQualityMonitor(streamSession);

qualityMonitor.session.on('quality_warning', (warning) => {
  // Display warning to user
  showQualityWarning(warning.message);
});

// Get quality report
setInterval(() => {
  const report = qualityMonitor.getQualityReport();
  updateQualityIndicator(report.overallQuality);
}, 1000);

Performance Optimization

Buffer Management

class OptimizedAudioBuffer {
  constructor(options = {}) {
    this.bufferSize = options.bufferSize || 4096;
    this.sampleRate = options.sampleRate || 16000;
    this.channels = options.channels || 1;
    this.buffer = new Float32Array(this.bufferSize);
    this.writeIndex = 0;
    this.readIndex = 0;
  }
  
  write(audioData) {
    const remaining = this.bufferSize - this.writeIndex;
    
    if (audioData.length <= remaining) {
      // Fits in remaining space
      this.buffer.set(audioData, this.writeIndex);
      this.writeIndex += audioData.length;
    } else {
      // Need to wrap around
      this.buffer.set(audioData.slice(0, remaining), this.writeIndex);
      this.buffer.set(audioData.slice(remaining), 0);
      this.writeIndex = audioData.length - remaining;
    }
  }
  
  read(length) {
    const available = this.available();
    const readLength = Math.min(length, available);
    
    if (readLength === 0) return new Float32Array(0);
    
    const result = new Float32Array(readLength);
    
    if (this.readIndex + readLength <= this.bufferSize) {
      // No wrap around
      result.set(this.buffer.slice(this.readIndex, this.readIndex + readLength));
    } else {
      // Wrap around
      const firstPart = this.bufferSize - this.readIndex;
      const secondPart = readLength - firstPart;
      
      result.set(this.buffer.slice(this.readIndex, this.bufferSize), 0);
      result.set(this.buffer.slice(0, secondPart), firstPart);
    }
    
    this.readIndex = (this.readIndex + readLength) % this.bufferSize;
    return result;
  }
  
  available() {
    if (this.writeIndex >= this.readIndex) {
      return this.writeIndex - this.readIndex;
    } else {
      return this.bufferSize - this.readIndex + this.writeIndex;
    }
  }
  
  clear() {
    this.writeIndex = 0;
    this.readIndex = 0;
  }
}

Connection Pooling for Streaming

import asyncio
from contextlib import asynccontextmanager

class StreamingConnectionPool:
    def __init__(self, client, max_connections=10):
        self.client = client
        self.max_connections = max_connections
        self.available_connections = asyncio.Queue(maxsize=max_connections)
        self.active_connections = set()
        
        # Pre-create connections
        asyncio.create_task(self._initialize_pool())
    
    async def _initialize_pool(self):
        """Initialize connection pool"""
        for _ in range(self.max_connections):
            connection = await self._create_connection()
            await self.available_connections.put(connection)
    
    async def _create_connection(self):
        """Create a new streaming connection"""
        return await self.client.streaming.create_session({
            'model': 'mini',
            'language': 'auto',
            'keep_alive': True
        })
    
    @asynccontextmanager
    async def get_connection(self):
        """Get a connection from the pool"""
        try:
            # Try to get available connection
            connection = await asyncio.wait_for(
                self.available_connections.get(), 
                timeout=5.0
            )
            
            self.active_connections.add(connection)
            yield connection
            
        except asyncio.TimeoutError:
            # Create new connection if pool is exhausted
            connection = await self._create_connection()
            self.active_connections.add(connection)
            yield connection
            
        finally:
            # Return connection to pool
            if connection in self.active_connections:
                self.active_connections.remove(connection)
                
                if not connection.is_closed():
                    await self.available_connections.put(connection)
    
    async def close_all(self):
        """Close all connections in the pool"""
        # Close available connections
        while not self.available_connections.empty():
            connection = await self.available_connections.get()
            await connection.close()
        
        # Close active connections
        for connection in self.active_connections:
            await connection.close()
        
        self.active_connections.clear()

# Usage
pool = StreamingConnectionPool(client, max_connections=5)

async def process_audio_stream(audio_data):
    async with pool.get_connection() as connection:
        result = await connection.send_audio(audio_data)
        return result

Best Practices

Error Handling in Streaming

  • Implement connection retry logic with exponential backoff
  • Handle network interruptions gracefully
  • Buffer audio data during reconnection attempts
  • Provide user feedback for connection issues

Performance Optimization

  • Use appropriate buffer sizes (typically 250-1000ms)
  • Implement audio preprocessing (noise reduction, AGC)
  • Monitor latency and adjust buffer sizes accordingly
  • Use connection pooling for high-throughput scenarios

Quality Assurance

  • Monitor audio quality metrics in real-time
  • Implement adaptive bitrate streaming
  • Provide quality feedback to users
  • Use appropriate models for different quality requirements

Ready to learn about comprehensive error handling? Check out the Error Handling guide for robust error management strategies across all SDK features.