Spaces:
Sleeping
Sleeping
| import logging | |
| import time | |
| from src.queue_manager import queue_manager | |
| from src.database import db | |
| from src.text_processor import text_processor | |
| from src.embeddings import embedding_service | |
| from src.vector_store import vector_store | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class Worker: | |
| def __init__(self): | |
| self.running = False | |
| def process_job(self, job_data: dict): | |
| url_id = job_data["url_id"] | |
| url = job_data["url"] | |
| logger.info(f"Processing URL: {url} (ID: {url_id})") | |
| try: | |
| db.update_url_status(url_id, "processing") | |
| logger.info(f"Fetching and cleaning webpage content...") | |
| result = text_processor.process_url(url) | |
| chunks = result["chunks"] | |
| chunk_count = result["chunk_count"] | |
| logger.info(f"Extracted {chunk_count} chunks from {result['word_count']} words") | |
| if chunk_count == 0: | |
| db.update_url_status( | |
| url_id, | |
| "failed", | |
| error_message="No content extracted from URL" | |
| ) | |
| logger.warning(f"No content extracted from {url}") | |
| return | |
| logger.info(f"Generating embeddings for {chunk_count} chunks...") | |
| chunk_texts = [chunk["text"] for chunk in chunks] | |
| embeddings = embedding_service.embed_batch(chunk_texts) | |
| logger.info(f"Storing chunks in vector database...") | |
| stored_count = vector_store.store_chunks(url_id, url, chunks, embeddings) | |
| metadata = { | |
| "word_count": result["word_count"], | |
| "chunk_count": chunk_count | |
| } | |
| db.update_url_status( | |
| url_id, | |
| "completed", | |
| chunk_count=chunk_count, | |
| metadata=metadata | |
| ) | |
| logger.info(f"Successfully processed URL: {url} ({stored_count} chunks stored)") | |
| except Exception as e: | |
| logger.error(f"Error processing URL {url}: {str(e)}") | |
| db.update_url_status( | |
| url_id, | |
| "failed", | |
| error_message=str(e) | |
| ) | |
| def run(self): | |
| self.running = True | |
| logger.info("Worker started. Listening for jobs...") | |
| while self.running: | |
| try: | |
| job = queue_manager.dequeue(timeout=5) | |
| if job: | |
| self.process_job(job) | |
| except KeyboardInterrupt: | |
| logger.info("Worker shutting down...") | |
| self.running = False | |
| break | |
| except Exception as e: | |
| logger.error(f"Error in worker loop: {str(e)}") | |
| time.sleep(5) | |
| if __name__ == "__main__": | |
| worker = Worker() | |
| worker.run() | |