Real-time streaming transcription with VerbalisAI SDKs
Learn how to implement real-time streaming transcription for live audio processing with both Python and JavaScript SDKs.
Streaming transcription allows you to process audio in real-time as it’s being recorded or received, providing immediate results without waiting for the entire audio to be processed. This is ideal for:
VerbalisAI supports several streaming modes:
Mode | Description | Use Case |
---|---|---|
Real-time | Process audio chunks as they arrive | Live transcription |
Near real-time | Small buffering for better accuracy | Live captions with slight delay |
Progressive | Process long files in chunks | Large file processing |
Continuous | Ongoing transcription session | Extended conversations |
import asyncio
import pyaudio
from verbalisai import VerbalisAI
class RealTimeTranscriber:
def __init__(self, api_key=None):
self.client = VerbalisAI(api_key=api_key)
self.audio_format = pyaudio.paInt16
self.channels = 1
self.rate = 16000
self.chunk_size = 1024
self.audio = pyaudio.PyAudio()
async def start_streaming(self, on_transcript=None, on_partial=None):
"""Start real-time transcription from microphone"""
# Initialize streaming session
stream_session = await self.client.streaming.create_session({
'sample_rate': self.rate,
'encoding': 'LINEAR16',
'language': 'auto',
'interim_results': True,
'single_utterance': False,
'model': 'mini'
})
# Set up audio stream
audio_stream = self.audio.open(
format=self.audio_format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk_size
)
print("Starting real-time transcription... (Press Ctrl+C to stop)")
try:
async for result in stream_session.stream():
# Read audio chunk
audio_chunk = audio_stream.read(self.chunk_size, exception_on_overflow=False)
# Send audio to streaming session
await stream_session.send_audio(audio_chunk)
# Process transcription results
if result.is_final:
print(f"Final: {result.transcript}")
if on_transcript:
await on_transcript(result.transcript)
else:
print(f"Partial: {result.transcript}")
if on_partial:
await on_partial(result.transcript)
except KeyboardInterrupt:
print("\nStopping transcription...")
finally:
audio_stream.stop_stream()
audio_stream.close()
await stream_session.close()
def __del__(self):
if hasattr(self, 'audio'):
self.audio.terminate()
# Usage
async def main():
transcriber = RealTimeTranscriber()
async def handle_transcript(text):
print(f"FINAL TRANSCRIPT: {text}")
# Save to database, send to API, etc.
async def handle_partial(text):
print(f"PARTIAL: {text}")
# Update UI in real-time
await transcriber.start_streaming(
on_transcript=handle_transcript,
on_partial=handle_partial
)
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import aiofiles
from verbalisai import VerbalisAI
class ProgressiveTranscriber:
def __init__(self, api_key=None):
self.client = VerbalisAI(api_key=api_key)
self.chunk_size = 5 * 1024 * 1024 # 5MB chunks
async def transcribe_large_file(self, file_path, on_progress=None, on_segment=None):
"""Transcribe large file progressively"""
# Create progressive streaming session
stream_session = await self.client.streaming.create_progressive_session({
'model': 'pro',
'language': 'auto',
'diarize': True,
'topics': True,
'return_segments': True
})
file_size = os.path.getsize(file_path)
bytes_processed = 0
try:
async with aiofiles.open(file_path, 'rb') as f:
while True:
chunk = await f.read(self.chunk_size)
if not chunk:
break
# Send chunk to processing
result = await stream_session.send_chunk(chunk)
bytes_processed += len(chunk)
progress = (bytes_processed / file_size) * 100
if on_progress:
await on_progress(progress)
# Process any available segments
if result.segments:
for segment in result.segments:
if on_segment:
await on_segment(segment)
print(f"[{segment.start:.1f}s]: {segment.text}")
# Finalize and get complete results
final_result = await stream_session.finalize()
return {
'text': final_result.text,
'topics': final_result.topics,
'segments': final_result.segments,
'duration': final_result.duration
}
finally:
await stream_session.close()
# Usage
async def process_large_file():
transcriber = ProgressiveTranscriber()
async def progress_callback(progress):
print(f"Progress: {progress:.1f}%")
async def segment_callback(segment):
print(f"New segment: [{segment.start:.1f}s] {segment.text}")
# Process segment immediately (save to DB, update UI, etc.)
result = await transcriber.transcribe_large_file(
'large_audio_file.mp3',
on_progress=progress_callback,
on_segment=segment_callback
)
print("Final result:", result)
asyncio.run(process_large_file())
import asyncio
import websockets
from verbalisai import VerbalisAI
class LiveMeetingTranscriber:
def __init__(self, api_key=None):
self.client = VerbalisAI(api_key=api_key)
self.active_sessions = {}
async def start_meeting_session(self, meeting_id, participants):
"""Start transcription for a live meeting"""
session = await self.client.streaming.create_meeting_session({
'meeting_id': meeting_id,
'participants': participants,
'model': 'pro',
'diarize': True,
'real_time_captions': True,
'language': 'auto',
'format': 'enhanced'
})
self.active_sessions[meeting_id] = session
# Start processing loop
asyncio.create_task(self._process_meeting_audio(meeting_id, session))
return session
async def _process_meeting_audio(self, meeting_id, session):
"""Process audio stream for meeting"""
async for result in session.stream():
# Broadcast real-time captions
await self.broadcast_caption(meeting_id, {
'speaker': result.speaker_id,
'text': result.transcript,
'timestamp': result.timestamp,
'is_final': result.is_final
})
# Store final transcripts
if result.is_final:
await self.store_transcript_segment(meeting_id, result)
async def broadcast_caption(self, meeting_id, caption):
"""Send captions to all meeting participants"""
# Send via WebSocket to meeting participants
message = {
'type': 'live_caption',
'meeting_id': meeting_id,
'caption': caption
}
# Broadcast to all connected clients
await self.websocket_broadcast(meeting_id, message)
async def send_audio_chunk(self, meeting_id, audio_data, speaker_id=None):
"""Send audio chunk from meeting participant"""
if meeting_id not in self.active_sessions:
raise ValueError(f"No active session for meeting {meeting_id}")
session = self.active_sessions[meeting_id]
await session.send_audio(audio_data, speaker_id=speaker_id)
async def end_meeting_session(self, meeting_id):
"""End meeting transcription and get final results"""
if meeting_id not in self.active_sessions:
return None
session = self.active_sessions[meeting_id]
final_result = await session.finalize()
del self.active_sessions[meeting_id]
return {
'meeting_id': meeting_id,
'full_transcript': final_result.text,
'speakers': final_result.speakers,
'topics': final_result.topics,
'summary': final_result.summary,
'duration': final_result.duration,
'segments': final_result.segments
}
# Usage
async def run_meeting_transcription():
transcriber = LiveMeetingTranscriber()
# Start meeting
participants = ['John Doe', 'Jane Smith', 'Bob Johnson']
session = await transcriber.start_meeting_session('meeting_123', participants)
# Simulate receiving audio chunks (in real app, this would come from audio stream)
# await transcriber.send_audio_chunk('meeting_123', audio_chunk, 'john_doe')
# End meeting after some time
await asyncio.sleep(60) # Meeting duration
final_result = await transcriber.end_meeting_session('meeting_123')
print("Meeting completed:", final_result)
asyncio.run(run_meeting_transcription())
import { VerbalisAI } from '@verbalisai/sdk';
class BrowserRealTimeTranscriber {
constructor(apiKey) {
this.client = new VerbalisAI({ apiKey });
this.mediaRecorder = null;
this.audioStream = null;
this.streamSession = null;
}
async startMicrophoneTranscription(options = {}) {
try {
// Get microphone access
this.audioStream = await navigator.mediaDevices.getUserMedia({
audio: {
sampleRate: 16000,
channelCount: 1,
echoCancellation: true,
noiseSuppression: true
}
});
// Create streaming session
this.streamSession = await this.client.streaming.createSession({
sampleRate: 16000,
encoding: 'WEBM_OPUS',
language: 'auto',
interimResults: true,
model: 'mini',
...options
});
// Set up MediaRecorder
this.mediaRecorder = new MediaRecorder(this.audioStream, {
mimeType: 'audio/webm;codecs=opus',
audioBitsPerSecond: 16000
});
// Handle audio data
this.mediaRecorder.ondataavailable = async (event) => {
if (event.data.size > 0) {
const audioBuffer = await event.data.arrayBuffer();
await this.streamSession.sendAudio(audioBuffer);
}
};
// Start recording in small chunks
this.mediaRecorder.start(250); // 250ms chunks
// Listen for transcription results
this.streamSession.onResult((result) => {
if (result.isFinal) {
this.onFinalTranscript(result.transcript);
} else {
this.onPartialTranscript(result.transcript);
}
});
console.log('Real-time transcription started');
} catch (error) {
console.error('Failed to start transcription:', error);
throw error;
}
}
async stopTranscription() {
if (this.mediaRecorder && this.mediaRecorder.state !== 'inactive') {
this.mediaRecorder.stop();
}
if (this.audioStream) {
this.audioStream.getTracks().forEach(track => track.stop());
}
if (this.streamSession) {
await this.streamSession.close();
}
console.log('Transcription stopped');
}
onFinalTranscript(text) {
console.log('Final:', text);
// Override this method to handle final transcripts
}
onPartialTranscript(text) {
console.log('Partial:', text);
// Override this method to handle partial transcripts
}
}
// Usage
const transcriber = new BrowserRealTimeTranscriber('your-api-key');
// Custom handlers
transcriber.onFinalTranscript = (text) => {
document.getElementById('final-transcript').textContent += text + ' ';
};
transcriber.onPartialTranscript = (text) => {
document.getElementById('partial-transcript').textContent = text;
};
// Start transcription
document.getElementById('start-btn').onclick = () => {
transcriber.startMicrophoneTranscription({
language: 'en',
topics: true
});
};
// Stop transcription
document.getElementById('stop-btn').onclick = () => {
transcriber.stopTranscription();
};
import { VerbalisAI } from '@verbalisai/sdk';
import WebSocket from 'ws';
import express from 'express';
class StreamingTranscriptionServer {
constructor(port = 8080) {
this.app = express();
this.server = null;
this.wss = null;
this.port = port;
this.client = new VerbalisAI();
this.activeSessions = new Map();
}
start() {
// HTTP server for health checks
this.app.get('/health', (req, res) => {
res.json({ status: 'healthy' });
});
this.server = this.app.listen(this.port, () => {
console.log(`Streaming server listening on port ${this.port}`);
});
// WebSocket server for real-time communication
this.wss = new WebSocket.Server({ server: this.server });
this.wss.on('connection', (ws, req) => {
console.log('Client connected');
this.handleConnection(ws, req);
});
}
async handleConnection(ws, req) {
const sessionId = this.generateSessionId();
let streamSession = null;
ws.on('message', async (message) => {
try {
const data = JSON.parse(message);
switch (data.type) {
case 'start_transcription':
streamSession = await this.startTranscription(sessionId, data.config);
this.activeSessions.set(sessionId, { ws, streamSession });
ws.send(JSON.stringify({
type: 'transcription_started',
sessionId: sessionId
}));
break;
case 'audio_data':
if (streamSession) {
const audioBuffer = Buffer.from(data.audio, 'base64');
await streamSession.sendAudio(audioBuffer);
}
break;
case 'stop_transcription':
if (streamSession) {
const finalResult = await streamSession.finalize();
ws.send(JSON.stringify({
type: 'transcription_complete',
result: finalResult
}));
await streamSession.close();
}
break;
}
} catch (error) {
ws.send(JSON.stringify({
type: 'error',
message: error.message
}));
}
});
ws.on('close', async () => {
console.log('Client disconnected');
if (this.activeSessions.has(sessionId)) {
const { streamSession } = this.activeSessions.get(sessionId);
if (streamSession) {
await streamSession.close();
}
this.activeSessions.delete(sessionId);
}
});
}
async startTranscription(sessionId, config) {
const streamSession = await this.client.streaming.createSession({
sampleRate: config.sampleRate || 16000,
encoding: config.encoding || 'LINEAR16',
language: config.language || 'auto',
model: config.model || 'mini',
interimResults: true
});
// Handle transcription results
streamSession.onResult((result) => {
const session = this.activeSessions.get(sessionId);
if (session) {
session.ws.send(JSON.stringify({
type: 'transcription_result',
result: {
transcript: result.transcript,
isFinal: result.isFinal,
confidence: result.confidence,
timestamp: result.timestamp
}
}));
}
});
return streamSession;
}
generateSessionId() {
return Math.random().toString(36).substring(2, 15);
}
stop() {
if (this.wss) {
this.wss.close();
}
if (this.server) {
this.server.close();
}
}
}
// Usage
const server = new StreamingTranscriptionServer(8080);
server.start();
// Graceful shutdown
process.on('SIGINT', () => {
console.log('Shutting down server...');
server.stop();
process.exit(0);
});
import React, { useState, useRef, useEffect } from 'react';
import { VerbalisAI } from '@verbalisai/sdk';
const LiveTranscriptionComponent = ({ apiKey }) => {
const [isRecording, setIsRecording] = useState(false);
const [finalTranscript, setFinalTranscript] = useState('');
const [partialTranscript, setPartialTranscript] = useState('');
const [error, setError] = useState(null);
const mediaRecorderRef = useRef(null);
const streamSessionRef = useRef(null);
const audioStreamRef = useRef(null);
const clientRef = useRef(new VerbalisAI({ apiKey }));
const startRecording = async () => {
try {
setError(null);
// Get microphone access
audioStreamRef.current = await navigator.mediaDevices.getUserMedia({
audio: {
sampleRate: 16000,
channelCount: 1,
echoCancellation: true,
noiseSuppression: true
}
});
// Create streaming session
streamSessionRef.current = await clientRef.current.streaming.createSession({
sampleRate: 16000,
encoding: 'WEBM_OPUS',
language: 'auto',
interimResults: true,
model: 'mini'
});
// Set up MediaRecorder
mediaRecorderRef.current = new MediaRecorder(audioStreamRef.current, {
mimeType: 'audio/webm;codecs=opus'
});
// Handle audio data
mediaRecorderRef.current.ondataavailable = async (event) => {
if (event.data.size > 0 && streamSessionRef.current) {
const audioBuffer = await event.data.arrayBuffer();
await streamSessionRef.current.sendAudio(audioBuffer);
}
};
// Handle transcription results
streamSessionRef.current.onResult((result) => {
if (result.isFinal) {
setFinalTranscript(prev => prev + result.transcript + ' ');
setPartialTranscript('');
} else {
setPartialTranscript(result.transcript);
}
});
// Start recording
mediaRecorderRef.current.start(250); // 250ms chunks
setIsRecording(true);
} catch (err) {
setError(`Failed to start recording: ${err.message}`);
}
};
const stopRecording = async () => {
if (mediaRecorderRef.current && mediaRecorderRef.current.state !== 'inactive') {
mediaRecorderRef.current.stop();
}
if (audioStreamRef.current) {
audioStreamRef.current.getTracks().forEach(track => track.stop());
}
if (streamSessionRef.current) {
await streamSessionRef.current.close();
}
setIsRecording(false);
setPartialTranscript('');
};
useEffect(() => {
// Cleanup on unmount
return () => {
if (isRecording) {
stopRecording();
}
};
}, []);
return (
<div className="live-transcription">
<div className="controls">
{!isRecording ? (
<button
onClick={startRecording}
className="start-btn"
disabled={!apiKey}
>
🎤 Start Recording
</button>
) : (
<button
onClick={stopRecording}
className="stop-btn"
>
⏹️ Stop Recording
</button>
)}
</div>
{error && (
<div className="error">
❌ {error}
</div>
)}
{isRecording && (
<div className="recording-indicator">
🔴 Recording...
</div>
)}
<div className="transcription-display">
<div className="final-transcript">
<h3>Final Transcript:</h3>
<p>{finalTranscript}</p>
</div>
{partialTranscript && (
<div className="partial-transcript">
<h3>Partial (Live):</h3>
<p className="partial-text">{partialTranscript}</p>
</div>
)}
</div>
</div>
);
};
export default LiveTranscriptionComponent;
import asyncio
from verbalisai import VerbalisAI
class MultiSpeakerStreamProcessor:
def __init__(self, api_key=None):
self.client = VerbalisAI(api_key=api_key)
self.speaker_sessions = {}
async def create_speaker_session(self, speaker_id, speaker_config):
"""Create a dedicated session for each speaker"""
session = await self.client.streaming.create_session({
'speaker_id': speaker_id,
'model': 'pro',
'diarize': False, # We're handling speakers separately
'language': speaker_config.get('language', 'auto'),
'accent': speaker_config.get('accent'),
'vocabulary': speaker_config.get('custom_vocab', [])
})
self.speaker_sessions[speaker_id] = {
'session': session,
'config': speaker_config
}
# Set up result handler for this speaker
session.onResult(lambda result: self.handle_speaker_result(speaker_id, result))
return session
async def send_speaker_audio(self, speaker_id, audio_data):
"""Send audio data for specific speaker"""
if speaker_id not in self.speaker_sessions:
raise ValueError(f"No session for speaker {speaker_id}")
session = self.speaker_sessions[speaker_id]['session']
await session.sendAudio(audio_data)
def handle_speaker_result(self, speaker_id, result):
"""Handle transcription result for specific speaker"""
if result.isFinal:
print(f"{speaker_id}: {result.transcript}")
# Store final transcript with speaker attribution
self.store_speaker_transcript(speaker_id, result.transcript)
async def close_all_sessions(self):
"""Close all speaker sessions"""
for speaker_id, session_info in self.speaker_sessions.items():
await session_info['session'].close()
self.speaker_sessions.clear()
# Usage
async def multi_speaker_example():
processor = MultiSpeakerStreamProcessor()
# Set up speakers
speakers = {
'john': {'language': 'en', 'accent': 'us'},
'maria': {'language': 'es', 'accent': 'mx'},
'pierre': {'language': 'fr', 'accent': 'fr'}
}
# Create sessions for each speaker
for speaker_id, config in speakers.items():
await processor.create_speaker_session(speaker_id, config)
# Simulate sending audio for different speakers
# In real application, you'd have speaker identification/separation
# await processor.send_speaker_audio('john', john_audio_data)
# await processor.send_speaker_audio('maria', maria_audio_data)
# Clean up
await processor.close_all_sessions()
asyncio.run(multi_speaker_example())
class StreamQualityMonitor {
constructor(streamSession) {
this.session = streamSession;
this.metrics = {
audioLevel: 0,
noiseLevel: 0,
speechRate: 0,
confidence: 0,
latency: 0
};
this.startMonitoring();
}
startMonitoring() {
// Monitor audio quality
this.session.onAudioAnalysis((analysis) => {
this.metrics.audioLevel = analysis.volumeLevel;
this.metrics.noiseLevel = analysis.noiseLevel;
// Provide quality feedback
if (analysis.volumeLevel < 0.1) {
this.onQualityIssue('low_volume', 'Microphone volume is too low');
}
if (analysis.noiseLevel > 0.7) {
this.onQualityIssue('high_noise', 'Background noise is too high');
}
});
// Monitor transcription quality
this.session.onResult((result) => {
this.metrics.confidence = result.confidence;
this.metrics.latency = result.processingTime;
if (result.confidence < 0.5) {
this.onQualityIssue('low_confidence', 'Transcription confidence is low');
}
});
// Monitor speech rate
this.session.onSpeechAnalysis((analysis) => {
this.metrics.speechRate = analysis.wordsPerMinute;
if (analysis.wordsPerMinute > 200) {
this.onQualityIssue('fast_speech', 'Speech rate is very fast');
}
});
}
onQualityIssue(type, message) {
console.warn(`Quality issue [${type}]: ${message}`);
// Send quality feedback to UI
this.session.emit('quality_warning', {
type: type,
message: message,
metrics: this.metrics
});
}
getQualityReport() {
return {
...this.metrics,
overallQuality: this.calculateOverallQuality()
};
}
calculateOverallQuality() {
const weights = {
audioLevel: 0.3,
noiseLevel: -0.2, // Negative because high noise is bad
confidence: 0.4,
latency: -0.1 // Negative because high latency is bad
};
let score = 0;
score += this.metrics.audioLevel * weights.audioLevel;
score += (1 - this.metrics.noiseLevel) * Math.abs(weights.noiseLevel);
score += this.metrics.confidence * weights.confidence;
score += (1 - Math.min(this.metrics.latency / 1000, 1)) * Math.abs(weights.latency);
return Math.max(0, Math.min(1, score));
}
}
// Usage
const qualityMonitor = new StreamQualityMonitor(streamSession);
qualityMonitor.session.on('quality_warning', (warning) => {
// Display warning to user
showQualityWarning(warning.message);
});
// Get quality report
setInterval(() => {
const report = qualityMonitor.getQualityReport();
updateQualityIndicator(report.overallQuality);
}, 1000);
class OptimizedAudioBuffer {
constructor(options = {}) {
this.bufferSize = options.bufferSize || 4096;
this.sampleRate = options.sampleRate || 16000;
this.channels = options.channels || 1;
this.buffer = new Float32Array(this.bufferSize);
this.writeIndex = 0;
this.readIndex = 0;
}
write(audioData) {
const remaining = this.bufferSize - this.writeIndex;
if (audioData.length <= remaining) {
// Fits in remaining space
this.buffer.set(audioData, this.writeIndex);
this.writeIndex += audioData.length;
} else {
// Need to wrap around
this.buffer.set(audioData.slice(0, remaining), this.writeIndex);
this.buffer.set(audioData.slice(remaining), 0);
this.writeIndex = audioData.length - remaining;
}
}
read(length) {
const available = this.available();
const readLength = Math.min(length, available);
if (readLength === 0) return new Float32Array(0);
const result = new Float32Array(readLength);
if (this.readIndex + readLength <= this.bufferSize) {
// No wrap around
result.set(this.buffer.slice(this.readIndex, this.readIndex + readLength));
} else {
// Wrap around
const firstPart = this.bufferSize - this.readIndex;
const secondPart = readLength - firstPart;
result.set(this.buffer.slice(this.readIndex, this.bufferSize), 0);
result.set(this.buffer.slice(0, secondPart), firstPart);
}
this.readIndex = (this.readIndex + readLength) % this.bufferSize;
return result;
}
available() {
if (this.writeIndex >= this.readIndex) {
return this.writeIndex - this.readIndex;
} else {
return this.bufferSize - this.readIndex + this.writeIndex;
}
}
clear() {
this.writeIndex = 0;
this.readIndex = 0;
}
}
import asyncio
from contextlib import asynccontextmanager
class StreamingConnectionPool:
def __init__(self, client, max_connections=10):
self.client = client
self.max_connections = max_connections
self.available_connections = asyncio.Queue(maxsize=max_connections)
self.active_connections = set()
# Pre-create connections
asyncio.create_task(self._initialize_pool())
async def _initialize_pool(self):
"""Initialize connection pool"""
for _ in range(self.max_connections):
connection = await self._create_connection()
await self.available_connections.put(connection)
async def _create_connection(self):
"""Create a new streaming connection"""
return await self.client.streaming.create_session({
'model': 'mini',
'language': 'auto',
'keep_alive': True
})
@asynccontextmanager
async def get_connection(self):
"""Get a connection from the pool"""
try:
# Try to get available connection
connection = await asyncio.wait_for(
self.available_connections.get(),
timeout=5.0
)
self.active_connections.add(connection)
yield connection
except asyncio.TimeoutError:
# Create new connection if pool is exhausted
connection = await self._create_connection()
self.active_connections.add(connection)
yield connection
finally:
# Return connection to pool
if connection in self.active_connections:
self.active_connections.remove(connection)
if not connection.is_closed():
await self.available_connections.put(connection)
async def close_all(self):
"""Close all connections in the pool"""
# Close available connections
while not self.available_connections.empty():
connection = await self.available_connections.get()
await connection.close()
# Close active connections
for connection in self.active_connections:
await connection.close()
self.active_connections.clear()
# Usage
pool = StreamingConnectionPool(client, max_connections=5)
async def process_audio_stream(audio_data):
async with pool.get_connection() as connection:
result = await connection.send_audio(audio_data)
return result
Ready to learn about comprehensive error handling? Check out the Error Handling guide for robust error management strategies across all SDK features.