File size: 3,096 Bytes
97f9138
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import logging
import time
from src.queue_manager import queue_manager
from src.database import db
from src.text_processor import text_processor
from src.embeddings import embedding_service
from src.vector_store import vector_store

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class Worker:
    def __init__(self):
        self.running = False
    
    def process_job(self, job_data: dict):
        url_id = job_data["url_id"]
        url = job_data["url"]
        
        logger.info(f"Processing URL: {url} (ID: {url_id})")
        
        try:
            db.update_url_status(url_id, "processing")
            
            logger.info(f"Fetching and cleaning webpage content...")
            result = text_processor.process_url(url)
            
            chunks = result["chunks"]
            chunk_count = result["chunk_count"]
            
            logger.info(f"Extracted {chunk_count} chunks from {result['word_count']} words")
            
            if chunk_count == 0:
                db.update_url_status(
                    url_id, 
                    "failed", 
                    error_message="No content extracted from URL"
                )
                logger.warning(f"No content extracted from {url}")
                return
            
            logger.info(f"Generating embeddings for {chunk_count} chunks...")
            chunk_texts = [chunk["text"] for chunk in chunks]
            embeddings = embedding_service.embed_batch(chunk_texts)
            
            logger.info(f"Storing chunks in vector database...")
            stored_count = vector_store.store_chunks(url_id, url, chunks, embeddings)
            
            metadata = {
                "word_count": result["word_count"],
                "chunk_count": chunk_count
            }
            db.update_url_status(
                url_id, 
                "completed",
                chunk_count=chunk_count,
                metadata=metadata
            )
            
            logger.info(f"Successfully processed URL: {url} ({stored_count} chunks stored)")
        
        except Exception as e:
            logger.error(f"Error processing URL {url}: {str(e)}")
            db.update_url_status(
                url_id,
                "failed",
                error_message=str(e)
            )
    
    def run(self):
        self.running = True
        logger.info("Worker started. Listening for jobs...")
        
        while self.running:
            try:
                job = queue_manager.dequeue(timeout=5)
                
                if job:
                    self.process_job(job)
                
            except KeyboardInterrupt:
                logger.info("Worker shutting down...")
                self.running = False
                break
            except Exception as e:
                logger.error(f"Error in worker loop: {str(e)}")
                time.sleep(5)

if __name__ == "__main__":
    worker = Worker()
    worker.run()