Complete guide to transcription features in the VerbalisAI Python SDK
Comprehensive guide to using all transcription features with the VerbalisAI Python SDK.
from verbalisai import VerbalisAI
import asyncio
async def basic_transcription():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/audio.mp3"
)
print("Transcription:", transcription.text)
print(f"Duration: {transcription.duration} seconds")
asyncio.run(basic_transcription())
async def model_comparison():
client = VerbalisAI()
audio_url = "https://example.com/audio.mp3"
# Nano model - fastest, English only
nano_result = await client.transcriptions.create(
audio_url=audio_url,
model="nano" # 3x faster than mini
)
# Mini model - balanced speed/accuracy
mini_result = await client.transcriptions.create(
audio_url=audio_url,
model="mini" # Default, good for most use cases
)
# Pro model - highest accuracy
pro_result = await client.transcriptions.create(
audio_url=audio_url,
model="pro" # Best accuracy, slower processing
)
print(f"Nano: {nano_result.text}")
print(f"Mini: {mini_result.text}")
print(f"Pro: {pro_result.text}")
asyncio.run(model_comparison())
async def auto_language():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/multilingual.mp3",
language="auto" # Default - detects language automatically
)
print(f"Detected language: {transcription.detected_language}")
print(f"Confidence: {transcription.language_confidence}")
print(f"Text: {transcription.text}")
asyncio.run(auto_language())
async def specific_languages():
client = VerbalisAI()
# English transcription
english = await client.transcriptions.create(
audio_url="https://example.com/english.mp3",
language="en"
)
# Spanish transcription
spanish = await client.transcriptions.create(
audio_url="https://example.com/spanish.mp3",
language="es"
)
# French transcription
french = await client.transcriptions.create(
audio_url="https://example.com/french.mp3",
language="fr"
)
asyncio.run(specific_languages())
async def list_supported_languages():
client = VerbalisAI()
# Get list of supported languages
languages = await client.languages.list()
for lang in languages:
print(f"{lang.code}: {lang.name} ({lang.accuracy}% avg accuracy)")
asyncio.run(list_supported_languages())
async def speaker_diarization():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/meeting.mp3",
model="mini",
diarize=True, # Enable speaker identification
timestamp_style="word" # Get word-level timestamps
)
# Group by speakers
speakers = {}
for segment in transcription.segments:
speaker_id = segment.speaker_id or "Unknown"
if speaker_id not in speakers:
speakers[speaker_id] = []
speakers[speaker_id].append(segment)
# Print conversation by speaker
for speaker_id, segments in speakers.items():
print(f"\n{speaker_id}:")
for segment in segments:
print(f" [{segment.start:.1f}s] {segment.text}")
asyncio.run(speaker_diarization())
async def topic_detection():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/business-call.mp3",
topics=True, # Enable topic detection
model="pro" # Pro model gives better topic accuracy
)
print("Detected Topics:")
for topic in transcription.topics:
print(f" - {topic}")
# Topics are also available with confidence scores
if hasattr(transcription, 'topic_details'):
for topic_detail in transcription.topic_details:
print(f" {topic_detail.topic}: {topic_detail.confidence:.2f}")
asyncio.run(topic_detection())
async def text_summarization():
client = VerbalisAI()
# Different summary formats
formats = ["bullets", "paragraphs", "markdown"]
for format_type in formats:
transcription = await client.transcriptions.create(
audio_url="https://example.com/long-meeting.mp3",
summarization=True,
summary_type=format_type,
summary_language="en"
)
print(f"\n{format_type.upper()} SUMMARY:")
print(transcription.summary.text)
print(f"Summary length: {len(transcription.summary.text)} chars")
asyncio.run(text_summarization())
async def entity_detection():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/business-call.mp3",
entity_detection=True,
entity_types=[
"person",
"organization",
"location",
"phone_number",
"email",
"date",
"product"
]
)
# Group entities by type
entities_by_type = {}
for entity in transcription.entities:
entity_type = entity.type
if entity_type not in entities_by_type:
entities_by_type[entity_type] = []
entities_by_type[entity_type].append(entity)
# Print entities by type
for entity_type, entities in entities_by_type.items():
print(f"\n{entity_type.upper()}:")
for entity in entities:
print(f" - {entity.text} (confidence: {entity.confidence:.2f})")
asyncio.run(entity_detection())
async def basic_pii_redaction():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/customer-call.mp3",
redact_pii=True,
redact_pii_policies=[
"person",
"phone_number",
"email",
"ssn",
"credit_card"
],
redact_pii_sub="hash" # Options: hash, mask, remove
)
print("Original audio contained PII, here's the redacted version:")
print(transcription.text)
print(f"\nPII types found and redacted: {transcription.redacted_pii_types}")
asyncio.run(basic_pii_redaction())
async def advanced_pii_redaction():
client = VerbalisAI()
# Different redaction methods
redaction_methods = {
"hash": "Replace with [REDACTED_HASH_123456]",
"mask": "Replace with [***]",
"remove": "Remove completely"
}
for method, description in redaction_methods.items():
transcription = await client.transcriptions.create(
audio_url="https://example.com/sensitive-call.mp3",
redact_pii=True,
redact_pii_policies=[
"person",
"phone_number",
"email",
"credit_card",
"bank_account"
],
redact_pii_sub=method
)
print(f"\nREDACTION METHOD: {method} ({description})")
print(transcription.text)
asyncio.run(advanced_pii_redaction())
async def healthcare_pii():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/patient-consultation.mp3",
redact_pii=True,
redact_pii_policies=[
"person", # Patient names
"medical_id", # Medical record numbers
"phone_number", # Contact information
"email", # Email addresses
"address", # Home addresses
"date", # Birth dates, appointment dates
"insurance_id" # Insurance information
],
redact_pii_sub="hash"
)
print("Healthcare transcription with PII redacted:")
print(transcription.text)
asyncio.run(healthcare_pii())
async def timestamp_comparison():
client = VerbalisAI()
audio_url = "https://example.com/speech.mp3"
# Segment-level timestamps (default)
segment_transcription = await client.transcriptions.create(
audio_url=audio_url,
timestamp_style="segment"
)
print("SEGMENT TIMESTAMPS:")
for segment in segment_transcription.segments:
print(f"[{segment.start:.1f}s - {segment.end:.1f}s]: {segment.text}")
# Word-level timestamps (more precise)
word_transcription = await client.transcriptions.create(
audio_url=audio_url,
timestamp_style="word"
)
print("\nWORD TIMESTAMPS:")
for segment in word_transcription.segments:
print(f"[{segment.start:.1f}s]: ", end="")
for word in segment.words:
print(f"{word.text}({word.start:.1f}s) ", end="")
print()
asyncio.run(timestamp_comparison())
async def audio_slicing():
client = VerbalisAI()
# Transcribe only a portion of the audio
transcription = await client.transcriptions.create(
audio_url="https://example.com/long-audio.mp3",
audio_start_from=60, # Start from 1 minute
audio_end_at=300, # End at 5 minutes
model="mini"
)
print(f"Transcribed audio from 1:00 to 5:00:")
print(transcription.text)
print(f"Duration of transcribed portion: {transcription.duration}s")
asyncio.run(audio_slicing())
async def content_safety():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/user-content.mp3",
content_safety=True, # Enable content safety filtering
model="pro"
)
if transcription.content_flags:
print("Content flags detected:")
for flag in transcription.content_flags:
print(f" - {flag.type}: {flag.description} (confidence: {flag.confidence})")
else:
print("No content safety issues detected")
print(f"\nTranscription: {transcription.text}")
asyncio.run(content_safety())
async def async_transcription():
client = VerbalisAI()
# Start transcription without waiting
transcription = await client.transcriptions.create(
audio_url="https://example.com/very-long-audio.mp3",
model="pro",
wait_until_complete=False # Don't wait for completion
)
print(f"Transcription started: {transcription.id}")
print(f"Status: {transcription.status}")
# Poll for completion
while transcription.status == "processing":
await asyncio.sleep(5) # Wait 5 seconds
transcription = await client.transcriptions.get(transcription.id)
print(f"Status: {transcription.status}")
if transcription.status == "completed":
print(f"Transcription completed: {transcription.text}")
else:
print(f"Transcription failed: {transcription.error}")
asyncio.run(async_transcription())
async def webhook_transcription():
client = VerbalisAI()
# Start transcription with webhook notification
transcription = await client.transcriptions.create(
audio_url="https://example.com/audio.mp3",
model="pro",
# Webhook configuration
webhook_url="https://yoursite.com/webhooks/transcription",
webhook_auth_header_name="Authorization",
webhook_auth_header_value="Bearer your-webhook-secret",
# Additional features
topics=True,
summarization=True,
entity_detection=True,
wait_until_complete=False # Use webhook instead of waiting
)
print(f"Transcription started: {transcription.id}")
print("You'll receive a webhook notification when complete")
asyncio.run(webhook_transcription())
async def batch_transcription():
client = VerbalisAI()
audio_files = [
"https://example.com/audio1.mp3",
"https://example.com/audio2.mp3",
"https://example.com/audio3.mp3",
"https://example.com/audio4.mp3",
"https://example.com/audio5.mp3"
]
# Process all files concurrently
tasks = []
for url in audio_files:
task = client.transcriptions.create(
audio_url=url,
model="mini",
topics=True
)
tasks.append(task)
# Wait for all to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful = 0
failed = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"File {i+1} failed: {result}")
failed += 1
else:
print(f"File {i+1} completed: {len(result.text)} characters")
if result.topics:
print(f" Topics: {', '.join(result.topics)}")
successful += 1
print(f"\nBatch complete: {successful} successful, {failed} failed")
asyncio.run(batch_transcription())
import asyncio
from asyncio import Semaphore
async def rate_limited_batch():
client = VerbalisAI()
audio_files = [f"https://example.com/audio{i}.mp3" for i in range(1, 21)]
# Limit concurrent requests to avoid rate limits
semaphore = Semaphore(5) # Max 5 concurrent requests
async def process_single_file(url):
async with semaphore:
try:
result = await client.transcriptions.create(
audio_url=url,
model="mini"
)
return {"url": url, "success": True, "result": result}
except Exception as e:
return {"url": url, "success": False, "error": str(e)}
# Process all files with rate limiting
tasks = [process_single_file(url) for url in audio_files]
results = await asyncio.gather(*tasks)
# Analyze results
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
print(f"Processed {len(audio_files)} files:")
print(f" Successful: {len(successful)}")
print(f" Failed: {len(failed)}")
if failed:
print("\nFailed files:")
for failure in failed:
print(f" {failure['url']}: {failure['error']}")
asyncio.run(rate_limited_batch())
from verbalisai import VerbalisAI, VerbalisAIError
import asyncio
async def robust_transcription():
client = VerbalisAI()
max_retries = 3
retry_delay = 2.0
for attempt in range(max_retries):
try:
transcription = await client.transcriptions.create(
audio_url="https://example.com/audio.mp3",
model="mini"
)
print(f"Success on attempt {attempt + 1}")
print(transcription.text)
break
except VerbalisAIError as e:
print(f"Attempt {attempt + 1} failed: {e.message}")
# Don't retry on certain errors
if e.status_code in [400, 401, 403]:
print("Non-retryable error, giving up")
break
# Retry on server errors and rate limits
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt) # Exponential backoff
print(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
print("Max retries exceeded")
except Exception as e:
print(f"Unexpected error: {e}")
break
asyncio.run(robust_transcription())
async def memory_efficient_processing():
client = VerbalisAI()
# Use context manager for automatic cleanup
async with client:
# Process large files efficiently
transcription = await client.transcriptions.create(
audio_url="https://example.com/large-audio.mp3",
model="mini",
# Don't load full segments into memory at once
stream_segments=True
)
# Process segments as they arrive
async for segment in transcription.segments_stream():
print(f"[{segment.start:.1f}s]: {segment.text}")
# Process each segment immediately
# (save to database, analyze, etc.)
await process_segment(segment)
async def process_segment(segment):
# Your segment processing logic here
pass
asyncio.run(memory_efficient_processing())
async def export_formats():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/audio.mp3",
timestamp_style="word",
diarize=True
)
# Export as SRT subtitle file
srt_content = transcription.to_srt()
with open("transcription.srt", "w") as f:
f.write(srt_content)
# Export as VTT subtitle file
vtt_content = transcription.to_vtt()
with open("transcription.vtt", "w") as f:
f.write(vtt_content)
# Export as plain text
txt_content = transcription.to_text()
with open("transcription.txt", "w") as f:
f.write(txt_content)
# Export as JSON
json_content = transcription.to_json()
with open("transcription.json", "w") as f:
f.write(json_content)
asyncio.run(export_formats())
async def search_transcription():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/meeting.mp3",
timestamp_style="word"
)
# Search for specific terms
search_terms = ["action items", "deadline", "budget"]
for term in search_terms:
matches = transcription.search(term, case_sensitive=False)
if matches:
print(f"\nFound '{term}' {len(matches)} times:")
for match in matches:
context_start = max(0, match.start_index - 50)
context_end = min(len(transcription.text), match.end_index + 50)
context = transcription.text[context_start:context_end]
print(f" [{match.timestamp:.1f}s]: ...{context}...")
asyncio.run(search_transcription())
Ready to explore file storage? Check out the File Storage guide to learn about uploading and managing audio files with the Python SDK.