Spaces:
Sleeping
Sleeping
| """ | |
| Processing Service - Async document processing | |
| Handles document processing workflow integration with the existing | |
| ingestion pipeline and vector database. Provides async processing | |
| with status tracking and queue management. | |
| """ | |
| import logging | |
| import os | |
| import threading | |
| from datetime import datetime | |
| from queue import Empty, Queue | |
| from typing import Any, Callable, Dict, List, Optional | |
| from .document_service import DocumentStatus | |
| class ProcessingJob: | |
| """Represents a document processing job""" | |
| def __init__(self, file_info: Dict[str, Any], processing_options: Dict[str, Any] = None): | |
| self.job_id = file_info["file_id"] | |
| self.file_info = file_info | |
| self.processing_options = processing_options or {} | |
| self.status = DocumentStatus.UPLOADED | |
| self.progress = 0.0 | |
| self.created_at = datetime.utcnow() | |
| self.started_at = None | |
| self.completed_at = None | |
| self.error_message = None | |
| self.result = None | |
| class ProcessingService: | |
| """ | |
| Async document processing service that integrates with existing RAG pipeline. | |
| This service manages the document processing queue and coordinates with | |
| the existing ingestion pipeline for seamless integration. | |
| """ | |
| def __init__(self, max_workers: int = 2): | |
| """ | |
| Initialize the processing service. | |
| Args: | |
| max_workers: Maximum number of concurrent processing jobs | |
| """ | |
| self.max_workers = max_workers | |
| self.job_queue = Queue() | |
| self.active_jobs = {} | |
| self.completed_jobs = {} | |
| self.failed_jobs = {} | |
| self.workers = [] | |
| self.running = False | |
| self.status_callbacks = [] | |
| logging.info(f"ProcessingService initialized with {max_workers} workers") | |
| def start(self): | |
| """Start the processing service""" | |
| if self.running: | |
| return | |
| self.running = True | |
| # Start worker threads | |
| for i in range(self.max_workers): | |
| worker = threading.Thread(target=self._worker_loop, name=f"ProcessingWorker-{i}") | |
| worker.daemon = True | |
| worker.start() | |
| self.workers.append(worker) | |
| logging.info(f"ProcessingService started with {len(self.workers)} workers") | |
| def stop(self): | |
| """Stop the processing service""" | |
| self.running = False | |
| # Add sentinel values to wake up workers | |
| for _ in range(self.max_workers): | |
| self.job_queue.put(None) | |
| # Wait for workers to finish | |
| for worker in self.workers: | |
| worker.join(timeout=5.0) | |
| self.workers.clear() | |
| logging.info("ProcessingService stopped") | |
| def submit_job(self, file_info: Dict[str, Any], processing_options: Dict[str, Any] = None) -> str: | |
| """ | |
| Submit a document for processing. | |
| Args: | |
| file_info: File information from document service | |
| processing_options: Processing configuration options | |
| Returns: | |
| Job ID for tracking | |
| """ | |
| job = ProcessingJob(file_info, processing_options) | |
| # Add to active jobs tracking | |
| self.active_jobs[job.job_id] = job | |
| # Add to processing queue | |
| self.job_queue.put(job) | |
| original_name = file_info["original_name"] | |
| logging.info(f"Submitted processing job {job.job_id} for file {original_name}") | |
| # Notify status callbacks | |
| self._notify_status_change(job, DocumentStatus.UPLOADED) | |
| return job.job_id | |
| def get_job_status(self, job_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get status of a processing job. | |
| Args: | |
| job_id: Job ID to check | |
| Returns: | |
| Job status information or None if not found | |
| """ | |
| # Check active jobs | |
| if job_id in self.active_jobs: | |
| job = self.active_jobs[job_id] | |
| return self._job_to_dict(job) | |
| # Check completed jobs | |
| if job_id in self.completed_jobs: | |
| job = self.completed_jobs[job_id] | |
| return self._job_to_dict(job) | |
| # Check failed jobs | |
| if job_id in self.failed_jobs: | |
| job = self.failed_jobs[job_id] | |
| return self._job_to_dict(job) | |
| return None | |
| def get_queue_status(self) -> Dict[str, Any]: | |
| """ | |
| Get overall queue status. | |
| Returns: | |
| Queue status information | |
| """ | |
| return { | |
| "queue_size": self.job_queue.qsize(), | |
| "active_jobs": len(self.active_jobs), | |
| "completed_jobs": len(self.completed_jobs), | |
| "failed_jobs": len(self.failed_jobs), | |
| "workers_running": len(self.workers), | |
| "service_running": self.running, | |
| } | |
| def get_all_jobs(self, status_filter: str = None) -> List[Dict[str, Any]]: | |
| """ | |
| Get all jobs, optionally filtered by status. | |
| Args: | |
| status_filter: Optional status to filter by | |
| Returns: | |
| List of job information | |
| """ | |
| jobs = [] | |
| # Add active jobs | |
| for job in self.active_jobs.values(): | |
| if not status_filter or job.status.value == status_filter: | |
| jobs.append(self._job_to_dict(job)) | |
| # Add completed jobs | |
| for job in self.completed_jobs.values(): | |
| if not status_filter or job.status.value == status_filter: | |
| jobs.append(self._job_to_dict(job)) | |
| # Add failed jobs | |
| for job in self.failed_jobs.values(): | |
| if not status_filter or job.status.value == status_filter: | |
| jobs.append(self._job_to_dict(job)) | |
| # Sort by created time (newest first) | |
| jobs.sort(key=lambda x: x["created_at"], reverse=True) | |
| return jobs | |
| def add_status_callback(self, callback: Callable[[str, DocumentStatus], None]): | |
| """ | |
| Add a callback for status change notifications. | |
| Args: | |
| callback: Function to call when job status changes | |
| """ | |
| self.status_callbacks.append(callback) | |
| def _worker_loop(self): | |
| """Main worker loop for processing jobs""" | |
| while self.running: | |
| try: | |
| # Get next job from queue (blocks until available) | |
| job = self.job_queue.get(timeout=1.0) | |
| # Check for sentinel value (stop signal) | |
| if job is None: | |
| break | |
| # Process the job | |
| self._process_job(job) | |
| except Empty: | |
| # Normal timeout when no jobs are available - continue polling | |
| continue | |
| except Exception as e: | |
| logging.error(f"Worker error: {e}", exc_info=True) | |
| def _process_job(self, job: ProcessingJob): | |
| """ | |
| Process a single document job. | |
| Args: | |
| job: ProcessingJob to process | |
| """ | |
| try: | |
| job.started_at = datetime.utcnow() | |
| job.status = DocumentStatus.VALIDATING | |
| job.progress = 10.0 | |
| self._notify_status_change(job, DocumentStatus.VALIDATING) | |
| # Step 1: Validation | |
| if not self._validate_file(job): | |
| return | |
| # Step 2: Parse document | |
| job.status = DocumentStatus.PARSING | |
| job.progress = 25.0 | |
| self._notify_status_change(job, DocumentStatus.PARSING) | |
| parsed_content = self._parse_document(job) | |
| if not parsed_content: | |
| return | |
| # Step 3: Chunk document | |
| job.status = DocumentStatus.CHUNKING | |
| job.progress = 50.0 | |
| self._notify_status_change(job, DocumentStatus.CHUNKING) | |
| chunks = self._chunk_document(job, parsed_content) | |
| if not chunks: | |
| return | |
| # Step 4: Generate embeddings | |
| job.status = DocumentStatus.EMBEDDING | |
| job.progress = 75.0 | |
| self._notify_status_change(job, DocumentStatus.EMBEDDING) | |
| embeddings = self._generate_embeddings(job, chunks) | |
| if not embeddings: | |
| return | |
| # Step 5: Index in vector database | |
| job.status = DocumentStatus.INDEXING | |
| job.progress = 90.0 | |
| self._notify_status_change(job, DocumentStatus.INDEXING) | |
| if not self._index_document(job, chunks, embeddings): | |
| return | |
| # Completion | |
| job.status = DocumentStatus.COMPLETED | |
| job.progress = 100.0 | |
| job.completed_at = datetime.utcnow() | |
| # Store result | |
| job.result = { | |
| "chunks_created": len(chunks), | |
| "embeddings_generated": len(embeddings), | |
| "processing_time": (job.completed_at - job.started_at).total_seconds(), | |
| } | |
| # Move to completed jobs | |
| self.completed_jobs[job.job_id] = job | |
| if job.job_id in self.active_jobs: | |
| del self.active_jobs[job.job_id] | |
| self._notify_status_change(job, DocumentStatus.COMPLETED) | |
| logging.info(f"Successfully processed job {job.job_id}") | |
| except Exception as e: | |
| self._handle_job_error(job, str(e)) | |
| def _validate_file(self, job: ProcessingJob) -> bool: | |
| """Validate file before processing""" | |
| try: | |
| file_path = job.file_info["file_path"] | |
| # Check if file exists | |
| if not os.path.exists(file_path): | |
| raise ValueError(f"File not found: {file_path}") | |
| # Check file size | |
| file_size = os.path.getsize(file_path) | |
| if file_size == 0: | |
| raise ValueError("File is empty") | |
| return True | |
| except Exception as e: | |
| self._handle_job_error(job, f"Validation failed: {e}") | |
| return False | |
| def _parse_document(self, job: ProcessingJob) -> Optional[str]: | |
| """Parse document content""" | |
| try: | |
| # This would integrate with existing document parsing logic | |
| # For now, simulate parsing based on file type | |
| file_path = job.file_info["file_path"] | |
| file_ext = job.file_info.get("file_extension", "").lower() | |
| if file_ext in [".txt", ".md"]: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| else: | |
| # For other formats, would use appropriate parsers | |
| # (PyPDF2 for PDF, python-docx for Word, etc.) | |
| return f"Parsed content from {file_path}" | |
| except Exception as e: | |
| self._handle_job_error(job, f"Parsing failed: {e}") | |
| return None | |
| def _chunk_document(self, job: ProcessingJob, content: str) -> Optional[List[str]]: | |
| """Chunk document content""" | |
| try: | |
| # This would integrate with existing chunking logic from ingestion pipeline | |
| # For now, simulate chunking | |
| chunk_size = job.processing_options.get("chunk_size", 1000) | |
| overlap = job.processing_options.get("overlap", 200) | |
| chunks = [] | |
| start = 0 | |
| while start < len(content): | |
| end = start + chunk_size | |
| chunk = content[start:end] | |
| chunks.append(chunk) | |
| start = end - overlap | |
| return chunks | |
| except Exception as e: | |
| self._handle_job_error(job, f"Chunking failed: {e}") | |
| return None | |
| def _generate_embeddings(self, job: ProcessingJob, chunks: List[str]) -> Optional[List[List[float]]]: | |
| """Generate embeddings for chunks""" | |
| try: | |
| # This would integrate with existing embedding service | |
| # For now, simulate embedding generation | |
| embeddings = [] | |
| for chunk in chunks: | |
| # Simulate embedding vector (384 dimensions for sentence-transformers) | |
| embedding = [0.1] * 384 # Placeholder | |
| embeddings.append(embedding) | |
| return embeddings | |
| except Exception as e: | |
| self._handle_job_error(job, f"Embedding generation failed: {e}") | |
| return None | |
| def _index_document(self, job: ProcessingJob, chunks: List[str], embeddings: List[List[float]]) -> bool: | |
| """Index document in vector database""" | |
| try: | |
| # This would integrate with existing vector database | |
| # For now, simulate indexing | |
| logging.info(f"Indexing {len(chunks)} chunks for job {job.job_id}") | |
| return True | |
| except Exception as e: | |
| self._handle_job_error(job, f"Indexing failed: {e}") | |
| return False | |
| def _handle_job_error(self, job: ProcessingJob, error_message: str): | |
| """Handle job processing error""" | |
| job.status = DocumentStatus.FAILED | |
| job.error_message = error_message | |
| job.completed_at = datetime.utcnow() | |
| # Move to failed jobs | |
| self.failed_jobs[job.job_id] = job | |
| if job.job_id in self.active_jobs: | |
| del self.active_jobs[job.job_id] | |
| self._notify_status_change(job, DocumentStatus.FAILED) | |
| logging.error(f"Job {job.job_id} failed: {error_message}") | |
| def _notify_status_change(self, job: ProcessingJob, status: DocumentStatus): | |
| """Notify registered callbacks of status change""" | |
| for callback in self.status_callbacks: | |
| try: | |
| callback(job.job_id, status) | |
| except Exception as e: | |
| logging.error(f"Status callback error: {e}") | |
| def _job_to_dict(self, job: ProcessingJob) -> Dict[str, Any]: | |
| """Convert ProcessingJob to dictionary""" | |
| return { | |
| "job_id": job.job_id, | |
| "file_info": job.file_info, | |
| "status": job.status.value, | |
| "progress": job.progress, | |
| "created_at": job.created_at.isoformat(), | |
| "started_at": job.started_at.isoformat() if job.started_at else None, | |
| "completed_at": job.completed_at.isoformat() if job.completed_at else None, | |
| "error_message": job.error_message, | |
| "result": job.result, | |
| "processing_options": job.processing_options, | |
| } | |