""" Background worker for processing ingest jobs. Consumes jobs from Redis queue and processes them. """ import sqlite3 import logging from pathlib import Path from datetime import datetime from filelock import FileLock from typing import Dict import traceback from .text_extractor import extract_text # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Paths BASE_DIR = Path(__file__).parent.parent.parent DB_PATH = BASE_DIR / "data" / "invoices.db" LOCK_PATH = BASE_DIR / "data" / "invoices.db.lock" def update_job_status(job_id: str, status: str, error_message: str = None): """Update job status in database.""" with FileLock(str(LOCK_PATH), timeout=10): conn = sqlite3.connect(str(DB_PATH)) cursor = conn.cursor() if status == "processing": cursor.execute(""" UPDATE ingest_jobs SET status = ?, started_at = CURRENT_TIMESTAMP WHERE job_id = ? """, (status, job_id)) elif status == "completed": cursor.execute(""" UPDATE ingest_jobs SET status = ?, completed_at = CURRENT_TIMESTAMP WHERE job_id = ? """, (status, job_id)) elif status == "failed": cursor.execute(""" UPDATE ingest_jobs SET status = ?, error_message = ?, completed_at = CURRENT_TIMESTAMP WHERE job_id = ? """, (status, error_message, job_id)) conn.commit() conn.close() def save_extraction(document_id: int, raw_text: str, metadata: Dict): """Save extracted text to database.""" with FileLock(str(LOCK_PATH), timeout=10): conn = sqlite3.connect(str(DB_PATH)) cursor = conn.cursor() cursor.execute(""" INSERT INTO extractions ( document_id, raw_text, page_count, extraction_method, confidence_score ) VALUES (?, ?, ?, ?, ?) """, ( document_id, raw_text, metadata.get('page_count'), metadata.get('extraction_method'), metadata.get('confidence_score') )) conn.commit() conn.close() def process_job(job_data: Dict): """ Process a single ingest job. Args: job_data: Dict with job_id, document_id, file_path, mime_type """ job_id = job_data['job_id'] document_id = job_data['document_id'] file_path = Path(job_data['file_path']) mime_type = job_data['mime_type'] logger.info(f"Processing job {job_id} for document {document_id}") try: # Update status to processing update_job_status(job_id, "processing") # Extract text logger.info(f"Extracting text from {file_path}") raw_text, metadata = extract_text(file_path, mime_type) if not raw_text or len(raw_text.strip()) < 10: raise ValueError("No text extracted or text too short") logger.info(f"Extracted {len(raw_text)} characters, {metadata['page_count']} pages") # Save to database save_extraction(document_id, raw_text, metadata) # Update status to completed update_job_status(job_id, "completed") logger.info(f"Job {job_id} completed successfully") except Exception as e: error_msg = f"{type(e).__name__}: {str(e)}" logger.error(f"Job {job_id} failed: {error_msg}") logger.error(traceback.format_exc()) # Update status to failed update_job_status(job_id, "failed", error_msg) raise