Spaces:
Sleeping
Sleeping
| import time | |
| from typing import Any | |
| import base64 | |
| import numpy as np | |
| import torch | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import HTMLResponse | |
| from pydantic import BaseModel, ConfigDict | |
| from sentence_transformers import SentenceTransformer | |
| torch.set_grad_enabled(False) | |
| torch.set_num_threads(2) | |
| APP_TITLE = "ollama-code-embed" | |
| MODEL_ID = "jinaai/jina-code-embeddings-0.5b" | |
| MODEL_NAME = "code-embed" | |
| MODEL_ALIASES = [ | |
| MODEL_NAME, | |
| f"{MODEL_NAME}:latest", | |
| MODEL_ID, | |
| f"{MODEL_ID}:latest", | |
| ] | |
| MODEL_CREATED_AT = "2026-03-11T00:00:00Z" | |
| MODEL_DIMENSIONS = 896 | |
| SERVER_VERSION = "0.11.0" | |
| app = FastAPI(title=APP_TITLE, version="1.0.0") | |
| _model: SentenceTransformer | None = None | |
| _loaded_at_ns: int | None = None | |
| _load_duration_ns: int = 0 | |
| def model_card(name: str) -> dict[str, Any]: | |
| return { | |
| "name": name, | |
| "model": name, | |
| "modified_at": MODEL_CREATED_AT, | |
| "size": 0, | |
| "digest": MODEL_ID, | |
| "details": { | |
| "format": "sentence-transformers", | |
| "family": "jina", | |
| "families": ["jina", "embedding"], | |
| "parameter_size": "0.5B", | |
| "quantization_level": "F32", | |
| }, | |
| } | |
| class CompatibleRequest(BaseModel): | |
| model_config = ConfigDict(extra="allow") | |
| class EmbedRequest(CompatibleRequest): | |
| model: str = MODEL_NAME | |
| input: str | list[str] | None = None | |
| prompt: str | None = None | |
| truncate: bool = True | |
| dimensions: int | None = None | |
| options: dict[str, Any] | None = None | |
| keep_alive: str | int | None = None | |
| class OpenAIEmbeddingRequest(CompatibleRequest): | |
| model: str = MODEL_ID | |
| input: str | list[str] | |
| encoding_format: str = "float" | |
| dimensions: int | None = None | |
| user: str | None = None | |
| def get_model() -> SentenceTransformer: | |
| global _model, _loaded_at_ns, _load_duration_ns | |
| if _model is None: | |
| started = time.perf_counter_ns() | |
| _model = SentenceTransformer(MODEL_ID, trust_remote_code=True, device="cpu") | |
| _load_duration_ns = time.perf_counter_ns() - started | |
| _loaded_at_ns = time.time_ns() | |
| return _model | |
| def preload_model() -> None: | |
| get_model() | |
| def normalize_inputs(request: EmbedRequest) -> list[str]: | |
| if request.input is not None: | |
| return request.input if isinstance(request.input, list) else [request.input] | |
| if request.prompt is not None: | |
| return [request.prompt] | |
| raise HTTPException(status_code=400, detail="Request must include 'input' or 'prompt'") | |
| def normalize_openai_inputs(request: OpenAIEmbeddingRequest) -> list[str]: | |
| return request.input if isinstance(request.input, list) else [request.input] | |
| def maybe_truncate(vector: np.ndarray, dimensions: int | None) -> np.ndarray: | |
| if dimensions is None or dimensions <= 0 or dimensions >= vector.shape[0]: | |
| return vector | |
| truncated = vector[:dimensions] | |
| norm = np.linalg.norm(truncated) | |
| if norm > 0: | |
| truncated = truncated / norm | |
| return truncated | |
| def validate_model_name(model_name: str) -> None: | |
| if model_name not in MODEL_ALIASES: | |
| raise HTTPException(status_code=404, detail=f"Model '{model_name}' not found") | |
| def estimate_prompt_eval_count(texts: list[str], model: SentenceTransformer) -> int: | |
| tokenizer = getattr(model, "tokenizer", None) | |
| if tokenizer is None: | |
| return sum(max(1, len(text.split())) for text in texts) | |
| return sum(len(tokenizer.encode(text, add_special_tokens=True)) for text in texts) | |
| def root() -> str: | |
| return f"""<!doctype html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="utf-8" /> | |
| <meta name="viewport" content="width=device-width, initial-scale=1" /> | |
| <title>{APP_TITLE}</title> | |
| <style> | |
| body {{ font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; margin: 32px; line-height: 1.45; }} | |
| code {{ background: #f4f4f4; padding: 2px 6px; border-radius: 4px; }} | |
| </style> | |
| </head> | |
| <body> | |
| <h1>Ollama-Compatible Code Embeddings</h1> | |
| <p>Model: <code>{MODEL_ID}</code></p> | |
| <p>Served name: <code>{MODEL_NAME}</code></p> | |
| <ul> | |
| <li><code>GET /api/version</code></li> | |
| <li><code>GET /api/tags</code></li> | |
| <li><code>POST /api/embed</code></li> | |
| <li><code>POST /api/embeddings</code></li> | |
| <li><code>POST /embed</code></li> | |
| </ul> | |
| </body> | |
| </html>""" | |
| def health() -> dict[str, float]: | |
| return {"unix": time.time()} | |
| def api_version() -> dict[str, str]: | |
| return {"version": SERVER_VERSION} | |
| def api_tags() -> dict[str, Any]: | |
| return {"models": [model_card(name) for name in MODEL_ALIASES]} | |
| def api_ps() -> dict[str, Any]: | |
| get_model() | |
| now = time.time() | |
| return { | |
| "models": [ | |
| { | |
| "name": MODEL_ID, | |
| "model": MODEL_ID, | |
| "size": 0, | |
| "digest": MODEL_ID, | |
| "details": model_card(MODEL_ID)["details"], | |
| "expires_at": None, | |
| "size_vram": 0, | |
| } | |
| ], | |
| "timestamp": now, | |
| } | |
| def api_show(request: EmbedRequest) -> dict[str, Any]: | |
| validate_model_name(request.model) | |
| return { | |
| "license": "cc-by-nc-4.0", | |
| "modelfile": f"FROM {MODEL_ID}", | |
| "parameters": "embedding-only", | |
| "template": "", | |
| "details": model_card(MODEL_ID)["details"], | |
| "model_info": { | |
| "general.architecture": "sentence-transformer", | |
| "general.name": MODEL_ID, | |
| "embedding.length": MODEL_DIMENSIONS, | |
| }, | |
| } | |
| def v1_models() -> dict[str, Any]: | |
| now = int(time.time()) | |
| return { | |
| "object": "list", | |
| "data": [ | |
| {"id": model_name, "object": "model", "created": now, "owned_by": "chmielvu"} | |
| for model_name in MODEL_ALIASES | |
| ], | |
| } | |
| def embed_impl(request: EmbedRequest) -> dict[str, Any]: | |
| validate_model_name(request.model) | |
| texts = normalize_inputs(request) | |
| model = get_model() | |
| started = time.perf_counter_ns() | |
| vectors = np.asarray(model.encode(texts, convert_to_numpy=True)) | |
| total_duration = time.perf_counter_ns() - started | |
| payload = [maybe_truncate(vector, request.dimensions).astype(np.float32).tolist() for vector in vectors] | |
| return { | |
| "model": request.model, | |
| "embeddings": payload, | |
| "total_duration": total_duration, | |
| "load_duration": _load_duration_ns, | |
| "prompt_eval_count": estimate_prompt_eval_count(texts, model), | |
| } | |
| def api_embed(request: EmbedRequest) -> dict[str, Any]: | |
| return embed_impl(request) | |
| def api_embeddings(request: EmbedRequest) -> dict[str, Any]: | |
| result = embed_impl(request) | |
| first = result["embeddings"][0] if result["embeddings"] else [] | |
| return { | |
| "embedding": first, | |
| "model": result["model"], | |
| "total_duration": result["total_duration"], | |
| "load_duration": result["load_duration"], | |
| "prompt_eval_count": result["prompt_eval_count"], | |
| } | |
| def v1_embeddings(request: OpenAIEmbeddingRequest) -> dict[str, Any]: | |
| validate_model_name(request.model) | |
| texts = normalize_openai_inputs(request) | |
| model = get_model() | |
| started = time.perf_counter_ns() | |
| vectors = np.asarray(model.encode(texts, convert_to_numpy=True)) | |
| total_duration = time.perf_counter_ns() - started | |
| data = [] | |
| for idx, vector in enumerate(vectors): | |
| vector = maybe_truncate(vector, request.dimensions).astype(np.float32) | |
| embedding: list[float] | str | |
| if request.encoding_format == "base64": | |
| embedding = base64.b64encode(vector.tobytes()).decode("ascii") | |
| else: | |
| embedding = vector.tolist() | |
| data.append({"object": "embedding", "index": idx, "embedding": embedding}) | |
| prompt_tokens = estimate_prompt_eval_count(texts, model) | |
| return { | |
| "object": "list", | |
| "model": request.model, | |
| "data": data, | |
| "usage": { | |
| "prompt_tokens": prompt_tokens, | |
| "total_tokens": prompt_tokens, | |
| }, | |
| "load_duration": _load_duration_ns, | |
| "total_duration": total_duration, | |
| } | |