Spaces:
Running
Running
File size: 3,096 Bytes
97f9138 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | import logging
import time
from src.queue_manager import queue_manager
from src.database import db
from src.text_processor import text_processor
from src.embeddings import embedding_service
from src.vector_store import vector_store
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class Worker:
def __init__(self):
self.running = False
def process_job(self, job_data: dict):
url_id = job_data["url_id"]
url = job_data["url"]
logger.info(f"Processing URL: {url} (ID: {url_id})")
try:
db.update_url_status(url_id, "processing")
logger.info(f"Fetching and cleaning webpage content...")
result = text_processor.process_url(url)
chunks = result["chunks"]
chunk_count = result["chunk_count"]
logger.info(f"Extracted {chunk_count} chunks from {result['word_count']} words")
if chunk_count == 0:
db.update_url_status(
url_id,
"failed",
error_message="No content extracted from URL"
)
logger.warning(f"No content extracted from {url}")
return
logger.info(f"Generating embeddings for {chunk_count} chunks...")
chunk_texts = [chunk["text"] for chunk in chunks]
embeddings = embedding_service.embed_batch(chunk_texts)
logger.info(f"Storing chunks in vector database...")
stored_count = vector_store.store_chunks(url_id, url, chunks, embeddings)
metadata = {
"word_count": result["word_count"],
"chunk_count": chunk_count
}
db.update_url_status(
url_id,
"completed",
chunk_count=chunk_count,
metadata=metadata
)
logger.info(f"Successfully processed URL: {url} ({stored_count} chunks stored)")
except Exception as e:
logger.error(f"Error processing URL {url}: {str(e)}")
db.update_url_status(
url_id,
"failed",
error_message=str(e)
)
def run(self):
self.running = True
logger.info("Worker started. Listening for jobs...")
while self.running:
try:
job = queue_manager.dequeue(timeout=5)
if job:
self.process_job(job)
except KeyboardInterrupt:
logger.info("Worker shutting down...")
self.running = False
break
except Exception as e:
logger.error(f"Error in worker loop: {str(e)}")
time.sleep(5)
if __name__ == "__main__":
worker = Worker()
worker.run()
|