Complete guide to file storage and management with the VerbalisAI Python SDK
from verbalisai import VerbalisAI
import asyncio
async def upload_local_file():
client = VerbalisAI()
# Upload a single file
with open("audio.mp3", "rb") as audio_file:
file_info = await client.files.upload(
file=audio_file,
filename="my-audio.mp3"
)
print(f"File uploaded successfully!")
print(f"URL: {file_info.url}")
print(f"File ID: {file_info.id}")
print(f"Size: {file_info.size_bytes} bytes")
print(f"Upload date: {file_info.created_at}")
asyncio.run(upload_local_file())
async def upload_with_metadata():
client = VerbalisAI()
with open("meeting-recording.mp3", "rb") as audio_file:
file_info = await client.files.upload(
file=audio_file,
filename="meeting-recording.mp3",
# Optional metadata
tags=["meeting", "quarterly", "2025"],
description="Q2 2025 team meeting recording",
folder="meetings/2025/q2",
# File settings
public=False, # Keep file private
auto_delete_days=30 # Delete after 30 days
)
print(f"File uploaded to: {file_info.folder}")
print(f"Tags: {', '.join(file_info.tags)}")
print(f"Auto-delete date: {file_info.auto_delete_date}")
asyncio.run(upload_with_metadata())
import os
import asyncio
from pathlib import Path
async def batch_upload():
client = VerbalisAI()
audio_folder = Path("./audio_files")
audio_files = list(audio_folder.glob("*.mp3"))
upload_tasks = []
for audio_file in audio_files:
with open(audio_file, "rb") as f:
task = client.files.upload(
file=f,
filename=audio_file.name,
folder="batch_upload"
)
upload_tasks.append(task)
# Upload all files concurrently
results = await asyncio.gather(*upload_tasks, return_exceptions=True)
successful_uploads = 0
failed_uploads = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Failed to upload {audio_files[i].name}: {result}")
failed_uploads += 1
else:
print(f"Uploaded {result.filename}: {result.url}")
successful_uploads += 1
print(f"\nBatch upload complete:")
print(f" Successful: {successful_uploads}")
print(f" Failed: {failed_uploads}")
asyncio.run(batch_upload())
async def resumable_upload():
client = VerbalisAI()
large_file_path = "very-large-audio.wav" # e.g., 2GB file
# Start resumable upload
with open(large_file_path, "rb") as f:
uploader = await client.files.create_resumable_upload(
filename="very-large-audio.wav",
file_size=os.path.getsize(large_file_path),
chunk_size=10 * 1024 * 1024 # 10MB chunks
)
print(f"Starting resumable upload: {uploader.upload_id}")
chunk_num = 0
while True:
chunk = f.read(uploader.chunk_size)
if not chunk:
break
# Upload chunk with progress tracking
await uploader.upload_chunk(chunk, chunk_num)
progress = (chunk_num + 1) * uploader.chunk_size / uploader.file_size * 100
print(f"Progress: {progress:.1f}%")
chunk_num += 1
# Complete the upload
file_info = await uploader.complete()
print(f"Upload completed: {file_info.url}")
asyncio.run(resumable_upload())
async def resume_interrupted_upload():
client = VerbalisAI()
upload_id = "your-interrupted-upload-id"
# Resume existing upload
uploader = await client.files.get_resumable_upload(upload_id)
print(f"Resuming upload from chunk {uploader.last_chunk}")
with open("large-file.wav", "rb") as f:
# Seek to the position where upload was interrupted
f.seek(uploader.last_chunk * uploader.chunk_size)
chunk_num = uploader.last_chunk
while True:
chunk = f.read(uploader.chunk_size)
if not chunk:
break
await uploader.upload_chunk(chunk, chunk_num)
print(f"Uploaded chunk {chunk_num}")
chunk_num += 1
file_info = await uploader.complete()
print(f"Resume completed: {file_info.url}")
asyncio.run(resume_interrupted_upload())
async def list_files():
client = VerbalisAI()
# List all files
files = await client.files.list()
print(f"Total files: {len(files)}")
for file in files:
print(f" {file.filename} ({file.size_bytes} bytes) - {file.created_at}")
# List files with filtering
meeting_files = await client.files.list(
folder="meetings",
tags=["quarterly"],
created_after="2025-01-01",
limit=50
)
print(f"\nMeeting files: {len(meeting_files)}")
asyncio.run(list_files())
async def search_files():
client = VerbalisAI()
# Search by filename
results = await client.files.search(
query="meeting",
search_in=["filename", "description", "tags"]
)
print(f"Found {len(results)} files matching 'meeting':")
for file in results:
print(f" {file.filename}: {file.description}")
# Advanced search
advanced_results = await client.files.search(
query="quarterly team",
folder="meetings",
file_type="audio/mpeg",
size_min=1000000, # At least 1MB
created_after="2025-01-01"
)
asyncio.run(search_files())
async def get_file_info():
client = VerbalisAI()
file_id = "your-file-id"
# Get detailed file information
file_info = await client.files.get(file_id)
print(f"Filename: {file_info.filename}")
print(f"Size: {file_info.size_bytes} bytes ({file_info.size_human})")
print(f"Type: {file_info.mime_type}")
print(f"Duration: {file_info.duration_seconds} seconds")
print(f"Sample rate: {file_info.sample_rate} Hz")
print(f"Channels: {file_info.channels}")
print(f"Folder: {file_info.folder}")
print(f"Tags: {', '.join(file_info.tags)}")
print(f"Public: {file_info.public}")
print(f"Downloads: {file_info.download_count}")
asyncio.run(get_file_info())
async def manage_folders():
client = VerbalisAI()
# Create folder structure
await client.files.create_folder("projects")
await client.files.create_folder("projects/client-a")
await client.files.create_folder("projects/client-a/meetings")
await client.files.create_folder("projects/client-a/interviews")
# List folders
folders = await client.files.list_folders()
print("Folder structure:")
for folder in folders:
print(f" {folder.path} ({folder.file_count} files)")
# Move files to folders
file_id = "your-file-id"
await client.files.move(file_id, "projects/client-a/meetings")
print("File moved to new folder")
asyncio.run(manage_folders())
async def manage_tags():
client = VerbalisAI()
file_id = "your-file-id"
# Add tags to file
await client.files.add_tags(file_id, ["important", "Q2-2025", "client-meeting"])
# Remove specific tags
await client.files.remove_tags(file_id, ["Q2-2025"])
# Replace all tags
await client.files.set_tags(file_id, ["archived", "processed"])
# Get all available tags
all_tags = await client.files.list_tags()
print("Available tags:")
for tag in all_tags:
print(f" {tag.name} ({tag.file_count} files)")
asyncio.run(manage_tags())
async def copy_move_files():
client = VerbalisAI()
file_id = "your-file-id"
# Copy file to another folder
copied_file = await client.files.copy(
file_id,
new_folder="backup",
new_filename="backup-copy.mp3"
)
print(f"File copied: {copied_file.id}")
# Move file to different folder
await client.files.move(file_id, "archive/2025")
print("File moved to archive")
# Rename file
await client.files.rename(file_id, "new-filename.mp3")
print("File renamed")
asyncio.run(copy_move_files())
async def share_files():
client = VerbalisAI()
file_id = "your-file-id"
# Make file public
public_url = await client.files.make_public(file_id)
print(f"Public URL: {public_url}")
# Create temporary sharing link
temp_link = await client.files.create_share_link(
file_id,
expires_in_hours=24,
password="optional-password",
download_limit=10
)
print(f"Temporary link: {temp_link.url}")
print(f"Expires: {temp_link.expires_at}")
# Make file private again
await client.files.make_private(file_id)
print("File is now private")
asyncio.run(share_files())
async def storage_analytics():
client = VerbalisAI()
# Get storage overview
storage = await client.files.get_storage_info()
print(f"Storage used: {storage.used_bytes} bytes ({storage.used_human})")
print(f"Storage limit: {storage.limit_bytes} bytes ({storage.limit_human})")
print(f"Usage percentage: {storage.usage_percentage:.1f}%")
print(f"Files count: {storage.file_count}")
print(f"Folders count: {storage.folder_count}")
# Get usage by file type
usage_by_type = await client.files.get_usage_by_type()
print("\nUsage by file type:")
for file_type, info in usage_by_type.items():
print(f" {file_type}: {info.count} files, {info.size_human}")
# Get usage by folder
usage_by_folder = await client.files.get_usage_by_folder()
print("\nUsage by folder:")
for folder, info in usage_by_folder.items():
print(f" {folder}: {info.count} files, {info.size_human}")
asyncio.run(storage_analytics())
async def usage_history():
client = VerbalisAI()
# Get daily usage for the last 30 days
daily_usage = await client.files.get_daily_usage(days=30)
print("Daily storage usage (last 30 days):")
for day in daily_usage:
print(f" {day.date}: {day.uploads} uploads, {day.size_human} added")
# Get monthly summary
monthly_usage = await client.files.get_monthly_usage(year=2025)
print("\nMonthly usage summary for 2025:")
for month in monthly_usage:
print(f" {month.month}: {month.uploads} uploads, {month.total_size_human}")
asyncio.run(usage_history())
async def lifecycle_management():
client = VerbalisAI()
# Set auto-delete for old files
await client.files.set_auto_delete_policy(
folder="temp",
delete_after_days=7
)
# Set archival policy for large files
await client.files.set_archive_policy(
folder="archive",
archive_after_days=90,
delete_after_days=365
)
# Get files scheduled for deletion
scheduled_deletions = await client.files.list_scheduled_deletions()
print("Files scheduled for deletion:")
for file in scheduled_deletions:
print(f" {file.filename}: {file.delete_date}")
# Cancel deletion for specific file
file_id = "file-to-keep"
await client.files.cancel_deletion(file_id)
print("Deletion cancelled")
asyncio.run(lifecycle_management())
async def backup_restore():
client = VerbalisAI()
# Create backup of specific folder
backup_id = await client.files.create_backup(
folder="important-files",
backup_name="monthly-backup-june-2025"
)
print(f"Backup created: {backup_id}")
# List available backups
backups = await client.files.list_backups()
for backup in backups:
print(f"Backup: {backup.name} ({backup.created_at}) - {backup.file_count} files")
# Restore from backup
restored_files = await client.files.restore_backup(
backup_id,
restore_to_folder="restored-files"
)
print(f"Restored {len(restored_files)} files")
asyncio.run(backup_restore())
async def process_files():
client = VerbalisAI()
file_id = "your-audio-file-id"
# Get audio analysis
analysis = await client.files.analyze_audio(file_id)
print(f"Audio quality score: {analysis.quality_score}/10")
print(f"Noise level: {analysis.noise_level}%")
print(f"Volume level: {analysis.volume_level} dB")
print(f"Speech percentage: {analysis.speech_percentage}%")
# Optimize audio for transcription
optimized_file = await client.files.optimize_for_transcription(
file_id,
reduce_noise=True,
normalize_volume=True,
remove_silence=True
)
print(f"Optimized file: {optimized_file.id}")
print(f"Size reduction: {analysis.size_bytes - optimized_file.size_bytes} bytes")
asyncio.run(process_files())
async def convert_files():
client = VerbalisAI()
file_id = "your-file-id"
# Convert to different formats
conversions = {
"mp3": {"bitrate": "128k", "quality": "standard"},
"wav": {"sample_rate": 16000, "channels": 1}, # Mono, 16kHz
"flac": {"compression_level": 5}
}
for format_type, options in conversions.items():
converted_file = await client.files.convert(
file_id,
target_format=format_type,
options=options
)
print(f"Converted to {format_type}: {converted_file.filename}")
print(f"Size: {converted_file.size_bytes} bytes")
asyncio.run(convert_files())
from verbalisai import VerbalisAI, VerbalisAIError
import asyncio
async def robust_file_upload():
client = VerbalisAI()
max_retries = 3
retry_delay = 2.0
file_path = "important-audio.mp3"
for attempt in range(max_retries):
try:
with open(file_path, "rb") as f:
file_info = await client.files.upload(
file=f,
filename="important-audio.mp3"
)
print(f"Upload successful on attempt {attempt + 1}")
print(f"File URL: {file_info.url}")
break
except VerbalisAIError as e:
print(f"Upload attempt {attempt + 1} failed: {e.message}")
if e.status_code == 413: # File too large
print("File is too large, compressing...")
# Implement compression logic here
continue
elif e.status_code == 507: # Storage full
print("Storage quota exceeded")
break
elif e.status_code in [500, 502, 503]: # Server errors
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
print(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
continue
else:
print("Non-retryable error")
break
except Exception as e:
print(f"Unexpected error: {e}")
break
asyncio.run(robust_file_upload())
async def upload_and_transcribe():
client = VerbalisAI()
# Upload local file
with open("meeting-recording.mp3", "rb") as f:
file_info = await client.files.upload(
file=f,
filename="meeting-recording.mp3",
folder="meetings/june-2025",
tags=["meeting", "team", "planning"]
)
print(f"File uploaded: {file_info.url}")
# Transcribe uploaded file
transcription = await client.transcriptions.create(
audio_url=file_info.url,
model="pro",
diarize=True,
topics=True,
summarization=True,
summary_type="bullets"
)
print(f"Transcription completed: {transcription.id}")
# Update file with transcription metadata
await client.files.update_metadata(
file_info.id,
transcription_id=transcription.id,
transcription_status="completed",
topics=transcription.topics
)
print("File metadata updated with transcription info")
asyncio.run(upload_and_transcribe())