Spaces:
Sleeping
Sleeping
update
Browse files- core/collection_router_retriever.py +17 -13
- main.py +29 -19
core/collection_router_retriever.py
CHANGED
|
@@ -141,19 +141,23 @@ class CollectionRouterRetriever:
|
|
| 141 |
break
|
| 142 |
return deduplicated[:k]
|
| 143 |
|
| 144 |
-
|
| 145 |
-
|
| 146 |
-
|
| 147 |
-
|
| 148 |
-
|
| 149 |
-
|
| 150 |
-
|
| 151 |
-
|
| 152 |
-
|
| 153 |
-
|
| 154 |
-
|
| 155 |
-
|
| 156 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 157 |
|
| 158 |
deduplicated = []
|
| 159 |
seen = set()
|
|
|
|
| 141 |
break
|
| 142 |
return deduplicated[:k]
|
| 143 |
|
| 144 |
+
fallback_docs = []
|
| 145 |
+
if self.base_retriever is not None:
|
| 146 |
+
try:
|
| 147 |
+
fallback_docs = self.base_retriever.search(
|
| 148 |
+
query,
|
| 149 |
+
k=candidate_k,
|
| 150 |
+
alpha=alpha,
|
| 151 |
+
year_scope=year_scope,
|
| 152 |
+
)
|
| 153 |
+
except TypeError:
|
| 154 |
+
fallback_docs = self.base_retriever.search(
|
| 155 |
+
query,
|
| 156 |
+
k=candidate_k,
|
| 157 |
+
alpha=alpha,
|
| 158 |
+
)
|
| 159 |
+
except Exception:
|
| 160 |
+
logger.exception("Base retriever fallback failed")
|
| 161 |
|
| 162 |
deduplicated = []
|
| 163 |
seen = set()
|
main.py
CHANGED
|
@@ -16,7 +16,6 @@ from core.config import (
|
|
| 16 |
COLLECTION_ROUTER_TOP_N,
|
| 17 |
DATABASE_URL,
|
| 18 |
QDRANT_API_KEY,
|
| 19 |
-
QDRANT_COLLECTION,
|
| 20 |
QDRANT_URL,
|
| 21 |
SUPABASE_ADMIN_SYNC_TOKEN,
|
| 22 |
SUPABASE_SERVICE_ROLE_KEY,
|
|
@@ -28,10 +27,8 @@ from core.config import (
|
|
| 28 |
)
|
| 29 |
from core.document_db import init_document_db
|
| 30 |
from core.supabase_sync_service import SupabaseStorageSyncService, SupabaseSyncCoordinator
|
| 31 |
-
from core.vectorstore import build_vectorstore_improved, load_vectorstore_improved
|
| 32 |
from core.collection_router_retriever import CollectionRouterRetriever
|
| 33 |
from core.models import embeddings
|
| 34 |
-
from core.retriever import HybridRetriever
|
| 35 |
from core.qa_pipeline import ask_ai_improved, ask_ai_stream_delta
|
| 36 |
from api.admin_documents_router import router as admin_documents_router
|
| 37 |
from api.admin_sync_router import router as admin_sync_router
|
|
@@ -143,22 +140,12 @@ async def lifespan(app: FastAPI):
|
|
| 143 |
app.state.db_pool = pool
|
| 144 |
await init_db_asyncpg(pool)
|
| 145 |
|
| 146 |
-
client = QdrantClient(url
|
| 147 |
-
|
| 148 |
-
if not client.collection_exists(collection_name):
|
| 149 |
-
logger.warning(f"Chưa có collection {collection_name} trên Qdrant Cloud. Đang xây dựng vectorstore mới...")
|
| 150 |
-
db, all_chunks= build_vectorstore_improved()
|
| 151 |
-
else :
|
| 152 |
-
logger.info(f"Đã tìm thấy collection {collection_name} trên Qdrant Cloud. Đang tải vectorstore...")
|
| 153 |
-
db, all_chunks = load_vectorstore_improved()
|
| 154 |
|
| 155 |
-
|
| 156 |
-
raise RuntimeError("Không thể khởi tạo vectorstore. Kiểm tra log để biết chi tiết.")
|
| 157 |
-
logger.info("Đang khởi tạo retriever ...")
|
| 158 |
-
|
| 159 |
-
base_retriever = HybridRetriever(db, all_chunks)
|
| 160 |
app.state.retriever = CollectionRouterRetriever(
|
| 161 |
-
base_retriever=
|
| 162 |
qdrant_client=client,
|
| 163 |
embeddings_model=embeddings,
|
| 164 |
top_n_collections=COLLECTION_ROUTER_TOP_N,
|
|
@@ -177,13 +164,36 @@ async def lifespan(app: FastAPI):
|
|
| 177 |
sync_service=sync_service,
|
| 178 |
poll_interval_seconds=SUPABASE_SYNC_INTERVAL_SECONDS,
|
| 179 |
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 180 |
sync_stop_event = asyncio.Event()
|
| 181 |
sync_task = asyncio.create_task(
|
| 182 |
sync_coordinator.run_polling_loop(stop_event=sync_stop_event)
|
| 183 |
)
|
| 184 |
|
| 185 |
-
app.state.supabase_sync_service = sync_service
|
| 186 |
-
app.state.supabase_sync_coordinator = sync_coordinator
|
| 187 |
app.state.supabase_sync_stop_event = sync_stop_event
|
| 188 |
app.state.supabase_sync_task = sync_task
|
| 189 |
logger.info(
|
|
|
|
| 16 |
COLLECTION_ROUTER_TOP_N,
|
| 17 |
DATABASE_URL,
|
| 18 |
QDRANT_API_KEY,
|
|
|
|
| 19 |
QDRANT_URL,
|
| 20 |
SUPABASE_ADMIN_SYNC_TOKEN,
|
| 21 |
SUPABASE_SERVICE_ROLE_KEY,
|
|
|
|
| 27 |
)
|
| 28 |
from core.document_db import init_document_db
|
| 29 |
from core.supabase_sync_service import SupabaseStorageSyncService, SupabaseSyncCoordinator
|
|
|
|
| 30 |
from core.collection_router_retriever import CollectionRouterRetriever
|
| 31 |
from core.models import embeddings
|
|
|
|
| 32 |
from core.qa_pipeline import ask_ai_improved, ask_ai_stream_delta
|
| 33 |
from api.admin_documents_router import router as admin_documents_router
|
| 34 |
from api.admin_sync_router import router as admin_sync_router
|
|
|
|
| 140 |
app.state.db_pool = pool
|
| 141 |
await init_db_asyncpg(pool)
|
| 142 |
|
| 143 |
+
client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)
|
| 144 |
+
client.get_collections()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 145 |
|
| 146 |
+
logger.info("Đang khởi tạo retriever (Qdrant collection router)...")
|
|
|
|
|
|
|
|
|
|
|
|
|
| 147 |
app.state.retriever = CollectionRouterRetriever(
|
| 148 |
+
base_retriever=None,
|
| 149 |
qdrant_client=client,
|
| 150 |
embeddings_model=embeddings,
|
| 151 |
top_n_collections=COLLECTION_ROUTER_TOP_N,
|
|
|
|
| 164 |
sync_service=sync_service,
|
| 165 |
poll_interval_seconds=SUPABASE_SYNC_INTERVAL_SECONDS,
|
| 166 |
)
|
| 167 |
+
|
| 168 |
+
app.state.supabase_sync_service = sync_service
|
| 169 |
+
app.state.supabase_sync_coordinator = sync_coordinator
|
| 170 |
+
|
| 171 |
+
initial_sync = await sync_coordinator.run_sync(
|
| 172 |
+
trigger="startup:initial_sync",
|
| 173 |
+
queue_if_locked=False,
|
| 174 |
+
)
|
| 175 |
+
|
| 176 |
+
if initial_sync.get("status") == "failed":
|
| 177 |
+
logger.warning(
|
| 178 |
+
"Supabase initial sync failed at startup. service will continue and retry in scheduler. error=%s",
|
| 179 |
+
initial_sync.get("error"),
|
| 180 |
+
)
|
| 181 |
+
else:
|
| 182 |
+
summary = initial_sync.get("result") if isinstance(initial_sync.get("result"), dict) else {}
|
| 183 |
+
logger.info(
|
| 184 |
+
"Supabase initial sync completed. added=%s updated=%s deleted=%s failed=%s total_objects=%s",
|
| 185 |
+
summary.get("added", 0),
|
| 186 |
+
summary.get("updated", 0),
|
| 187 |
+
summary.get("deleted", 0),
|
| 188 |
+
summary.get("failed", 0),
|
| 189 |
+
summary.get("total_objects", 0),
|
| 190 |
+
)
|
| 191 |
+
|
| 192 |
sync_stop_event = asyncio.Event()
|
| 193 |
sync_task = asyncio.create_task(
|
| 194 |
sync_coordinator.run_polling_loop(stop_event=sync_stop_event)
|
| 195 |
)
|
| 196 |
|
|
|
|
|
|
|
| 197 |
app.state.supabase_sync_stop_event = sync_stop_event
|
| 198 |
app.state.supabase_sync_task = sync_task
|
| 199 |
logger.info(
|