Complete guide to transcription features in the VerbalisAI Python SDK
from verbalisai import VerbalisAI
import asyncio
async def basic_transcription():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/audio.mp3"
)
print("Transcription:", transcription.text)
print(f"Duration: {transcription.duration} seconds")
asyncio.run(basic_transcription())
async def model_comparison():
client = VerbalisAI()
audio_url = "https://example.com/audio.mp3"
# Nano model - fastest, English only
nano_result = await client.transcriptions.create(
audio_url=audio_url,
model="nano" # 3x faster than mini
)
# Mini model - balanced speed/accuracy
mini_result = await client.transcriptions.create(
audio_url=audio_url,
model="mini" # Default, good for most use cases
)
# Pro model - highest accuracy
pro_result = await client.transcriptions.create(
audio_url=audio_url,
model="pro" # Best accuracy, slower processing
)
print(f"Nano: {nano_result.text}")
print(f"Mini: {mini_result.text}")
print(f"Pro: {pro_result.text}")
asyncio.run(model_comparison())
async def auto_language():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/multilingual.mp3",
language="auto" # Default - detects language automatically
)
print(f"Detected language: {transcription.detected_language}")
print(f"Confidence: {transcription.language_confidence}")
print(f"Text: {transcription.text}")
asyncio.run(auto_language())
async def specific_languages():
client = VerbalisAI()
# English transcription
english = await client.transcriptions.create(
audio_url="https://example.com/english.mp3",
language="en"
)
# Spanish transcription
spanish = await client.transcriptions.create(
audio_url="https://example.com/spanish.mp3",
language="es"
)
# French transcription
french = await client.transcriptions.create(
audio_url="https://example.com/french.mp3",
language="fr"
)
asyncio.run(specific_languages())
async def list_supported_languages():
client = VerbalisAI()
# Get list of supported languages
languages = await client.languages.list()
for lang in languages:
print(f"{lang.code}: {lang.name} ({lang.accuracy}% avg accuracy)")
asyncio.run(list_supported_languages())
async def speaker_diarization():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/meeting.mp3",
model="mini",
diarize=True, # Enable speaker identification
timestamp_style="word" # Get word-level timestamps
)
# Group by speakers
speakers = {}
for segment in transcription.segments:
speaker_id = segment.speaker_id or "Unknown"
if speaker_id not in speakers:
speakers[speaker_id] = []
speakers[speaker_id].append(segment)
# Print conversation by speaker
for speaker_id, segments in speakers.items():
print(f"\n{speaker_id}:")
for segment in segments:
print(f" [{segment.start:.1f}s] {segment.text}")
asyncio.run(speaker_diarization())
async def topic_detection():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/business-call.mp3",
topics=True, # Enable topic detection
model="pro" # Pro model gives better topic accuracy
)
print("Detected Topics:")
for topic in transcription.topics:
print(f" - {topic}")
# Topics are also available with confidence scores
if hasattr(transcription, 'topic_details'):
for topic_detail in transcription.topic_details:
print(f" {topic_detail.topic}: {topic_detail.confidence:.2f}")
asyncio.run(topic_detection())
async def text_summarization():
client = VerbalisAI()
# Different summary formats
formats = ["bullets", "paragraphs", "markdown"]
for format_type in formats:
transcription = await client.transcriptions.create(
audio_url="https://example.com/long-meeting.mp3",
summarization=True,
summary_type=format_type,
summary_language="en"
)
print(f"\n{format_type.upper()} SUMMARY:")
print(transcription.summary.text)
print(f"Summary length: {len(transcription.summary.text)} chars")
asyncio.run(text_summarization())
async def entity_detection():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/business-call.mp3",
entity_detection=True,
entity_types=[
"person",
"organization",
"location",
"phone_number",
"email",
"date",
"product"
]
)
# Group entities by type
entities_by_type = {}
for entity in transcription.entities:
entity_type = entity.type
if entity_type not in entities_by_type:
entities_by_type[entity_type] = []
entities_by_type[entity_type].append(entity)
# Print entities by type
for entity_type, entities in entities_by_type.items():
print(f"\n{entity_type.upper()}:")
for entity in entities:
print(f" - {entity.text} (confidence: {entity.confidence:.2f})")
asyncio.run(entity_detection())
async def basic_pii_redaction():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/customer-call.mp3",
redact_pii=True,
redact_pii_policies=[
"person",
"phone_number",
"email",
"ssn",
"credit_card"
],
redact_pii_sub="hash" # Options: hash, mask, remove
)
print("Original audio contained PII, here's the redacted version:")
print(transcription.text)
print(f"\nPII types found and redacted: {transcription.redacted_pii_types}")
asyncio.run(basic_pii_redaction())
async def advanced_pii_redaction():
client = VerbalisAI()
# Different redaction methods
redaction_methods = {
"hash": "Replace with [REDACTED_HASH_123456]",
"mask": "Replace with [***]",
"remove": "Remove completely"
}
for method, description in redaction_methods.items():
transcription = await client.transcriptions.create(
audio_url="https://example.com/sensitive-call.mp3",
redact_pii=True,
redact_pii_policies=[
"person",
"phone_number",
"email",
"credit_card",
"bank_account"
],
redact_pii_sub=method
)
print(f"\nREDACTION METHOD: {method} ({description})")
print(transcription.text)
asyncio.run(advanced_pii_redaction())
async def healthcare_pii():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/patient-consultation.mp3",
redact_pii=True,
redact_pii_policies=[
"person", # Patient names
"medical_id", # Medical record numbers
"phone_number", # Contact information
"email", # Email addresses
"address", # Home addresses
"date", # Birth dates, appointment dates
"insurance_id" # Insurance information
],
redact_pii_sub="hash"
)
print("Healthcare transcription with PII redacted:")
print(transcription.text)
asyncio.run(healthcare_pii())
async def timestamp_comparison():
client = VerbalisAI()
audio_url = "https://example.com/speech.mp3"
# Segment-level timestamps (default)
segment_transcription = await client.transcriptions.create(
audio_url=audio_url,
timestamp_style="segment"
)
print("SEGMENT TIMESTAMPS:")
for segment in segment_transcription.segments:
print(f"[{segment.start:.1f}s - {segment.end:.1f}s]: {segment.text}")
# Word-level timestamps (more precise)
word_transcription = await client.transcriptions.create(
audio_url=audio_url,
timestamp_style="word"
)
print("\nWORD TIMESTAMPS:")
for segment in word_transcription.segments:
print(f"[{segment.start:.1f}s]: ", end="")
for word in segment.words:
print(f"{word.text}({word.start:.1f}s) ", end="")
print()
asyncio.run(timestamp_comparison())
async def audio_slicing():
client = VerbalisAI()
# Transcribe only a portion of the audio
transcription = await client.transcriptions.create(
audio_url="https://example.com/long-audio.mp3",
audio_start_from=60, # Start from 1 minute
audio_end_at=300, # End at 5 minutes
model="mini"
)
print(f"Transcribed audio from 1:00 to 5:00:")
print(transcription.text)
print(f"Duration of transcribed portion: {transcription.duration}s")
asyncio.run(audio_slicing())
async def content_safety():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/user-content.mp3",
content_safety=True, # Enable content safety filtering
model="pro"
)
if transcription.content_flags:
print("Content flags detected:")
for flag in transcription.content_flags:
print(f" - {flag.type}: {flag.description} (confidence: {flag.confidence})")
else:
print("No content safety issues detected")
print(f"\nTranscription: {transcription.text}")
asyncio.run(content_safety())
async def async_transcription():
client = VerbalisAI()
# Start transcription without waiting
transcription = await client.transcriptions.create(
audio_url="https://example.com/very-long-audio.mp3",
model="pro",
wait_until_complete=False # Don't wait for completion
)
print(f"Transcription started: {transcription.id}")
print(f"Status: {transcription.status}")
# Poll for completion
while transcription.status == "processing":
await asyncio.sleep(5) # Wait 5 seconds
transcription = await client.transcriptions.get(transcription.id)
print(f"Status: {transcription.status}")
if transcription.status == "completed":
print(f"Transcription completed: {transcription.text}")
else:
print(f"Transcription failed: {transcription.error}")
asyncio.run(async_transcription())
async def webhook_transcription():
client = VerbalisAI()
# Start transcription with webhook notification
transcription = await client.transcriptions.create(
audio_url="https://example.com/audio.mp3",
model="pro",
# Webhook configuration
webhook_url="https://yoursite.com/webhooks/transcription",
webhook_auth_header_name="Authorization",
webhook_auth_header_value="Bearer your-webhook-secret",
# Additional features
topics=True,
summarization=True,
entity_detection=True,
wait_until_complete=False # Use webhook instead of waiting
)
print(f"Transcription started: {transcription.id}")
print("You'll receive a webhook notification when complete")
asyncio.run(webhook_transcription())
async def batch_transcription():
client = VerbalisAI()
audio_files = [
"https://example.com/audio1.mp3",
"https://example.com/audio2.mp3",
"https://example.com/audio3.mp3",
"https://example.com/audio4.mp3",
"https://example.com/audio5.mp3"
]
# Process all files concurrently
tasks = []
for url in audio_files:
task = client.transcriptions.create(
audio_url=url,
model="mini",
topics=True
)
tasks.append(task)
# Wait for all to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful = 0
failed = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"File {i+1} failed: {result}")
failed += 1
else:
print(f"File {i+1} completed: {len(result.text)} characters")
if result.topics:
print(f" Topics: {', '.join(result.topics)}")
successful += 1
print(f"\nBatch complete: {successful} successful, {failed} failed")
asyncio.run(batch_transcription())
import asyncio
from asyncio import Semaphore
async def rate_limited_batch():
client = VerbalisAI()
audio_files = [f"https://example.com/audio{i}.mp3" for i in range(1, 21)]
# Limit concurrent requests to avoid rate limits
semaphore = Semaphore(5) # Max 5 concurrent requests
async def process_single_file(url):
async with semaphore:
try:
result = await client.transcriptions.create(
audio_url=url,
model="mini"
)
return {"url": url, "success": True, "result": result}
except Exception as e:
return {"url": url, "success": False, "error": str(e)}
# Process all files with rate limiting
tasks = [process_single_file(url) for url in audio_files]
results = await asyncio.gather(*tasks)
# Analyze results
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
print(f"Processed {len(audio_files)} files:")
print(f" Successful: {len(successful)}")
print(f" Failed: {len(failed)}")
if failed:
print("\nFailed files:")
for failure in failed:
print(f" {failure['url']}: {failure['error']}")
asyncio.run(rate_limited_batch())
from verbalisai import VerbalisAI, VerbalisAIError
import asyncio
async def robust_transcription():
client = VerbalisAI()
max_retries = 3
retry_delay = 2.0
for attempt in range(max_retries):
try:
transcription = await client.transcriptions.create(
audio_url="https://example.com/audio.mp3",
model="mini"
)
print(f"Success on attempt {attempt + 1}")
print(transcription.text)
break
except VerbalisAIError as e:
print(f"Attempt {attempt + 1} failed: {e.message}")
# Don't retry on certain errors
if e.status_code in [400, 401, 403]:
print("Non-retryable error, giving up")
break
# Retry on server errors and rate limits
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt) # Exponential backoff
print(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
print("Max retries exceeded")
except Exception as e:
print(f"Unexpected error: {e}")
break
asyncio.run(robust_transcription())
async def memory_efficient_processing():
client = VerbalisAI()
# Use context manager for automatic cleanup
async with client:
# Process large files efficiently
transcription = await client.transcriptions.create(
audio_url="https://example.com/large-audio.mp3",
model="mini",
# Don't load full segments into memory at once
stream_segments=True
)
# Process segments as they arrive
async for segment in transcription.segments_stream():
print(f"[{segment.start:.1f}s]: {segment.text}")
# Process each segment immediately
# (save to database, analyze, etc.)
await process_segment(segment)
async def process_segment(segment):
# Your segment processing logic here
pass
asyncio.run(memory_efficient_processing())
async def export_formats():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/audio.mp3",
timestamp_style="word",
diarize=True
)
# Export as SRT subtitle file
srt_content = transcription.to_srt()
with open("transcription.srt", "w") as f:
f.write(srt_content)
# Export as VTT subtitle file
vtt_content = transcription.to_vtt()
with open("transcription.vtt", "w") as f:
f.write(vtt_content)
# Export as plain text
txt_content = transcription.to_text()
with open("transcription.txt", "w") as f:
f.write(txt_content)
# Export as JSON
json_content = transcription.to_json()
with open("transcription.json", "w") as f:
f.write(json_content)
asyncio.run(export_formats())
async def search_transcription():
client = VerbalisAI()
transcription = await client.transcriptions.create(
audio_url="https://example.com/meeting.mp3",
timestamp_style="word"
)
# Search for specific terms
search_terms = ["action items", "deadline", "budget"]
for term in search_terms:
matches = transcription.search(term, case_sensitive=False)
if matches:
print(f"\nFound '{term}' {len(matches)} times:")
for match in matches:
context_start = max(0, match.start_index - 50)
context_end = min(len(transcription.text), match.end_index + 50)
context = transcription.text[context_start:context_end]
print(f" [{match.timestamp:.1f}s]: ...{context}...")
asyncio.run(search_transcription())