from workers.celery_app import celery_app from app.services.document_service import document_service from app.utils.logger import logger from typing import Dict import asyncio @celery_app.task(bind=True, name="process_document_async") def process_document_task(self, file_path: str, metadata: Dict = None) -> Dict: try: self.update_state(state="PROCESSING", meta={"status": "Loading document..."}) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete( document_service.process_document(file_path, metadata) ) loop.close() logger.info(f"Background processing completed for: {file_path}") return { "status": "completed", "doc_id": result["doc_id"], "file_name": result["file_name"], "chunk_count": result["num_chunks"] } except Exception as e: logger.error(f"Background processing failed: {str(e)}") self.update_state(state="FAILURE", meta={"error": str(e)}) raise @celery_app.task(name="cleanup_old_documents") def cleanup_old_documents_task(days: int = 30) -> Dict: try: from datetime import datetime, timedelta from app.db.mongodb import MongoDB from app.db.vector_store import vector_store cutoff_date = datetime.utcnow() - timedelta(days=days) mongodb = MongoDB() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(mongodb.connect()) collection = loop.run_until_complete( mongodb.get_collection("documents") ) old_docs = list(collection.find({ "processed_at": {"$lt": cutoff_date} })) deleted_count = 0 for doc in old_docs: loop.run_until_complete( vector_store.delete_by_metadata( collection_name="rag_embeddings", metadata_key="doc_id", metadata_value=doc["doc_id"] ) ) collection.delete_one({"doc_id": doc["doc_id"]}) deleted_count += 1 loop.close() logger.info(f"Cleaned up {deleted_count} old documents") return { "status": "completed", "deleted_count": deleted_count, "cutoff_date": cutoff_date.isoformat() } except Exception as e: logger.error(f"Cleanup task failed: {str(e)}") raise @celery_app.task(name="generate_embeddings_batch") def generate_embeddings_batch_task(texts: list) -> list: try: from ingestion.embedder import embedder embeddings = embedder.embed_documents(texts) logger.info(f"Generated {len(embeddings)} embeddings") return embeddings except Exception as e: logger.error(f"Batch embedding failed: {str(e)}") raise @celery_app.task(name="reindex_documents") def reindex_documents_task() -> Dict: try: from app.db.mongodb import MongoDB loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) mongodb = MongoDB() loop.run_until_complete(mongodb.connect()) collection = loop.run_until_complete( mongodb.get_collection("documents") ) documents = list(collection.find({})) reindexed_count = 0 for doc in documents: file_path = doc.get("file_path") if file_path: try: loop.run_until_complete( document_service.process_document(file_path) ) reindexed_count += 1 except Exception as e: logger.error(f"Reindex failed for {file_path}: {str(e)}") loop.close() logger.info(f"Reindexed {reindexed_count} documents") return { "status": "completed", "reindexed_count": reindexed_count, "total_documents": len(documents) } except Exception as e: logger.error(f"Reindex task failed: {str(e)}") raise @celery_app.task(name="optimize_vector_store") def optimize_vector_store_task() -> Dict: try: from app.db.vector_store import vector_store from app.config import config client = vector_store.get_client() collection_name = config["database"]["qdrant"]["collection_name"] info = client.get_collection(collection_name) logger.info(f"Vector store optimized: {collection_name}") return { "status": "completed", "collection": collection_name, "vectors_count": info.vectors_count } except Exception as e: logger.error(f"Optimize task failed: {str(e)}") raise