Python SDK Examples

Real-world examples and practical implementations using the VerbalisAI Python SDK.

Basic Examples

Simple Transcription Script

#!/usr/bin/env python3
"""
Simple transcription script
Usage: python transcribe.py <audio_url>
"""

import asyncio
import sys
from verbalisai import VerbalisAI

async def main():
    if len(sys.argv) != 2:
        print("Usage: python transcribe.py <audio_url>")
        sys.exit(1)
    
    audio_url = sys.argv[1]
    
    try:
        client = VerbalisAI()
        
        print(f"Starting transcription of: {audio_url}")
        
        transcription = await client.transcriptions.create(
            audio_url=audio_url,
            model="mini"
        )
        
        print(f"\nTranscription completed!")
        print(f"Duration: {transcription.duration} seconds")
        print(f"Text: {transcription.text}")
        
    except Exception as e:
        print(f"Error: {e}")
        sys.exit(1)

if __name__ == "__main__":
    asyncio.run(main())

Batch File Processor

#!/usr/bin/env python3
"""
Batch transcription processor
Processes all audio files in a directory
"""

import asyncio
import os
from pathlib import Path
from verbalisai import VerbalisAI

async def process_directory(directory_path: str, output_dir: str = "transcriptions"):
    """Process all audio files in a directory"""
    
    client = VerbalisAI()
    
    # Create output directory
    Path(output_dir).mkdir(exist_ok=True)
    
    # Find audio files
    audio_extensions = {'.mp3', '.wav', '.flac', '.m4a', '.ogg'}
    audio_files = []
    
    for ext in audio_extensions:
        audio_files.extend(Path(directory_path).glob(f"*{ext}"))
    
    print(f"Found {len(audio_files)} audio files")
    
    # Process files
    results = []
    for audio_file in audio_files:
        try:
            print(f"Processing: {audio_file.name}")
            
            # Upload file
            with open(audio_file, "rb") as f:
                file_info = await client.files.upload(
                    file=f,
                    filename=audio_file.name
                )
            
            # Transcribe
            transcription = await client.transcriptions.create(
                audio_url=file_info.url,
                model="mini",
                topics=True
            )
            
            # Save transcription
            output_file = Path(output_dir) / f"{audio_file.stem}_transcription.txt"
            with open(output_file, "w", encoding="utf-8") as f:
                f.write(f"File: {audio_file.name}\n")
                f.write(f"Duration: {transcription.duration} seconds\n")
                if transcription.topics:
                    f.write(f"Topics: {', '.join(transcription.topics)}\n")
                f.write(f"\nTranscription:\n{transcription.text}")
            
            results.append({
                "file": audio_file.name,
                "status": "success",
                "output": output_file
            })
            
            print(f"✅ Completed: {audio_file.name}")
            
        except Exception as e:
            print(f"❌ Failed: {audio_file.name} - {e}")
            results.append({
                "file": audio_file.name,
                "status": "failed",
                "error": str(e)
            })
    
    # Summary
    successful = len([r for r in results if r["status"] == "success"])
    failed = len([r for r in results if r["status"] == "failed"])
    
    print(f"\nBatch processing complete:")
    print(f"✅ Successful: {successful}")
    print(f"❌ Failed: {failed}")
    
    return results

if __name__ == "__main__":
    import sys
    
    if len(sys.argv) != 2:
        print("Usage: python batch_processor.py <directory_path>")
        sys.exit(1)
    
    directory = sys.argv[1]
    asyncio.run(process_directory(directory))

Web Applications

Flask Web App

from flask import Flask, request, jsonify, render_template
from werkzeug.utils import secure_filename
import asyncio
import os
from verbalisai import VerbalisAI

app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024  # 100MB max

# Ensure upload directory exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)

client = VerbalisAI()

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/upload', methods=['POST'])
def upload_file():
    if 'file' not in request.files:
        return jsonify({'error': 'No file provided'}), 400
    
    file = request.files['file']
    if file.filename == '':
        return jsonify({'error': 'No file selected'}), 400
    
    if file:
        filename = secure_filename(file.filename)
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
        file.save(file_path)
        
        # Start transcription in background
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        
        try:
            transcription = loop.run_until_complete(
                transcribe_file(file_path, filename)
            )
            return jsonify(transcription)
        except Exception as e:
            return jsonify({'error': str(e)}), 500
        finally:
            loop.close()
            # Clean up uploaded file
            os.remove(file_path)

async def transcribe_file(file_path: str, filename: str):
    """Transcribe uploaded file"""
    
    # Upload to VerbalisAI
    with open(file_path, "rb") as f:
        file_info = await client.files.upload(
            file=f,
            filename=filename
        )
    
    # Transcribe with advanced features
    transcription = await client.transcriptions.create(
        audio_url=file_info.url,
        model="pro",
        diarize=True,
        topics=True,
        summarization=True,
        summary_type="bullets"
    )
    
    return {
        'id': transcription.id,
        'text': transcription.text,
        'duration': transcription.duration,
        'topics': transcription.topics,
        'summary': transcription.summary.text if transcription.summary else None,
        'speakers': len(set(s.speaker_id for s in transcription.segments if s.speaker_id))
    }

@app.route('/transcription/<transcription_id>')
def get_transcription(transcription_id):
    """Get transcription details"""
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    
    try:
        transcription = loop.run_until_complete(
            client.transcriptions.get(transcription_id)
        )
        
        return jsonify({
            'id': transcription.id,
            'status': transcription.status,
            'text': transcription.text,
            'segments': [
                {
                    'text': s.text,
                    'start': s.start,
                    'end': s.end,
                    'speaker_id': s.speaker_id
                }
                for s in transcription.segments
            ]
        })
    except Exception as e:
        return jsonify({'error': str(e)}), 404
    finally:
        loop.close()

if __name__ == '__main__':
    app.run(debug=True)

FastAPI Application

from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks
from fastapi.responses import HTMLResponse
import asyncio
import tempfile
import os
from verbalisai import VerbalisAI

app = FastAPI(title="VerbalisAI Transcription Service")
client = VerbalisAI()

# In-memory storage for demo (use database in production)
transcription_status = {}

@app.get("/", response_class=HTMLResponse)
async def index():
    return """
    <html>
        <head><title>VerbalisAI Transcription</title></head>
        <body>
            <h1>Audio Transcription Service</h1>
            <form action="/transcribe" method="post" enctype="multipart/form-data">
                <input type="file" name="file" accept="audio/*" required>
                <button type="submit">Upload & Transcribe</button>
            </form>
        </body>
    </html>
    """

@app.post("/transcribe")
async def transcribe_audio(
    background_tasks: BackgroundTasks,
    file: UploadFile = File(...)
):
    """Upload and transcribe audio file"""
    
    if not file.content_type.startswith('audio/'):
        raise HTTPException(400, "File must be an audio file")
    
    # Create temporary file
    with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{file.filename}") as tmp:
        content = await file.read()
        tmp.write(content)
        tmp_path = tmp.name
    
    try:
        # Upload to VerbalisAI
        with open(tmp_path, "rb") as f:
            file_info = await client.files.upload(
                file=f,
                filename=file.filename
            )
        
        # Start transcription (non-blocking)
        transcription = await client.transcriptions.create(
            audio_url=file_info.url,
            model="pro",
            diarize=True,
            topics=True,
            summarization=True,
            wait_until_complete=False
        )
        
        # Store status
        transcription_status[transcription.id] = {
            "status": "processing",
            "filename": file.filename
        }
        
        # Add background task to check completion
        background_tasks.add_task(monitor_transcription, transcription.id)
        
        return {
            "transcription_id": transcription.id,
            "status": "processing",
            "message": f"Transcription started for {file.filename}"
        }
        
    finally:
        # Clean up temporary file
        os.unlink(tmp_path)

async def monitor_transcription(transcription_id: str):
    """Monitor transcription completion"""
    
    while True:
        try:
            transcription = await client.transcriptions.get(transcription_id)
            
            if transcription.status == "completed":
                transcription_status[transcription_id] = {
                    "status": "completed",
                    "transcription": transcription
                }
                break
            elif transcription.status == "failed":
                transcription_status[transcription_id] = {
                    "status": "failed",
                    "error": transcription.error
                }
                break
            
            await asyncio.sleep(5)  # Check every 5 seconds
            
        except Exception as e:
            transcription_status[transcription_id] = {
                "status": "error",
                "error": str(e)
            }
            break

@app.get("/status/{transcription_id}")
async def get_transcription_status(transcription_id: str):
    """Get transcription status and results"""
    
    if transcription_id not in transcription_status:
        raise HTTPException(404, "Transcription not found")
    
    status_info = transcription_status[transcription_id]
    
    if status_info["status"] == "completed":
        transcription = status_info["transcription"]
        return {
            "status": "completed",
            "text": transcription.text,
            "duration": transcription.duration,
            "topics": transcription.topics,
            "summary": transcription.summary.text if transcription.summary else None,
            "segments": [
                {
                    "text": s.text,
                    "start": s.start,
                    "end": s.end,
                    "speaker_id": s.speaker_id
                }
                for s in transcription.segments
            ]
        }
    else:
        return status_info

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Data Processing

Meeting Transcription Analyzer

import asyncio
import json
from datetime import datetime
from pathlib import Path
from verbalisai import VerbalisAI

class MeetingAnalyzer:
    def __init__(self):
        self.client = VerbalisAI()
    
    async def analyze_meeting(self, audio_file_path: str) -> dict:
        """Analyze a meeting recording"""
        
        print(f"Analyzing meeting: {audio_file_path}")
        
        # Upload audio file
        with open(audio_file_path, "rb") as f:
            file_info = await self.client.files.upload(
                file=f,
                filename=Path(audio_file_path).name,
                tags=["meeting", "analysis"]
            )
        
        # Transcribe with full analysis
        transcription = await self.client.transcriptions.create(
            audio_url=file_info.url,
            model="pro",
            diarize=True,
            topics=True,
            summarization=True,
            summary_type="bullets",
            entity_detection=True,
            entity_types=["person", "organization", "date", "phone_number", "email"]
        )
        
        # Extract meeting insights
        analysis = {
            "metadata": {
                "filename": Path(audio_file_path).name,
                "duration": transcription.duration,
                "analyzed_at": datetime.now().isoformat(),
                "transcription_id": transcription.id
            },
            
            "participants": self.extract_participants(transcription),
            "topics": transcription.topics,
            "summary": transcription.summary.text if transcription.summary else None,
            "action_items": self.extract_action_items(transcription.text),
            "key_decisions": self.extract_decisions(transcription.text),
            "entities": self.group_entities(transcription.entities),
            
            "conversation_flow": self.analyze_conversation_flow(transcription.segments),
            "speaking_time": self.calculate_speaking_time(transcription.segments),
            
            "full_transcript": transcription.text,
            "timestamped_segments": [
                {
                    "speaker": s.speaker_id,
                    "text": s.text,
                    "start": s.start,
                    "end": s.end
                }
                for s in transcription.segments
            ]
        }
        
        return analysis
    
    def extract_participants(self, transcription) -> list:
        """Extract unique speakers/participants"""
        speakers = set()
        for segment in transcription.segments:
            if segment.speaker_id:
                speakers.add(segment.speaker_id)
        
        return [
            {
                "id": speaker,
                "label": f"Participant {speaker[-1]}" if speaker else "Unknown"
            }
            for speaker in sorted(speakers)
        ]
    
    def extract_action_items(self, text: str) -> list:
        """Extract potential action items using keyword matching"""
        action_keywords = [
            "action item", "todo", "to do", "follow up", "will do",
            "should do", "need to", "must do", "assigned to",
            "deadline", "by next week", "by friday"
        ]
        
        sentences = text.split('. ')
        action_items = []
        
        for sentence in sentences:
            sentence_lower = sentence.lower()
            if any(keyword in sentence_lower for keyword in action_keywords):
                action_items.append(sentence.strip())
        
        return action_items
    
    def extract_decisions(self, text: str) -> list:
        """Extract key decisions made during the meeting"""
        decision_keywords = [
            "decided", "decision", "agreed", "consensus",
            "approved", "rejected", "concluded", "determined"
        ]
        
        sentences = text.split('. ')
        decisions = []
        
        for sentence in sentences:
            sentence_lower = sentence.lower()
            if any(keyword in sentence_lower for keyword in decision_keywords):
                decisions.append(sentence.strip())
        
        return decisions
    
    def group_entities(self, entities) -> dict:
        """Group entities by type"""
        grouped = {}
        for entity in entities:
            entity_type = entity.type
            if entity_type not in grouped:
                grouped[entity_type] = []
            grouped[entity_type].append({
                "text": entity.text,
                "confidence": entity.confidence
            })
        
        return grouped
    
    def analyze_conversation_flow(self, segments) -> dict:
        """Analyze conversation flow and speaker transitions"""
        if not segments:
            return {}
        
        speaker_transitions = []
        current_speaker = None
        
        for segment in segments:
            if segment.speaker_id != current_speaker:
                speaker_transitions.append({
                    "from": current_speaker,
                    "to": segment.speaker_id,
                    "timestamp": segment.start
                })
                current_speaker = segment.speaker_id
        
        return {
            "total_transitions": len(speaker_transitions),
            "transitions": speaker_transitions[:10]  # First 10 transitions
        }
    
    def calculate_speaking_time(self, segments) -> dict:
        """Calculate speaking time for each participant"""
        speaking_time = {}
        
        for segment in segments:
            speaker = segment.speaker_id or "Unknown"
            duration = segment.end - segment.start
            
            if speaker not in speaking_time:
                speaking_time[speaker] = 0
            speaking_time[speaker] += duration
        
        # Convert to percentages
        total_time = sum(speaking_time.values())
        if total_time > 0:
            speaking_percentages = {
                speaker: (time / total_time) * 100
                for speaker, time in speaking_time.items()
            }
        else:
            speaking_percentages = {}
        
        return {
            "absolute_time": speaking_time,
            "percentages": speaking_percentages
        }

async def main():
    analyzer = MeetingAnalyzer()
    
    # Analyze a meeting
    audio_file = "meeting-recording.mp3"
    
    try:
        analysis = await analyzer.analyze_meeting(audio_file)
        
        # Save analysis results
        output_file = f"meeting_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(output_file, "w", encoding="utf-8") as f:
            json.dump(analysis, f, indent=2, ensure_ascii=False)
        
        print(f"Meeting analysis completed!")
        print(f"Results saved to: {output_file}")
        
        # Print summary
        print(f"\n--- Meeting Summary ---")
        print(f"Duration: {analysis['metadata']['duration']} seconds")
        print(f"Participants: {len(analysis['participants'])}")
        print(f"Topics: {', '.join(analysis['topics'][:5])}")
        print(f"Action Items: {len(analysis['action_items'])}")
        print(f"Key Decisions: {len(analysis['key_decisions'])}")
        
    except Exception as e:
        print(f"Error analyzing meeting: {e}")

if __name__ == "__main__":
    asyncio.run(main())

Podcast Processing Pipeline

import asyncio
import json
from pathlib import Path
from datetime import datetime
from verbalisai import VerbalisAI

class PodcastProcessor:
    def __init__(self):
        self.client = VerbalisAI()
    
    async def process_podcast_episode(self, audio_file: str, metadata: dict = None) -> dict:
        """Process a podcast episode with full analysis"""
        
        print(f"Processing podcast episode: {audio_file}")
        
        # Upload audio
        with open(audio_file, "rb") as f:
            file_info = await self.client.files.upload(
                file=f,
                filename=Path(audio_file).name,
                tags=["podcast", "episode"],
                folder="podcasts"
            )
        
        # Transcribe with speaker identification
        transcription = await self.client.transcriptions.create(
            audio_url=file_info.url,
            model="pro",
            diarize=True,
            topics=True,
            summarization=True,
            summary_type="paragraphs",
            entity_detection=True,
            entity_types=["person", "organization", "product", "location"]
        )
        
        # Generate podcast-specific outputs
        result = {
            "episode_info": {
                "filename": Path(audio_file).name,
                "duration": transcription.duration,
                "processed_at": datetime.now().isoformat(),
                **metadata if metadata else {}
            },
            
            "content": {
                "full_transcript": transcription.text,
                "summary": transcription.summary.text if transcription.summary else None,
                "topics": transcription.topics,
                "key_entities": self.extract_key_entities(transcription.entities)
            },
            
            "speakers": self.analyze_speakers(transcription.segments),
            "highlights": self.extract_highlights(transcription.segments),
            "chapters": self.generate_chapters(transcription.segments, transcription.topics),
            
            "seo": {
                "title_suggestions": self.generate_title_suggestions(transcription.topics),
                "description": self.generate_description(transcription.summary),
                "keywords": transcription.topics[:10]
            },
            
            "social_media": {
                "twitter_thread": self.generate_twitter_thread(transcription.text),
                "instagram_caption": self.generate_instagram_caption(transcription.summary),
                "youtube_description": self.generate_youtube_description(transcription)
            }
        }
        
        return result
    
    def extract_key_entities(self, entities) -> dict:
        """Extract and categorize key entities"""
        key_entities = {}
        
        for entity in entities:
            entity_type = entity.type
            if entity_type not in key_entities:
                key_entities[entity_type] = []
            
            # Only include high-confidence entities
            if entity.confidence > 0.8:
                key_entities[entity_type].append({
                    "name": entity.text,
                    "confidence": entity.confidence
                })
        
        return key_entities
    
    def analyze_speakers(self, segments) -> dict:
        """Analyze speaker patterns and characteristics"""
        speakers = {}
        
        for segment in segments:
            speaker_id = segment.speaker_id or "Unknown"
            
            if speaker_id not in speakers:
                speakers[speaker_id] = {
                    "total_time": 0,
                    "segment_count": 0,
                    "sample_quotes": []
                }
            
            speakers[speaker_id]["total_time"] += segment.end - segment.start
            speakers[speaker_id]["segment_count"] += 1
            
            # Collect sample quotes (longer segments)
            if len(segment.text) > 100 and len(speakers[speaker_id]["sample_quotes"]) < 3:
                speakers[speaker_id]["sample_quotes"].append(segment.text)
        
        return speakers
    
    def extract_highlights(self, segments) -> list:
        """Extract potential highlight clips"""
        highlights = []
        
        # Look for longer, engaging segments
        for segment in segments:
            if (len(segment.text) > 200 and 
                segment.end - segment.start > 30):  # At least 30 seconds
                
                highlights.append({
                    "text": segment.text,
                    "start_time": segment.start,
                    "end_time": segment.end,
                    "duration": segment.end - segment.start,
                    "speaker": segment.speaker_id
                })
        
        # Sort by duration (longest first)
        highlights.sort(key=lambda x: x["duration"], reverse=True)
        
        return highlights[:5]  # Top 5 highlights
    
    def generate_chapters(self, segments, topics) -> list:
        """Generate chapter markers based on content flow"""
        if not segments:
            return []
        
        chapters = []
        chapter_duration = 300  # 5 minutes per chapter
        
        current_time = 0
        chapter_num = 1
        
        while current_time < segments[-1].end:
            # Find segments around this time
            chapter_segments = [
                s for s in segments 
                if current_time <= s.start < current_time + chapter_duration
            ]
            
            if chapter_segments:
                # Generate chapter title from content
                chapter_text = " ".join([s.text for s in chapter_segments[:3]])
                title = self.generate_chapter_title(chapter_text, topics)
                
                chapters.append({
                    "number": chapter_num,
                    "title": title,
                    "start_time": current_time,
                    "end_time": min(current_time + chapter_duration, segments[-1].end)
                })
                
                chapter_num += 1
            
            current_time += chapter_duration
        
        return chapters
    
    def generate_chapter_title(self, text: str, topics: list) -> str:
        """Generate a chapter title from text content"""
        # Simple approach: use relevant topic or first few words
        words = text.split()[:8]
        
        # Check if any topics appear in this text
        text_lower = text.lower()
        for topic in topics:
            if topic.lower() in text_lower:
                return f"Discussion about {topic}"
        
        # Fallback to first few words
        return " ".join(words) + "..."
    
    def generate_title_suggestions(self, topics: list) -> list:
        """Generate SEO-friendly title suggestions"""
        if not topics:
            return ["Podcast Episode"]
        
        templates = [
            f"The Ultimate Guide to {topics[0]}",
            f"Everything You Need to Know About {topics[0]}",
            f"{topics[0]}: Insights and Tips",
            f"Deep Dive: {topics[0]} Explained",
            f"Mastering {topics[0]}: A Complete Discussion"
        ]
        
        return templates[:3]
    
    def generate_description(self, summary) -> str:
        """Generate podcast episode description"""
        if not summary or not summary.text:
            return "An engaging podcast episode with valuable insights and discussion."
        
        return f"In this episode, we explore {summary.text[:200]}..."
    
    def generate_twitter_thread(self, transcript: str) -> list:
        """Generate Twitter thread from transcript"""
        # Simple approach: break into tweet-sized chunks
        sentences = transcript.split('. ')
        thread = []
        current_tweet = ""
        
        for sentence in sentences:
            if len(current_tweet + sentence) < 280:
                current_tweet += sentence + ". "
            else:
                if current_tweet.strip():
                    thread.append(current_tweet.strip())
                current_tweet = sentence + ". "
        
        if current_tweet.strip():
            thread.append(current_tweet.strip())
        
        return thread[:5]  # Max 5 tweets
    
    def generate_instagram_caption(self, summary) -> str:
        """Generate Instagram caption"""
        if not summary or not summary.text:
            return "New podcast episode is live! 🎙️ #podcast #newepisode"
        
        return f"New episode is live! 🎙️\n\n{summary.text[:150]}...\n\n#podcast #newepisode #discussion"
    
    def generate_youtube_description(self, transcription) -> str:
        """Generate YouTube video description"""
        description = f"""New podcast episode is now available!

Duration: {transcription.duration} seconds

{transcription.summary.text if transcription.summary else 'An engaging discussion with valuable insights.'}

Topics covered:
"""
        
        for i, topic in enumerate(transcription.topics[:5], 1):
            description += f"{i}. {topic}\n"
        
        description += "\n--- Full Transcript ---\n"
        description += transcription.text[:1000] + "..."
        
        return description

async def main():
    processor = PodcastProcessor()
    
    # Process podcast episode
    audio_file = "podcast-episode-001.mp3"
    metadata = {
        "title": "Episode 1: Getting Started",
        "host": "John Doe",
        "guest": "Jane Smith",
        "episode_number": 1
    }
    
    try:
        result = await processor.process_podcast_episode(audio_file, metadata)
        
        # Save results
        output_file = f"podcast_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(output_file, "w", encoding="utf-8") as f:
            json.dump(result, f, indent=2, ensure_ascii=False)
        
        print(f"Podcast processing completed!")
        print(f"Results saved to: {output_file}")
        
        # Print summary
        print(f"\n--- Episode Summary ---")
        print(f"Duration: {result['episode_info']['duration']} seconds")
        print(f"Topics: {', '.join(result['content']['topics'][:3])}")
        print(f"Speakers: {len(result['speakers'])}")
        print(f"Chapters: {len(result['chapters'])}")
        print(f"Highlights: {len(result['highlights'])}")
        
    except Exception as e:
        print(f"Error processing podcast: {e}")

if __name__ == "__main__":
    asyncio.run(main())

Automation Scripts

Scheduled Transcription Service

import asyncio
import schedule
import time
from pathlib import Path
from verbalisai import VerbalisAI
import logging

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('transcription_service.log'),
        logging.StreamHandler()
    ]
)

class ScheduledTranscriptionService:
    def __init__(self, watch_folder: str, output_folder: str):
        self.client = VerbalisAI()
        self.watch_folder = Path(watch_folder)
        self.output_folder = Path(output_folder)
        self.processed_files = set()
        
        # Create folders if they don't exist
        self.watch_folder.mkdir(exist_ok=True)
        self.output_folder.mkdir(exist_ok=True)
        
        logging.info(f"Watching folder: {self.watch_folder}")
        logging.info(f"Output folder: {self.output_folder}")
    
    async def process_new_files(self):
        """Process any new audio files in the watch folder"""
        
        audio_extensions = {'.mp3', '.wav', '.flac', '.m4a', '.ogg'}
        
        # Find new audio files
        new_files = []
        for ext in audio_extensions:
            for file_path in self.watch_folder.glob(f"*{ext}"):
                if file_path not in self.processed_files:
                    new_files.append(file_path)
        
        if not new_files:
            logging.info("No new files to process")
            return
        
        logging.info(f"Found {len(new_files)} new files to process")
        
        # Process each file
        for file_path in new_files:
            try:
                await self.process_single_file(file_path)
                self.processed_files.add(file_path)
                logging.info(f"✅ Processed: {file_path.name}")
                
            except Exception as e:
                logging.error(f"❌ Failed to process {file_path.name}: {e}")
    
    async def process_single_file(self, file_path: Path):
        """Process a single audio file"""
        
        logging.info(f"Processing: {file_path.name}")
        
        # Upload file
        with open(file_path, "rb") as f:
            file_info = await self.client.files.upload(
                file=f,
                filename=file_path.name,
                folder="scheduled_uploads"
            )
        
        # Transcribe
        transcription = await self.client.transcriptions.create(
            audio_url=file_info.url,
            model="mini",
            topics=True,
            summarization=True
        )
        
        # Save results
        output_file = self.output_folder / f"{file_path.stem}_transcript.txt"
        with open(output_file, "w", encoding="utf-8") as f:
            f.write(f"File: {file_path.name}\n")
            f.write(f"Duration: {transcription.duration} seconds\n")
            f.write(f"Topics: {', '.join(transcription.topics)}\n")
            if transcription.summary:
                f.write(f"Summary: {transcription.summary.text}\n")
            f.write(f"\n--- Transcript ---\n")
            f.write(transcription.text)
        
        # Optionally move processed file to archive
        archive_folder = self.watch_folder / "processed"
        archive_folder.mkdir(exist_ok=True)
        
        archived_file = archive_folder / file_path.name
        file_path.rename(archived_file)
        
        logging.info(f"Archived processed file to: {archived_file}")

def run_scheduled_service():
    """Run the scheduled transcription service"""
    
    service = ScheduledTranscriptionService(
        watch_folder="./watch_folder",
        output_folder="./transcriptions"
    )
    
    def job():
        """Wrapper function for async job"""
        asyncio.run(service.process_new_files())
    
    # Schedule the job to run every 15 minutes
    schedule.every(15).minutes.do(job)
    
    logging.info("Scheduled transcription service started")
    logging.info("Checking for new files every 15 minutes...")
    
    # Keep the service running
    while True:
        schedule.run_pending()
        time.sleep(60)  # Check every minute

if __name__ == "__main__":
    run_scheduled_service()

Integration Examples

Slack Bot Integration

import asyncio
import os
from slack_bolt.async_app import AsyncApp
from slack_bolt.adapter.socket_mode.async_handler import AsyncSocketModeHandler
from verbalisai import VerbalisAI

# Initialize Slack app
app = AsyncApp(token=os.environ.get("SLACK_BOT_TOKEN"))
client = VerbalisAI()

@app.event("file_shared")
async def handle_file_shared(event, say):
    """Handle audio file uploads to Slack"""
    
    file_info = event.get("file", {})
    
    # Check if it's an audio file
    if not file_info.get("mimetype", "").startswith("audio/"):
        return
    
    await say(f"🎙️ Audio file detected: {file_info['name']}. Starting transcription...")
    
    try:
        # Download file from Slack
        file_url = file_info["url_private_download"]
        
        # Start transcription
        transcription = await client.transcriptions.create(
            audio_url=file_url,
            model="mini",
            topics=True
        )
        
        # Send results back to Slack
        await say(
            f"✅ Transcription completed for `{file_info['name']}`:\n\n"
            f"**Duration:** {transcription.duration} seconds\n"
            f"**Topics:** {', '.join(transcription.topics)}\n\n"
            f"**Transcript:**\n```{transcription.text[:1000]}{'...' if len(transcription.text) > 1000 else ''}```"
        )
        
    except Exception as e:
        await say(f"❌ Transcription failed: {str(e)}")

@app.command("/transcribe")
async def transcribe_command(ack, respond, command):
    """Slash command to transcribe audio from URL"""
    
    await ack()
    
    url = command["text"].strip()
    if not url:
        await respond("Please provide an audio URL: `/transcribe https://example.com/audio.mp3`")
        return
    
    await respond("🎙️ Starting transcription...")
    
    try:
        transcription = await client.transcriptions.create(
            audio_url=url,
            model="mini",
            topics=True,
            summarization=True
        )
        
        await respond(
            f"✅ Transcription completed:\n\n"
            f"**Duration:** {transcription.duration} seconds\n"
            f"**Topics:** {', '.join(transcription.topics)}\n"
            f"**Summary:** {transcription.summary.text if transcription.summary else 'N/A'}\n\n"
            f"**Transcript:**\n```{transcription.text[:1500]}{'...' if len(transcription.text) > 1500 else ''}```"
        )
        
    except Exception as e:
        await respond(f"❌ Transcription failed: {str(e)}")

async def main():
    handler = AsyncSocketModeHandler(app, os.environ["SLACK_APP_TOKEN"])
    await handler.start_async()

if __name__ == "__main__":
    asyncio.run(main())

This concludes the Python SDK examples. Ready to explore JavaScript SDK documentation? Check out the JavaScript Transcription guide for similar features in JavaScript.