Spaces:
Build error
Build error
| """Vector store utilities using Weaviate cloud for knowledge base retrieval.""" | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| import numpy as np | |
| import weaviate | |
| from sentence_transformers import SentenceTransformer | |
| from weaviate.collections.classes.config import DataType | |
| from ..config import settings | |
| from ..logger import get_logger | |
| logger = get_logger(__name__) | |
| class VectorStore: | |
| """Wrapper around Weaviate vector store.""" | |
| def __init__(self, client: weaviate.WeaviateClient, encoder: SentenceTransformer) -> None: | |
| self.client = client | |
| self.encoder = encoder | |
| self.collection = self._ensure_collection() | |
| self._property_map = self._detect_property_names() | |
| def _ensure_collection(self) -> weaviate.collections.Collection: | |
| """Create collection if it doesn't exist.""" | |
| collection_name = settings.weaviate_collection | |
| collections = self.client.collections.list_all() | |
| # Check for exact or case-insensitive match to avoid 422 when class | |
| # already exists with different casing (e.g. 'Mathvectors'). | |
| lower_names = [n.lower() for n in collections] | |
| if collection_name not in collections and collection_name.lower() not in lower_names: | |
| try: | |
| collection = self.client.collections.create( | |
| name=collection_name, | |
| vectorizer_config=weaviate.classes.config.Configure.Vectorizer.none(), | |
| vector_index_config=weaviate.classes.config.Configure.VectorIndex.hnsw(), | |
| properties=[ | |
| {"name": "question", "data_type": DataType.TEXT}, | |
| {"name": "answer", "data_type": DataType.TEXT}, | |
| {"name": "source", "data_type": DataType.TEXT}, | |
| ] | |
| ) | |
| logger.info("vector_store.collection.created", name=collection_name) | |
| except weaviate.exceptions.UnexpectedStatusCodeError as e: | |
| # If the create failed because the class already exists (422), | |
| # recover by locating the existing class name (case-insensitive) | |
| msg = str(e) | |
| if "already exists" in msg or "class name" in msg: | |
| logger.warning("vector_store.collection.create_conflict", msg=msg) | |
| # refresh list and try to find the existing class name | |
| collections = self.client.collections.list_all() | |
| existing = None | |
| for n in collections: | |
| if n.lower() == collection_name.lower(): | |
| existing = n | |
| break | |
| if existing: | |
| collection = self.client.collections.get(existing) | |
| logger.info("vector_store.collection.exists", name=existing) | |
| else: | |
| # re-raise if we can't resolve the conflict | |
| raise | |
| else: | |
| raise | |
| else: | |
| # exact or case-insensitive match found — pick the actual existing name | |
| if collection_name in collections: | |
| existing_name = collection_name | |
| else: | |
| existing_name = next(n for n in collections if n.lower() == collection_name.lower()) | |
| collection = self.client.collections.get(existing_name) | |
| logger.info("vector_store.collection.exists", name=existing_name) | |
| return collection | |
| def _detect_property_names(self) -> Dict[str, str]: | |
| """Detect the actual property names in the collection schema. | |
| Returns a mapping from logical names (question, answer, source) to actual property names. | |
| """ | |
| property_map = {} | |
| prop_names = [] | |
| try: | |
| # Get the collection configuration to see what properties exist | |
| config = self.collection.config.get() | |
| # Try different ways to access properties depending on Weaviate v4 structure | |
| if hasattr(config, 'properties'): | |
| props = config.properties | |
| if props: | |
| # Properties might be a list or iterable | |
| if isinstance(props, (list, tuple)): | |
| prop_names = [prop.name if hasattr(prop, 'name') else str(prop) for prop in props] | |
| elif hasattr(props, '__iter__') and not isinstance(props, (str, bytes)): | |
| prop_names = [prop.name if hasattr(prop, 'name') else str(prop) for prop in props] | |
| # Also try accessing as attributes | |
| if not prop_names and hasattr(config, 'properties'): | |
| try: | |
| props_dict = config.properties.__dict__ if hasattr(config.properties, '__dict__') else {} | |
| prop_names = list(props_dict.keys()) if props_dict else [] | |
| except: | |
| pass | |
| except Exception as exc: | |
| logger.warning("vector_store.schema.detection_error", error=str(exc)) | |
| # Try to map to our expected names | |
| # Check for new schema (question, answer, source) first | |
| if prop_names: | |
| if "question" in prop_names: | |
| property_map["question"] = "question" | |
| elif "input" in prop_names: | |
| property_map["question"] = "input" | |
| if "answer" in prop_names: | |
| property_map["answer"] = "answer" | |
| elif "label" in prop_names: | |
| property_map["answer"] = "label" | |
| if "source" in prop_names: | |
| property_map["source"] = "source" | |
| elif "source_file" in prop_names: | |
| property_map["source"] = "source_file" | |
| else: | |
| # No properties detected - use fallback (old schema) | |
| logger.warning("vector_store.schema.detection_failed", msg="Could not detect properties, using fallback") | |
| property_map = {"question": "input", "answer": "label", "source": "source_file"} | |
| # If detection partially failed, fill in missing mappings | |
| if "question" not in property_map: | |
| property_map["question"] = "input" | |
| if "answer" not in property_map: | |
| property_map["answer"] = "label" | |
| if "source" not in property_map: | |
| property_map["source"] = "source_file" | |
| logger.info("vector_store.schema.detected", mapping=property_map, found_properties=prop_names) | |
| return property_map | |
| def search(self, query: str, top_k: int | None = None) -> List[Dict[str, Any]]: | |
| """Search for similar questions and return contexts.""" | |
| if top_k is None: | |
| top_k = settings.top_k | |
| # Generate embedding for query | |
| embedding = self.encoder.encode(query) | |
| embedding = embedding / np.linalg.norm(embedding) # Normalize | |
| try: | |
| # Get the actual property names from the detected schema | |
| question_prop = self._property_map.get("question", "input") | |
| answer_prop = self._property_map.get("answer", "label") | |
| source_prop = self._property_map.get("source", "source_file") | |
| # Build Weaviate v4 query with near_vector using actual property names | |
| response = self.collection.query.near_vector( | |
| near_vector=embedding.tolist(), | |
| certainty=settings.similarity_threshold, | |
| limit=top_k, | |
| return_metadata=["certainty"], | |
| return_properties=[question_prop, answer_prop, source_prop] | |
| ) | |
| except Exception as exc: # pragma: no cover - defensive | |
| # If query failed, try with old schema as fallback | |
| error_msg = str(exc).lower() | |
| if "no such prop" in error_msg or "property" in error_msg: | |
| logger.warning("vector_store.search.schema_mismatch", error=str(exc), trying="old_schema") | |
| try: | |
| # Fallback to old schema property names | |
| response = self.collection.query.near_vector( | |
| near_vector=embedding.tolist(), | |
| certainty=settings.similarity_threshold, | |
| limit=top_k, | |
| return_metadata=["certainty"], | |
| return_properties=["input", "label", "source_file"] | |
| ) | |
| # Update property map for this query | |
| self._property_map = {"question": "input", "answer": "label", "source": "source_file"} | |
| question_prop = "input" | |
| answer_prop = "label" | |
| source_prop = "source_file" | |
| except Exception as fallback_exc: | |
| logger.exception("vector_store.search.fallback_failed", error=str(fallback_exc)) | |
| return [] | |
| else: | |
| logger.exception("vector_store.search.error", error=str(exc)) | |
| return [] | |
| results = [] | |
| # Use the property names that were successfully used in the query | |
| # (either from detection or from fallback) | |
| question_prop = self._property_map.get("question", "input") | |
| answer_prop = self._property_map.get("answer", "label") | |
| source_prop = self._property_map.get("source", "source_file") | |
| # Weaviate v4 returns a QueryReturn with an objects attribute | |
| for obj in response.objects: | |
| question = "" | |
| answer = "" | |
| source = "" | |
| if isinstance(obj.properties, dict): | |
| question = obj.properties.get(question_prop, "") | |
| answer = obj.properties.get(answer_prop, "") | |
| source = obj.properties.get(source_prop, "") | |
| else: | |
| # Try attribute access | |
| question = getattr(obj.properties, question_prop, "") | |
| answer = getattr(obj.properties, answer_prop, "") | |
| source = getattr(obj.properties, source_prop, "") | |
| # For old schema where answer might be a label (integer), convert to string | |
| if answer_prop == "label" and isinstance(answer, (int, float)): | |
| answer = str(answer) | |
| # Extract similarity - use certainty if available, otherwise distance (inverted), default to 0.0 | |
| similarity = 0.0 | |
| if obj.metadata: | |
| if hasattr(obj.metadata, 'certainty') and obj.metadata.certainty is not None: | |
| similarity = float(obj.metadata.certainty) | |
| elif hasattr(obj.metadata, 'distance') and obj.metadata.distance is not None: | |
| # Convert distance to similarity (distance is lower for more similar items) | |
| # For normalized vectors, distance = 1 - certainty approximately | |
| similarity = max(0.0, 1.0 - float(obj.metadata.distance)) | |
| results.append({ | |
| "document_id": str(obj.uuid), | |
| "question": question, | |
| "answer": answer, | |
| "source": source, | |
| "similarity": similarity, | |
| }) | |
| return results | |
| def add_entry(self, question: str, answer: str, source: str = "kb") -> str: | |
| """Add a new entry to the vector store.""" | |
| # Generate embedding | |
| text = question + "\n" + answer | |
| embedding = self.encoder.encode(text) | |
| embedding = embedding / np.linalg.norm(embedding) | |
| # Add to Weaviate | |
| result = self.collection.data.insert( | |
| properties={ | |
| "question": question, | |
| "answer": answer, | |
| "source": source, | |
| }, | |
| vector=embedding.tolist() | |
| ) | |
| return result.uuid | |
| _vector_store: VectorStore | None = None | |
| class _NullVectorStore: | |
| """Fallback vector store used when Weaviate is unavailable at startup. | |
| Provides the minimal API consumed by the workflow. | |
| """ | |
| def search(self, query: str, top_k: int | None = None) -> List[Dict[str, Any]]: # type: ignore[override] | |
| return [] | |
| def add_entry(self, question: str, answer: str, source: str = "kb") -> str: # pragma: no cover | |
| logger.warning("vector_store.null.add_entry_ignored") | |
| return "" | |
| def load_vector_store(force_reload: bool = False) -> VectorStore: | |
| """Lazy-load the Weaviate vector store.""" | |
| global _vector_store | |
| if _vector_store is not None and not force_reload: | |
| return _vector_store | |
| if not settings.weaviate_url or not settings.weaviate_api_key: | |
| logger.warning("vector_store.config.missing", msg="Starting without KB; searches will return empty.") | |
| _vector_store = _NullVectorStore() # type: ignore[assignment] | |
| return _vector_store # type: ignore[return-value] | |
| logger.info("vector_store.load.start", url=settings.weaviate_url) | |
| # Use the v4 helper to connect to Weaviate Cloud. The older v3-style | |
| # `weaviate.Client(url=..., auth_client_secret=...)` constructor is not | |
| # supported by the installed weaviate client and raises a TypeError. | |
| # connect_to_weaviate_cloud will construct the correct ConnectionParams | |
| # for cloud deployments (http + grpc hosts) and return a connected client. | |
| # pass the API key string directly — the helper will parse it into the | |
| # proper AuthCredentials object. Accessing `weaviate.Auth` at module | |
| # level can raise AttributeError in some installs; passing a string is | |
| # supported and avoids that problem. | |
| try: | |
| client = weaviate.connect_to_weaviate_cloud( | |
| cluster_url=settings.weaviate_url, | |
| auth_credentials=settings.weaviate_api_key, | |
| skip_init_checks=True, | |
| ) | |
| encoder = SentenceTransformer(settings.embedding_model_name) | |
| _vector_store = VectorStore(client=client, encoder=encoder) | |
| logger.info("vector_store.load.success") | |
| return _vector_store | |
| except Exception as exc: # pragma: no cover | |
| logger.warning( | |
| "vector_store.load.failed_starting_without_kb", | |
| error=str(exc), | |
| ) | |
| _vector_store = _NullVectorStore() # type: ignore[assignment] | |
| return _vector_store # type: ignore[return-value] | |
| def save_feedback_to_queue(feedback_record: dict) -> None: | |
| """Append feedback to feedback database.""" | |
| feedback_path = Path(settings.feedback_store_path) | |
| feedback_path.parent.mkdir(parents=True, exist_ok=True) | |
| if feedback_path.exists(): | |
| data = json.loads(feedback_path.read_text(encoding="utf-8")) | |
| else: | |
| data = [] | |
| data.append(feedback_record) | |
| feedback_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") | |
| def queue_candidate_kb_entry(question: str, solution: str, source: str) -> None: | |
| """Save a candidate knowledge base entry for human approval.""" | |
| queue_path = Path("backend/data/kb_candidate_queue.json") | |
| queue_path.parent.mkdir(parents=True, exist_ok=True) | |
| if queue_path.exists(): | |
| queue = json.loads(queue_path.read_text(encoding="utf-8")) | |
| else: | |
| queue = [] | |
| queue.append({"question": question, "answer": solution, "source": source}) | |
| queue_path.write_text(json.dumps(queue, ensure_ascii=False, indent=2), encoding="utf-8") | |