Source code for tzst.core

"""Core functionality for tzst archives."""

import io
import os
import tarfile
import tempfile
import time
from collections.abc import Callable, Sequence
from enum import Enum
from pathlib import Path
from typing import BinaryIO

import zstandard as zstd

from .exceptions import TzstArchiveError, TzstDecompressionError


[docs] class ConflictResolution(Enum): """Enum for conflict resolution strategies.""" REPLACE = "replace" SKIP = "skip" REPLACE_ALL = "replace_all" SKIP_ALL = "skip_all" AUTO_RENAME = "auto_rename" AUTO_RENAME_ALL = "auto_rename_all" EXIT = "exit" ASK = "ask"
[docs] class ConflictResolutionState: """State management for conflict resolution during extraction."""
[docs] def __init__(self, initial_resolution: ConflictResolution | None = None): self.continue_extraction = True self.global_resolution = initial_resolution # If initial resolution is EXIT, set continue_extraction to False if initial_resolution == ConflictResolution.EXIT: self.continue_extraction = False
@property def current_resolution(self) -> ConflictResolution | None: """Get the current resolution state.""" return self.global_resolution
[docs] def should_continue(self) -> bool: """Check if extraction should continue.""" return self.continue_extraction
@property def apply_to_all(self) -> bool: """Check if the current resolution applies to all future conflicts.""" return self.global_resolution in ( ConflictResolution.REPLACE_ALL, ConflictResolution.SKIP_ALL, ConflictResolution.AUTO_RENAME_ALL, )
[docs] def update_resolution(self, resolution: ConflictResolution) -> None: """Update the global resolution state.""" if resolution == ConflictResolution.EXIT: self.continue_extraction = False self.global_resolution = resolution elif resolution in ( ConflictResolution.REPLACE_ALL, ConflictResolution.SKIP_ALL, ConflictResolution.AUTO_RENAME_ALL, ): self.global_resolution = resolution
def _get_unique_filename(file_path: Path) -> Path: """Generate a unique filename by appending a number if the file exists.""" if not file_path.exists(): return file_path parent = file_path.parent stem = file_path.stem suffix = file_path.suffix counter = 1 while True: new_name = f"{stem}_{counter}{suffix}" new_path = parent / new_name if not new_path.exists(): return new_path counter += 1 def _move_file_cross_platform(src: Path, dst: Path) -> None: """Move a file from src to dst, handling cross-drive moves on Windows.""" try: # Try the fast rename operation first src.rename(dst) except OSError as e: # On Windows, rename fails across drives with error 17 # Fall back to copy + delete for cross-drive moves import shutil try: shutil.copy2(src, dst) src.unlink() except Exception: # If copy also fails, re-raise the original rename error raise e from None def _handle_file_conflict( target_path: Path, resolution: ConflictResolution | str, interactive_callback: Callable[[Path], ConflictResolution] | None = None, ) -> tuple[ConflictResolution, Path | None]: """ Handle file conflicts during extraction. Args: target_path: The path where a conflict occurred resolution: The conflict resolution strategy interactive_callback: Optional callback for interactive resolution Returns: Tuple of (actual_resolution, final_path)""" # Convert string resolution to enum if needed if isinstance(resolution, str): try: resolution = ConflictResolution(resolution) except ValueError: # Invalid string, fallback to ASK for interactive handling resolution = ConflictResolution.ASK if resolution == ConflictResolution.ASK: if interactive_callback: resolution = interactive_callback(target_path) else: # No callback provided, default to REPLACE for consistency with tests resolution = ConflictResolution.REPLACE if resolution in (ConflictResolution.REPLACE, ConflictResolution.REPLACE_ALL): return resolution, target_path elif resolution in (ConflictResolution.SKIP, ConflictResolution.SKIP_ALL): return resolution, None elif resolution in ( ConflictResolution.AUTO_RENAME, ConflictResolution.AUTO_RENAME_ALL, ): unique_path = _get_unique_filename(target_path) return resolution, unique_path elif resolution == ConflictResolution.EXIT: return resolution, None else: # Unknown resolution, default to REPLACE for robustness return ConflictResolution.REPLACE, target_path
[docs] class TzstArchive: """A class for handling .tzst/.tar.zst archives."""
[docs] def __init__( self, filename: str | Path, mode: str = "r", compression_level: int = 3, streaming: bool = False, ): """ Initialize a TzstArchive. Args: filename: Path to the archive file mode: Open mode ('r', 'w', 'a') compression_level: Zstandard compression level (1-22) streaming: If True, use streaming mode for reading (reduces memory usage for very large archives but may limit some tarfile operations that require seeking. Recommended for archives > 100MB) """ self.filename = Path(filename) self.mode = mode self.compression_level = compression_level self.streaming = streaming self._tarfile: tarfile.TarFile | None = None self._fileobj: BinaryIO | None = None self._compressed_stream: ( zstd.ZstdCompressionWriter | zstd.ZstdDecompressionReader | io.BytesIO | None ) = None # Validate mode valid_modes = ["r", "w", "a"] if mode not in valid_modes: raise ValueError( f"Invalid mode '{mode}'. Must be one of: {', '.join(valid_modes)}" ) # Validate compression level if not 1 <= compression_level <= 22: raise ValueError( f"Invalid compression level '{compression_level}'. Must be between 1 and 22." ) # Check for unsupported modes immediately - provide clear documentation if mode.startswith("a"): raise NotImplementedError( "Append mode is not currently supported for .tzst/.tar.zst archives. " "This would require decompressing the entire archive, adding new files, " "and recompressing, which is complex and potentially slow for large archives. " "Alternatives: 1) Create multiple archives, 2) Recreate the archive with all files, " "3) Use standard tar format for append operations, then compress separately." )
[docs] def __enter__(self): """Enter context manager.""" self.open() return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """Exit context manager.""" try: self.close() except Exception: # Suppress exceptions during cleanup to avoid masking original exceptions pass
[docs] def open(self): """Open the archive. See Also: :meth:`close`: Method to close the archive """ try: if self.mode.startswith("r"): # Read mode self._fileobj = open(self.filename, "rb") dctx = zstd.ZstdDecompressor() if self.streaming: # Streaming mode - use stream reader directly (memory efficient) # Note: This may limit some tarfile operations that require seeking self._compressed_stream = dctx.stream_reader(self._fileobj) self._tarfile = tarfile.open( fileobj=self._compressed_stream, mode="r|" ) else: # Buffer mode - decompress to memory buffer for random access # Better compatibility but higher memory usage for large archives decompressed_chunks = [] with dctx.stream_reader(self._fileobj) as reader: while True: chunk = reader.read(8192) if not chunk: break decompressed_chunks.append(chunk) decompressed_data = b"".join(decompressed_chunks) self._compressed_stream = io.BytesIO(decompressed_data) self._tarfile = tarfile.open( fileobj=self._compressed_stream, mode="r" ) elif self.mode.startswith("w"): # Write mode - use streaming compression self._fileobj = open(self.filename, "wb") cctx = zstd.ZstdCompressor( level=self.compression_level, write_content_size=True ) self._compressed_stream = cctx.stream_writer(self._fileobj) self._tarfile = tarfile.open(fileobj=self._compressed_stream, mode="w|") elif self.mode.startswith("a"): # Append mode - for tar.zst, this is complex as we need to decompress, # add files, and recompress. For simplicity, we'll raise an error for now. raise NotImplementedError( "Append mode is not currently supported for .tzst/.tar.zst archives. " "This would require decompressing the entire archive, adding new files, " "and recompressing, which is complex and potentially slow for large archives. " "Alternatives: 1) Create multiple archives, 2) Recreate the archive with all files, " "3) Use standard tar format for append operations, then compress separately." ) else: raise ValueError(f"Invalid mode: {self.mode}") except Exception as e: self.close() if "zstd" in str(e).lower(): raise TzstDecompressionError(f"Failed to open archive: {e}") from e else: raise TzstArchiveError(f"Failed to open archive: {e}") from e
[docs] def close(self): """Close the archive.""" if self._tarfile: try: self._tarfile.close() except Exception: pass self._tarfile = None if self._compressed_stream: try: self._compressed_stream.close() except Exception: pass self._compressed_stream = None if self._fileobj: try: self._fileobj.close() except Exception: pass self._fileobj = None
[docs] def add( self, name: str | Path, arcname: str | None = None, recursive: bool = True, ): """ Add a file or directory to the archive. Args: name: Path to file or directory to add arcname: Alternative name for the file in the archive recursive: If True, add directories recursively See Also: :func:`create_archive`: Convenience function for creating archives """ if not self._tarfile: raise RuntimeError("Archive not open") if not self.mode.startswith("w"): raise RuntimeError("Archive not open for writing") path = Path(name) if not path.exists(): raise FileNotFoundError(f"File not found: {name}") try: self._tarfile.add(str(path), arcname=arcname, recursive=recursive) except PermissionError as e: raise TzstArchiveError(f"Failed to add {name}: {e}") from e
[docs] def extract( self, member: str | None = None, path: str | Path = ".", set_attrs: bool = True, numeric_owner: bool = False, filter: str | Callable | None = "data", ): """ Extract files from the archive. Args: member: Specific member to extract (None for all) path: Destination directory set_attrs: Whether to set file attributes numeric_owner: Whether to use numeric owner filter: Extraction filter for security. Can be: - 'data': Safe filter for cross-platform data archives (default, recommended) - 'tar': Honor most tar features but block dangerous ones - 'fully_trusted': Honor all metadata (use only for trusted archives) - None: Use default behavior (may show deprecation warning in Python 3.12+) - callable: Custom filter function Warning: Never extract archives from untrusted sources without proper filtering. The 'data' filter is recommended for most use cases as it prevents dangerous security issues like path traversal attacks. Note: In streaming mode, extracting specific members is not supported. Some extraction operations may be limited due to the sequential nature of streaming mode. See Also: :func:`extract_archive`: Convenience function for extracting archives """ if not self._tarfile: raise RuntimeError("Archive not open") if not self.mode.startswith("r"): raise RuntimeError("Archive not open for reading") extract_path = Path(path) extract_path.mkdir(parents=True, exist_ok=True) if self.streaming and member: # Specific member extraction not supported in streaming mode raise RuntimeError( "Extracting specific members is not supported in streaming mode. " "Please use non-streaming mode for selective extraction, or extract all files." ) # Prepare extraction arguments - different parameters for extract vs extractall try: if member: # extract() accepts set_attrs, numeric_owner, and filter extract_kwargs = { "set_attrs": set_attrs, "numeric_owner": numeric_owner, "filter": filter, } self._tarfile.extract(member, path=extract_path, **extract_kwargs) else: # extractall() only accepts numeric_owner and filter (no set_attrs) extractall_kwargs = { "numeric_owner": numeric_owner, "filter": filter, } self._tarfile.extractall(path=extract_path, **extractall_kwargs) except (tarfile.StreamError, OSError) as e: if self.streaming and ( "seeking" in str(e).lower() or "stream" in str(e).lower() ): raise RuntimeError( "Extraction failed in streaming mode due to archive structure limitations. " "This archive may require non-streaming mode for extraction. " f"Original error: {e}" ) from e else: raise
[docs] def extractfile(self, member: str | tarfile.TarInfo): """ Extract a file-like object from the archive. Args: member: Member name or TarInfo object Returns: File-like object or None if member is not a file """ if not self._tarfile: raise RuntimeError("Archive not open") if not self.mode.startswith("r"): raise RuntimeError("Archive not open for reading") return self._tarfile.extractfile(member)
[docs] def extractall( self, path: str | Path = ".", members: list[tarfile.TarInfo] | None = None, *, numeric_owner: bool = False, filter: str | Callable | None = "data", ): """ Extract all members from the archive. Args: path: Destination directory (default: current directory) members: Specific members to extract (None for all) numeric_owner: Whether to use numeric owner IDs filter: Extraction filter for security. Can be: - 'data': Safe filter for cross-platform data archives - 'tar': Honor most tar features but block dangerous ones - 'fully_trusted': Honor all metadata (trusted archives only) - None: Use default behavior (may show deprecation warning) - callable: Custom filter function Warning: Never extract archives from untrusted sources without proper filtering. The 'data' filter is recommended for most use cases as it prevents dangerous security issues like path traversal attacks. Note: In streaming mode, extracting specific members is not supported. Some extraction operations may be limited due to the sequential nature of streaming mode. See Also: :meth:`extract`: Extract a single member from the archive :func:`extract_archive`: Convenience function for extracting archives """ if not self._tarfile: raise RuntimeError("Archive not open") if not self.mode.startswith("r"): raise RuntimeError("Archive not open for reading") if self.streaming and members is not None: # Specific member extraction not supported in streaming mode raise RuntimeError( "Extracting specific members is not supported in streaming mode. " "Please use non-streaming mode for selective extraction, " "or extract all files." ) extract_path = Path(path) extract_path.mkdir(parents=True, exist_ok=True) try: # extractall() accepts numeric_owner, filter, and members parameters extractall_kwargs = { "numeric_owner": numeric_owner, "filter": filter, } if members is not None: extractall_kwargs["members"] = members self._tarfile.extractall(path=extract_path, **extractall_kwargs) except (tarfile.StreamError, OSError) as e: if self.streaming and ( "seeking" in str(e).lower() or "stream" in str(e).lower() ): raise RuntimeError( "Extraction failed in streaming mode due to archive " "structure limitations. This archive may require " "non-streaming mode for extraction. " f"Original error: {e}" ) from e else: raise
[docs] def getmembers(self) -> list[tarfile.TarInfo]: """Get list of all members in the archive.""" if not self._tarfile: raise RuntimeError("Archive not open") if not self.mode.startswith("r"): raise RuntimeError("Archive not open for reading") return self._tarfile.getmembers()
[docs] def getnames(self) -> list[str]: """Get list of all member names in the archive.""" if not self._tarfile: raise RuntimeError("Archive not open") if not self.mode.startswith("r"): raise RuntimeError("Archive not open for reading") return self._tarfile.getnames()
[docs] def list(self, verbose: bool = False) -> list[dict]: """ List contents of the archive. Args: verbose: Include detailed information Returns: List of file information dictionaries See Also: :meth:`getmembers`: Get TarInfo objects for all archive members :meth:`getnames`: Get names of all archive members :func:`list_archive`: Convenience function for listing archives """ if not self._tarfile: raise RuntimeError("Archive not open") if not self.mode.startswith("r"): raise RuntimeError("Archive not open for reading") members = self.getmembers() result = [] for member in members: info = { "name": member.name, "size": member.size, "is_file": member.isfile(), "is_dir": member.isdir(), "is_link": member.islnk(), "is_symlink": member.issym(), } if verbose: info.update( { "mode": member.mode, "uid": member.uid, "gid": member.gid, "mtime": member.mtime, "mtime_str": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(member.mtime) ), "linkname": member.linkname, "uname": member.uname, "gname": member.gname, } ) result.append(info) return result
[docs] def test(self) -> bool: """ Test the integrity of the archive. Returns: True if archive is valid, False otherwise See Also: :func:`test_archive`: Convenience function for testing archive integrity """ if not self._tarfile: raise RuntimeError("Archive not open") if not self.mode.startswith("r"): raise RuntimeError("Archive not open for reading") try: # Try to iterate through all members and read file contents for member in self.getmembers(): if member.isfile(): # Try to extract each file to verify integrity fileobj = self.extractfile(member) if fileobj: # Read the entire file to verify decompression while True: chunk = fileobj.read(8192) if not chunk: break return True except Exception: return False
# Convenience functions
[docs] def create_archive( archive_path: str | Path, files: Sequence[str | Path], compression_level: int = 3, use_temp_file: bool = True, ) -> None: """ Create a new .tzst archive with atomic file operations. Args: archive_path: Path for the new archive files: List of files/directories to add compression_level: Zstandard compression level (1-22) use_temp_file: If True, create archive in temporary file first, then move to final location for atomic operation See Also: :meth:`TzstArchive.add`: Method for adding files to an open archive """ # Validate compression level if not 1 <= compression_level <= 22: raise ValueError( f"Invalid compression level '{compression_level}'. Must be between 1 and 22." ) archive_path = Path(archive_path) # Ensure archive has correct extension if archive_path.suffix.lower() not in [".tzst", ".zst"]: if archive_path.suffix.lower() == ".tar": archive_path = archive_path.with_suffix(".tar.zst") else: archive_path = archive_path.with_suffix(archive_path.suffix + ".tzst") # Use temporary file for atomic operation if requested if use_temp_file: temp_fd = None temp_path = None try: # Create temporary file in same directory as target for atomic move temp_fd, temp_path_str = tempfile.mkstemp( suffix=".tmp", prefix=f".{archive_path.name}.", dir=archive_path.parent ) os.close(temp_fd) # Close file descriptor, we'll open with TzstArchive temp_path = Path(temp_path_str) # Create archive in temporary location _create_archive_impl(temp_path, files, compression_level) # Atomic move to final location temp_path.replace(archive_path) except Exception: # Clean up temporary file on error if temp_path and temp_path.exists(): try: temp_path.unlink() except Exception: pass raise else: # Direct creation (non-atomic) _create_archive_impl(archive_path, files, compression_level)
def _create_archive_impl( archive_path: Path, files: Sequence[str | Path], compression_level: int, ) -> None: """Internal implementation for creating archives.""" # Find common parent directory for relative paths if files: file_paths = [Path(f) for f in files if Path(f).exists()] if file_paths: # Special handling for current directory "." current_dir = Path.cwd().resolve() if len(file_paths) == 1 and ( str(file_paths[0]) == "." or file_paths[0].resolve() == current_dir ): # When adding current directory, add its contents without "./" prefix with TzstArchive(archive_path, "w", compression_level) as archive: # Add all items in current directory, excluding archive and temp files archive_abs_path = archive_path.resolve() archive_name = archive_path.name for item in Path(".").iterdir(): item_abs_path = item.resolve() # Skip archives and temp files for consistency if ( item_abs_path == archive_abs_path or item.name == archive_name or ( item.name.startswith(".") and item.name.endswith(".tmp") ) or item.suffix.lower() in [".tzst", ".zst"] or item.name.lower().endswith(".tar.zst") ): continue # Use item name as archive name to avoid "./" prefix item_name = str(item.name).replace("\\", "/") archive.add(str(item), arcname=item_name) else: # Find the common parent directory try: common_parent = Path( os.path.commonpath([p.parent for p in file_paths]) ) except ValueError: # No common path, use parent of first file common_parent = file_paths[ 0 ].parent # Change to common parent directory to get relative paths original_cwd = Path.cwd() # Convert archive path to absolute to avoid issues when changing working directory absolute_archive_path = archive_path.resolve() try: os.chdir(common_parent) with TzstArchive( absolute_archive_path, "w", compression_level ) as archive: for file_path in file_paths: # Calculate relative path from common parent relative_path = file_path.relative_to(common_parent) # Normalize path separators and remove Windows prefixes path_str = str(relative_path).replace("\\", "/") if path_str.startswith("./") or path_str.startswith(".\\"): path_str = path_str[2:] # Use arcname to control the name in the archive archive.add(str(relative_path), arcname=path_str) finally: os.chdir(original_cwd) else: raise FileNotFoundError("No valid files found") else: # Empty archive with TzstArchive(archive_path, "w", compression_level) as archive: pass
[docs] def extract_archive( archive_path: str | Path, extract_path: str | Path = ".", members: list[str] | None = None, flatten: bool = False, streaming: bool = False, filter: str | Callable | None = "data", conflict_resolution: ConflictResolution | str = ConflictResolution.REPLACE, interactive_callback: Callable[[Path], ConflictResolution] | None = None, ) -> None: """ Extract files from a .tzst archive. Args: archive_path: Path to the archive extract_path: Destination directory members: Specific members to extract (None for all) flatten: If True, extract without directory structure streaming: If True, use streaming mode (memory efficient for large archives) filter: Extraction filter for security. Can be: - 'data': Safe filter for cross-platform data archives (default) - 'tar': Honor most tar features but block dangerous ones - 'fully_trusted': Honor all metadata (use only for trusted archives) - None: Use default behavior (may show deprecation warning) - callable: Custom filter function conflict_resolution: How to handle file conflicts during extraction interactive_callback: Function to call for interactive conflict resolution Warning: Never extract archives from untrusted sources without proper filtering. The 'data' filter is recommended for most use cases as it prevents dangerous security issues like path traversal attacks. See Also: :meth:`TzstArchive.extract`: Method for extracting from an open archive """ with TzstArchive(archive_path, "r", streaming=streaming) as archive: # Convert string resolution to enum if needed if isinstance(conflict_resolution, str): try: conflict_resolution = ConflictResolution(conflict_resolution) except ValueError: conflict_resolution = ConflictResolution.REPLACE state = ConflictResolutionState(conflict_resolution) if flatten: # Extract files without directory structure extract_dir = Path(extract_path) extract_dir.mkdir(parents=True, exist_ok=True) if members: member_list = [m for m in archive.getmembers() if m.name in members] else: member_list = archive.getmembers() for member in member_list: if not state.should_continue(): break if member.isfile(): # Extract to flat directory filename = Path(member.name).name target_path = extract_dir / filename # Handle conflicts if target_path.exists(): current_resolution = ( state.global_resolution or conflict_resolution ) actual_resolution, final_path = _handle_file_conflict( target_path, current_resolution, interactive_callback ) state.update_resolution(actual_resolution) if actual_resolution in ( ConflictResolution.SKIP, ConflictResolution.SKIP_ALL, ): continue elif actual_resolution == ConflictResolution.EXIT: break target_path = final_path fileobj = archive.extractfile(member) if fileobj: with open(target_path, "wb") as f: f.write(fileobj.read()) else: # Extract with full directory structure if members: for member in members: if not state.should_continue(): break target_path = Path(extract_path) / member # Handle conflicts if target_path.exists(): current_resolution = ( state.global_resolution or conflict_resolution ) actual_resolution, final_path = _handle_file_conflict( target_path, current_resolution, interactive_callback ) state.update_resolution(actual_resolution) if actual_resolution in ( ConflictResolution.SKIP, ConflictResolution.SKIP_ALL, ): continue elif actual_resolution == ConflictResolution.EXIT: break # For AUTO_RENAME, we need to adjust the member path if actual_resolution in ( ConflictResolution.AUTO_RENAME, ConflictResolution.AUTO_RENAME_ALL, ): # Create parent directories for renamed file if final_path: final_path.parent.mkdir(parents=True, exist_ok=True) # Extract to temporary location, then move temp_extract_path = Path(tempfile.mkdtemp()) try: archive.extract( member, temp_extract_path, filter=filter ) temp_file = temp_extract_path / member if final_path: _move_file_cross_platform(temp_file, final_path) finally: # Clean up temp directory import shutil shutil.rmtree(temp_extract_path, ignore_errors=True) else: archive.extract(member, extract_path, filter=filter) else: archive.extract(member, extract_path, filter=filter) else: # For extractall, we need a different approach # We'll extract to a temp location and handle conflicts file by file temp_extract_path = Path(tempfile.mkdtemp()) try: archive.extractall(temp_extract_path, filter=filter) # Move files with conflict resolution for temp_file in temp_extract_path.rglob("*"): if not state.should_continue(): break if temp_file.is_file(): rel_path = temp_file.relative_to(temp_extract_path) target_path = Path(extract_path) / rel_path # Create parent directories target_path.parent.mkdir( parents=True, exist_ok=True ) # Handle conflicts if target_path.exists(): current_resolution = ( state.global_resolution or conflict_resolution ) actual_resolution, final_path = _handle_file_conflict( target_path, current_resolution, interactive_callback, ) state.update_resolution(actual_resolution) if actual_resolution in ( ConflictResolution.SKIP, ConflictResolution.SKIP_ALL, ): continue elif actual_resolution == ConflictResolution.EXIT: break target_path = final_path # Handle file replacement on Windows if target_path and target_path.exists(): if actual_resolution in ( ConflictResolution.REPLACE, ConflictResolution.REPLACE_ALL, ): target_path.unlink() # Remove existing file if target_path: _move_file_cross_platform(temp_file, target_path) finally: # Clean up temp directory import shutil shutil.rmtree(temp_extract_path, ignore_errors=True)
[docs] def list_archive( archive_path: str | Path, verbose: bool = False, streaming: bool = False ) -> list[dict]: """ List contents of a .tzst archive. Args: archive_path: Path to the archive verbose: Include detailed information streaming: If True, use streaming mode (memory efficient for large archives) Returns: List of file information dictionaries See Also: :meth:`TzstArchive.list`: Method for listing an open archive """ with TzstArchive(archive_path, "r", streaming=streaming) as archive: return archive.list(verbose=verbose)
[docs] def test_archive(archive_path: str | Path, streaming: bool = False) -> bool: """ Test the integrity of a .tzst archive. Args: archive_path: Path to the archive streaming: If True, use streaming mode (memory efficient for large archives) Returns: True if archive is valid, False otherwise See Also: :meth:`TzstArchive.test`: Method for testing an open archive """ try: # Open a fresh archive instance for testing with TzstArchive(archive_path, "r", streaming=streaming) as archive: # Try to iterate through all members and read file contents for member in archive.getmembers(): if member.isfile(): # In streaming mode, extractfile may not work properly with r| mode # So we'll just check that we can iterate through members if streaming: # For streaming mode, just verify we can read the member info # This tests that the archive structure is valid continue else: # Try to extract each file to verify integrity fileobj = archive.extractfile(member) if fileobj: # Read the entire file to verify decompression while True: chunk = fileobj.read(8192) if not chunk: break return True except Exception: return False