File Storage with Python SDK
Comprehensive guide to uploading, managing, and organizing audio files using the VerbalisAI Python SDK.File Upload
Upload Local Files
Copy
from verbalisai import VerbalisAI
import asyncio
async def upload_local_file():
client = VerbalisAI()
# Upload a single file
with open("audio.mp3", "rb") as audio_file:
file_info = await client.files.upload(
file=audio_file,
filename="my-audio.mp3"
)
print(f"File uploaded successfully!")
print(f"URL: {file_info.url}")
print(f"File ID: {file_info.id}")
print(f"Size: {file_info.size_bytes} bytes")
print(f"Upload date: {file_info.created_at}")
asyncio.run(upload_local_file())
Upload with Metadata
Copy
async def upload_with_metadata():
client = VerbalisAI()
with open("meeting-recording.mp3", "rb") as audio_file:
file_info = await client.files.upload(
file=audio_file,
filename="meeting-recording.mp3",
# Optional metadata
tags=["meeting", "quarterly", "2025"],
description="Q2 2025 team meeting recording",
folder="meetings/2025/q2",
# File settings
public=False, # Keep file private
auto_delete_days=30 # Delete after 30 days
)
print(f"File uploaded to: {file_info.folder}")
print(f"Tags: {', '.join(file_info.tags)}")
print(f"Auto-delete date: {file_info.auto_delete_date}")
asyncio.run(upload_with_metadata())
Batch File Upload
Copy
import os
import asyncio
from pathlib import Path
async def batch_upload():
client = VerbalisAI()
audio_folder = Path("./audio_files")
audio_files = list(audio_folder.glob("*.mp3"))
upload_tasks = []
for audio_file in audio_files:
with open(audio_file, "rb") as f:
task = client.files.upload(
file=f,
filename=audio_file.name,
folder="batch_upload"
)
upload_tasks.append(task)
# Upload all files concurrently
results = await asyncio.gather(*upload_tasks, return_exceptions=True)
successful_uploads = 0
failed_uploads = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Failed to upload {audio_files[i].name}: {result}")
failed_uploads += 1
else:
print(f"Uploaded {result.filename}: {result.url}")
successful_uploads += 1
print(f"\nBatch upload complete:")
print(f" Successful: {successful_uploads}")
print(f" Failed: {failed_uploads}")
asyncio.run(batch_upload())
Resumable Uploads
Large File Upload
Copy
async def resumable_upload():
client = VerbalisAI()
large_file_path = "very-large-audio.wav" # e.g., 2GB file
# Start resumable upload
with open(large_file_path, "rb") as f:
uploader = await client.files.create_resumable_upload(
filename="very-large-audio.wav",
file_size=os.path.getsize(large_file_path),
chunk_size=10 * 1024 * 1024 # 10MB chunks
)
print(f"Starting resumable upload: {uploader.upload_id}")
chunk_num = 0
while True:
chunk = f.read(uploader.chunk_size)
if not chunk:
break
# Upload chunk with progress tracking
await uploader.upload_chunk(chunk, chunk_num)
progress = (chunk_num + 1) * uploader.chunk_size / uploader.file_size * 100
print(f"Progress: {progress:.1f}%")
chunk_num += 1
# Complete the upload
file_info = await uploader.complete()
print(f"Upload completed: {file_info.url}")
asyncio.run(resumable_upload())
Resume Interrupted Upload
Copy
async def resume_interrupted_upload():
client = VerbalisAI()
upload_id = "your-interrupted-upload-id"
# Resume existing upload
uploader = await client.files.get_resumable_upload(upload_id)
print(f"Resuming upload from chunk {uploader.last_chunk}")
with open("large-file.wav", "rb") as f:
# Seek to the position where upload was interrupted
f.seek(uploader.last_chunk * uploader.chunk_size)
chunk_num = uploader.last_chunk
while True:
chunk = f.read(uploader.chunk_size)
if not chunk:
break
await uploader.upload_chunk(chunk, chunk_num)
print(f"Uploaded chunk {chunk_num}")
chunk_num += 1
file_info = await uploader.complete()
print(f"Resume completed: {file_info.url}")
asyncio.run(resume_interrupted_upload())
File Management
List Files
Copy
async def list_files():
client = VerbalisAI()
# List all files
files = await client.files.list()
print(f"Total files: {len(files)}")
for file in files:
print(f" {file.filename} ({file.size_bytes} bytes) - {file.created_at}")
# List files with filtering
meeting_files = await client.files.list(
folder="meetings",
tags=["quarterly"],
created_after="2025-01-01",
limit=50
)
print(f"\nMeeting files: {len(meeting_files)}")
asyncio.run(list_files())
Search Files
Copy
async def search_files():
client = VerbalisAI()
# Search by filename
results = await client.files.search(
query="meeting",
search_in=["filename", "description", "tags"]
)
print(f"Found {len(results)} files matching 'meeting':")
for file in results:
print(f" {file.filename}: {file.description}")
# Advanced search
advanced_results = await client.files.search(
query="quarterly team",
folder="meetings",
file_type="audio/mpeg",
size_min=1000000, # At least 1MB
created_after="2025-01-01"
)
asyncio.run(search_files())
Get File Information
Copy
async def get_file_info():
client = VerbalisAI()
file_id = "your-file-id"
# Get detailed file information
file_info = await client.files.get(file_id)
print(f"Filename: {file_info.filename}")
print(f"Size: {file_info.size_bytes} bytes ({file_info.size_human})")
print(f"Type: {file_info.mime_type}")
print(f"Duration: {file_info.duration_seconds} seconds")
print(f"Sample rate: {file_info.sample_rate} Hz")
print(f"Channels: {file_info.channels}")
print(f"Folder: {file_info.folder}")
print(f"Tags: {', '.join(file_info.tags)}")
print(f"Public: {file_info.public}")
print(f"Downloads: {file_info.download_count}")
asyncio.run(get_file_info())
File Organization
Folder Management
Copy
async def manage_folders():
client = VerbalisAI()
# Create folder structure
await client.files.create_folder("projects")
await client.files.create_folder("projects/client-a")
await client.files.create_folder("projects/client-a/meetings")
await client.files.create_folder("projects/client-a/interviews")
# List folders
folders = await client.files.list_folders()
print("Folder structure:")
for folder in folders:
print(f" {folder.path} ({folder.file_count} files)")
# Move files to folders
file_id = "your-file-id"
await client.files.move(file_id, "projects/client-a/meetings")
print("File moved to new folder")
asyncio.run(manage_folders())
Tagging System
Copy
async def manage_tags():
client = VerbalisAI()
file_id = "your-file-id"
# Add tags to file
await client.files.add_tags(file_id, ["important", "Q2-2025", "client-meeting"])
# Remove specific tags
await client.files.remove_tags(file_id, ["Q2-2025"])
# Replace all tags
await client.files.set_tags(file_id, ["archived", "processed"])
# Get all available tags
all_tags = await client.files.list_tags()
print("Available tags:")
for tag in all_tags:
print(f" {tag.name} ({tag.file_count} files)")
asyncio.run(manage_tags())
File Operations
Copy and Move Files
Copy
async def copy_move_files():
client = VerbalisAI()
file_id = "your-file-id"
# Copy file to another folder
copied_file = await client.files.copy(
file_id,
new_folder="backup",
new_filename="backup-copy.mp3"
)
print(f"File copied: {copied_file.id}")
# Move file to different folder
await client.files.move(file_id, "archive/2025")
print("File moved to archive")
# Rename file
await client.files.rename(file_id, "new-filename.mp3")
print("File renamed")
asyncio.run(copy_move_files())
File Sharing
Copy
async def share_files():
client = VerbalisAI()
file_id = "your-file-id"
# Make file public
public_url = await client.files.make_public(file_id)
print(f"Public URL: {public_url}")
# Create temporary sharing link
temp_link = await client.files.create_share_link(
file_id,
expires_in_hours=24,
password="optional-password",
download_limit=10
)
print(f"Temporary link: {temp_link.url}")
print(f"Expires: {temp_link.expires_at}")
# Make file private again
await client.files.make_private(file_id)
print("File is now private")
asyncio.run(share_files())
Storage Analytics
Storage Usage
Copy
async def storage_analytics():
client = VerbalisAI()
# Get storage overview
storage = await client.files.get_storage_info()
print(f"Storage used: {storage.used_bytes} bytes ({storage.used_human})")
print(f"Storage limit: {storage.limit_bytes} bytes ({storage.limit_human})")
print(f"Usage percentage: {storage.usage_percentage:.1f}%")
print(f"Files count: {storage.file_count}")
print(f"Folders count: {storage.folder_count}")
# Get usage by file type
usage_by_type = await client.files.get_usage_by_type()
print("\nUsage by file type:")
for file_type, info in usage_by_type.items():
print(f" {file_type}: {info.count} files, {info.size_human}")
# Get usage by folder
usage_by_folder = await client.files.get_usage_by_folder()
print("\nUsage by folder:")
for folder, info in usage_by_folder.items():
print(f" {folder}: {info.count} files, {info.size_human}")
asyncio.run(storage_analytics())
Usage History
Copy
async def usage_history():
client = VerbalisAI()
# Get daily usage for the last 30 days
daily_usage = await client.files.get_daily_usage(days=30)
print("Daily storage usage (last 30 days):")
for day in daily_usage:
print(f" {day.date}: {day.uploads} uploads, {day.size_human} added")
# Get monthly summary
monthly_usage = await client.files.get_monthly_usage(year=2025)
print("\nMonthly usage summary for 2025:")
for month in monthly_usage:
print(f" {month.month}: {month.uploads} uploads, {month.total_size_human}")
asyncio.run(usage_history())
File Lifecycle Management
Auto-Delete Policies
Copy
async def lifecycle_management():
client = VerbalisAI()
# Set auto-delete for old files
await client.files.set_auto_delete_policy(
folder="temp",
delete_after_days=7
)
# Set archival policy for large files
await client.files.set_archive_policy(
folder="archive",
archive_after_days=90,
delete_after_days=365
)
# Get files scheduled for deletion
scheduled_deletions = await client.files.list_scheduled_deletions()
print("Files scheduled for deletion:")
for file in scheduled_deletions:
print(f" {file.filename}: {file.delete_date}")
# Cancel deletion for specific file
file_id = "file-to-keep"
await client.files.cancel_deletion(file_id)
print("Deletion cancelled")
asyncio.run(lifecycle_management())
Backup and Restore
Copy
async def backup_restore():
client = VerbalisAI()
# Create backup of specific folder
backup_id = await client.files.create_backup(
folder="important-files",
backup_name="monthly-backup-june-2025"
)
print(f"Backup created: {backup_id}")
# List available backups
backups = await client.files.list_backups()
for backup in backups:
print(f"Backup: {backup.name} ({backup.created_at}) - {backup.file_count} files")
# Restore from backup
restored_files = await client.files.restore_backup(
backup_id,
restore_to_folder="restored-files"
)
print(f"Restored {len(restored_files)} files")
asyncio.run(backup_restore())
Advanced File Operations
File Processing
Copy
async def process_files():
client = VerbalisAI()
file_id = "your-audio-file-id"
# Get audio analysis
analysis = await client.files.analyze_audio(file_id)
print(f"Audio quality score: {analysis.quality_score}/10")
print(f"Noise level: {analysis.noise_level}%")
print(f"Volume level: {analysis.volume_level} dB")
print(f"Speech percentage: {analysis.speech_percentage}%")
# Optimize audio for transcription
optimized_file = await client.files.optimize_for_transcription(
file_id,
reduce_noise=True,
normalize_volume=True,
remove_silence=True
)
print(f"Optimized file: {optimized_file.id}")
print(f"Size reduction: {analysis.size_bytes - optimized_file.size_bytes} bytes")
asyncio.run(process_files())
File Conversion
Copy
async def convert_files():
client = VerbalisAI()
file_id = "your-file-id"
# Convert to different formats
conversions = {
"mp3": {"bitrate": "128k", "quality": "standard"},
"wav": {"sample_rate": 16000, "channels": 1}, # Mono, 16kHz
"flac": {"compression_level": 5}
}
for format_type, options in conversions.items():
converted_file = await client.files.convert(
file_id,
target_format=format_type,
options=options
)
print(f"Converted to {format_type}: {converted_file.filename}")
print(f"Size: {converted_file.size_bytes} bytes")
asyncio.run(convert_files())
Error Handling for File Operations
Robust File Upload
Copy
from verbalisai import VerbalisAI, VerbalisAIError
import asyncio
async def robust_file_upload():
client = VerbalisAI()
max_retries = 3
retry_delay = 2.0
file_path = "important-audio.mp3"
for attempt in range(max_retries):
try:
with open(file_path, "rb") as f:
file_info = await client.files.upload(
file=f,
filename="important-audio.mp3"
)
print(f"Upload successful on attempt {attempt + 1}")
print(f"File URL: {file_info.url}")
break
except VerbalisAIError as e:
print(f"Upload attempt {attempt + 1} failed: {e.message}")
if e.status_code == 413: # File too large
print("File is too large, compressing...")
# Implement compression logic here
continue
elif e.status_code == 507: # Storage full
print("Storage quota exceeded")
break
elif e.status_code in [500, 502, 503]: # Server errors
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
print(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
continue
else:
print("Non-retryable error")
break
except Exception as e:
print(f"Unexpected error: {e}")
break
asyncio.run(robust_file_upload())
Integration with Transcription
Upload and Transcribe Workflow
Copy
async def upload_and_transcribe():
client = VerbalisAI()
# Upload local file
with open("meeting-recording.mp3", "rb") as f:
file_info = await client.files.upload(
file=f,
filename="meeting-recording.mp3",
folder="meetings/june-2025",
tags=["meeting", "team", "planning"]
)
print(f"File uploaded: {file_info.url}")
# Transcribe uploaded file
transcription = await client.transcriptions.create(
audio_url=file_info.url,
model="pro",
diarize=True,
topics=True,
summarization=True,
summary_type="bullets"
)
print(f"Transcription completed: {transcription.id}")
# Update file with transcription metadata
await client.files.update_metadata(
file_info.id,
transcription_id=transcription.id,
transcription_status="completed",
topics=transcription.topics
)
print("File metadata updated with transcription info")
asyncio.run(upload_and_transcribe())
Ready to learn about SDK configuration? Check out the Configuration guide to learn about advanced client settings and customization options.