import os import json import hashlib import numpy as np from typing import List, Optional, Dict import httpx from dotenv import load_dotenv load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), ".env")) EMBEDDING_API_URL = os.environ.get("EMBEDDING_API_URL", "https://api.siliconflow.cn/v1/embeddings") # EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "Qwen/Qwen3-VL-Embedding-8B") # EMBEDDING_DIM = int(os.environ.get("EMBEDDING_DIM", "4096")) EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "BAAI/bge-m3") EMBEDDING_DIM = int(os.environ.get("EMBEDDING_DIM", "1024")) EMBEDDING_PROVIDER = os.environ.get("EMBEDDING_PROVIDER", "siliconflow") SILICONFLOW_API_KEY = os.environ.get("SILICONFLOW_API_KEY", "") RERANKER_MODEL = os.environ.get("RERANKER_MODEL", "Qwen/Qwen3-VL-Reranker-8B") RERANKER_API_URL = os.environ.get("RERANKER_API_URL", "https://api.siliconflow.cn/v1/rerank") RERANKER_ENABLED = os.environ.get("RERANKER_ENABLED", "true").lower() == "true" # Bypass system proxy for SiliconFlow API calls if "NO_PROXY" not in os.environ: os.environ["NO_PROXY"] = "api.siliconflow.cn" elif "siliconflow" not in os.environ.get("NO_PROXY", ""): os.environ["NO_PROXY"] = os.environ["NO_PROXY"] + ",api.siliconflow.cn" def _build_simple_embedding(text: str, dim: int = 768) -> np.ndarray: """Fallback: deterministic pseudo-embedding based on character hashing. Only for testing when no real embedding API is available.""" h = hashlib.sha512(text.encode("utf-8")).digest() seed = int.from_bytes(h[:4], "big") rng = np.random.RandomState(seed) vec = rng.randn(dim).astype(np.float32) norm = np.linalg.norm(vec) if norm > 0: vec = vec / norm return vec async def get_embeddings_batch(texts: List[str], model: Optional[str] = None) -> List[np.ndarray]: """Generate embeddings for a batch of texts.""" model = model or EMBEDDING_MODEL provider = EMBEDDING_PROVIDER.lower() if provider == "siliconflow": return await _siliconflow_embeddings(texts, model) elif provider == "ollama": return await _ollama_embeddings(texts, model) elif provider == "openai": return await _openai_embeddings(texts, model) else: return [_build_simple_embedding(t, EMBEDDING_DIM) for t in texts] async def _siliconflow_embeddings(texts: List[str], model: str) -> List[np.ndarray]: """Call SiliconFlow (硅基流动) embedding API. API docs: https://docs.siliconflow.cn/api-reference/embeddings Compatible with OpenAI format, supports batch input.""" api_url = EMBEDDING_API_URL or "https://api.siliconflow.cn/v1/embeddings" api_key = SILICONFLOW_API_KEY if not api_key: print("[WARN] SILICONFLOW_API_KEY not set, falling back to pseudo embeddings") return [_build_simple_embedding(t, EMBEDDING_DIM) for t in texts] results = [] try: async with httpx.AsyncClient(timeout=120.0, proxies={}) as client: # SiliconFlow supports batch, but limit to 64 per request for i in range(0, len(texts), 64): batch = texts[i : i + 64] resp = await client.post( api_url, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json={"model": model, "input": batch, "encoding_format": "float"}, ) if resp.status_code == 200: data = resp.json() for item in sorted(data["data"], key=lambda x: x["index"]): vec = np.array(item["embedding"], dtype=np.float32) results.append(vec) else: print(f"[ERROR] SiliconFlow API returned {resp.status_code}: {resp.text[:200]}") results.extend([_build_simple_embedding(t, EMBEDDING_DIM) for t in batch]) except Exception as e: print(f"[ERROR] SiliconFlow API call failed: {e}") results = [_build_simple_embedding(t, EMBEDDING_DIM) for t in texts] return results async def _ollama_embeddings(texts: List[str], model: str) -> List[np.ndarray]: """Call Ollama embedding API.""" results = [] try: async with httpx.AsyncClient(timeout=120.0, proxies={}) as client: for text in texts: resp = await client.post( EMBEDDING_API_URL, json={"model": model, "input": text} ) if resp.status_code == 200: data = resp.json() if "embeddings" in data: vec = np.array(data["embeddings"][0], dtype=np.float32) elif "embedding" in data: vec = np.array(data["embedding"], dtype=np.float32) else: vec = _build_simple_embedding(text, EMBEDDING_DIM) results.append(vec) else: results.append(_build_simple_embedding(text, EMBEDDING_DIM)) except Exception: results = [_build_simple_embedding(t, EMBEDDING_DIM) for t in texts] return results async def _openai_embeddings(texts: List[str], model: str) -> List[np.ndarray]: """Call OpenAI-compatible embedding API (e.g., vLLM).""" api_url = os.environ.get("OPENAI_API_BASE", "http://localhost:8000") + "/v1/embeddings" api_key = os.environ.get("OPENAI_API_KEY", "no-key") results = [] try: async with httpx.AsyncClient(timeout=120.0, proxies={}) as client: resp = await client.post( api_url, headers={"Authorization": f"Bearer {api_key}"}, json={"model": model, "input": texts} ) if resp.status_code == 200: data = resp.json() for item in data["data"]: vec = np.array(item["embedding"], dtype=np.float32) results.append(vec) else: results = [_build_simple_embedding(t, EMBEDDING_DIM) for t in texts] except Exception: results = [_build_simple_embedding(t, EMBEDDING_DIM) for t in texts] return results async def rerank_candidates( query: str, documents: List[str], top_n: Optional[int] = None, model: Optional[str] = None, ) -> List[Dict]: """Call SiliconFlow Reranker API (Qwen/Qwen3-VL-Reranker-8B). Returns list of {"index": int, "relevance_score": float} sorted by score desc.""" model = model or RERANKER_MODEL api_key = SILICONFLOW_API_KEY if not api_key or not RERANKER_ENABLED: return [{"index": i, "relevance_score": 0.0} for i in range(len(documents))] if not documents: return [] top_n = top_n or len(documents) try: async with httpx.AsyncClient(timeout=120.0, proxies={}) as client: resp = await client.post( RERANKER_API_URL, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json={ "model": model, "query": query, "documents": documents, "top_n": top_n, "return_documents": False, }, ) if resp.status_code == 200: data = resp.json() results = data.get("results", []) return sorted(results, key=lambda x: x["relevance_score"], reverse=True) else: print(f"[ERROR] Reranker API returned {resp.status_code}: {resp.text[:200]}") return [{"index": i, "relevance_score": 0.0} for i in range(len(documents))] except Exception as e: print(f"[ERROR] Reranker API call failed: {e}") return [{"index": i, "relevance_score": 0.0} for i in range(len(documents))] def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: """Compute cosine similarity between two vectors.""" norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) if norm_a == 0 or norm_b == 0: return 0.0 return float(np.dot(a, b) / (norm_a * norm_b)) def batch_cosine_similarity(source_vecs: np.ndarray, target_vecs: np.ndarray) -> np.ndarray: """Compute pairwise cosine similarity matrix. source_vecs: (M, D), target_vecs: (N, D) Returns: (M, N) similarity matrix""" source_norms = np.linalg.norm(source_vecs, axis=1, keepdims=True) target_norms = np.linalg.norm(target_vecs, axis=1, keepdims=True) source_norms = np.where(source_norms == 0, 1, source_norms) target_norms = np.where(target_norms == 0, 1, target_norms) source_normed = source_vecs / source_norms target_normed = target_vecs / target_norms return source_normed @ target_normed.T def embedding_to_bytes(vec: np.ndarray) -> bytes: return vec.astype(np.float32).tobytes() def bytes_to_embedding(data: bytes) -> np.ndarray: return np.frombuffer(data, dtype=np.float32) def get_match_level(score: float) -> str: if score >= 0.90: return "high" elif score >= 0.80: return "possible" elif score >= 0.70: return "low_confidence" else: return "no_match"