Examples
This section provides comprehensive examples of using tzst for various scenarios and use cases.
Table of Contents
Basic Operations
Creating Your First Archive
from tzst import create_archive
# Simple archive creation
files_to_archive = ["document.pdf", "photos/", "config.json"]
create_archive("my-archive.tzst", files_to_archive)
# With custom compression level
create_archive("high-compression.tzst", files_to_archive, compression_level=9)
Command Line Equivalent
# Create archive
tzst a my-archive.tzst document.pdf photos/ config.json
# With high compression
tzst a high-compression.tzst document.pdf photos/ config.json -l 9
Basic Extraction
from tzst import extract_archive
# Extract to current directory
extract_archive("my-archive.tzst")
# Extract to specific directory
extract_archive("my-archive.tzst", "extracted/")
# Extract specific files only
extract_archive("my-archive.tzst", "output/", members=["document.pdf", "config.json"])
# Extract with conflict resolution
extract_archive("my-archive.tzst", "output/", conflict_resolution="skip")
Listing Archive Contents
from tzst import list_archive
# Simple listing
contents = list_archive("my-archive.tzst")
for item in contents:
print(f"{item['name']} ({item['size']} bytes)")
# Detailed listing with timestamps and permissions
contents = list_archive("my-archive.tzst", verbose=True)
for item in contents:
print(f"{item['name']:30} {item['size']:>10} bytes {item['mtime']}")
Command Line Usage
Note: Download the standalone binary for the best performance and no Python dependency. Alternatively, use
uvx tzst
for running without installation. See uv documentation for details.
Archive Creation Commands
# Basic archive creation
tzst a backup.tzst documents/ photos/
# Or with uvx (no installation needed)
uvx tzst a backup.tzst documents/ photos/
# Create with specific compression level
tzst a backup.tzst documents/ photos/ -l 6
uvx tzst a backup.tzst documents/ photos/ -l 6
# Create from multiple sources
tzst a complete-backup.tzst /home/user/documents /home/user/photos /etc/config
# Create with verbose output
tzst a backup.tzst documents/ photos/ -v
Extraction Commands
# Extract to current directory
tzst x backup.tzst
# Or with uvx
uvx tzst x backup.tzst
# Extract to specific directory
tzst x backup.tzst --output /restore/
# Extract specific files only
tzst x backup.tzst documents/report.pdf photos/vacation.jpg
# Extract with conflict resolution
tzst x backup.tzst --conflict-resolution skip
# Extract flattening directory structure
tzst e backup.tzst --output flat-restore/
Archive Inspection Commands
# List archive contents
tzst l backup.tzst
# Or with uvx
uvx tzst l backup.tzst
# List with detailed information
tzst l backup.tzst --verbose
# Test archive integrity
tzst t backup.tzst
# Stream large archives efficiently
tzst l huge-archive.tzst --streaming
Advanced Archive Creation
Working with the TzstArchive Class
from tzst import TzstArchive
from pathlib import Path
# Create archive with fine-grained control
with TzstArchive("project-backup.tzst", "w", compression_level=6) as archive:
# Add individual files
archive.add("README.md")
archive.add("LICENSE")
# Add directories recursively
archive.add("src/", recursive=True)
archive.add("tests/", recursive=True)
# Add with custom archive names
archive.add("config/production.yaml", arcname="config.yaml")
archive.add("/tmp/build-info.json", arcname="build-info.json")
Conditional File Addition
import os
from tzst import TzstArchive
from pathlib import Path
def backup_project(project_path, output_archive):
"""Create a project backup excluding certain files."""
project_path = Path(project_path)
# Define exclusion patterns
exclude_patterns = {
"*.pyc", "*.pyo", "__pycache__",
".git", ".svn", "node_modules",
"*.tmp", "*.log", ".DS_Store"
}
with TzstArchive(output_archive, "w", compression_level=5) as archive:
for item in project_path.rglob("*"):
# Skip excluded patterns
if any(item.match(pattern) for pattern in exclude_patterns):
continue
# Skip if it's a directory (will be created automatically)
if item.is_dir():
continue
# Add file with relative path
rel_path = item.relative_to(project_path)
archive.add(str(item), arcname=str(rel_path))
print(f"Added: {rel_path}")
# Usage
backup_project("/home/user/myproject", "project-clean.tzst")
Atomic Archive Creation
from tzst import create_archive
# Safe atomic creation (default behavior)
# Creates in temporary file first, then moves to final location
create_archive("important-data.tzst", ["critical/"], use_temp_file=True)
# Direct creation (faster but not atomic)
create_archive("temp-data.tzst", ["temp/"], use_temp_file=False)
Flexible Extraction
Extracting with Different Structures
from tzst import extract_archive, TzstArchive
# Standard extraction (preserves directory structure)
extract_archive("archive.tzst", "output/")
# Flatten all files to single directory
extract_archive("archive.tzst", "flat-output/", flatten=True)
# Extract with streaming for large archives
extract_archive("huge-archive.tzst", "output/", streaming=True)
Selective Extraction
from tzst import TzstArchive
def extract_by_extension(archive_path, output_dir, extensions):
"""Extract only files with specific extensions."""
with TzstArchive(archive_path, "r") as archive:
members = archive.getmembers()
# Filter members by extension
filtered_members = [
member.name for member in members
if any(member.name.endswith(ext) for ext in extensions)
]
if filtered_members:
archive.extractall(output_dir, members=filtered_members)
print(f"Extracted {len(filtered_members)} files")
else:
print("No matching files found")
# Extract only images
extract_by_extension("photos.tzst", "images/", [".jpg", ".png", ".gif"])
# Extract only documents
extract_by_extension("backup.tzst", "docs/", [".pdf", ".docx", ".txt"])
Custom Extraction Logic
from tzst import TzstArchive
import os
def extract_large_files_only(archive_path, output_dir, min_size_mb=10):
"""Extract only files larger than specified size."""
min_size_bytes = min_size_mb * 1024 * 1024
with TzstArchive(archive_path, "r") as archive:
large_files = []
for member in archive.getmembers():
if member.isfile() and member.size > min_size_bytes:
large_files.append(member.name)
size_mb = member.size / (1024 * 1024)
print(f"Will extract: {member.name} ({size_mb:.1f} MB)")
if large_files:
os.makedirs(output_dir, exist_ok=True)
for filename in large_files:
archive.extract(filename, output_dir)
print(f"Extracted {len(large_files)} large files")
extract_large_files_only("mixed-content.tzst", "large-files/", min_size_mb=5)
Security and Filtering
Safe Extraction Practices
from tzst import extract_archive
# Always use secure filters (default behavior)
extract_archive("untrusted.tzst", "safe-output/", filter="data")
# For trusted archives with special tar features
extract_archive("trusted.tzst", "output/", filter="tar")
# Only for completely trusted archives
extract_archive("internal.tzst", "output/", filter="fully_trusted")
Custom Security Filter
import tarfile
from tzst import TzstArchive
def secure_data_filter(member, path):
"""Custom filter that only allows regular files and directories."""
# Only allow regular files and directories
if not (member.isfile() or member.isdir()):
return None
# Prevent path traversal
if os.path.isabs(member.name) or ".." in member.name:
return None
# Limit file size (100MB max)
if member.isfile() and member.size > 100 * 1024 * 1024:
return None
return member
# Use custom filter
with TzstArchive("archive.tzst", "r") as archive:
archive.extractall("secure-output/", filter=secure_data_filter)
Handling File Conflicts
from tzst import extract_archive, ConflictResolution
# Skip existing files
extract_archive("archive.tzst", "output/",
conflict_resolution=ConflictResolution.SKIP_ALL)
# Replace all existing files
extract_archive("archive.tzst", "output/",
conflict_resolution=ConflictResolution.REPLACE_ALL)
# Auto-rename conflicting files (adds suffix like "_1", "_2", etc.)
extract_archive("archive.tzst", "output/",
conflict_resolution=ConflictResolution.AUTO_RENAME_ALL)
# Interactive resolution (command line only)
extract_archive("archive.tzst", "output/",
conflict_resolution=ConflictResolution.ASK)
Custom Conflict Resolution
from tzst import extract_archive, ConflictResolution
from pathlib import Path
def custom_conflict_handler(target_path: Path) -> ConflictResolution:
"""Custom logic for handling file conflicts."""
# Check file age
if target_path.exists():
file_age_days = (time.time() - target_path.stat().st_mtime) / (24 * 3600)
if file_age_days > 30:
print(f"Replacing old file: {target_path}")
return ConflictResolution.REPLACE
else:
print(f"Keeping newer file: {target_path}")
return ConflictResolution.SKIP
return ConflictResolution.REPLACE
# Use custom callback
extract_archive("archive.tzst", "output/",
conflict_resolution=ConflictResolution.ASK,
interactive_callback=custom_conflict_handler)
Performance Optimization
Streaming for Large Archives
from tzst import TzstArchive, list_archive, test_archive
# Memory-efficient operations for large archives
large_archive = "backup-500gb.tzst"
# Test integrity with streaming
is_valid = test_archive(large_archive, streaming=True)
# List contents with streaming
contents = list_archive(large_archive, streaming=True, verbose=True)
# Extract with streaming
with TzstArchive(large_archive, "r", streaming=True) as archive:
archive.extractall("restore/")
Compression Level Optimization
import time
from tzst import create_archive
def benchmark_compression_levels(files, output_prefix="test"):
"""Compare different compression levels."""
levels_to_test = [1, 3, 6, 9, 15, 22]
results = []
for level in levels_to_test:
output_file = f"{output_prefix}_level_{level}.tzst"
# Measure compression time
start_time = time.time()
create_archive(output_file, files, compression_level=level)
compress_time = time.time() - start_time
# Get file size
file_size = Path(output_file).stat().st_size
results.append({
'level': level,
'time': compress_time,
'size': file_size,
'size_mb': file_size / (1024 * 1024)
})
print(f"Level {level}: {compress_time:.2f}s, {file_size/1024/1024:.1f} MB")
return results
# Test different compression levels
results = benchmark_compression_levels(["large-directory/"])
Parallel Processing
import concurrent.futures
from tzst import create_archive
from pathlib import Path
def create_archive_batch(file_groups, output_dir="archives/", compression_level=6):
"""Create multiple archives in parallel."""
Path(output_dir).mkdir(exist_ok=True)
def create_single_archive(args):
group_name, files = args
output_path = Path(output_dir) / f"{group_name}.tzst"
create_archive(output_path, files, compression_level=compression_level)
return f"Created {output_path}"
# Use ThreadPoolExecutor for I/O-bound operations
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
future_to_group = {
executor.submit(create_single_archive, item): item[0]
for item in file_groups.items()
}
for future in concurrent.futures.as_completed(future_to_group):
group_name = future_to_group[future]
try:
result = future.result()
print(result)
except Exception as e:
print(f"Archive {group_name} failed: {e}")
# Example usage
file_groups = {
"documents": ["docs/", "papers/"],
"projects": ["src/", "tests/"],
"media": ["photos/", "videos/"]
}
create_archive_batch(file_groups)
Error Handling
Comprehensive Error Handling
from tzst import TzstArchive, TzstArchiveError, TzstDecompressionError
import logging
def safe_archive_operation(operation, *args, **kwargs):
"""Wrapper for safe archive operations with logging."""
try:
return operation(*args, **kwargs)
except TzstDecompressionError as e:
logging.error(f"Decompression error: {e}")
print("The archive appears to be corrupted or not a valid tzst file.")
return None
except TzstArchiveError as e:
logging.error(f"Archive error: {e}")
print(f"Archive operation failed: {e}")
return None
except PermissionError as e:
logging.error(f"Permission error: {e}")
print("Permission denied. Check file/directory permissions.")
return None
except FileNotFoundError as e:
logging.error(f"File not found: {e}")
print(f"File or directory not found: {e}")
return None
except Exception as e:
logging.error(f"Unexpected error: {e}")
print(f"An unexpected error occurred: {e}")
return None
# Example usage
def create_backup_safely(files, output_archive):
def create_operation():
from tzst import create_archive
return create_archive(output_archive, files)
result = safe_archive_operation(create_operation)
if result is not None:
print(f"Backup created successfully: {output_archive}")
else:
print("Backup creation failed!")
Validation and Recovery
from tzst import test_archive, list_archive, TzstArchive
from pathlib import Path
def validate_and_repair_archive(archive_path):
"""Validate archive and attempt basic recovery."""
archive_path = Path(archive_path)
print(f"Validating {archive_path}...")
# Test basic integrity
try:
if test_archive(archive_path):
print("Archive integrity test passed")
return True
except Exception as e:
print(f"Integrity test failed: {e}")
# Try to list contents
try:
contents = list_archive(archive_path)
print(f"Archive contains {len(contents)} items")
# Try streaming mode if regular mode fails
contents_streaming = list_archive(archive_path, streaming=True)
if len(contents_streaming) != len(contents):
print("Different results between modes - possible corruption")
except Exception as e:
print(f"Cannot list contents: {e}")
return False
# Try partial extraction
try:
backup_dir = archive_path.parent / f"{archive_path.stem}_recovery"
backup_dir.mkdir(exist_ok=True)
with TzstArchive(archive_path, "r") as archive:
extracted_count = 0
for member in archive.getmembers():
try:
if member.isfile():
archive.extract(member.name, backup_dir)
extracted_count += 1
except Exception as e:
print(f"Failed to extract {member.name}: {e}")
print(f"Recovered {extracted_count} files to {backup_dir}")
return True
except Exception as e:
print(f"Recovery failed: {e}")
return False
# Example usage
validate_and_repair_archive("potentially-corrupted.tzst")
Real-World Scenarios
Automated Backup System
#!/usr/bin/env python3
"""
Daily backup script with rotation and validation.
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from tzst import create_archive, test_archive
class BackupManager:
def __init__(self, source_dirs, backup_dir, retention_days=30):
self.source_dirs = [Path(d) for d in source_dirs]
self.backup_dir = Path(backup_dir)
self.retention_days = retention_days
self.backup_dir.mkdir(parents=True, exist_ok=True)
def create_backup(self):
"""Create a new backup with timestamp."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"backup_{timestamp}.tzst"
backup_path = self.backup_dir / backup_name
print(f"Creating backup: {backup_name}")
# Collect all existing files
files_to_backup = []
for source_dir in self.source_dirs:
if source_dir.exists():
files_to_backup.append(str(source_dir))
else:
print(f"Warning: Source directory not found: {source_dir}")
if not files_to_backup:
print("No files to backup!")
return None
try:
# Create backup with high compression for storage efficiency
create_archive(backup_path, files_to_backup, compression_level=9)
# Validate the backup
if test_archive(backup_path):
file_size = backup_path.stat().st_size / (1024 * 1024)
print(f"Backup created and validated: {file_size:.1f} MB")
return backup_path
else:
print("Backup validation failed!")
backup_path.unlink() # Remove invalid backup
return None
except Exception as e:
print(f"Backup failed: {e}")
return None
def cleanup_old_backups(self):
"""Remove backups older than retention period."""
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
removed_count = 0
for backup_file in self.backup_dir.glob("backup_*.tzst"):
# Extract timestamp from filename
try:
timestamp_str = backup_file.stem.split("_", 1)[1]
file_date = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
if file_date < cutoff_date:
backup_file.unlink()
removed_count += 1
print(f"Removed old backup: {backup_file.name}")
except (ValueError, IndexError):
print(f"Warning: Could not parse backup date: {backup_file.name}")
print(f"Cleaned up {removed_count} old backups")
def run_backup(self):
"""Run complete backup process."""
print("Starting backup process...")
backup_path = self.create_backup()
if backup_path:
self.cleanup_old_backups()
print("Backup process completed successfully!")
return True
else:
print("Backup process failed!")
return False
# Configuration
if __name__ == "__main__":
# Customize these paths for your setup
BACKUP_SOURCES = [
"~/Documents",
"~/Projects",
"~/Pictures",
"/etc", # System configs (Linux/macOS)
]
BACKUP_DESTINATION = "~/Backups"
RETENTION_DAYS = 30
# Expand user paths
sources = [os.path.expanduser(path) for path in BACKUP_SOURCES]
destination = os.path.expanduser(BACKUP_DESTINATION)
# Run backup
backup_manager = BackupManager(sources, destination, RETENTION_DAYS)
success = backup_manager.run_backup()
sys.exit(0 if success else 1)
Log File Archiver
#!/usr/bin/env python3
"""
Archive and compress log files by date.
"""
import re
from datetime import datetime, timedelta
from pathlib import Path
from tzst import create_archive
def archive_logs_by_date(log_dir, archive_dir, days_old=7):
"""Archive log files older than specified days."""
log_dir = Path(log_dir)
archive_dir = Path(archive_dir)
archive_dir.mkdir(parents=True, exist_ok=True)
cutoff_date = datetime.now() - timedelta(days=days_old)
# Group log files by date
log_groups = {}
log_pattern = re.compile(r"(\d{4}-\d{2}-\d{2})")
for log_file in log_dir.glob("*.log"):
# Try to extract date from filename or modification time
date_match = log_pattern.search(log_file.name)
if date_match:
file_date_str = date_match.group(1)
try:
file_date = datetime.strptime(file_date_str, "%Y-%m-%d")
except ValueError:
# Fall back to modification time
file_date = datetime.fromtimestamp(log_file.stat().st_mtime)
else:
# Use modification time
file_date = datetime.fromtimestamp(log_file.stat().st_mtime)
# Skip recent files
if file_date >= cutoff_date:
continue
# Group by date
date_key = file_date.strftime("%Y-%m-%d")
if date_key not in log_groups:
log_groups[date_key] = []
log_groups[date_key].append(log_file)
# Create archives for each date group
archived_files = []
for date_key, files in log_groups.items():
archive_name = f"logs_{date_key}.tzst"
archive_path = archive_dir / archive_name
# Skip if archive already exists
if archive_path.exists():
print(f"Archive already exists: {archive_name}")
continue
print(f"Archiving {len(files)} log files for {date_key}")
try:
# Create archive with maximum compression (logs compress well)
create_archive(archive_path, [str(f) for f in files], compression_level=22)
# Verify archive
from tzst import test_archive
if test_archive(archive_path):
# Remove original files after successful archiving
for log_file in files:
log_file.unlink()
archived_files.append(log_file)
file_size = archive_path.stat().st_size / 1024
print(f"Created {archive_name} ({file_size:.1f} KB)")
else:
print(f"Archive validation failed for {archive_name}")
archive_path.unlink()
except Exception as e:
print(f"Failed to archive logs for {date_key}: {e}")
print(f"Archived {len(archived_files)} log files")
# Usage
if __name__ == "__main__":
archive_logs_by_date("/var/log", "/var/archives", days_old=7)
Data Migration Tool
#!/usr/bin/env python3
"""
Migrate data between systems using tzst archives.
"""
import hashlib
from pathlib import Path
from tzst import create_archive, extract_archive, test_archive
class DataMigrator:
def __init__(self, source_dir, staging_dir):
self.source_dir = Path(source_dir)
self.staging_dir = Path(staging_dir)
self.staging_dir.mkdir(parents=True, exist_ok=True)
def calculate_checksum(self, file_path):
"""Calculate SHA256 checksum of a file."""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
def create_migration_package(self, package_name):
"""Create a migration package with checksums."""
package_path = self.staging_dir / f"{package_name}.tzst"
checksum_file = self.staging_dir / f"{package_name}.sha256"
print(f"Creating migration package: {package_name}")
# Create the archive
create_archive(
package_path,
[str(self.source_dir)],
compression_level=6 # Balanced for network transfer
)
# Verify archive
if not test_archive(package_path):
raise RuntimeError("Archive validation failed")
# Calculate and save checksum
checksum = self.calculate_checksum(package_path)
with open(checksum_file, "w") as f:
f.write(f"{checksum} {package_path.name}\n")
package_size = package_path.stat().st_size / (1024 * 1024) print(f"Package created: {package_size:.1f} MB")
print(f"Checksum: {checksum}")
return package_path, checksum_file
def verify_and_extract_package(self, package_path, checksum_path, destination):
"""Verify package integrity and extract."""
package_path = Path(package_path)
checksum_path = Path(checksum_path)
destination = Path(destination)
print(f"Verifying package: {package_path.name}")
# Verify checksum
expected_checksum = checksum_path.read_text().strip().split()[0]
actual_checksum = self.calculate_checksum(package_path)
if expected_checksum != actual_checksum:
raise RuntimeError(f"Checksum mismatch! Expected: {expected_checksum}, Got: {actual_checksum}")
print("Checksum verification passed")
# Test archive integrity
if not test_archive(package_path):
raise RuntimeError("Archive integrity check failed")
print("Archive integrity verified")
# Extract with conflict resolution
destination.mkdir(parents=True, exist_ok=True)
extract_archive(
package_path,
destination,
conflict_resolution="replace_all" # Overwrite for migration
)
print(f"Package extracted to: {destination}")
# Example usage
if __name__ == "__main__":
# Create migration package
migrator = DataMigrator("/home/user/important-data", "/tmp/migration")
package_path, checksum_path = migrator.create_migration_package("data-migration-v1")
# Simulate transfer and extraction on target system
migrator.verify_and_extract_package(
package_path,
checksum_path,
"/home/user/restored-data"
)
Integration Examples
Django Management Command
# management/commands/backup_media.py
from django.core.management.base import BaseCommand
from django.conf import settings
from tzst import create_archive
from datetime import datetime
import os
class Command(BaseCommand):
help = 'Create a backup of media files'
def add_arguments(self, parser):
parser.add_argument(
'--output-dir',
default='/backups',
help='Output directory for backup files'
)
parser.add_argument(
'--compression-level',
type=int,
default=6,
help='Compression level (1-22)'
)
def handle(self, *args, **options):
media_root = settings.MEDIA_ROOT
output_dir = options['output_dir']
compression_level = options['compression_level']
if not os.path.exists(media_root):
self.stdout.write(
self.style.ERROR(f'Media directory not found: {media_root}')
)
return
# Create backup filename with timestamp
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_filename = f'media_backup_{timestamp}.tzst'
backup_path = os.path.join(output_dir, backup_filename)
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
try:
self.stdout.write(f'Creating media backup: {backup_filename}')
create_archive(backup_path, [media_root], compression_level=compression_level)
# Verify backup
from tzst import test_archive
if test_archive(backup_path):
file_size = os.path.getsize(backup_path) / (1024 * 1024)
self.stdout.write(
self.style.SUCCESS(
f'Backup created successfully: {backup_filename} ({file_size:.1f} MB)'
)
)
else:
self.stdout.write(
self.style.ERROR('Backup validation failed!')
)
except Exception as e:
self.stdout.write(
self.style.ERROR(f'Backup failed: {e}')
)
Flask Application Integration
from flask import Flask, request, send_file, jsonify
from tzst import create_archive, extract_archive
import tempfile
import os
from pathlib import Path
app = Flask(__name__)
@app.route('/api/backup', methods=['POST'])
def create_backup():
"""API endpoint to create backups."""
try:
data = request.get_json()
paths = data.get('paths', [])
compression_level = data.get('compression_level', 6)
if not paths:
return jsonify({'error': 'No paths specified'}), 400
# Create temporary archive
with tempfile.NamedTemporaryFile(suffix='.tzst', delete=False) as tmp:
temp_path = tmp.name
create_archive(temp_path, paths, compression_level=compression_level)
# Return archive file
return send_file(
temp_path,
as_attachment=True,
download_name='backup.tzst',
mimetype='application/octet-stream'
)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
# Clean up temporary file
if 'temp_path' in locals() and os.path.exists(temp_path):
os.unlink(temp_path)
@app.route('/api/extract', methods=['POST'])
def extract_files():
"""API endpoint to extract archives."""
try:
if 'file' not in request.files:
return jsonify({'error': 'No file uploaded'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(suffix='.tzst', delete=False) as tmp:
temp_archive = tmp.name
file.save(temp_archive)
# Create extraction directory
extract_dir = tempfile.mkdtemp()
# Extract archive
extract_archive(temp_archive, extract_dir)
# List extracted files
extracted_files = []
for root, dirs, files in os.walk(extract_dir):
for file in files:
rel_path = os.path.relpath(os.path.join(root, file), extract_dir)
extracted_files.append(rel_path)
return jsonify({
'success': True,
'extracted_files': extracted_files,
'extract_path': extract_dir
})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
# Clean up temporary archive
if 'temp_archive' in locals() and os.path.exists(temp_archive):
os.unlink(temp_archive)
if __name__ == '__main__':
app.run(debug=True)
Jupyter Notebook Integration
# Cell 1: Setup
import pandas as pd
from tzst import create_archive, extract_archive, list_archive
from pathlib import Path
import matplotlib.pyplot as plt
# Cell 2: Create dataset archive
def archive_datasets(data_dir="./data", archive_name="datasets.tzst"):
"""Archive all dataset files for sharing."""
data_path = Path(data_dir)
if not data_path.exists():
print(f"Creating sample data directory: {data_dir}")
data_path.mkdir(exist_ok=True)
# Create sample datasets
sample_data = pd.DataFrame({
'A': range(100),
'B': range(100, 200),
'C': range(200, 300)
})
sample_data.to_csv(data_path / "sample.csv", index=False)
sample_data.to_parquet(data_path / "sample.parquet")
# Create archive
create_archive(archive_name, [str(data_path)], compression_level=9)
# Show archive contents
contents = list_archive(archive_name, verbose=True)
df = pd.DataFrame(contents)
print(f"Created archive: {archive_name}")
return df
# Execute
archive_contents = archive_datasets()
display(archive_contents)
# Cell 3: Analyze archive
def analyze_archive(archive_path="datasets.tzst"):
"""Analyze archive contents and compression."""
contents = list_archive(archive_path, verbose=True)
df = pd.DataFrame(contents)
# File type analysis
df['extension'] = df['name'].str.split('.').str[-1]
file_types = df.groupby('extension')['size'].agg(['count', 'sum']).reset_index()
# Visualization
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# File count by type
ax1.bar(file_types['extension'], file_types['count'])
ax1.set_title('File Count by Type')
ax1.set_xlabel('File Extension')
ax1.set_ylabel('Count')
# Size by type
ax2.bar(file_types['extension'], file_types['sum'] / 1024) # KB
ax2.set_title('Total Size by Type (KB)')
ax2.set_xlabel('File Extension')
ax2.set_ylabel('Size (KB)')
plt.tight_layout()
plt.show()
return df, file_types
# Execute
contents_df, file_summary = analyze_archive()
print("File Summary:")
display(file_summary)
These examples demonstrate the flexibility and power of tzst for various real-world scenarios. The library’s clean API and robust error handling make it suitable for everything from simple backup scripts to complex enterprise applications.