rag-chatbot / workers /document_tasks.py
Abeshith's picture
RAG Chatbot with LangChain, FastAPI, and service layer architecture
64d7fdf
from workers.celery_app import celery_app
from app.services.document_service import document_service
from app.utils.logger import logger
from typing import Dict
import asyncio
@celery_app.task(bind=True, name="process_document_async")
def process_document_task(self, file_path: str, metadata: Dict = None) -> Dict:
try:
self.update_state(state="PROCESSING", meta={"status": "Loading document..."})
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
document_service.process_document(file_path, metadata)
)
loop.close()
logger.info(f"Background processing completed for: {file_path}")
return {
"status": "completed",
"doc_id": result["doc_id"],
"file_name": result["file_name"],
"chunk_count": result["num_chunks"]
}
except Exception as e:
logger.error(f"Background processing failed: {str(e)}")
self.update_state(state="FAILURE", meta={"error": str(e)})
raise
@celery_app.task(name="cleanup_old_documents")
def cleanup_old_documents_task(days: int = 30) -> Dict:
try:
from datetime import datetime, timedelta
from app.db.mongodb import MongoDB
from app.db.vector_store import vector_store
cutoff_date = datetime.utcnow() - timedelta(days=days)
mongodb = MongoDB()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(mongodb.connect())
collection = loop.run_until_complete(
mongodb.get_collection("documents")
)
old_docs = list(collection.find({
"processed_at": {"$lt": cutoff_date}
}))
deleted_count = 0
for doc in old_docs:
loop.run_until_complete(
vector_store.delete_by_metadata(
collection_name="rag_embeddings",
metadata_key="doc_id",
metadata_value=doc["doc_id"]
)
)
collection.delete_one({"doc_id": doc["doc_id"]})
deleted_count += 1
loop.close()
logger.info(f"Cleaned up {deleted_count} old documents")
return {
"status": "completed",
"deleted_count": deleted_count,
"cutoff_date": cutoff_date.isoformat()
}
except Exception as e:
logger.error(f"Cleanup task failed: {str(e)}")
raise
@celery_app.task(name="generate_embeddings_batch")
def generate_embeddings_batch_task(texts: list) -> list:
try:
from ingestion.embedder import embedder
embeddings = embedder.embed_documents(texts)
logger.info(f"Generated {len(embeddings)} embeddings")
return embeddings
except Exception as e:
logger.error(f"Batch embedding failed: {str(e)}")
raise
@celery_app.task(name="reindex_documents")
def reindex_documents_task() -> Dict:
try:
from app.db.mongodb import MongoDB
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
mongodb = MongoDB()
loop.run_until_complete(mongodb.connect())
collection = loop.run_until_complete(
mongodb.get_collection("documents")
)
documents = list(collection.find({}))
reindexed_count = 0
for doc in documents:
file_path = doc.get("file_path")
if file_path:
try:
loop.run_until_complete(
document_service.process_document(file_path)
)
reindexed_count += 1
except Exception as e:
logger.error(f"Reindex failed for {file_path}: {str(e)}")
loop.close()
logger.info(f"Reindexed {reindexed_count} documents")
return {
"status": "completed",
"reindexed_count": reindexed_count,
"total_documents": len(documents)
}
except Exception as e:
logger.error(f"Reindex task failed: {str(e)}")
raise
@celery_app.task(name="optimize_vector_store")
def optimize_vector_store_task() -> Dict:
try:
from app.db.vector_store import vector_store
from app.config import config
client = vector_store.get_client()
collection_name = config["database"]["qdrant"]["collection_name"]
info = client.get_collection(collection_name)
logger.info(f"Vector store optimized: {collection_name}")
return {
"status": "completed",
"collection": collection_name,
"vectors_count": info.vectors_count
}
except Exception as e:
logger.error(f"Optimize task failed: {str(e)}")
raise