| | """ |
| | Code Search API β v3.0 |
| | ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| | Key architecture changes from v2: |
| | |
| | β’ Model : ONNX fp16 via sentence-transformers backend="onnx" |
| | β ONNX Runtime replaces PyTorch for every forward pass. |
| | β Pre-built onnx/model_fp16.onnx from the HF repo is used |
| | directly β no export step, no trust_remote_code issues. |
| | β All three transformers-compatibility patches removed. |
| | |
| | β’ Storage : LanceDB (disk-backed, columnar, mmap) |
| | β Vectors live on disk, not in Python RAM. |
| | β Chunks stored alongside vectors in the same table β |
| | no separate pickle files. |
| | β FAISS removed entirely. |
| | |
| | β’ Indexing: Streaming pipeline |
| | β Chunks are produced, encoded in micro-batches, and written |
| | to LanceDB immediately. The full embeddings array is never |
| | held in RAM. |
| | |
| | β’ Retrieval: On-demand table loading + LRU cache |
| | β Tables are opened from disk per request. |
| | β An LRU cache (default: 5 tables, TTL: 10 min) keeps |
| | recently used handles warm without pinning everything. |
| | |
| | β’ RAM budget (approximate, CPU-only HF Space): |
| | Model weights (fp16 ONNX) ~275 MB |
| | Encoding peak (batch=8) ~100 MB transient |
| | LanceDB per query ~10-50 MB transient |
| | Python overhead ~150 MB |
| | βββββββββββββββββββββββββββββββββββββ |
| | Total steady-state ~425 MB (vs ~16 GB before) |
| | """ |
| |
|
| | import os |
| | import ast |
| | import re |
| | import gc |
| | import time |
| | import pathlib |
| | import asyncio |
| | from collections import OrderedDict |
| | from concurrent.futures import ThreadPoolExecutor |
| | from contextlib import asynccontextmanager |
| | from threading import Lock |
| | from typing import Annotated |
| |
|
| | os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1" |
| | os.environ["TOKENIZERS_PARALLELISM"] = "false" |
| | os.environ["HF_HUB_VERBOSITY"] = "error" |
| | |
| | os.environ.setdefault("OMP_NUM_THREADS", "2") |
| |
|
| | import numpy as np |
| | import lancedb |
| | import pyarrow as pa |
| | from fastapi import FastAPI, HTTPException, UploadFile, File, Form |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from pydantic import BaseModel, Field |
| | from sentence_transformers import SentenceTransformer |
| |
|
| |
|
| | |
| | DIM = 768 |
| | ENCODE_BATCH_SIZE = int(os.getenv("ENCODE_BATCH_SIZE", "8")) |
| | MAX_UPLOAD_BYTES = int(os.getenv("MAX_UPLOAD_MB", "50")) * 1024 * 1024 |
| | MAX_CHUNKS = int(os.getenv("MAX_CHUNKS", "10000")) |
| | LRU_MAXSIZE = int(os.getenv("LRU_TABLE_CACHE", "5")) |
| | LRU_TTL = int(os.getenv("LRU_TTL_SECONDS", "600")) |
| |
|
| | LANGUAGE_MAP = { |
| | ".py": "python", ".js": "javascript", ".ts": "typescript", |
| | ".tsx": "typescript", ".jsx": "javascript", ".go": "go", |
| | ".rs": "rust", ".java": "java", ".cpp": "cpp", |
| | ".c": "c", ".cs": "csharp", ".rb": "ruby", |
| | ".php": "php", ".md": "markdown", ".txt": "text", |
| | } |
| |
|
| | |
| | _SCHEMA = pa.schema([ |
| | pa.field("chunk_id", pa.int32()), |
| | pa.field("text", pa.large_utf8()), |
| | pa.field("vector", pa.list_(pa.float32(), DIM)), |
| | ]) |
| |
|
| |
|
| | |
| | def _resolve_store_dir() -> pathlib.Path: |
| | primary = pathlib.Path("/data/lancedb") |
| | try: |
| | primary.mkdir(parents=True, exist_ok=True) |
| | probe = primary / ".write_probe" |
| | probe.touch(); probe.unlink() |
| | return primary |
| | except OSError: |
| | fallback = pathlib.Path.home() / ".cache" / "code-search" / "lancedb" |
| | fallback.mkdir(parents=True, exist_ok=True) |
| | print(f"Warning: /data/lancedb not writable β using fallback: {fallback}") |
| | return fallback |
| |
|
| | STORE_DIR = _resolve_store_dir() |
| |
|
| |
|
| | |
| | class _LRUTableCache: |
| | """ |
| | Keeps up to `maxsize` LanceDB table handles open in memory. |
| | Entries expire after `ttl` seconds of inactivity. |
| | Opening a LanceDB table is cheap (no vectors loaded into RAM), so |
| | this is primarily about limiting open file-descriptor churn. |
| | """ |
| | def __init__(self, maxsize: int = 5, ttl: int = 600): |
| | self._cache: OrderedDict = OrderedDict() |
| | self._maxsize = maxsize |
| | self._ttl = ttl |
| | self._lock = Lock() |
| |
|
| | def get(self, key: str): |
| | with self._lock: |
| | entry = self._cache.get(key) |
| | if entry is None: |
| | return None |
| | ts, tbl = entry |
| | if time.monotonic() - ts > self._ttl: |
| | del self._cache[key] |
| | return None |
| | self._cache.move_to_end(key) |
| | self._cache[key] = (time.monotonic(), tbl) |
| | return tbl |
| |
|
| | def set(self, key: str, tbl) -> None: |
| | with self._lock: |
| | if key in self._cache: |
| | self._cache.move_to_end(key) |
| | self._cache[key] = (time.monotonic(), tbl) |
| | while len(self._cache) > self._maxsize: |
| | self._cache.popitem(last=False) |
| |
|
| | def evict(self, key: str) -> None: |
| | with self._lock: |
| | self._cache.pop(key, None) |
| |
|
| | def keys(self): |
| | with self._lock: |
| | now = time.monotonic() |
| | return [k for k, (ts, _) in self._cache.items() |
| | if now - ts <= self._ttl] |
| |
|
| | _table_cache = _LRUTableCache(maxsize=LRU_MAXSIZE, ttl=LRU_TTL) |
| |
|
| |
|
| | |
| | models: dict = {} |
| | _executor = ThreadPoolExecutor(max_workers=2) |
| |
|
| |
|
| | |
| | @asynccontextmanager |
| | async def lifespan(app: FastAPI): |
| | print("Loading jina-embeddings-v2-base-code (ONNX fp32)β¦") |
| | import onnxruntime as ort |
| | sess_opts = ort.SessionOptions() |
| | sess_opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
| | sess_opts.intra_op_num_threads = int(os.getenv("OMP_NUM_THREADS", "2")) |
| | |
| | |
| | sess_opts.add_session_config_entry("session.disable_optimizers", "SimplifiedLayerNormFusion") |
| |
|
| | model = SentenceTransformer( |
| | "jinaai/jina-embeddings-v2-base-code", |
| | backend="onnx", |
| | model_kwargs={ |
| | "file_name": "onnx/model.onnx", |
| | "provider": "CPUExecutionProvider", |
| | "session_options": sess_opts, |
| | |
| | }, |
| | trust_remote_code=True, |
| | ) |
| | model.max_seq_length = 8192 |
| | models["model"] = model |
| | print(f"Model ready [backend={model.backend}]") |
| | yield |
| | models.clear() |
| |
|
| |
|
| | |
| | app = FastAPI( |
| | title="Code Search API", |
| | description="Semantic code search β jina-embeddings-v2-base-code ONNX fp16 + LanceDB", |
| | version="3.0.0", |
| | lifespan=lifespan, |
| | ) |
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], |
| | ) |
| |
|
| |
|
| | |
| | def _encode_sync(texts: list[str]) -> np.ndarray: |
| | """ |
| | Synchronous encode via ONNX Runtime. |
| | Processes ENCODE_BATCH_SIZE texts at a time; GC between batches. |
| | Returns float32 array of shape (len(texts), DIM). |
| | Note: no torch.no_grad() needed β ONNX Runtime has no autograd. |
| | """ |
| | parts = [] |
| | for i in range(0, len(texts), ENCODE_BATCH_SIZE): |
| | batch = texts[i : i + ENCODE_BATCH_SIZE] |
| | embs = models["model"].encode( |
| | batch, |
| | show_progress_bar=False, |
| | convert_to_numpy=True, |
| | normalize_embeddings=False, |
| | ) |
| | parts.append(np.asarray(embs, dtype=np.float32)) |
| | gc.collect() |
| | return np.vstack(parts) |
| |
|
| |
|
| | async def _encode_async(texts: list[str]) -> np.ndarray: |
| | loop = asyncio.get_event_loop() |
| | return await loop.run_in_executor(_executor, _encode_sync, texts) |
| |
|
| |
|
| | def _normalize(embs: np.ndarray) -> np.ndarray: |
| | norms = np.linalg.norm(embs, axis=1, keepdims=True) |
| | return embs / np.maximum(norms, 1e-9) |
| |
|
| |
|
| | |
| | def _db() -> lancedb.DBConnection: |
| | return lancedb.connect(str(STORE_DIR)) |
| |
|
| |
|
| | def _table_exists(doc_id: str) -> bool: |
| | return doc_id in _db().table_names() |
| |
|
| |
|
| | def _open_table(doc_id: str): |
| | """Return table handle from LRU cache or open from disk.""" |
| | tbl = _table_cache.get(doc_id) |
| | if tbl is None: |
| | tbl = _db().open_table(doc_id) |
| | _table_cache.set(doc_id, tbl) |
| | return tbl |
| |
|
| |
|
| | async def _build_table_streaming(doc_id: str, chunks: list[str]) -> None: |
| | """ |
| | Streaming index build β the heart of the memory optimisation. |
| | |
| | Instead of: chunk_all β encode_all β build_index (full array in RAM) |
| | We do: for each micro-batch β encode β write to LanceDB β free |
| | |
| | Peak RAM = one micro-batch of embeddings (8 Γ 768 Γ 4 bytes β 24 KB). |
| | LanceDB stores vectors as a memory-mapped Lance file on disk; only |
| | the pages touched during a query are paged into RAM at search time. |
| | """ |
| | db = _db() |
| | |
| | if doc_id in db.table_names(): |
| | db.drop_table(doc_id) |
| | _table_cache.evict(doc_id) |
| |
|
| | tbl = None |
| | for i in range(0, len(chunks), ENCODE_BATCH_SIZE): |
| | batch = chunks[i : i + ENCODE_BATCH_SIZE] |
| | embs = await _encode_async(batch) |
| | embs = _normalize(embs) |
| |
|
| | records = [ |
| | { |
| | "chunk_id": i + j, |
| | "text": text, |
| | "vector": vec.tolist(), |
| | } |
| | for j, (text, vec) in enumerate(zip(batch, embs)) |
| | ] |
| |
|
| | if tbl is None: |
| | tbl = db.create_table(doc_id, data=records, |
| | schema=_SCHEMA, mode="overwrite") |
| | else: |
| | tbl.add(records) |
| |
|
| | del embs, records |
| | gc.collect() |
| |
|
| | |
| | if tbl is not None and len(chunks) >= 256: |
| | try: |
| | tbl.create_index( |
| | metric="dot", |
| | vector_column_name="vector", |
| | num_partitions=max(1, min(256, len(chunks) // 40)), |
| | num_sub_vectors=96, |
| | ) |
| | except Exception as e: |
| | print(f"Warning: ANN index creation skipped for '{doc_id}': {e}") |
| |
|
| | if tbl is not None: |
| | _table_cache.set(doc_id, tbl) |
| |
|
| |
|
| | def _search_table(doc_id: str, query: str, top_k: int) -> list[dict]: |
| | """ |
| | On-demand search. Opens the table handle (from LRU cache or disk), |
| | runs a vector search, returns top_k results. Only the pages of the |
| | Lance file containing the nearest vectors are paged into RAM. |
| | """ |
| | q_emb = _encode_sync([query]) |
| | q_emb = _normalize(q_emb)[0] |
| |
|
| | tbl = _open_table(doc_id) |
| | results = ( |
| | tbl.search(q_emb.tolist(), vector_column_name="vector") |
| | .metric("dot") |
| | .limit(top_k) |
| | .to_list() |
| | ) |
| |
|
| | return [ |
| | { |
| | "rank": i + 1, |
| | "score": round(float(r.get("_distance", r.get("score", 0.0))), 4), |
| | "text": r["text"], |
| | } |
| | for i, r in enumerate(results) |
| | ] |
| |
|
| |
|
| | |
| | def detect_language(filename: str) -> str: |
| | return LANGUAGE_MAP.get(os.path.splitext(filename)[-1].lower(), "text") |
| |
|
| |
|
| | def chunk_text(text: str, chunk_size: int = 3, overlap: int = 1) -> list[str]: |
| | sentences = re.split(r'(?<=[.!?])\s+', text.strip()) |
| | sentences = [s.strip() for s in sentences if s.strip()] |
| | chunks, i = [], 0 |
| | while i < len(sentences): |
| | chunks.append(" ".join(sentences[i : i + chunk_size])) |
| | i += max(1, chunk_size - overlap) |
| | return chunks |
| |
|
| |
|
| | def chunk_fallback(source: str, max_lines: int = 40, overlap: int = 5) -> list[str]: |
| | lines = source.splitlines() |
| | chunks = [] |
| | i = 0 |
| | while i < len(lines): |
| | chunks.append("\n".join(lines[i : i + max_lines])) |
| | i += max(1, max_lines - overlap) |
| | return chunks |
| |
|
| |
|
| | def chunk_python(source: str, filepath: str = "") -> list[str]: |
| | try: |
| | tree = ast.parse(source) |
| | lines = source.splitlines() |
| | chunks = [] |
| | for node in ast.walk(tree): |
| | if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): |
| | snippet = "\n".join(lines[node.lineno - 1 : node.end_lineno]) |
| | prefix = f"# {filepath}\n" if filepath else "" |
| | chunks.append(f"{prefix}{snippet}") |
| | return chunks if chunks else chunk_fallback(source) |
| | except SyntaxError: |
| | return chunk_fallback(source) |
| |
|
| |
|
| | def chunk_generic(source: str, filepath: str = "") -> list[str]: |
| | pattern = re.compile( |
| | r'(?:^|\n)(?=' |
| | r'(?:export\s+)?(?:async\s+)?' |
| | r'(?:function|class|const\s+\w+\s*=\s*(?:async\s+)?(?:\(|function)|' |
| | r'(?:public|private|protected|static|\s)*(?:fn|func|def)\s+\w+)' |
| | r')', |
| | re.MULTILINE, |
| | ) |
| | parts = pattern.split(source) |
| | prefix = f"// {filepath}\n" if filepath else "" |
| | chunks = [prefix + p.strip() for p in parts if p.strip()] |
| | return chunks if chunks else chunk_fallback(source) |
| |
|
| |
|
| | def chunk_code(source: str, filename: str = "") -> list[str]: |
| | lang = detect_language(filename) |
| | if lang == "python": |
| | return chunk_python(source, filepath=filename) |
| | elif lang in ("markdown", "text"): |
| | return chunk_text(source) |
| | else: |
| | return chunk_generic(source, filepath=filename) |
| |
|
| |
|
| | |
| | class IndexResponse(BaseModel): |
| | doc_id: str; chunks_indexed: int; message: str |
| |
|
| | class SearchRequest(BaseModel): |
| | doc_id: str = Field(...); query: str = Field(...); top_k: int = Field(5, ge=1, le=20) |
| |
|
| | class SearchResult(BaseModel): |
| | rank: int; score: float; text: str |
| |
|
| | class SearchResponse(BaseModel): |
| | doc_id: str; query: str; results: list[SearchResult] |
| |
|
| | class EmbedRequest(BaseModel): |
| | texts: list[str] = Field(...) |
| |
|
| | class EmbedResponse(BaseModel): |
| | embeddings: list[list[float]]; dimensions: int |
| |
|
| | class FileEntry(BaseModel): |
| | filename: str; content: str |
| |
|
| | class BatchIndexRequest(BaseModel): |
| | doc_id: str; files: list[FileEntry]; replace: bool = True |
| |
|
| | class BatchIndexResponse(BaseModel): |
| | doc_id: str; files_indexed: int; chunks_indexed: int |
| |
|
| |
|
| | |
| | @app.get("/", tags=["health"]) |
| | def root(): |
| | return {"status": "ok", "docs": "/docs"} |
| |
|
| |
|
| | @app.get("/health", tags=["health"]) |
| | def health(): |
| | return {"status": "ok", "models_loaded": bool(models), |
| | "backend": models["model"].backend if models else None} |
| |
|
| |
|
| | @app.post("/index", response_model=IndexResponse, tags=["search"]) |
| | async def index_document( |
| | file: Annotated[UploadFile, File(description="Source file to index")], |
| | doc_id: Annotated[str, Form(description="Unique ID (defaults to filename)")] = "", |
| | ): |
| | if not models: |
| | raise HTTPException(503, "Model not loaded yet.") |
| |
|
| | content = await file.read() |
| | if len(content) > MAX_UPLOAD_BYTES: |
| | raise HTTPException(413, |
| | f"File too large ({len(content)/1024/1024:.1f} MB). " |
| | f"Max: {MAX_UPLOAD_BYTES//1024//1024} MB.") |
| |
|
| | source = content.decode("utf-8", errors="replace") |
| | filename = file.filename or "unknown" |
| | resolved_id = doc_id.strip() or os.path.splitext(filename)[0] |
| |
|
| | chunks = chunk_code(source, filename=filename) |
| | if not chunks: |
| | raise HTTPException(400, "Document produced no chunks.") |
| |
|
| | await _build_table_streaming(resolved_id, chunks) |
| | gc.collect() |
| |
|
| | return IndexResponse( |
| | doc_id=resolved_id, |
| | chunks_indexed=len(chunks), |
| | message=f"Document '{resolved_id}' indexed successfully.", |
| | ) |
| |
|
| |
|
| | @app.post("/index/batch", response_model=BatchIndexResponse, tags=["search"]) |
| | async def index_batch(req: BatchIndexRequest): |
| | if not models: |
| | raise HTTPException(503, "Model not loaded yet.") |
| |
|
| | |
| | all_chunks: list[str] = [] |
| | for entry in req.files: |
| | all_chunks.extend(chunk_code(entry.content, filename=entry.filename)) |
| |
|
| | if not all_chunks: |
| | raise HTTPException(400, "No chunks produced from provided files.") |
| | if len(all_chunks) > MAX_CHUNKS: |
| | raise HTTPException(413, |
| | f"Too many chunks ({len(all_chunks):,}). Max: {MAX_CHUNKS:,}.") |
| |
|
| | |
| | await _build_table_streaming(req.doc_id, all_chunks) |
| | gc.collect() |
| |
|
| | return BatchIndexResponse( |
| | doc_id=req.doc_id, |
| | files_indexed=len(req.files), |
| | chunks_indexed=len(all_chunks), |
| | ) |
| |
|
| |
|
| | @app.post("/search", response_model=SearchResponse, tags=["search"]) |
| | async def search_document(req: SearchRequest): |
| | if not _table_exists(req.doc_id): |
| | raise HTTPException(404, f"doc_id '{req.doc_id}' not found. Call /index first.") |
| |
|
| | loop = asyncio.get_event_loop() |
| | results = await loop.run_in_executor( |
| | _executor, _search_table, req.doc_id, req.query, req.top_k |
| | ) |
| | return SearchResponse( |
| | doc_id=req.doc_id, |
| | query=req.query, |
| | results=[SearchResult(**r) for r in results], |
| | ) |
| |
|
| |
|
| | @app.post("/embed", response_model=EmbedResponse, tags=["embeddings"]) |
| | async def embed_texts(req: EmbedRequest): |
| | if not models: |
| | raise HTTPException(503, "Model not loaded yet.") |
| | if len(req.texts) > 64: |
| | raise HTTPException(400, "Maximum 64 texts per request.") |
| |
|
| | embs = await _encode_async(req.texts) |
| | return EmbedResponse(embeddings=embs.tolist(), dimensions=embs.shape[1]) |
| |
|
| |
|
| | @app.get("/documents", tags=["search"]) |
| | def list_documents(): |
| | db = _db() |
| | docs = [] |
| | for name in db.table_names(): |
| | try: |
| | tbl = db.open_table(name) |
| | count = tbl.count_rows() |
| | docs.append({"doc_id": name, "chunks": count}) |
| | except Exception: |
| | docs.append({"doc_id": name, "chunks": -1}) |
| | return {"documents": docs} |
| |
|
| |
|
| | @app.delete("/documents/{doc_id}", tags=["search"]) |
| | def delete_document(doc_id: str): |
| | if not _table_exists(doc_id): |
| | raise HTTPException(404, f"doc_id '{doc_id}' not found.") |
| | _db().drop_table(doc_id) |
| | _table_cache.evict(doc_id) |
| | return {"deleted": doc_id} |