Real-world examples and use cases with the VerbalisAI Python SDK
Real-world examples and practical implementations using the VerbalisAI Python SDK.
#!/usr/bin/env python3
"""
Simple transcription script
Usage: python transcribe.py <audio_url>
"""
import asyncio
import sys
from verbalisai import VerbalisAI
async def main():
if len(sys.argv) != 2:
print("Usage: python transcribe.py <audio_url>")
sys.exit(1)
audio_url = sys.argv[1]
try:
client = VerbalisAI()
print(f"Starting transcription of: {audio_url}")
transcription = await client.transcriptions.create(
audio_url=audio_url,
model="mini"
)
print(f"\nTranscription completed!")
print(f"Duration: {transcription.duration} seconds")
print(f"Text: {transcription.text}")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
#!/usr/bin/env python3
"""
Batch transcription processor
Processes all audio files in a directory
"""
import asyncio
import os
from pathlib import Path
from verbalisai import VerbalisAI
async def process_directory(directory_path: str, output_dir: str = "transcriptions"):
"""Process all audio files in a directory"""
client = VerbalisAI()
# Create output directory
Path(output_dir).mkdir(exist_ok=True)
# Find audio files
audio_extensions = {'.mp3', '.wav', '.flac', '.m4a', '.ogg'}
audio_files = []
for ext in audio_extensions:
audio_files.extend(Path(directory_path).glob(f"*{ext}"))
print(f"Found {len(audio_files)} audio files")
# Process files
results = []
for audio_file in audio_files:
try:
print(f"Processing: {audio_file.name}")
# Upload file
with open(audio_file, "rb") as f:
file_info = await client.files.upload(
file=f,
filename=audio_file.name
)
# Transcribe
transcription = await client.transcriptions.create(
audio_url=file_info.url,
model="mini",
topics=True
)
# Save transcription
output_file = Path(output_dir) / f"{audio_file.stem}_transcription.txt"
with open(output_file, "w", encoding="utf-8") as f:
f.write(f"File: {audio_file.name}\n")
f.write(f"Duration: {transcription.duration} seconds\n")
if transcription.topics:
f.write(f"Topics: {', '.join(transcription.topics)}\n")
f.write(f"\nTranscription:\n{transcription.text}")
results.append({
"file": audio_file.name,
"status": "success",
"output": output_file
})
print(f"✅ Completed: {audio_file.name}")
except Exception as e:
print(f"❌ Failed: {audio_file.name} - {e}")
results.append({
"file": audio_file.name,
"status": "failed",
"error": str(e)
})
# Summary
successful = len([r for r in results if r["status"] == "success"])
failed = len([r for r in results if r["status"] == "failed"])
print(f"\nBatch processing complete:")
print(f"✅ Successful: {successful}")
print(f"❌ Failed: {failed}")
return results
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Usage: python batch_processor.py <directory_path>")
sys.exit(1)
directory = sys.argv[1]
asyncio.run(process_directory(directory))
from flask import Flask, request, jsonify, render_template
from werkzeug.utils import secure_filename
import asyncio
import os
from verbalisai import VerbalisAI
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max
# Ensure upload directory exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
client = VerbalisAI()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
if file:
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
# Start transcription in background
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
transcription = loop.run_until_complete(
transcribe_file(file_path, filename)
)
return jsonify(transcription)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
loop.close()
# Clean up uploaded file
os.remove(file_path)
async def transcribe_file(file_path: str, filename: str):
"""Transcribe uploaded file"""
# Upload to VerbalisAI
with open(file_path, "rb") as f:
file_info = await client.files.upload(
file=f,
filename=filename
)
# Transcribe with advanced features
transcription = await client.transcriptions.create(
audio_url=file_info.url,
model="pro",
diarize=True,
topics=True,
summarization=True,
summary_type="bullets"
)
return {
'id': transcription.id,
'text': transcription.text,
'duration': transcription.duration,
'topics': transcription.topics,
'summary': transcription.summary.text if transcription.summary else None,
'speakers': len(set(s.speaker_id for s in transcription.segments if s.speaker_id))
}
@app.route('/transcription/<transcription_id>')
def get_transcription(transcription_id):
"""Get transcription details"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
transcription = loop.run_until_complete(
client.transcriptions.get(transcription_id)
)
return jsonify({
'id': transcription.id,
'status': transcription.status,
'text': transcription.text,
'segments': [
{
'text': s.text,
'start': s.start,
'end': s.end,
'speaker_id': s.speaker_id
}
for s in transcription.segments
]
})
except Exception as e:
return jsonify({'error': str(e)}), 404
finally:
loop.close()
if __name__ == '__main__':
app.run(debug=True)
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks
from fastapi.responses import HTMLResponse
import asyncio
import tempfile
import os
from verbalisai import VerbalisAI
app = FastAPI(title="VerbalisAI Transcription Service")
client = VerbalisAI()
# In-memory storage for demo (use database in production)
transcription_status = {}
@app.get("/", response_class=HTMLResponse)
async def index():
return """
<html>
<head><title>VerbalisAI Transcription</title></head>
<body>
<h1>Audio Transcription Service</h1>
<form action="/transcribe" method="post" enctype="multipart/form-data">
<input type="file" name="file" accept="audio/*" required>
<button type="submit">Upload & Transcribe</button>
</form>
</body>
</html>
"""
@app.post("/transcribe")
async def transcribe_audio(
background_tasks: BackgroundTasks,
file: UploadFile = File(...)
):
"""Upload and transcribe audio file"""
if not file.content_type.startswith('audio/'):
raise HTTPException(400, "File must be an audio file")
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{file.filename}") as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
try:
# Upload to VerbalisAI
with open(tmp_path, "rb") as f:
file_info = await client.files.upload(
file=f,
filename=file.filename
)
# Start transcription (non-blocking)
transcription = await client.transcriptions.create(
audio_url=file_info.url,
model="pro",
diarize=True,
topics=True,
summarization=True,
wait_until_complete=False
)
# Store status
transcription_status[transcription.id] = {
"status": "processing",
"filename": file.filename
}
# Add background task to check completion
background_tasks.add_task(monitor_transcription, transcription.id)
return {
"transcription_id": transcription.id,
"status": "processing",
"message": f"Transcription started for {file.filename}"
}
finally:
# Clean up temporary file
os.unlink(tmp_path)
async def monitor_transcription(transcription_id: str):
"""Monitor transcription completion"""
while True:
try:
transcription = await client.transcriptions.get(transcription_id)
if transcription.status == "completed":
transcription_status[transcription_id] = {
"status": "completed",
"transcription": transcription
}
break
elif transcription.status == "failed":
transcription_status[transcription_id] = {
"status": "failed",
"error": transcription.error
}
break
await asyncio.sleep(5) # Check every 5 seconds
except Exception as e:
transcription_status[transcription_id] = {
"status": "error",
"error": str(e)
}
break
@app.get("/status/{transcription_id}")
async def get_transcription_status(transcription_id: str):
"""Get transcription status and results"""
if transcription_id not in transcription_status:
raise HTTPException(404, "Transcription not found")
status_info = transcription_status[transcription_id]
if status_info["status"] == "completed":
transcription = status_info["transcription"]
return {
"status": "completed",
"text": transcription.text,
"duration": transcription.duration,
"topics": transcription.topics,
"summary": transcription.summary.text if transcription.summary else None,
"segments": [
{
"text": s.text,
"start": s.start,
"end": s.end,
"speaker_id": s.speaker_id
}
for s in transcription.segments
]
}
else:
return status_info
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
import asyncio
import json
from datetime import datetime
from pathlib import Path
from verbalisai import VerbalisAI
class MeetingAnalyzer:
def __init__(self):
self.client = VerbalisAI()
async def analyze_meeting(self, audio_file_path: str) -> dict:
"""Analyze a meeting recording"""
print(f"Analyzing meeting: {audio_file_path}")
# Upload audio file
with open(audio_file_path, "rb") as f:
file_info = await self.client.files.upload(
file=f,
filename=Path(audio_file_path).name,
tags=["meeting", "analysis"]
)
# Transcribe with full analysis
transcription = await self.client.transcriptions.create(
audio_url=file_info.url,
model="pro",
diarize=True,
topics=True,
summarization=True,
summary_type="bullets",
entity_detection=True,
entity_types=["person", "organization", "date", "phone_number", "email"]
)
# Extract meeting insights
analysis = {
"metadata": {
"filename": Path(audio_file_path).name,
"duration": transcription.duration,
"analyzed_at": datetime.now().isoformat(),
"transcription_id": transcription.id
},
"participants": self.extract_participants(transcription),
"topics": transcription.topics,
"summary": transcription.summary.text if transcription.summary else None,
"action_items": self.extract_action_items(transcription.text),
"key_decisions": self.extract_decisions(transcription.text),
"entities": self.group_entities(transcription.entities),
"conversation_flow": self.analyze_conversation_flow(transcription.segments),
"speaking_time": self.calculate_speaking_time(transcription.segments),
"full_transcript": transcription.text,
"timestamped_segments": [
{
"speaker": s.speaker_id,
"text": s.text,
"start": s.start,
"end": s.end
}
for s in transcription.segments
]
}
return analysis
def extract_participants(self, transcription) -> list:
"""Extract unique speakers/participants"""
speakers = set()
for segment in transcription.segments:
if segment.speaker_id:
speakers.add(segment.speaker_id)
return [
{
"id": speaker,
"label": f"Participant {speaker[-1]}" if speaker else "Unknown"
}
for speaker in sorted(speakers)
]
def extract_action_items(self, text: str) -> list:
"""Extract potential action items using keyword matching"""
action_keywords = [
"action item", "todo", "to do", "follow up", "will do",
"should do", "need to", "must do", "assigned to",
"deadline", "by next week", "by friday"
]
sentences = text.split('. ')
action_items = []
for sentence in sentences:
sentence_lower = sentence.lower()
if any(keyword in sentence_lower for keyword in action_keywords):
action_items.append(sentence.strip())
return action_items
def extract_decisions(self, text: str) -> list:
"""Extract key decisions made during the meeting"""
decision_keywords = [
"decided", "decision", "agreed", "consensus",
"approved", "rejected", "concluded", "determined"
]
sentences = text.split('. ')
decisions = []
for sentence in sentences:
sentence_lower = sentence.lower()
if any(keyword in sentence_lower for keyword in decision_keywords):
decisions.append(sentence.strip())
return decisions
def group_entities(self, entities) -> dict:
"""Group entities by type"""
grouped = {}
for entity in entities:
entity_type = entity.type
if entity_type not in grouped:
grouped[entity_type] = []
grouped[entity_type].append({
"text": entity.text,
"confidence": entity.confidence
})
return grouped
def analyze_conversation_flow(self, segments) -> dict:
"""Analyze conversation flow and speaker transitions"""
if not segments:
return {}
speaker_transitions = []
current_speaker = None
for segment in segments:
if segment.speaker_id != current_speaker:
speaker_transitions.append({
"from": current_speaker,
"to": segment.speaker_id,
"timestamp": segment.start
})
current_speaker = segment.speaker_id
return {
"total_transitions": len(speaker_transitions),
"transitions": speaker_transitions[:10] # First 10 transitions
}
def calculate_speaking_time(self, segments) -> dict:
"""Calculate speaking time for each participant"""
speaking_time = {}
for segment in segments:
speaker = segment.speaker_id or "Unknown"
duration = segment.end - segment.start
if speaker not in speaking_time:
speaking_time[speaker] = 0
speaking_time[speaker] += duration
# Convert to percentages
total_time = sum(speaking_time.values())
if total_time > 0:
speaking_percentages = {
speaker: (time / total_time) * 100
for speaker, time in speaking_time.items()
}
else:
speaking_percentages = {}
return {
"absolute_time": speaking_time,
"percentages": speaking_percentages
}
async def main():
analyzer = MeetingAnalyzer()
# Analyze a meeting
audio_file = "meeting-recording.mp3"
try:
analysis = await analyzer.analyze_meeting(audio_file)
# Save analysis results
output_file = f"meeting_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(analysis, f, indent=2, ensure_ascii=False)
print(f"Meeting analysis completed!")
print(f"Results saved to: {output_file}")
# Print summary
print(f"\n--- Meeting Summary ---")
print(f"Duration: {analysis['metadata']['duration']} seconds")
print(f"Participants: {len(analysis['participants'])}")
print(f"Topics: {', '.join(analysis['topics'][:5])}")
print(f"Action Items: {len(analysis['action_items'])}")
print(f"Key Decisions: {len(analysis['key_decisions'])}")
except Exception as e:
print(f"Error analyzing meeting: {e}")
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import json
from pathlib import Path
from datetime import datetime
from verbalisai import VerbalisAI
class PodcastProcessor:
def __init__(self):
self.client = VerbalisAI()
async def process_podcast_episode(self, audio_file: str, metadata: dict = None) -> dict:
"""Process a podcast episode with full analysis"""
print(f"Processing podcast episode: {audio_file}")
# Upload audio
with open(audio_file, "rb") as f:
file_info = await self.client.files.upload(
file=f,
filename=Path(audio_file).name,
tags=["podcast", "episode"],
folder="podcasts"
)
# Transcribe with speaker identification
transcription = await self.client.transcriptions.create(
audio_url=file_info.url,
model="pro",
diarize=True,
topics=True,
summarization=True,
summary_type="paragraphs",
entity_detection=True,
entity_types=["person", "organization", "product", "location"]
)
# Generate podcast-specific outputs
result = {
"episode_info": {
"filename": Path(audio_file).name,
"duration": transcription.duration,
"processed_at": datetime.now().isoformat(),
**metadata if metadata else {}
},
"content": {
"full_transcript": transcription.text,
"summary": transcription.summary.text if transcription.summary else None,
"topics": transcription.topics,
"key_entities": self.extract_key_entities(transcription.entities)
},
"speakers": self.analyze_speakers(transcription.segments),
"highlights": self.extract_highlights(transcription.segments),
"chapters": self.generate_chapters(transcription.segments, transcription.topics),
"seo": {
"title_suggestions": self.generate_title_suggestions(transcription.topics),
"description": self.generate_description(transcription.summary),
"keywords": transcription.topics[:10]
},
"social_media": {
"twitter_thread": self.generate_twitter_thread(transcription.text),
"instagram_caption": self.generate_instagram_caption(transcription.summary),
"youtube_description": self.generate_youtube_description(transcription)
}
}
return result
def extract_key_entities(self, entities) -> dict:
"""Extract and categorize key entities"""
key_entities = {}
for entity in entities:
entity_type = entity.type
if entity_type not in key_entities:
key_entities[entity_type] = []
# Only include high-confidence entities
if entity.confidence > 0.8:
key_entities[entity_type].append({
"name": entity.text,
"confidence": entity.confidence
})
return key_entities
def analyze_speakers(self, segments) -> dict:
"""Analyze speaker patterns and characteristics"""
speakers = {}
for segment in segments:
speaker_id = segment.speaker_id or "Unknown"
if speaker_id not in speakers:
speakers[speaker_id] = {
"total_time": 0,
"segment_count": 0,
"sample_quotes": []
}
speakers[speaker_id]["total_time"] += segment.end - segment.start
speakers[speaker_id]["segment_count"] += 1
# Collect sample quotes (longer segments)
if len(segment.text) > 100 and len(speakers[speaker_id]["sample_quotes"]) < 3:
speakers[speaker_id]["sample_quotes"].append(segment.text)
return speakers
def extract_highlights(self, segments) -> list:
"""Extract potential highlight clips"""
highlights = []
# Look for longer, engaging segments
for segment in segments:
if (len(segment.text) > 200 and
segment.end - segment.start > 30): # At least 30 seconds
highlights.append({
"text": segment.text,
"start_time": segment.start,
"end_time": segment.end,
"duration": segment.end - segment.start,
"speaker": segment.speaker_id
})
# Sort by duration (longest first)
highlights.sort(key=lambda x: x["duration"], reverse=True)
return highlights[:5] # Top 5 highlights
def generate_chapters(self, segments, topics) -> list:
"""Generate chapter markers based on content flow"""
if not segments:
return []
chapters = []
chapter_duration = 300 # 5 minutes per chapter
current_time = 0
chapter_num = 1
while current_time < segments[-1].end:
# Find segments around this time
chapter_segments = [
s for s in segments
if current_time <= s.start < current_time + chapter_duration
]
if chapter_segments:
# Generate chapter title from content
chapter_text = " ".join([s.text for s in chapter_segments[:3]])
title = self.generate_chapter_title(chapter_text, topics)
chapters.append({
"number": chapter_num,
"title": title,
"start_time": current_time,
"end_time": min(current_time + chapter_duration, segments[-1].end)
})
chapter_num += 1
current_time += chapter_duration
return chapters
def generate_chapter_title(self, text: str, topics: list) -> str:
"""Generate a chapter title from text content"""
# Simple approach: use relevant topic or first few words
words = text.split()[:8]
# Check if any topics appear in this text
text_lower = text.lower()
for topic in topics:
if topic.lower() in text_lower:
return f"Discussion about {topic}"
# Fallback to first few words
return " ".join(words) + "..."
def generate_title_suggestions(self, topics: list) -> list:
"""Generate SEO-friendly title suggestions"""
if not topics:
return ["Podcast Episode"]
templates = [
f"The Ultimate Guide to {topics[0]}",
f"Everything You Need to Know About {topics[0]}",
f"{topics[0]}: Insights and Tips",
f"Deep Dive: {topics[0]} Explained",
f"Mastering {topics[0]}: A Complete Discussion"
]
return templates[:3]
def generate_description(self, summary) -> str:
"""Generate podcast episode description"""
if not summary or not summary.text:
return "An engaging podcast episode with valuable insights and discussion."
return f"In this episode, we explore {summary.text[:200]}..."
def generate_twitter_thread(self, transcript: str) -> list:
"""Generate Twitter thread from transcript"""
# Simple approach: break into tweet-sized chunks
sentences = transcript.split('. ')
thread = []
current_tweet = ""
for sentence in sentences:
if len(current_tweet + sentence) < 280:
current_tweet += sentence + ". "
else:
if current_tweet.strip():
thread.append(current_tweet.strip())
current_tweet = sentence + ". "
if current_tweet.strip():
thread.append(current_tweet.strip())
return thread[:5] # Max 5 tweets
def generate_instagram_caption(self, summary) -> str:
"""Generate Instagram caption"""
if not summary or not summary.text:
return "New podcast episode is live! 🎙️ #podcast #newepisode"
return f"New episode is live! 🎙️\n\n{summary.text[:150]}...\n\n#podcast #newepisode #discussion"
def generate_youtube_description(self, transcription) -> str:
"""Generate YouTube video description"""
description = f"""New podcast episode is now available!
Duration: {transcription.duration} seconds
{transcription.summary.text if transcription.summary else 'An engaging discussion with valuable insights.'}
Topics covered:
"""
for i, topic in enumerate(transcription.topics[:5], 1):
description += f"{i}. {topic}\n"
description += "\n--- Full Transcript ---\n"
description += transcription.text[:1000] + "..."
return description
async def main():
processor = PodcastProcessor()
# Process podcast episode
audio_file = "podcast-episode-001.mp3"
metadata = {
"title": "Episode 1: Getting Started",
"host": "John Doe",
"guest": "Jane Smith",
"episode_number": 1
}
try:
result = await processor.process_podcast_episode(audio_file, metadata)
# Save results
output_file = f"podcast_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"Podcast processing completed!")
print(f"Results saved to: {output_file}")
# Print summary
print(f"\n--- Episode Summary ---")
print(f"Duration: {result['episode_info']['duration']} seconds")
print(f"Topics: {', '.join(result['content']['topics'][:3])}")
print(f"Speakers: {len(result['speakers'])}")
print(f"Chapters: {len(result['chapters'])}")
print(f"Highlights: {len(result['highlights'])}")
except Exception as e:
print(f"Error processing podcast: {e}")
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import schedule
import time
from pathlib import Path
from verbalisai import VerbalisAI
import logging
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('transcription_service.log'),
logging.StreamHandler()
]
)
class ScheduledTranscriptionService:
def __init__(self, watch_folder: str, output_folder: str):
self.client = VerbalisAI()
self.watch_folder = Path(watch_folder)
self.output_folder = Path(output_folder)
self.processed_files = set()
# Create folders if they don't exist
self.watch_folder.mkdir(exist_ok=True)
self.output_folder.mkdir(exist_ok=True)
logging.info(f"Watching folder: {self.watch_folder}")
logging.info(f"Output folder: {self.output_folder}")
async def process_new_files(self):
"""Process any new audio files in the watch folder"""
audio_extensions = {'.mp3', '.wav', '.flac', '.m4a', '.ogg'}
# Find new audio files
new_files = []
for ext in audio_extensions:
for file_path in self.watch_folder.glob(f"*{ext}"):
if file_path not in self.processed_files:
new_files.append(file_path)
if not new_files:
logging.info("No new files to process")
return
logging.info(f"Found {len(new_files)} new files to process")
# Process each file
for file_path in new_files:
try:
await self.process_single_file(file_path)
self.processed_files.add(file_path)
logging.info(f"✅ Processed: {file_path.name}")
except Exception as e:
logging.error(f"❌ Failed to process {file_path.name}: {e}")
async def process_single_file(self, file_path: Path):
"""Process a single audio file"""
logging.info(f"Processing: {file_path.name}")
# Upload file
with open(file_path, "rb") as f:
file_info = await self.client.files.upload(
file=f,
filename=file_path.name,
folder="scheduled_uploads"
)
# Transcribe
transcription = await self.client.transcriptions.create(
audio_url=file_info.url,
model="mini",
topics=True,
summarization=True
)
# Save results
output_file = self.output_folder / f"{file_path.stem}_transcript.txt"
with open(output_file, "w", encoding="utf-8") as f:
f.write(f"File: {file_path.name}\n")
f.write(f"Duration: {transcription.duration} seconds\n")
f.write(f"Topics: {', '.join(transcription.topics)}\n")
if transcription.summary:
f.write(f"Summary: {transcription.summary.text}\n")
f.write(f"\n--- Transcript ---\n")
f.write(transcription.text)
# Optionally move processed file to archive
archive_folder = self.watch_folder / "processed"
archive_folder.mkdir(exist_ok=True)
archived_file = archive_folder / file_path.name
file_path.rename(archived_file)
logging.info(f"Archived processed file to: {archived_file}")
def run_scheduled_service():
"""Run the scheduled transcription service"""
service = ScheduledTranscriptionService(
watch_folder="./watch_folder",
output_folder="./transcriptions"
)
def job():
"""Wrapper function for async job"""
asyncio.run(service.process_new_files())
# Schedule the job to run every 15 minutes
schedule.every(15).minutes.do(job)
logging.info("Scheduled transcription service started")
logging.info("Checking for new files every 15 minutes...")
# Keep the service running
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
if __name__ == "__main__":
run_scheduled_service()
import asyncio
import os
from slack_bolt.async_app import AsyncApp
from slack_bolt.adapter.socket_mode.async_handler import AsyncSocketModeHandler
from verbalisai import VerbalisAI
# Initialize Slack app
app = AsyncApp(token=os.environ.get("SLACK_BOT_TOKEN"))
client = VerbalisAI()
@app.event("file_shared")
async def handle_file_shared(event, say):
"""Handle audio file uploads to Slack"""
file_info = event.get("file", {})
# Check if it's an audio file
if not file_info.get("mimetype", "").startswith("audio/"):
return
await say(f"🎙️ Audio file detected: {file_info['name']}. Starting transcription...")
try:
# Download file from Slack
file_url = file_info["url_private_download"]
# Start transcription
transcription = await client.transcriptions.create(
audio_url=file_url,
model="mini",
topics=True
)
# Send results back to Slack
await say(
f"✅ Transcription completed for `{file_info['name']}`:\n\n"
f"**Duration:** {transcription.duration} seconds\n"
f"**Topics:** {', '.join(transcription.topics)}\n\n"
f"**Transcript:**\n```{transcription.text[:1000]}{'...' if len(transcription.text) > 1000 else ''}```"
)
except Exception as e:
await say(f"❌ Transcription failed: {str(e)}")
@app.command("/transcribe")
async def transcribe_command(ack, respond, command):
"""Slash command to transcribe audio from URL"""
await ack()
url = command["text"].strip()
if not url:
await respond("Please provide an audio URL: `/transcribe https://example.com/audio.mp3`")
return
await respond("🎙️ Starting transcription...")
try:
transcription = await client.transcriptions.create(
audio_url=url,
model="mini",
topics=True,
summarization=True
)
await respond(
f"✅ Transcription completed:\n\n"
f"**Duration:** {transcription.duration} seconds\n"
f"**Topics:** {', '.join(transcription.topics)}\n"
f"**Summary:** {transcription.summary.text if transcription.summary else 'N/A'}\n\n"
f"**Transcript:**\n```{transcription.text[:1500]}{'...' if len(transcription.text) > 1500 else ''}```"
)
except Exception as e:
await respond(f"❌ Transcription failed: {str(e)}")
async def main():
handler = AsyncSocketModeHandler(app, os.environ["SLACK_APP_TOKEN"])
await handler.start_async()
if __name__ == "__main__":
asyncio.run(main())
This concludes the Python SDK examples. Ready to explore JavaScript SDK documentation? Check out the JavaScript Transcription guide for similar features in JavaScript.