BrejBala's picture
Deploy backend Docker app
e63c592
from __future__ import annotations
from typing import Any, Dict, List, Optional
from pinecone import Pinecone
from app.core.config import Settings, get_settings
from app.core.errors import PineconeIndexConfigError
from app.core.logging import get_logger
logger = get_logger(__name__)
_index: Optional[Any] = None
_pc: Optional[Pinenecone] = None
_default_namespace: str = "dev"
def init_pinecone(settings: Optional[Settings] = None) -> None:
"""Initialise the Pinecone client and Index.
This function should be called once on application startup. It validates
that the configured index is an integrated embedding index so that
`upsert_records` and `search` can be used without local embedding models.
"""
global _index, _pc, _default_namespace
if settings is None:
settings = get_settings()
text_field = settings.PINECONE_TEXT_FIELD.strip()
if not text_field:
raise ValueError("PINECONE_TEXT_FIELD must not be empty")
logger.info(
"Initialising Pinecone client (host targeting). host=%s text_field=%s",
settings.PINECONE_HOST,
text_field,
)
pc = Pinecone(api_key=settings.PINECONE_API_KEY)
# Validate index configuration via control plane using index name.
index_model = pc.describe_index(settings.PINECONE_INDEX_NAME)
embed_config = getattr(index_model, "embed", None)
if not embed_config:
raise PineconeIndexConfigError(
"The configured Pinecone index is not an integrated embedding index.\n"
"Create or reconfigure an index using Pinecone's integrated inference "
"(e.g. via `create_index_for_model` or `configure_index(embed=...)`) so "
"that embeddings are generated server-side. This keeps the backend "
"lightweight without local embedding models."
)
if not getattr(index_model, "status", None) or not getattr(
index_model.status, "ready", False
):
raise PineconeIndexConfigError(
f"Pinecone index '{settings.PINECONE_INDEX_NAME}' is not ready. "
"Please wait for the index to become ready in the Pinecone console."
)
index_host = settings.PINECONE_HOST
logger.info("Connecting to Pinecone index via host %s", index_host)
index = pc.Index(host=index_host)
_pc = pc
_index = index
_default_namespace = settings.PINECONE_NAMESPACE
logger.info(
"Pinecone initialised successfully with namespace=%s",
_default_namespace,
)
def get_index() -> Any:
"""Return the initialised Pinecone Index client."""
if _index is None:
raise RuntimeError("Pinecone index has not been initialised")
return _index
def get_default_namespace() -> str:
return _default_namespace
def upsert_records(
namespace: str, records: List[Dict[str, Any]], batch_size: int = 64
) -> int:
"""Upsert records into Pinecone using the RECORDS API.
Returns the total number of records reported as upserted.
"""
if not records:
return 0
index = get_index()
total_upserted = 0
for i in range(0, len(records), batch_size):
batch = records[i : i + batch_size]
logger.info(
"Upserting %d records into namespace='%s' (batch %d/%d)",
len(batch),
namespace,
i // batch_size + 1,
(len(records) + batch_size - 1) // batch_size,
)
response = index.upsert_records(namespace=namespace, records=batch)
# The response type may be a dict-like or model; try to read upserted count.
upserted_count = getattr(response, "upserted_count", None)
if upserted_count is None and isinstance(response, dict):
upserted_count = response.get("upserted_count")
if isinstance(upserted_count, int):
total_upserted += upserted_count
else:
# Fallback: assume all batch records were upserted
total_upserted += len(batch)
logger.info(
"Finished upserting %d records into namespace='%s'", total_upserted, namespace
)
return total_upserted
def search(
namespace: str,
query_text: str,
top_k: int,
filters: Optional[Dict[str, Any]] = None,
fields: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""Search Pinecone using integrated embedding search.
Returns a list of hits, each containing `_id`, `_score`, and `fields`.
"""
index = get_index()
if fields is None:
settings = get_settings()
text_field = settings.PINECONE_TEXT_FIELD
fields = [
text_field,
"title",
"source",
"url",
"published",
"doc_id",
"chunk_id",
]
query: Dict[str, Any] = {
"inputs": {"text": query_text},
"top_k": top_k,
}
if filters:
query["filter"] = filters
logger.info(
"Searching Pinecone namespace='%s' top_k=%d filters=%s",
namespace,
top_k,
filters,
)
response = index.search(namespace=namespace, query=query, fields=fields)
# The response should match the SearchRecordsResponse shape.
data: Dict[str, Any]
if hasattr(response, "to_dict"):
data = response.to_dict() # type: ignore[assignment]
elif hasattr(response, "model_dump"):
data = response.model_dump() # type: ignore[assignment]
elif isinstance(response, dict):
data = response
else:
# Fallback to __dict__
data = getattr(response, "__dict__", {})
result = data.get("result", data)
hits = result.get("hits", []) or result.get("matches", [])
if not isinstance(hits, list):
return []
return hits # type: ignore[return-value]
def describe_index_stats(namespace_filter: Optional[str] = None) -> Dict[str, Any]:
"""Return index statistics, optionally filtered to a specific namespace."""
index = get_index()
stats = index.describe_index_stats()
# stats.namespaces is a mapping of namespace -> object with vector_count
namespaces: Dict[str, Any] = getattr(stats, "namespaces", {}) or {}
result: Dict[str, Any] = {}
for name, ns_info in namespaces.items():
if namespace_filter and name != namespace_filter:
continue
vector_count = getattr(ns_info, "vector_count", None)
if vector_count is None and isinstance(ns_info, dict):
vector_count = ns_info.get("vector_count", 0)
result[name] = {"vector_count": int(vector_count or 0)}
return result