Spaces:
Sleeping
Sleeping
File size: 5,130 Bytes
64d7fdf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | from workers.celery_app import celery_app
from app.services.document_service import document_service
from app.utils.logger import logger
from typing import Dict
import asyncio
@celery_app.task(bind=True, name="process_document_async")
def process_document_task(self, file_path: str, metadata: Dict = None) -> Dict:
try:
self.update_state(state="PROCESSING", meta={"status": "Loading document..."})
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
document_service.process_document(file_path, metadata)
)
loop.close()
logger.info(f"Background processing completed for: {file_path}")
return {
"status": "completed",
"doc_id": result["doc_id"],
"file_name": result["file_name"],
"chunk_count": result["num_chunks"]
}
except Exception as e:
logger.error(f"Background processing failed: {str(e)}")
self.update_state(state="FAILURE", meta={"error": str(e)})
raise
@celery_app.task(name="cleanup_old_documents")
def cleanup_old_documents_task(days: int = 30) -> Dict:
try:
from datetime import datetime, timedelta
from app.db.mongodb import MongoDB
from app.db.vector_store import vector_store
cutoff_date = datetime.utcnow() - timedelta(days=days)
mongodb = MongoDB()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(mongodb.connect())
collection = loop.run_until_complete(
mongodb.get_collection("documents")
)
old_docs = list(collection.find({
"processed_at": {"$lt": cutoff_date}
}))
deleted_count = 0
for doc in old_docs:
loop.run_until_complete(
vector_store.delete_by_metadata(
collection_name="rag_embeddings",
metadata_key="doc_id",
metadata_value=doc["doc_id"]
)
)
collection.delete_one({"doc_id": doc["doc_id"]})
deleted_count += 1
loop.close()
logger.info(f"Cleaned up {deleted_count} old documents")
return {
"status": "completed",
"deleted_count": deleted_count,
"cutoff_date": cutoff_date.isoformat()
}
except Exception as e:
logger.error(f"Cleanup task failed: {str(e)}")
raise
@celery_app.task(name="generate_embeddings_batch")
def generate_embeddings_batch_task(texts: list) -> list:
try:
from ingestion.embedder import embedder
embeddings = embedder.embed_documents(texts)
logger.info(f"Generated {len(embeddings)} embeddings")
return embeddings
except Exception as e:
logger.error(f"Batch embedding failed: {str(e)}")
raise
@celery_app.task(name="reindex_documents")
def reindex_documents_task() -> Dict:
try:
from app.db.mongodb import MongoDB
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
mongodb = MongoDB()
loop.run_until_complete(mongodb.connect())
collection = loop.run_until_complete(
mongodb.get_collection("documents")
)
documents = list(collection.find({}))
reindexed_count = 0
for doc in documents:
file_path = doc.get("file_path")
if file_path:
try:
loop.run_until_complete(
document_service.process_document(file_path)
)
reindexed_count += 1
except Exception as e:
logger.error(f"Reindex failed for {file_path}: {str(e)}")
loop.close()
logger.info(f"Reindexed {reindexed_count} documents")
return {
"status": "completed",
"reindexed_count": reindexed_count,
"total_documents": len(documents)
}
except Exception as e:
logger.error(f"Reindex task failed: {str(e)}")
raise
@celery_app.task(name="optimize_vector_store")
def optimize_vector_store_task() -> Dict:
try:
from app.db.vector_store import vector_store
from app.config import config
client = vector_store.get_client()
collection_name = config["database"]["qdrant"]["collection_name"]
info = client.get_collection(collection_name)
logger.info(f"Vector store optimized: {collection_name}")
return {
"status": "completed",
"collection": collection_name,
"vectors_count": info.vectors_count
}
except Exception as e:
logger.error(f"Optimize task failed: {str(e)}")
raise
|