Dipan04's picture
Deploy Invoice Digitization Agent
8a859a8
"""
Background worker for processing ingest jobs.
Consumes jobs from Redis queue and processes them.
"""
import sqlite3
import logging
from pathlib import Path
from datetime import datetime
from filelock import FileLock
from typing import Dict
import traceback
from .text_extractor import extract_text
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Paths
BASE_DIR = Path(__file__).parent.parent.parent
DB_PATH = BASE_DIR / "data" / "invoices.db"
LOCK_PATH = BASE_DIR / "data" / "invoices.db.lock"
def update_job_status(job_id: str, status: str, error_message: str = None):
"""Update job status in database."""
with FileLock(str(LOCK_PATH), timeout=10):
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
if status == "processing":
cursor.execute("""
UPDATE ingest_jobs
SET status = ?, started_at = CURRENT_TIMESTAMP
WHERE job_id = ?
""", (status, job_id))
elif status == "completed":
cursor.execute("""
UPDATE ingest_jobs
SET status = ?, completed_at = CURRENT_TIMESTAMP
WHERE job_id = ?
""", (status, job_id))
elif status == "failed":
cursor.execute("""
UPDATE ingest_jobs
SET status = ?, error_message = ?, completed_at = CURRENT_TIMESTAMP
WHERE job_id = ?
""", (status, error_message, job_id))
conn.commit()
conn.close()
def save_extraction(document_id: int, raw_text: str, metadata: Dict):
"""Save extracted text to database."""
with FileLock(str(LOCK_PATH), timeout=10):
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
cursor.execute("""
INSERT INTO extractions (
document_id,
raw_text,
page_count,
extraction_method,
confidence_score
) VALUES (?, ?, ?, ?, ?)
""", (
document_id,
raw_text,
metadata.get('page_count'),
metadata.get('extraction_method'),
metadata.get('confidence_score')
))
conn.commit()
conn.close()
def process_job(job_data: Dict):
"""
Process a single ingest job.
Args:
job_data: Dict with job_id, document_id, file_path, mime_type
"""
job_id = job_data['job_id']
document_id = job_data['document_id']
file_path = Path(job_data['file_path'])
mime_type = job_data['mime_type']
logger.info(f"Processing job {job_id} for document {document_id}")
try:
# Update status to processing
update_job_status(job_id, "processing")
# Extract text
logger.info(f"Extracting text from {file_path}")
raw_text, metadata = extract_text(file_path, mime_type)
if not raw_text or len(raw_text.strip()) < 10:
raise ValueError("No text extracted or text too short")
logger.info(f"Extracted {len(raw_text)} characters, {metadata['page_count']} pages")
# Save to database
save_extraction(document_id, raw_text, metadata)
# Update status to completed
update_job_status(job_id, "completed")
logger.info(f"Job {job_id} completed successfully")
except Exception as e:
error_msg = f"{type(e).__name__}: {str(e)}"
logger.error(f"Job {job_id} failed: {error_msg}")
logger.error(traceback.format_exc())
# Update status to failed
update_job_status(job_id, "failed", error_msg)
raise