File size: 1,974 Bytes
f4c61e6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | """ZIP archive parser - extracts and delegates to txt/epub parsers."""
import logging
import tempfile
import zipfile
from pathlib import Path
from .txt_parser import parse_txt
from .epub_parser import parse_epub
logger = logging.getLogger(__name__)
PARSERS = {
".txt": parse_txt,
".epub": parse_epub,
}
def parse_zip(filepath: Path) -> str:
"""Extract a ZIP archive and parse all supported files inside.
Args:
filepath: Path to the .zip file.
Returns:
Combined text from all supported files in the archive.
"""
if not zipfile.is_zipfile(str(filepath)):
logger.error("Not a valid ZIP file: %s", filepath.name)
return ""
texts = []
with tempfile.TemporaryDirectory() as tmpdir:
tmppath = Path(tmpdir)
try:
with zipfile.ZipFile(str(filepath), "r") as zf:
zf.extractall(tmppath)
except Exception as e:
logger.error("Failed to extract ZIP %s: %s", filepath.name, e)
return ""
# Find all supported files recursively
supported_files = []
for ext, parser in PARSERS.items():
for f in tmppath.rglob(f"*{ext}"):
if f.is_file() and not f.name.startswith("."):
supported_files.append((f, parser))
if not supported_files:
logger.warning("No supported files found in ZIP: %s", filepath.name)
return ""
supported_files.sort(key=lambda x: x[0].name)
logger.info("Found %d supported files in ZIP %s", len(supported_files), filepath.name)
for f, parser in supported_files:
try:
text = parser(f)
if text.strip():
texts.append(text)
logger.info(" Parsed %s (%d chars)", f.name, len(text))
except Exception as e:
logger.error(" Failed to parse %s: %s", f.name, e)
return "\n\n".join(texts)
|