teryryy's picture
Upload 13 files
010f0b1 verified
import os
import json
import hashlib
import numpy as np
from typing import List, Optional, Dict
import httpx
from dotenv import load_dotenv
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), ".env"))
EMBEDDING_API_URL = os.environ.get("EMBEDDING_API_URL", "https://api.siliconflow.cn/v1/embeddings")
# EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "Qwen/Qwen3-VL-Embedding-8B")
# EMBEDDING_DIM = int(os.environ.get("EMBEDDING_DIM", "4096"))
EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "BAAI/bge-m3")
EMBEDDING_DIM = int(os.environ.get("EMBEDDING_DIM", "1024"))
EMBEDDING_PROVIDER = os.environ.get("EMBEDDING_PROVIDER", "siliconflow")
SILICONFLOW_API_KEY = os.environ.get("SILICONFLOW_API_KEY", "")
RERANKER_MODEL = os.environ.get("RERANKER_MODEL", "Qwen/Qwen3-VL-Reranker-8B")
RERANKER_API_URL = os.environ.get("RERANKER_API_URL", "https://api.siliconflow.cn/v1/rerank")
RERANKER_ENABLED = os.environ.get("RERANKER_ENABLED", "true").lower() == "true"
# Bypass system proxy for SiliconFlow API calls
if "NO_PROXY" not in os.environ:
os.environ["NO_PROXY"] = "api.siliconflow.cn"
elif "siliconflow" not in os.environ.get("NO_PROXY", ""):
os.environ["NO_PROXY"] = os.environ["NO_PROXY"] + ",api.siliconflow.cn"
def _build_simple_embedding(text: str, dim: int = 768) -> np.ndarray:
"""Fallback: deterministic pseudo-embedding based on character hashing.
Only for testing when no real embedding API is available."""
h = hashlib.sha512(text.encode("utf-8")).digest()
seed = int.from_bytes(h[:4], "big")
rng = np.random.RandomState(seed)
vec = rng.randn(dim).astype(np.float32)
norm = np.linalg.norm(vec)
if norm > 0:
vec = vec / norm
return vec
async def get_embeddings_batch(texts: List[str], model: Optional[str] = None) -> List[np.ndarray]:
"""Generate embeddings for a batch of texts."""
model = model or EMBEDDING_MODEL
provider = EMBEDDING_PROVIDER.lower()
if provider == "siliconflow":
return await _siliconflow_embeddings(texts, model)
elif provider == "ollama":
return await _ollama_embeddings(texts, model)
elif provider == "openai":
return await _openai_embeddings(texts, model)
else:
return [_build_simple_embedding(t, EMBEDDING_DIM) for t in texts]
async def _siliconflow_embeddings(texts: List[str], model: str) -> List[np.ndarray]:
"""Call SiliconFlow (硅基流动) embedding API.
API docs: https://docs.siliconflow.cn/api-reference/embeddings
Compatible with OpenAI format, supports batch input."""
api_url = EMBEDDING_API_URL or "https://api.siliconflow.cn/v1/embeddings"
api_key = SILICONFLOW_API_KEY
if not api_key:
print("[WARN] SILICONFLOW_API_KEY not set, falling back to pseudo embeddings")
return [_build_simple_embedding(t, EMBEDDING_DIM) for t in texts]
results = []
try:
async with httpx.AsyncClient(timeout=120.0, proxies={}) as client:
# SiliconFlow supports batch, but limit to 64 per request
for i in range(0, len(texts), 64):
batch = texts[i : i + 64]
resp = await client.post(
api_url,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={"model": model, "input": batch, "encoding_format": "float"},
)
if resp.status_code == 200:
data = resp.json()
for item in sorted(data["data"], key=lambda x: x["index"]):
vec = np.array(item["embedding"], dtype=np.float32)
results.append(vec)
else:
print(f"[ERROR] SiliconFlow API returned {resp.status_code}: {resp.text[:200]}")
results.extend([_build_simple_embedding(t, EMBEDDING_DIM) for t in batch])
except Exception as e:
print(f"[ERROR] SiliconFlow API call failed: {e}")
results = [_build_simple_embedding(t, EMBEDDING_DIM) for t in texts]
return results
async def _ollama_embeddings(texts: List[str], model: str) -> List[np.ndarray]:
"""Call Ollama embedding API."""
results = []
try:
async with httpx.AsyncClient(timeout=120.0, proxies={}) as client:
for text in texts:
resp = await client.post(
EMBEDDING_API_URL,
json={"model": model, "input": text}
)
if resp.status_code == 200:
data = resp.json()
if "embeddings" in data:
vec = np.array(data["embeddings"][0], dtype=np.float32)
elif "embedding" in data:
vec = np.array(data["embedding"], dtype=np.float32)
else:
vec = _build_simple_embedding(text, EMBEDDING_DIM)
results.append(vec)
else:
results.append(_build_simple_embedding(text, EMBEDDING_DIM))
except Exception:
results = [_build_simple_embedding(t, EMBEDDING_DIM) for t in texts]
return results
async def _openai_embeddings(texts: List[str], model: str) -> List[np.ndarray]:
"""Call OpenAI-compatible embedding API (e.g., vLLM)."""
api_url = os.environ.get("OPENAI_API_BASE", "http://localhost:8000") + "/v1/embeddings"
api_key = os.environ.get("OPENAI_API_KEY", "no-key")
results = []
try:
async with httpx.AsyncClient(timeout=120.0, proxies={}) as client:
resp = await client.post(
api_url,
headers={"Authorization": f"Bearer {api_key}"},
json={"model": model, "input": texts}
)
if resp.status_code == 200:
data = resp.json()
for item in data["data"]:
vec = np.array(item["embedding"], dtype=np.float32)
results.append(vec)
else:
results = [_build_simple_embedding(t, EMBEDDING_DIM) for t in texts]
except Exception:
results = [_build_simple_embedding(t, EMBEDDING_DIM) for t in texts]
return results
async def rerank_candidates(
query: str,
documents: List[str],
top_n: Optional[int] = None,
model: Optional[str] = None,
) -> List[Dict]:
"""Call SiliconFlow Reranker API (Qwen/Qwen3-VL-Reranker-8B).
Returns list of {"index": int, "relevance_score": float} sorted by score desc."""
model = model or RERANKER_MODEL
api_key = SILICONFLOW_API_KEY
if not api_key or not RERANKER_ENABLED:
return [{"index": i, "relevance_score": 0.0} for i in range(len(documents))]
if not documents:
return []
top_n = top_n or len(documents)
try:
async with httpx.AsyncClient(timeout=120.0, proxies={}) as client:
resp = await client.post(
RERANKER_API_URL,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"model": model,
"query": query,
"documents": documents,
"top_n": top_n,
"return_documents": False,
},
)
if resp.status_code == 200:
data = resp.json()
results = data.get("results", [])
return sorted(results, key=lambda x: x["relevance_score"], reverse=True)
else:
print(f"[ERROR] Reranker API returned {resp.status_code}: {resp.text[:200]}")
return [{"index": i, "relevance_score": 0.0} for i in range(len(documents))]
except Exception as e:
print(f"[ERROR] Reranker API call failed: {e}")
return [{"index": i, "relevance_score": 0.0} for i in range(len(documents))]
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Compute cosine similarity between two vectors."""
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 0.0
return float(np.dot(a, b) / (norm_a * norm_b))
def batch_cosine_similarity(source_vecs: np.ndarray, target_vecs: np.ndarray) -> np.ndarray:
"""Compute pairwise cosine similarity matrix.
source_vecs: (M, D), target_vecs: (N, D)
Returns: (M, N) similarity matrix"""
source_norms = np.linalg.norm(source_vecs, axis=1, keepdims=True)
target_norms = np.linalg.norm(target_vecs, axis=1, keepdims=True)
source_norms = np.where(source_norms == 0, 1, source_norms)
target_norms = np.where(target_norms == 0, 1, target_norms)
source_normed = source_vecs / source_norms
target_normed = target_vecs / target_norms
return source_normed @ target_normed.T
def embedding_to_bytes(vec: np.ndarray) -> bytes:
return vec.astype(np.float32).tobytes()
def bytes_to_embedding(data: bytes) -> np.ndarray:
return np.frombuffer(data, dtype=np.float32)
def get_match_level(score: float) -> str:
if score >= 0.90:
return "high"
elif score >= 0.80:
return "possible"
elif score >= 0.70:
return "low_confidence"
else:
return "no_match"