Spaces:
Sleeping
Sleeping
| """ | |
| Background worker for processing ingest jobs. | |
| Consumes jobs from Redis queue and processes them. | |
| """ | |
| import sqlite3 | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime | |
| from filelock import FileLock | |
| from typing import Dict | |
| import traceback | |
| from .text_extractor import extract_text | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Paths | |
| BASE_DIR = Path(__file__).parent.parent.parent | |
| DB_PATH = BASE_DIR / "data" / "invoices.db" | |
| LOCK_PATH = BASE_DIR / "data" / "invoices.db.lock" | |
| def update_job_status(job_id: str, status: str, error_message: str = None): | |
| """Update job status in database.""" | |
| with FileLock(str(LOCK_PATH), timeout=10): | |
| conn = sqlite3.connect(str(DB_PATH)) | |
| cursor = conn.cursor() | |
| if status == "processing": | |
| cursor.execute(""" | |
| UPDATE ingest_jobs | |
| SET status = ?, started_at = CURRENT_TIMESTAMP | |
| WHERE job_id = ? | |
| """, (status, job_id)) | |
| elif status == "completed": | |
| cursor.execute(""" | |
| UPDATE ingest_jobs | |
| SET status = ?, completed_at = CURRENT_TIMESTAMP | |
| WHERE job_id = ? | |
| """, (status, job_id)) | |
| elif status == "failed": | |
| cursor.execute(""" | |
| UPDATE ingest_jobs | |
| SET status = ?, error_message = ?, completed_at = CURRENT_TIMESTAMP | |
| WHERE job_id = ? | |
| """, (status, error_message, job_id)) | |
| conn.commit() | |
| conn.close() | |
| def save_extraction(document_id: int, raw_text: str, metadata: Dict): | |
| """Save extracted text to database.""" | |
| with FileLock(str(LOCK_PATH), timeout=10): | |
| conn = sqlite3.connect(str(DB_PATH)) | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO extractions ( | |
| document_id, | |
| raw_text, | |
| page_count, | |
| extraction_method, | |
| confidence_score | |
| ) VALUES (?, ?, ?, ?, ?) | |
| """, ( | |
| document_id, | |
| raw_text, | |
| metadata.get('page_count'), | |
| metadata.get('extraction_method'), | |
| metadata.get('confidence_score') | |
| )) | |
| conn.commit() | |
| conn.close() | |
| def process_job(job_data: Dict): | |
| """ | |
| Process a single ingest job. | |
| Args: | |
| job_data: Dict with job_id, document_id, file_path, mime_type | |
| """ | |
| job_id = job_data['job_id'] | |
| document_id = job_data['document_id'] | |
| file_path = Path(job_data['file_path']) | |
| mime_type = job_data['mime_type'] | |
| logger.info(f"Processing job {job_id} for document {document_id}") | |
| try: | |
| # Update status to processing | |
| update_job_status(job_id, "processing") | |
| # Extract text | |
| logger.info(f"Extracting text from {file_path}") | |
| raw_text, metadata = extract_text(file_path, mime_type) | |
| if not raw_text or len(raw_text.strip()) < 10: | |
| raise ValueError("No text extracted or text too short") | |
| logger.info(f"Extracted {len(raw_text)} characters, {metadata['page_count']} pages") | |
| # Save to database | |
| save_extraction(document_id, raw_text, metadata) | |
| # Update status to completed | |
| update_job_status(job_id, "completed") | |
| logger.info(f"Job {job_id} completed successfully") | |
| except Exception as e: | |
| error_msg = f"{type(e).__name__}: {str(e)}" | |
| logger.error(f"Job {job_id} failed: {error_msg}") | |
| logger.error(traceback.format_exc()) | |
| # Update status to failed | |
| update_job_status(job_id, "failed", error_msg) | |
| raise |