Spaces:
Sleeping
Sleeping
| from workers.celery_app import celery_app | |
| from app.services.document_service import document_service | |
| from app.utils.logger import logger | |
| from typing import Dict | |
| import asyncio | |
| def process_document_task(self, file_path: str, metadata: Dict = None) -> Dict: | |
| try: | |
| self.update_state(state="PROCESSING", meta={"status": "Loading document..."}) | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| result = loop.run_until_complete( | |
| document_service.process_document(file_path, metadata) | |
| ) | |
| loop.close() | |
| logger.info(f"Background processing completed for: {file_path}") | |
| return { | |
| "status": "completed", | |
| "doc_id": result["doc_id"], | |
| "file_name": result["file_name"], | |
| "chunk_count": result["num_chunks"] | |
| } | |
| except Exception as e: | |
| logger.error(f"Background processing failed: {str(e)}") | |
| self.update_state(state="FAILURE", meta={"error": str(e)}) | |
| raise | |
| def cleanup_old_documents_task(days: int = 30) -> Dict: | |
| try: | |
| from datetime import datetime, timedelta | |
| from app.db.mongodb import MongoDB | |
| from app.db.vector_store import vector_store | |
| cutoff_date = datetime.utcnow() - timedelta(days=days) | |
| mongodb = MongoDB() | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| loop.run_until_complete(mongodb.connect()) | |
| collection = loop.run_until_complete( | |
| mongodb.get_collection("documents") | |
| ) | |
| old_docs = list(collection.find({ | |
| "processed_at": {"$lt": cutoff_date} | |
| })) | |
| deleted_count = 0 | |
| for doc in old_docs: | |
| loop.run_until_complete( | |
| vector_store.delete_by_metadata( | |
| collection_name="rag_embeddings", | |
| metadata_key="doc_id", | |
| metadata_value=doc["doc_id"] | |
| ) | |
| ) | |
| collection.delete_one({"doc_id": doc["doc_id"]}) | |
| deleted_count += 1 | |
| loop.close() | |
| logger.info(f"Cleaned up {deleted_count} old documents") | |
| return { | |
| "status": "completed", | |
| "deleted_count": deleted_count, | |
| "cutoff_date": cutoff_date.isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Cleanup task failed: {str(e)}") | |
| raise | |
| def generate_embeddings_batch_task(texts: list) -> list: | |
| try: | |
| from ingestion.embedder import embedder | |
| embeddings = embedder.embed_documents(texts) | |
| logger.info(f"Generated {len(embeddings)} embeddings") | |
| return embeddings | |
| except Exception as e: | |
| logger.error(f"Batch embedding failed: {str(e)}") | |
| raise | |
| def reindex_documents_task() -> Dict: | |
| try: | |
| from app.db.mongodb import MongoDB | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| mongodb = MongoDB() | |
| loop.run_until_complete(mongodb.connect()) | |
| collection = loop.run_until_complete( | |
| mongodb.get_collection("documents") | |
| ) | |
| documents = list(collection.find({})) | |
| reindexed_count = 0 | |
| for doc in documents: | |
| file_path = doc.get("file_path") | |
| if file_path: | |
| try: | |
| loop.run_until_complete( | |
| document_service.process_document(file_path) | |
| ) | |
| reindexed_count += 1 | |
| except Exception as e: | |
| logger.error(f"Reindex failed for {file_path}: {str(e)}") | |
| loop.close() | |
| logger.info(f"Reindexed {reindexed_count} documents") | |
| return { | |
| "status": "completed", | |
| "reindexed_count": reindexed_count, | |
| "total_documents": len(documents) | |
| } | |
| except Exception as e: | |
| logger.error(f"Reindex task failed: {str(e)}") | |
| raise | |
| def optimize_vector_store_task() -> Dict: | |
| try: | |
| from app.db.vector_store import vector_store | |
| from app.config import config | |
| client = vector_store.get_client() | |
| collection_name = config["database"]["qdrant"]["collection_name"] | |
| info = client.get_collection(collection_name) | |
| logger.info(f"Vector store optimized: {collection_name}") | |
| return { | |
| "status": "completed", | |
| "collection": collection_name, | |
| "vectors_count": info.vectors_count | |
| } | |
| except Exception as e: | |
| logger.error(f"Optimize task failed: {str(e)}") | |
| raise | |