Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from typing import Any, Dict, List, Optional | |
| from pinecone import Pinecone | |
| from app.core.config import Settings, get_settings | |
| from app.core.errors import PineconeIndexConfigError | |
| from app.core.logging import get_logger | |
| logger = get_logger(__name__) | |
| _index: Optional[Any] = None | |
| _pc: Optional[Pinenecone] = None | |
| _default_namespace: str = "dev" | |
| def init_pinecone(settings: Optional[Settings] = None) -> None: | |
| """Initialise the Pinecone client and Index. | |
| This function should be called once on application startup. It validates | |
| that the configured index is an integrated embedding index so that | |
| `upsert_records` and `search` can be used without local embedding models. | |
| """ | |
| global _index, _pc, _default_namespace | |
| if settings is None: | |
| settings = get_settings() | |
| text_field = settings.PINECONE_TEXT_FIELD.strip() | |
| if not text_field: | |
| raise ValueError("PINECONE_TEXT_FIELD must not be empty") | |
| logger.info( | |
| "Initialising Pinecone client (host targeting). host=%s text_field=%s", | |
| settings.PINECONE_HOST, | |
| text_field, | |
| ) | |
| pc = Pinecone(api_key=settings.PINECONE_API_KEY) | |
| # Validate index configuration via control plane using index name. | |
| index_model = pc.describe_index(settings.PINECONE_INDEX_NAME) | |
| embed_config = getattr(index_model, "embed", None) | |
| if not embed_config: | |
| raise PineconeIndexConfigError( | |
| "The configured Pinecone index is not an integrated embedding index.\n" | |
| "Create or reconfigure an index using Pinecone's integrated inference " | |
| "(e.g. via `create_index_for_model` or `configure_index(embed=...)`) so " | |
| "that embeddings are generated server-side. This keeps the backend " | |
| "lightweight without local embedding models." | |
| ) | |
| if not getattr(index_model, "status", None) or not getattr( | |
| index_model.status, "ready", False | |
| ): | |
| raise PineconeIndexConfigError( | |
| f"Pinecone index '{settings.PINECONE_INDEX_NAME}' is not ready. " | |
| "Please wait for the index to become ready in the Pinecone console." | |
| ) | |
| index_host = settings.PINECONE_HOST | |
| logger.info("Connecting to Pinecone index via host %s", index_host) | |
| index = pc.Index(host=index_host) | |
| _pc = pc | |
| _index = index | |
| _default_namespace = settings.PINECONE_NAMESPACE | |
| logger.info( | |
| "Pinecone initialised successfully with namespace=%s", | |
| _default_namespace, | |
| ) | |
| def get_index() -> Any: | |
| """Return the initialised Pinecone Index client.""" | |
| if _index is None: | |
| raise RuntimeError("Pinecone index has not been initialised") | |
| return _index | |
| def get_default_namespace() -> str: | |
| return _default_namespace | |
| def upsert_records( | |
| namespace: str, records: List[Dict[str, Any]], batch_size: int = 64 | |
| ) -> int: | |
| """Upsert records into Pinecone using the RECORDS API. | |
| Returns the total number of records reported as upserted. | |
| """ | |
| if not records: | |
| return 0 | |
| index = get_index() | |
| total_upserted = 0 | |
| for i in range(0, len(records), batch_size): | |
| batch = records[i : i + batch_size] | |
| logger.info( | |
| "Upserting %d records into namespace='%s' (batch %d/%d)", | |
| len(batch), | |
| namespace, | |
| i // batch_size + 1, | |
| (len(records) + batch_size - 1) // batch_size, | |
| ) | |
| response = index.upsert_records(namespace=namespace, records=batch) | |
| # The response type may be a dict-like or model; try to read upserted count. | |
| upserted_count = getattr(response, "upserted_count", None) | |
| if upserted_count is None and isinstance(response, dict): | |
| upserted_count = response.get("upserted_count") | |
| if isinstance(upserted_count, int): | |
| total_upserted += upserted_count | |
| else: | |
| # Fallback: assume all batch records were upserted | |
| total_upserted += len(batch) | |
| logger.info( | |
| "Finished upserting %d records into namespace='%s'", total_upserted, namespace | |
| ) | |
| return total_upserted | |
| def search( | |
| namespace: str, | |
| query_text: str, | |
| top_k: int, | |
| filters: Optional[Dict[str, Any]] = None, | |
| fields: Optional[List[str]] = None, | |
| ) -> List[Dict[str, Any]]: | |
| """Search Pinecone using integrated embedding search. | |
| Returns a list of hits, each containing `_id`, `_score`, and `fields`. | |
| """ | |
| index = get_index() | |
| if fields is None: | |
| settings = get_settings() | |
| text_field = settings.PINECONE_TEXT_FIELD | |
| fields = [ | |
| text_field, | |
| "title", | |
| "source", | |
| "url", | |
| "published", | |
| "doc_id", | |
| "chunk_id", | |
| ] | |
| query: Dict[str, Any] = { | |
| "inputs": {"text": query_text}, | |
| "top_k": top_k, | |
| } | |
| if filters: | |
| query["filter"] = filters | |
| logger.info( | |
| "Searching Pinecone namespace='%s' top_k=%d filters=%s", | |
| namespace, | |
| top_k, | |
| filters, | |
| ) | |
| response = index.search(namespace=namespace, query=query, fields=fields) | |
| # The response should match the SearchRecordsResponse shape. | |
| data: Dict[str, Any] | |
| if hasattr(response, "to_dict"): | |
| data = response.to_dict() # type: ignore[assignment] | |
| elif hasattr(response, "model_dump"): | |
| data = response.model_dump() # type: ignore[assignment] | |
| elif isinstance(response, dict): | |
| data = response | |
| else: | |
| # Fallback to __dict__ | |
| data = getattr(response, "__dict__", {}) | |
| result = data.get("result", data) | |
| hits = result.get("hits", []) or result.get("matches", []) | |
| if not isinstance(hits, list): | |
| return [] | |
| return hits # type: ignore[return-value] | |
| def describe_index_stats(namespace_filter: Optional[str] = None) -> Dict[str, Any]: | |
| """Return index statistics, optionally filtered to a specific namespace.""" | |
| index = get_index() | |
| stats = index.describe_index_stats() | |
| # stats.namespaces is a mapping of namespace -> object with vector_count | |
| namespaces: Dict[str, Any] = getattr(stats, "namespaces", {}) or {} | |
| result: Dict[str, Any] = {} | |
| for name, ns_info in namespaces.items(): | |
| if namespace_filter and name != namespace_filter: | |
| continue | |
| vector_count = getattr(ns_info, "vector_count", None) | |
| if vector_count is None and isinstance(ns_info, dict): | |
| vector_count = ns_info.get("vector_count", 0) | |
| result[name] = {"vector_count": int(vector_count or 0)} | |
| return result |