WebRAG / src /worker.py
Arun21102003
Initial clean commit
97f9138
import logging
import time
from src.queue_manager import queue_manager
from src.database import db
from src.text_processor import text_processor
from src.embeddings import embedding_service
from src.vector_store import vector_store
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class Worker:
def __init__(self):
self.running = False
def process_job(self, job_data: dict):
url_id = job_data["url_id"]
url = job_data["url"]
logger.info(f"Processing URL: {url} (ID: {url_id})")
try:
db.update_url_status(url_id, "processing")
logger.info(f"Fetching and cleaning webpage content...")
result = text_processor.process_url(url)
chunks = result["chunks"]
chunk_count = result["chunk_count"]
logger.info(f"Extracted {chunk_count} chunks from {result['word_count']} words")
if chunk_count == 0:
db.update_url_status(
url_id,
"failed",
error_message="No content extracted from URL"
)
logger.warning(f"No content extracted from {url}")
return
logger.info(f"Generating embeddings for {chunk_count} chunks...")
chunk_texts = [chunk["text"] for chunk in chunks]
embeddings = embedding_service.embed_batch(chunk_texts)
logger.info(f"Storing chunks in vector database...")
stored_count = vector_store.store_chunks(url_id, url, chunks, embeddings)
metadata = {
"word_count": result["word_count"],
"chunk_count": chunk_count
}
db.update_url_status(
url_id,
"completed",
chunk_count=chunk_count,
metadata=metadata
)
logger.info(f"Successfully processed URL: {url} ({stored_count} chunks stored)")
except Exception as e:
logger.error(f"Error processing URL {url}: {str(e)}")
db.update_url_status(
url_id,
"failed",
error_message=str(e)
)
def run(self):
self.running = True
logger.info("Worker started. Listening for jobs...")
while self.running:
try:
job = queue_manager.dequeue(timeout=5)
if job:
self.process_job(job)
except KeyboardInterrupt:
logger.info("Worker shutting down...")
self.running = False
break
except Exception as e:
logger.error(f"Error in worker loop: {str(e)}")
time.sleep(5)
if __name__ == "__main__":
worker = Worker()
worker.run()