import logging import time from src.queue_manager import queue_manager from src.database import db from src.text_processor import text_processor from src.embeddings import embedding_service from src.vector_store import vector_store logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class Worker: def __init__(self): self.running = False def process_job(self, job_data: dict): url_id = job_data["url_id"] url = job_data["url"] logger.info(f"Processing URL: {url} (ID: {url_id})") try: db.update_url_status(url_id, "processing") logger.info(f"Fetching and cleaning webpage content...") result = text_processor.process_url(url) chunks = result["chunks"] chunk_count = result["chunk_count"] logger.info(f"Extracted {chunk_count} chunks from {result['word_count']} words") if chunk_count == 0: db.update_url_status( url_id, "failed", error_message="No content extracted from URL" ) logger.warning(f"No content extracted from {url}") return logger.info(f"Generating embeddings for {chunk_count} chunks...") chunk_texts = [chunk["text"] for chunk in chunks] embeddings = embedding_service.embed_batch(chunk_texts) logger.info(f"Storing chunks in vector database...") stored_count = vector_store.store_chunks(url_id, url, chunks, embeddings) metadata = { "word_count": result["word_count"], "chunk_count": chunk_count } db.update_url_status( url_id, "completed", chunk_count=chunk_count, metadata=metadata ) logger.info(f"Successfully processed URL: {url} ({stored_count} chunks stored)") except Exception as e: logger.error(f"Error processing URL {url}: {str(e)}") db.update_url_status( url_id, "failed", error_message=str(e) ) def run(self): self.running = True logger.info("Worker started. Listening for jobs...") while self.running: try: job = queue_manager.dequeue(timeout=5) if job: self.process_job(job) except KeyboardInterrupt: logger.info("Worker shutting down...") self.running = False break except Exception as e: logger.error(f"Error in worker loop: {str(e)}") time.sleep(5) if __name__ == "__main__": worker = Worker() worker.run()