mshauri-fedha / src /transform /download_files.py
teofizzy's picture
prototype stage
7011b92
import os
import json
import zipfile
import time
import gc
import threading
import shutil
import requests
import subprocess
import pymupdf
import pdfplumber
import pandas as pd
import urllib3
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Import shared config
from config import ProcessingConfig
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class UniversalProcessor:
def __init__(self, config: ProcessingConfig):
self.config = config
retry = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 504])
adapter = HTTPAdapter(max_retries=retry)
self.session = requests.Session()
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
def download(self, url, safe_title) -> Path:
try:
response = self.session.get(url, timeout=30, stream=True, verify=False)
response.raise_for_status()
ext = Path(url).suffix.lower() or '.pdf'
safe_name = safe_title[:50].replace(' ', '_').replace('/', '_')
filepath = self.config.local_dirs['pdfs'] / f"{safe_name}{ext}"
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return filepath
except: return None
def process(self, filepath: Path, url: str, safe_title: str):
# Only does basic validation/extraction for the initial pass
try:
res = subprocess.run(["python", "pdf_canary.py", str(filepath)], capture_output=True, timeout=15)
if res.returncode != 0: return None
# Basic PyMuPDF extraction for quick preview
doc = pymupdf.open(filepath)
text = "".join([page.get_text() for page in doc])
doc.close()
return {
'text': text,
'tables': [],
'images': [],
'metadata': {'pages': len(doc)}
}
except: return None
class BatchPipeline:
def __init__(self, config: ProcessingConfig, processor: UniversalProcessor):
self.config = config
self.processor = processor
self.lock = threading.Lock()
self.config.setup()
def _append_log(self, log_key, record):
with self.lock:
with open(self.config.logs[log_key], 'a', encoding='utf-8') as f:
f.write(json.dumps(record) + '\n')
def _worker(self, item):
row = item['row']
title = str(row.get('text', 'untitled'))
url = row['file_url']
path = self.processor.download(url, title)
if not path or path.stat().st_size < 500: return None
data = self.processor.process(path, url, title)
if not data: return None
# Save Preview Text
with open(self.config.local_dirs['texts'] / f"{path.stem}.txt", 'w') as f:
f.write(data['text'])
self._append_log('docs', {'url': url, 'file': path.name, 'status': 'downloaded'})
return True
def _zip_and_ship(self, batch_id):
ts = datetime.now().strftime("%H%M%S")
zname = f"{self.config.source_name}_{batch_id}_{ts}.zip"
local_z = Path(f"/tmp/{zname}")
drive_z = Path(self.config.drive_zip_dir) / zname
with zipfile.ZipFile(local_z, 'w', zipfile.ZIP_DEFLATED) as z:
for root, _, files in os.walk(self.config.local_work_dir):
for f in files:
fp = os.path.join(root, f)
z.write(fp, os.path.relpath(fp, self.config.local_work_dir))
shutil.copy(local_z, drive_z)
self.config.setup() # Wipe local
os.remove(local_z)
def run(self, df, ignore_history=False):
done = set()
if not ignore_history and os.path.exists(self.config.logs['docs']):
with open(self.config.logs['docs']) as f:
for l in f:
try: done.add(json.loads(l)['url'])
except: continue
queue = [r for _, r in df.iterrows() if r['file_url'] not in done]
print(f" Queued: {len(queue)} files.")
bs = self.config.batch_size
for i in range(0, len(queue), bs):
batch = queue[i:i+bs]
bid = f"batch_{i//bs + 1}"
print(f"Processing {bid}...")
with ThreadPoolExecutor(max_workers=self.config.max_workers) as ex:
futures = [ex.submit(self._worker, {'row': item}) for item in batch]
for _ in tqdm(as_completed(futures), total=len(batch)): pass
self._zip_and_ship(bid)
gc.collect()
if __name__ == "__main__":
# Example Usage
ROOT_DIR = "/scratch/user/mshauri_data"
conf = ProcessingConfig(root_dir=ROOT_DIR, source_name='cbk')
pipe = BatchPipeline(conf, UniversalProcessor(conf))