|
|
|
|
|
|
|
|
import os |
|
|
import json |
|
|
from typing import Optional, Dict, Any, List |
|
|
from datetime import datetime |
|
|
|
|
|
from fastapi import HTTPException |
|
|
|
|
|
from qdrant_client import QdrantClient |
|
|
from qdrant_client.http import models as qdrant_models |
|
|
|
|
|
from langchain_mongodb.chat_message_histories import MongoDBChatMessageHistory |
|
|
from langchain.memory import ConversationBufferMemory |
|
|
from langchain.chains import ConversationalRetrievalChain |
|
|
from langchain_core.retrievers import BaseRetriever |
|
|
from langchain_core.documents import Document |
|
|
|
|
|
from pydantic import ConfigDict |
|
|
|
|
|
from app.page_speed.config import settings |
|
|
from .db import vectorstore_meta_coll, chat_collection_name |
|
|
from .embeddings import embeddings, text_splitter, get_llm |
|
|
from .logging_config import logger |
|
|
from .prompt_library import ( |
|
|
default_user_prompt, |
|
|
page_speed_prompt, |
|
|
seo_prompt, |
|
|
content_relevance_prompt, |
|
|
uiux_prompt, |
|
|
mobile_usability_prompt |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_vectorstore_path(onboarding_id: str, doc_type: str) -> str: |
|
|
""" |
|
|
Returns a non-disk URI-like path for a vectorstore. |
|
|
Example: 'qdrant://<onboarding_id>/<doc_type>' |
|
|
This avoids creating a local folder while preserving a string that identifies |
|
|
the logical vectorstore for other components and logs. |
|
|
""" |
|
|
return f"qdrant://{onboarding_id}/{doc_type}" |
|
|
|
|
|
|
|
|
def save_vectorstore_to_disk( |
|
|
onboarding_id: str, |
|
|
doc_type: str, |
|
|
collection_name: str, |
|
|
qdrant_url: Optional[str], |
|
|
qdrant_api_key: Optional[str] |
|
|
) -> str: |
|
|
""" |
|
|
Previously this created a small local marker file with Qdrant connection details. |
|
|
In the diskless version we simply return a logical vectorstore path (URI-style). |
|
|
Persisting of metadata is done via `upsert_vectorstore_metadata`. |
|
|
""" |
|
|
vs_path = get_vectorstore_path(onboarding_id, doc_type) |
|
|
return vs_path |
|
|
|
|
|
|
|
|
def upsert_vectorstore_metadata( |
|
|
onboarding_id: str, |
|
|
doc_type: str, |
|
|
vectorstore_path: str, |
|
|
chat_id: str, |
|
|
collection_name: Optional[str] = None, |
|
|
qdrant_url: Optional[str] = None, |
|
|
qdrant_api_key: Optional[str] = None |
|
|
) -> None: |
|
|
""" |
|
|
Store metadata in MongoDB. Saves useful fields to allow build_rag_chain to |
|
|
reconstruct a working Qdrant client later. |
|
|
""" |
|
|
update = { |
|
|
"onboarding_id": onboarding_id, |
|
|
"doc_type": doc_type, |
|
|
"vectorstore_path": vectorstore_path, |
|
|
"chat_id": chat_id, |
|
|
"updated_at": datetime.utcnow(), |
|
|
} |
|
|
if collection_name: |
|
|
update["collection_name"] = collection_name |
|
|
if qdrant_url: |
|
|
update["qdrant_url"] = qdrant_url |
|
|
if qdrant_api_key: |
|
|
update["qdrant_api_key"] = qdrant_api_key |
|
|
|
|
|
|
|
|
vectorstore_meta_coll.update_one( |
|
|
{"onboarding_id": onboarding_id, "doc_type": doc_type}, |
|
|
{"$set": update, "$setOnInsert": {"created_at": datetime.utcnow()}}, |
|
|
upsert=True |
|
|
) |
|
|
logger.debug("Upserted vectorstore metadata for %s/%s into Mongo", onboarding_id, doc_type) |
|
|
|
|
|
|
|
|
def get_vectorstore_metadata( |
|
|
onboarding_id: str, |
|
|
doc_type: str |
|
|
) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Read vectorstore metadata from MongoDB (no local files). |
|
|
""" |
|
|
meta = vectorstore_meta_coll.find_one({"onboarding_id": onboarding_id, "doc_type": doc_type}) |
|
|
if meta: |
|
|
|
|
|
return meta |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class QdrantTextRetriever(BaseRetriever): |
|
|
""" |
|
|
Minimal retriever that queries Qdrant directly and returns LangChain Documents. |
|
|
Assumes payload stores the raw chunk under key 'text'. |
|
|
""" |
|
|
|
|
|
client: QdrantClient |
|
|
collection_name: str |
|
|
k: int = 5 |
|
|
model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
|
|
|
def _get_relevant_documents(self, query: str, *, run_manager=None) -> List[Document]: |
|
|
|
|
|
query_vec = None |
|
|
for attr in ("embed_query", "embed_documents", "embed_texts", "embed"): |
|
|
fn = getattr(embeddings, attr, None) |
|
|
if callable(fn): |
|
|
try: |
|
|
if attr == "embed_query": |
|
|
query_vec = fn(query) |
|
|
else: |
|
|
q_res = fn([query]) |
|
|
if isinstance(q_res, list) and q_res: |
|
|
query_vec = q_res[0] |
|
|
else: |
|
|
query_vec = q_res |
|
|
break |
|
|
except Exception: |
|
|
continue |
|
|
if query_vec is None: |
|
|
raise RuntimeError("No usable embedding function available on embeddings object.") |
|
|
|
|
|
|
|
|
if isinstance(query_vec, dict) and "embedding" in query_vec: |
|
|
query_vec = query_vec["embedding"] |
|
|
|
|
|
|
|
|
results = self.client.search( |
|
|
collection_name=self.collection_name, |
|
|
query_vector=query_vec, |
|
|
limit=self.k |
|
|
) |
|
|
|
|
|
docs: List[Document] = [] |
|
|
for r in results: |
|
|
payload = r.payload or {} |
|
|
text = payload.get("text") |
|
|
if not isinstance(text, str): |
|
|
logger.warning( |
|
|
"Qdrant payload missing 'text' or not a string; skipping. Payload: %s", |
|
|
payload |
|
|
) |
|
|
continue |
|
|
|
|
|
metadata = {k: v for k, v in payload.items() if k != "text"} |
|
|
metadata["score"] = r.score |
|
|
|
|
|
docs.append(Document(page_content=text, metadata=metadata)) |
|
|
|
|
|
return docs |
|
|
|
|
|
async def _aget_relevant_documents(self, query: str, *, run_manager=None) -> List[Document]: |
|
|
|
|
|
return self._get_relevant_documents(query, run_manager=run_manager) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_rag_chain( |
|
|
onboarding_id: str, |
|
|
doc_type: str, |
|
|
chat_id: str, |
|
|
prompt_type: str |
|
|
) -> ConversationalRetrievalChain: |
|
|
""" |
|
|
Builds a ConversationalRetrievalChain using pure Qdrant as backend. |
|
|
Loads connection details from the MongoDB metadata collection instead of a file. |
|
|
If metadata is missing, tries to detect an existing Qdrant collection named |
|
|
'vs_{onboarding_id}_{doc_type}' and auto-registers it in Mongo. |
|
|
""" |
|
|
meta = get_vectorstore_metadata(onboarding_id, doc_type) |
|
|
|
|
|
|
|
|
if not meta: |
|
|
logger.warning("Vectorstore metadata not found for %s/%s in Mongo; attempting Qdrant fallback detection", onboarding_id, doc_type) |
|
|
|
|
|
|
|
|
qdrant_url = getattr(settings, "qdrant_url", None) |
|
|
qdrant_api_key = getattr(settings, "qdrant_api_key", None) |
|
|
client_kwargs = {} |
|
|
if qdrant_url: |
|
|
client_kwargs["url"] = qdrant_url |
|
|
if qdrant_api_key: |
|
|
client_kwargs["api_key"] = qdrant_api_key |
|
|
|
|
|
qdrant_timeout = getattr(settings, "qdrant_timeout", 60) |
|
|
prefer_grpc = getattr(settings, "qdrant_prefer_grpc", False) |
|
|
|
|
|
try: |
|
|
if client_kwargs: |
|
|
qdrant_client = QdrantClient(**client_kwargs, timeout=qdrant_timeout, prefer_grpc=prefer_grpc) |
|
|
else: |
|
|
qdrant_client = QdrantClient(timeout=qdrant_timeout, prefer_grpc=prefer_grpc) |
|
|
except Exception as e: |
|
|
logger.exception("Failed to create Qdrant client during fallback detection: %s", e) |
|
|
raise HTTPException(status_code=500, detail="Vectorstore metadata not found and failed to connect to Qdrant for fallback detection.") |
|
|
|
|
|
guessed_collection = f"vs_{onboarding_id}_{doc_type}" |
|
|
try: |
|
|
|
|
|
info = None |
|
|
try: |
|
|
info = qdrant_client.get_collection(collection_name=guessed_collection) |
|
|
except Exception: |
|
|
|
|
|
collections_info = qdrant_client.get_collections() |
|
|
|
|
|
found = False |
|
|
for c in collections_info.get("collections", []) if isinstance(collections_info, dict) else collections_info: |
|
|
name = c.get("name") if isinstance(c, dict) else getattr(c, "name", None) |
|
|
if name == guessed_collection: |
|
|
found = True |
|
|
break |
|
|
if not found: |
|
|
info = None |
|
|
else: |
|
|
info = {"name": guessed_collection} |
|
|
|
|
|
if info: |
|
|
logger.info("Detected existing Qdrant collection '%s' via fallback; auto-registering metadata in Mongo", guessed_collection) |
|
|
|
|
|
vs_path = get_vectorstore_path(onboarding_id, doc_type) |
|
|
|
|
|
upsert_vectorstore_metadata(onboarding_id, doc_type, vs_path, chat_id="", collection_name=guessed_collection, qdrant_url=qdrant_url, qdrant_api_key=qdrant_api_key) |
|
|
meta = get_vectorstore_metadata(onboarding_id, doc_type) |
|
|
else: |
|
|
logger.info("Qdrant fallback detection found no collection named '%s'", guessed_collection) |
|
|
except Exception as e: |
|
|
logger.exception("Error while checking Qdrant collections for fallback detection: %s", e) |
|
|
|
|
|
|
|
|
if not meta: |
|
|
|
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=( |
|
|
"Vectorstore metadata not found; run initialization first. " |
|
|
"Call POST /rag/initialization/{onboarding_id}/{doc_type} with documents to ingest. " |
|
|
"If you already initialized, check server logs for ingestion errors and verify Mongo collection " |
|
|
"'vectorstore_meta_coll' contains the record for this onboarding/doc_type." |
|
|
) |
|
|
) |
|
|
|
|
|
collection_name = meta.get("collection_name") |
|
|
if not collection_name: |
|
|
raise HTTPException(status_code=500, detail="Qdrant collection name missing in metadata.") |
|
|
|
|
|
|
|
|
qdrant_url = meta.get("qdrant_url") or getattr(settings, "qdrant_url", None) |
|
|
qdrant_api_key = meta.get("qdrant_api_key") or getattr(settings, "qdrant_api_key", None) |
|
|
|
|
|
client_kwargs = {} |
|
|
if qdrant_url: |
|
|
client_kwargs["url"] = qdrant_url |
|
|
if qdrant_api_key: |
|
|
client_kwargs["api_key"] = qdrant_api_key |
|
|
|
|
|
qdrant_timeout = getattr(settings, "qdrant_timeout", 60) |
|
|
prefer_grpc = getattr(settings, "qdrant_prefer_grpc", False) |
|
|
|
|
|
try: |
|
|
if client_kwargs: |
|
|
qdrant_client = QdrantClient(**client_kwargs, timeout=qdrant_timeout, prefer_grpc=prefer_grpc) |
|
|
else: |
|
|
qdrant_client = QdrantClient(timeout=qdrant_timeout, prefer_grpc=prefer_grpc) |
|
|
except Exception as e: |
|
|
logger.exception("Failed to construct Qdrant client for retrieval: %s", e) |
|
|
raise HTTPException(status_code=500, detail=f"Failed to connect to Qdrant: {e}") |
|
|
|
|
|
retriever = QdrantTextRetriever(client=qdrant_client, collection_name=collection_name, k=5) |
|
|
|
|
|
|
|
|
chat_history = MongoDBChatMessageHistory( |
|
|
session_id=chat_id, |
|
|
connection_string=settings.mongo_uri, |
|
|
database_name=settings.mongo_db, |
|
|
collection_name=chat_collection_name, |
|
|
) |
|
|
memory = ConversationBufferMemory( |
|
|
memory_key="chat_history", |
|
|
chat_memory=chat_history, |
|
|
return_messages=True, |
|
|
) |
|
|
|
|
|
llm = get_llm() |
|
|
|
|
|
|
|
|
if prompt_type == "page_speed": |
|
|
user_prompt = page_speed_prompt |
|
|
elif prompt_type == "seo": |
|
|
user_prompt = seo_prompt |
|
|
elif prompt_type == "content_relevance": |
|
|
user_prompt = content_relevance_prompt |
|
|
elif prompt_type == "uiux": |
|
|
user_prompt = uiux_prompt |
|
|
elif prompt_type == "mobile_usability": |
|
|
user_prompt = mobile_usability_prompt |
|
|
else: |
|
|
user_prompt = default_user_prompt |
|
|
|
|
|
return ConversationalRetrievalChain.from_llm( |
|
|
llm=llm, |
|
|
retriever=retriever, |
|
|
memory=memory, |
|
|
return_source_documents=False, |
|
|
chain_type="stuff", |
|
|
combine_docs_chain_kwargs={"prompt": user_prompt}, |
|
|
verbose=False |
|
|
) |
|
|
|