Complete guide to file storage and management with the VerbalisAI Python SDK
Comprehensive guide to uploading, managing, and organizing audio files using the VerbalisAI Python SDK.
from verbalisai import VerbalisAI
import asyncio
async def upload_local_file():
client = VerbalisAI()
# Upload a single file
with open("audio.mp3", "rb") as audio_file:
file_info = await client.files.upload(
file=audio_file,
filename="my-audio.mp3"
)
print(f"File uploaded successfully!")
print(f"URL: {file_info.url}")
print(f"File ID: {file_info.id}")
print(f"Size: {file_info.size_bytes} bytes")
print(f"Upload date: {file_info.created_at}")
asyncio.run(upload_local_file())
async def upload_with_metadata():
client = VerbalisAI()
with open("meeting-recording.mp3", "rb") as audio_file:
file_info = await client.files.upload(
file=audio_file,
filename="meeting-recording.mp3",
# Optional metadata
tags=["meeting", "quarterly", "2025"],
description="Q2 2025 team meeting recording",
folder="meetings/2025/q2",
# File settings
public=False, # Keep file private
auto_delete_days=30 # Delete after 30 days
)
print(f"File uploaded to: {file_info.folder}")
print(f"Tags: {', '.join(file_info.tags)}")
print(f"Auto-delete date: {file_info.auto_delete_date}")
asyncio.run(upload_with_metadata())
import os
import asyncio
from pathlib import Path
async def batch_upload():
client = VerbalisAI()
audio_folder = Path("./audio_files")
audio_files = list(audio_folder.glob("*.mp3"))
upload_tasks = []
for audio_file in audio_files:
with open(audio_file, "rb") as f:
task = client.files.upload(
file=f,
filename=audio_file.name,
folder="batch_upload"
)
upload_tasks.append(task)
# Upload all files concurrently
results = await asyncio.gather(*upload_tasks, return_exceptions=True)
successful_uploads = 0
failed_uploads = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Failed to upload {audio_files[i].name}: {result}")
failed_uploads += 1
else:
print(f"Uploaded {result.filename}: {result.url}")
successful_uploads += 1
print(f"\nBatch upload complete:")
print(f" Successful: {successful_uploads}")
print(f" Failed: {failed_uploads}")
asyncio.run(batch_upload())
async def resumable_upload():
client = VerbalisAI()
large_file_path = "very-large-audio.wav" # e.g., 2GB file
# Start resumable upload
with open(large_file_path, "rb") as f:
uploader = await client.files.create_resumable_upload(
filename="very-large-audio.wav",
file_size=os.path.getsize(large_file_path),
chunk_size=10 * 1024 * 1024 # 10MB chunks
)
print(f"Starting resumable upload: {uploader.upload_id}")
chunk_num = 0
while True:
chunk = f.read(uploader.chunk_size)
if not chunk:
break
# Upload chunk with progress tracking
await uploader.upload_chunk(chunk, chunk_num)
progress = (chunk_num + 1) * uploader.chunk_size / uploader.file_size * 100
print(f"Progress: {progress:.1f}%")
chunk_num += 1
# Complete the upload
file_info = await uploader.complete()
print(f"Upload completed: {file_info.url}")
asyncio.run(resumable_upload())
async def resume_interrupted_upload():
client = VerbalisAI()
upload_id = "your-interrupted-upload-id"
# Resume existing upload
uploader = await client.files.get_resumable_upload(upload_id)
print(f"Resuming upload from chunk {uploader.last_chunk}")
with open("large-file.wav", "rb") as f:
# Seek to the position where upload was interrupted
f.seek(uploader.last_chunk * uploader.chunk_size)
chunk_num = uploader.last_chunk
while True:
chunk = f.read(uploader.chunk_size)
if not chunk:
break
await uploader.upload_chunk(chunk, chunk_num)
print(f"Uploaded chunk {chunk_num}")
chunk_num += 1
file_info = await uploader.complete()
print(f"Resume completed: {file_info.url}")
asyncio.run(resume_interrupted_upload())
async def list_files():
client = VerbalisAI()
# List all files
files = await client.files.list()
print(f"Total files: {len(files)}")
for file in files:
print(f" {file.filename} ({file.size_bytes} bytes) - {file.created_at}")
# List files with filtering
meeting_files = await client.files.list(
folder="meetings",
tags=["quarterly"],
created_after="2025-01-01",
limit=50
)
print(f"\nMeeting files: {len(meeting_files)}")
asyncio.run(list_files())
async def search_files():
client = VerbalisAI()
# Search by filename
results = await client.files.search(
query="meeting",
search_in=["filename", "description", "tags"]
)
print(f"Found {len(results)} files matching 'meeting':")
for file in results:
print(f" {file.filename}: {file.description}")
# Advanced search
advanced_results = await client.files.search(
query="quarterly team",
folder="meetings",
file_type="audio/mpeg",
size_min=1000000, # At least 1MB
created_after="2025-01-01"
)
asyncio.run(search_files())
async def get_file_info():
client = VerbalisAI()
file_id = "your-file-id"
# Get detailed file information
file_info = await client.files.get(file_id)
print(f"Filename: {file_info.filename}")
print(f"Size: {file_info.size_bytes} bytes ({file_info.size_human})")
print(f"Type: {file_info.mime_type}")
print(f"Duration: {file_info.duration_seconds} seconds")
print(f"Sample rate: {file_info.sample_rate} Hz")
print(f"Channels: {file_info.channels}")
print(f"Folder: {file_info.folder}")
print(f"Tags: {', '.join(file_info.tags)}")
print(f"Public: {file_info.public}")
print(f"Downloads: {file_info.download_count}")
asyncio.run(get_file_info())
async def manage_folders():
client = VerbalisAI()
# Create folder structure
await client.files.create_folder("projects")
await client.files.create_folder("projects/client-a")
await client.files.create_folder("projects/client-a/meetings")
await client.files.create_folder("projects/client-a/interviews")
# List folders
folders = await client.files.list_folders()
print("Folder structure:")
for folder in folders:
print(f" {folder.path} ({folder.file_count} files)")
# Move files to folders
file_id = "your-file-id"
await client.files.move(file_id, "projects/client-a/meetings")
print("File moved to new folder")
asyncio.run(manage_folders())
async def manage_tags():
client = VerbalisAI()
file_id = "your-file-id"
# Add tags to file
await client.files.add_tags(file_id, ["important", "Q2-2025", "client-meeting"])
# Remove specific tags
await client.files.remove_tags(file_id, ["Q2-2025"])
# Replace all tags
await client.files.set_tags(file_id, ["archived", "processed"])
# Get all available tags
all_tags = await client.files.list_tags()
print("Available tags:")
for tag in all_tags:
print(f" {tag.name} ({tag.file_count} files)")
asyncio.run(manage_tags())
async def copy_move_files():
client = VerbalisAI()
file_id = "your-file-id"
# Copy file to another folder
copied_file = await client.files.copy(
file_id,
new_folder="backup",
new_filename="backup-copy.mp3"
)
print(f"File copied: {copied_file.id}")
# Move file to different folder
await client.files.move(file_id, "archive/2025")
print("File moved to archive")
# Rename file
await client.files.rename(file_id, "new-filename.mp3")
print("File renamed")
asyncio.run(copy_move_files())
async def share_files():
client = VerbalisAI()
file_id = "your-file-id"
# Make file public
public_url = await client.files.make_public(file_id)
print(f"Public URL: {public_url}")
# Create temporary sharing link
temp_link = await client.files.create_share_link(
file_id,
expires_in_hours=24,
password="optional-password",
download_limit=10
)
print(f"Temporary link: {temp_link.url}")
print(f"Expires: {temp_link.expires_at}")
# Make file private again
await client.files.make_private(file_id)
print("File is now private")
asyncio.run(share_files())
async def storage_analytics():
client = VerbalisAI()
# Get storage overview
storage = await client.files.get_storage_info()
print(f"Storage used: {storage.used_bytes} bytes ({storage.used_human})")
print(f"Storage limit: {storage.limit_bytes} bytes ({storage.limit_human})")
print(f"Usage percentage: {storage.usage_percentage:.1f}%")
print(f"Files count: {storage.file_count}")
print(f"Folders count: {storage.folder_count}")
# Get usage by file type
usage_by_type = await client.files.get_usage_by_type()
print("\nUsage by file type:")
for file_type, info in usage_by_type.items():
print(f" {file_type}: {info.count} files, {info.size_human}")
# Get usage by folder
usage_by_folder = await client.files.get_usage_by_folder()
print("\nUsage by folder:")
for folder, info in usage_by_folder.items():
print(f" {folder}: {info.count} files, {info.size_human}")
asyncio.run(storage_analytics())
async def usage_history():
client = VerbalisAI()
# Get daily usage for the last 30 days
daily_usage = await client.files.get_daily_usage(days=30)
print("Daily storage usage (last 30 days):")
for day in daily_usage:
print(f" {day.date}: {day.uploads} uploads, {day.size_human} added")
# Get monthly summary
monthly_usage = await client.files.get_monthly_usage(year=2025)
print("\nMonthly usage summary for 2025:")
for month in monthly_usage:
print(f" {month.month}: {month.uploads} uploads, {month.total_size_human}")
asyncio.run(usage_history())
async def lifecycle_management():
client = VerbalisAI()
# Set auto-delete for old files
await client.files.set_auto_delete_policy(
folder="temp",
delete_after_days=7
)
# Set archival policy for large files
await client.files.set_archive_policy(
folder="archive",
archive_after_days=90,
delete_after_days=365
)
# Get files scheduled for deletion
scheduled_deletions = await client.files.list_scheduled_deletions()
print("Files scheduled for deletion:")
for file in scheduled_deletions:
print(f" {file.filename}: {file.delete_date}")
# Cancel deletion for specific file
file_id = "file-to-keep"
await client.files.cancel_deletion(file_id)
print("Deletion cancelled")
asyncio.run(lifecycle_management())
async def backup_restore():
client = VerbalisAI()
# Create backup of specific folder
backup_id = await client.files.create_backup(
folder="important-files",
backup_name="monthly-backup-june-2025"
)
print(f"Backup created: {backup_id}")
# List available backups
backups = await client.files.list_backups()
for backup in backups:
print(f"Backup: {backup.name} ({backup.created_at}) - {backup.file_count} files")
# Restore from backup
restored_files = await client.files.restore_backup(
backup_id,
restore_to_folder="restored-files"
)
print(f"Restored {len(restored_files)} files")
asyncio.run(backup_restore())
async def process_files():
client = VerbalisAI()
file_id = "your-audio-file-id"
# Get audio analysis
analysis = await client.files.analyze_audio(file_id)
print(f"Audio quality score: {analysis.quality_score}/10")
print(f"Noise level: {analysis.noise_level}%")
print(f"Volume level: {analysis.volume_level} dB")
print(f"Speech percentage: {analysis.speech_percentage}%")
# Optimize audio for transcription
optimized_file = await client.files.optimize_for_transcription(
file_id,
reduce_noise=True,
normalize_volume=True,
remove_silence=True
)
print(f"Optimized file: {optimized_file.id}")
print(f"Size reduction: {analysis.size_bytes - optimized_file.size_bytes} bytes")
asyncio.run(process_files())
async def convert_files():
client = VerbalisAI()
file_id = "your-file-id"
# Convert to different formats
conversions = {
"mp3": {"bitrate": "128k", "quality": "standard"},
"wav": {"sample_rate": 16000, "channels": 1}, # Mono, 16kHz
"flac": {"compression_level": 5}
}
for format_type, options in conversions.items():
converted_file = await client.files.convert(
file_id,
target_format=format_type,
options=options
)
print(f"Converted to {format_type}: {converted_file.filename}")
print(f"Size: {converted_file.size_bytes} bytes")
asyncio.run(convert_files())
from verbalisai import VerbalisAI, VerbalisAIError
import asyncio
async def robust_file_upload():
client = VerbalisAI()
max_retries = 3
retry_delay = 2.0
file_path = "important-audio.mp3"
for attempt in range(max_retries):
try:
with open(file_path, "rb") as f:
file_info = await client.files.upload(
file=f,
filename="important-audio.mp3"
)
print(f"Upload successful on attempt {attempt + 1}")
print(f"File URL: {file_info.url}")
break
except VerbalisAIError as e:
print(f"Upload attempt {attempt + 1} failed: {e.message}")
if e.status_code == 413: # File too large
print("File is too large, compressing...")
# Implement compression logic here
continue
elif e.status_code == 507: # Storage full
print("Storage quota exceeded")
break
elif e.status_code in [500, 502, 503]: # Server errors
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
print(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
continue
else:
print("Non-retryable error")
break
except Exception as e:
print(f"Unexpected error: {e}")
break
asyncio.run(robust_file_upload())
async def upload_and_transcribe():
client = VerbalisAI()
# Upload local file
with open("meeting-recording.mp3", "rb") as f:
file_info = await client.files.upload(
file=f,
filename="meeting-recording.mp3",
folder="meetings/june-2025",
tags=["meeting", "team", "planning"]
)
print(f"File uploaded: {file_info.url}")
# Transcribe uploaded file
transcription = await client.transcriptions.create(
audio_url=file_info.url,
model="pro",
diarize=True,
topics=True,
summarization=True,
summary_type="bullets"
)
print(f"Transcription completed: {transcription.id}")
# Update file with transcription metadata
await client.files.update_metadata(
file_info.id,
transcription_id=transcription.id,
transcription_status="completed",
topics=transcription.topics
)
print("File metadata updated with transcription info")
asyncio.run(upload_and_transcribe())
Ready to learn about SDK configuration? Check out the Configuration guide to learn about advanced client settings and customization options.