Text Generation
llama-cpp-python
GGUF
English
code-generation
coding-assistant
llama.cpp
qwen2.5
python
javascript
fine-tuned
conversational
Instructions to use neuralbroker/blitzkode with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- llama-cpp-python
How to use neuralbroker/blitzkode with llama-cpp-python:
# !pip install llama-cpp-python from llama_cpp import Llama llm = Llama.from_pretrained( repo_id="neuralbroker/blitzkode", filename="blitzkode.gguf", )
llm.create_chat_completion( messages = [ { "role": "user", "content": "What is the capital of France?" } ] ) - llama-cpp-python
How to use neuralbroker/blitzkode with llama-cpp-python:
# !pip install llama-cpp-python from llama_cpp import Llama llm = Llama.from_pretrained( repo_id="neuralbroker/blitzkode", filename="blitzkode.gguf", )
llm.create_chat_completion( messages = [ { "role": "user", "content": "What is the capital of France?" } ] ) - Notebooks
- Google Colab
- Kaggle
- Local Apps
- llama.cpp
How to use neuralbroker/blitzkode with llama.cpp:
Install from brew
brew install llama.cpp # Start a local OpenAI-compatible server with a web UI: llama-server -hf neuralbroker/blitzkode # Run inference directly in the terminal: llama-cli -hf neuralbroker/blitzkode
Install from WinGet (Windows)
winget install llama.cpp # Start a local OpenAI-compatible server with a web UI: llama-server -hf neuralbroker/blitzkode # Run inference directly in the terminal: llama-cli -hf neuralbroker/blitzkode
Use pre-built binary
# Download pre-built binary from: # https://github.com/ggerganov/llama.cpp/releases # Start a local OpenAI-compatible server with a web UI: ./llama-server -hf neuralbroker/blitzkode # Run inference directly in the terminal: ./llama-cli -hf neuralbroker/blitzkode
Build from source code
git clone https://github.com/ggerganov/llama.cpp.git cd llama.cpp cmake -B build cmake --build build -j --target llama-server llama-cli # Start a local OpenAI-compatible server with a web UI: ./build/bin/llama-server -hf neuralbroker/blitzkode # Run inference directly in the terminal: ./build/bin/llama-cli -hf neuralbroker/blitzkode
Use Docker
docker model run hf.co/neuralbroker/blitzkode
- LM Studio
- Jan
- vLLM
How to use neuralbroker/blitzkode with vLLM:
Install from pip and serve model
# Install vLLM from pip: pip install vllm # Start the vLLM server: vllm serve "neuralbroker/blitzkode" # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:8000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "neuralbroker/blitzkode", "messages": [ { "role": "user", "content": "What is the capital of France?" } ] }'Use Docker
docker model run hf.co/neuralbroker/blitzkode
- Ollama
How to use neuralbroker/blitzkode with Ollama:
ollama run hf.co/neuralbroker/blitzkode
- Unsloth Studio new
How to use neuralbroker/blitzkode with Unsloth Studio:
Install Unsloth Studio (macOS, Linux, WSL)
curl -fsSL https://unsloth.ai/install.sh | sh # Run unsloth studio unsloth studio -H 0.0.0.0 -p 8888 # Then open http://localhost:8888 in your browser # Search for neuralbroker/blitzkode to start chatting
Install Unsloth Studio (Windows)
irm https://unsloth.ai/install.ps1 | iex # Run unsloth studio unsloth studio -H 0.0.0.0 -p 8888 # Then open http://localhost:8888 in your browser # Search for neuralbroker/blitzkode to start chatting
Using HuggingFace Spaces for Unsloth
# No setup required # Open https://huggingface.co/spaces/unsloth/studio in your browser # Search for neuralbroker/blitzkode to start chatting
- Pi new
How to use neuralbroker/blitzkode with Pi:
Start the llama.cpp server
# Install llama.cpp: brew install llama.cpp # Start a local OpenAI-compatible server: llama-server -hf neuralbroker/blitzkode
Configure the model in Pi
# Install Pi: npm install -g @mariozechner/pi-coding-agent # Add to ~/.pi/agent/models.json: { "providers": { "llama-cpp": { "baseUrl": "http://localhost:8080/v1", "api": "openai-completions", "apiKey": "none", "models": [ { "id": "neuralbroker/blitzkode" } ] } } }Run Pi
# Start Pi in your project directory: pi
- Hermes Agent new
How to use neuralbroker/blitzkode with Hermes Agent:
Start the llama.cpp server
# Install llama.cpp: brew install llama.cpp # Start a local OpenAI-compatible server: llama-server -hf neuralbroker/blitzkode
Configure Hermes
# Install Hermes: curl -fsSL https://hermes-agent.nousresearch.com/install.sh | bash hermes setup # Point Hermes at the local server: hermes config set model.provider custom hermes config set model.base_url http://127.0.0.1:8080/v1 hermes config set model.default neuralbroker/blitzkode
Run Hermes
hermes
- Docker Model Runner
How to use neuralbroker/blitzkode with Docker Model Runner:
docker model run hf.co/neuralbroker/blitzkode
- Lemonade
How to use neuralbroker/blitzkode with Lemonade:
Pull the model
# Download Lemonade from https://lemonade-server.ai/ lemonade pull neuralbroker/blitzkode
Run and chat with the model
lemonade run user.blitzkode-{{QUANT_TAG}}List all available models
lemonade list
| #!/usr/bin/env python3 | |
| """ | |
| BlitzKode backend server. | |
| Serves the bundled frontend and proxies prompts to a local GGUF model | |
| through llama.cpp. Model is loaded lazily so the module stays importable | |
| in tests and environments where the model artifact is not present yet. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import queue | |
| import threading | |
| import time | |
| import urllib.error | |
| import urllib.parse | |
| import urllib.request | |
| from contextlib import asynccontextmanager, suppress | |
| from dataclasses import dataclass | |
| from dataclasses import field as dataclass_field | |
| from pathlib import Path | |
| from typing import Any, Literal, cast | |
| import llama_cpp | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse, StreamingResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel, Field | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| APP_NAME = "BlitzKode" | |
| APP_VERSION = "2.0" | |
| CREATOR = "Sajad" | |
| ROOT_DIR = Path(__file__).resolve().parent | |
| DEFAULT_MODEL_PATH = ROOT_DIR / "blitzkode.gguf" | |
| DEFAULT_FRONTEND_DIST_PATH = ROOT_DIR / "frontend" / "dist" / "index.html" | |
| DEFAULT_CONTEXT = 2048 | |
| DEFAULT_MAX_PROMPT_LENGTH = 4000 | |
| DEFAULT_MAX_TOKENS = 512 | |
| DEFAULT_RATE_LIMIT_MAX = 30 | |
| DEFAULT_MAX_SEARCH_RESULTS = 5 | |
| DEFAULT_SEARCH_TIMEOUT_SECONDS = 8 | |
| DEFAULT_MAX_MESSAGES = 20 | |
| STOP_TOKENS = ["<|im_end|>", "<|im_start|>user"] | |
| SYSTEM_PROMPT = ( | |
| "<|im_start|>system\n" | |
| "You are BlitzKode, an AI coding assistant created by Sajad. " | |
| "You are an expert in Python, JavaScript, Java, C++, and other programming languages. " | |
| "For coding work, first understand the user's goal and constraints, then provide a short plan before code when useful. " | |
| "Do not invent APIs, file contents, citations, or execution results. " | |
| "If evidence is missing, say what is unknown and give a safe next step. " | |
| "Write clean, efficient, and well-documented code. Keep responses concise and practical.<|im_end|>" | |
| ) | |
| logger = logging.getLogger("blitzkode") | |
| def _bool_from_env(name: str, default: bool = False) -> bool: | |
| value = os.getenv(name) | |
| if value is None: | |
| return default | |
| return value.strip().lower() in {"1", "true", "yes", "on"} | |
| def _int_from_env(name: str, default: int) -> int: | |
| value = os.getenv(name) | |
| if not value: | |
| return default | |
| try: | |
| return int(value) | |
| except ValueError: | |
| return default | |
| def _path_from_env(name: str, default: Path) -> Path: | |
| value = os.getenv(name) | |
| return Path(value) if value else default | |
| def _frontend_path_from_env() -> Path: | |
| value = os.getenv("BLITZKODE_FRONTEND_PATH") | |
| if value: | |
| return Path(value) | |
| return DEFAULT_FRONTEND_DIST_PATH | |
| def _validate_prompt(prompt: str, max_length: int) -> tuple[str, JSONResponse | None]: | |
| prompt = prompt.strip() | |
| if not prompt: | |
| return prompt, JSONResponse({"error": "Prompt is required"}, status_code=400) | |
| if len(prompt) > max_length: | |
| return prompt, JSONResponse( | |
| {"error": f"Prompt too long. Max {max_length} chars."}, | |
| status_code=400, | |
| ) | |
| return prompt, None | |
| class Settings: | |
| root_dir: Path = ROOT_DIR | |
| model_path: Path = dataclass_field(default_factory=lambda: _path_from_env("BLITZKODE_MODEL_PATH", DEFAULT_MODEL_PATH)) | |
| frontend_path: Path = dataclass_field(default_factory=_frontend_path_from_env) | |
| host: str = os.getenv("BLITZKODE_HOST", "0.0.0.0") | |
| port: int = _int_from_env("BLITZKODE_PORT", 7860) | |
| n_gpu_layers: int = _int_from_env("BLITZKODE_GPU_LAYERS", 0) | |
| n_ctx: int = _int_from_env("BLITZKODE_N_CTX", DEFAULT_CONTEXT) | |
| n_threads: int = _int_from_env("BLITZKODE_THREADS", max(1, min(8, os.cpu_count() or 1))) | |
| n_batch: int = _int_from_env("BLITZKODE_BATCH", 128) | |
| max_prompt_length: int = _int_from_env("BLITZKODE_MAX_PROMPT_LENGTH", DEFAULT_MAX_PROMPT_LENGTH) | |
| preload_model: bool = _bool_from_env("BLITZKODE_PRELOAD_MODEL", default=False) | |
| cors_origins: str = os.getenv("BLITZKODE_CORS_ORIGINS", "http://localhost:7860") | |
| api_key: str = os.getenv("BLITZKODE_API_KEY", "") | |
| web_search_enabled: bool = _bool_from_env("BLITZKODE_WEB_SEARCH", default=True) | |
| search_timeout_seconds: int = _int_from_env("BLITZKODE_SEARCH_TIMEOUT", DEFAULT_SEARCH_TIMEOUT_SECONDS) | |
| max_search_results: int = _int_from_env("BLITZKODE_MAX_SEARCH_RESULTS", DEFAULT_MAX_SEARCH_RESULTS) | |
| class MessageItem(BaseModel): | |
| role: Literal["user", "assistant"] | |
| content: str = Field(min_length=1, max_length=DEFAULT_MAX_PROMPT_LENGTH) | |
| class GenerateRequest(BaseModel): | |
| prompt: str | |
| messages: list[MessageItem] = Field(default_factory=list, max_length=DEFAULT_MAX_MESSAGES) | |
| temperature: float = Field(default=0.5, ge=0.0, le=2.0) | |
| max_tokens: int = Field(default=256, ge=1, le=DEFAULT_MAX_TOKENS) | |
| top_p: float = Field(default=0.95, gt=0.0, le=1.0) | |
| top_k: int = Field(default=20, ge=1, le=200) | |
| repeat_penalty: float = Field(default=1.05, ge=0.8, le=2.0) | |
| class SearchRequest(BaseModel): | |
| query: str = Field(min_length=1, max_length=500) | |
| max_results: int = Field(default=DEFAULT_MAX_SEARCH_RESULTS, ge=1, le=10) | |
| deep: bool = False | |
| class ResearchGenerateRequest(GenerateRequest): | |
| search_query: str | None = Field(default=None, max_length=500) | |
| search_results: int = Field(default=DEFAULT_MAX_SEARCH_RESULTS, ge=1, le=10) | |
| deep_search: bool = False | |
| class SearchResult: | |
| title: str | |
| url: str | |
| snippet: str | |
| source: str = "DuckDuckGo" | |
| def as_dict(self) -> dict[str, str]: | |
| return { | |
| "title": self.title, | |
| "url": self.url, | |
| "snippet": self.snippet, | |
| "source": self.source, | |
| } | |
| class WebSearchService: | |
| def __init__(self, settings: Settings): | |
| self.settings = settings | |
| def enabled(self) -> bool: | |
| return self.settings.web_search_enabled | |
| def _query_variants(self, query: str, deep: bool) -> list[str]: | |
| query = " ".join(query.split()) | |
| if not deep: | |
| return [query] | |
| return [ | |
| query, | |
| f"{query} official documentation", | |
| f"{query} best practices", | |
| ] | |
| def _append_result( | |
| self, results: list[SearchResult], seen_urls: set[str], title: str, url: str, snippet: str, max_results: int | |
| ) -> None: | |
| title = " ".join((title or "Untitled").split())[:200] | |
| url = (url or "").strip() | |
| snippet = " ".join((snippet or "").split())[:500] | |
| if not url or url in seen_urls or len(results) >= max_results: | |
| return | |
| seen_urls.add(url) | |
| results.append(SearchResult(title=title, url=url, snippet=snippet)) | |
| def _collect_related_topics(self, topics: list[dict], results: list[SearchResult], seen_urls: set[str], max_results: int) -> None: | |
| for topic in topics: | |
| if len(results) >= max_results: | |
| return | |
| if "Topics" in topic: | |
| self._collect_related_topics(topic.get("Topics", []), results, seen_urls, max_results) | |
| continue | |
| text = topic.get("Text", "") | |
| url = topic.get("FirstURL", "") | |
| if text and url: | |
| title = text.split(" - ", 1)[0] | |
| self._append_result(results, seen_urls, title, url, text, max_results) | |
| def search(self, query: str, max_results: int = DEFAULT_MAX_SEARCH_RESULTS, deep: bool = False) -> list[dict[str, str]]: | |
| if not self.enabled: | |
| raise RuntimeError("Web search is disabled. Set BLITZKODE_WEB_SEARCH=true to enable it.") | |
| query = " ".join(query.split()) | |
| if not query: | |
| raise ValueError("Search query is required") | |
| limit = min(max_results, max(1, self.settings.max_search_results), 10) | |
| results: list[SearchResult] = [] | |
| seen_urls: set[str] = set() | |
| for variant in self._query_variants(query, deep): | |
| if len(results) >= limit: | |
| break | |
| params = urllib.parse.urlencode( | |
| { | |
| "q": variant, | |
| "format": "json", | |
| "no_html": "1", | |
| "skip_disambig": "1", | |
| } | |
| ) | |
| request = urllib.request.Request( | |
| f"https://api.duckduckgo.com/?{params}", | |
| headers={"User-Agent": f"{APP_NAME}/{APP_VERSION}"}, | |
| ) | |
| with urllib.request.urlopen(request, timeout=self.settings.search_timeout_seconds) as response: | |
| payload = json.loads(response.read().decode("utf-8")) | |
| self._append_result( | |
| results, | |
| seen_urls, | |
| payload.get("Heading") or variant, | |
| payload.get("AbstractURL", ""), | |
| payload.get("AbstractText", ""), | |
| limit, | |
| ) | |
| self._collect_related_topics(payload.get("RelatedTopics", []), results, seen_urls, limit) | |
| return [result.as_dict() for result in results] | |
| class ModelService: | |
| def __init__(self, settings: Settings): | |
| self.settings = settings | |
| self._llm: llama_cpp.Llama | None = None | |
| self._init_lock = threading.Lock() | |
| self._load_time_seconds: float | None = None | |
| self._last_error: str | None = None | |
| self._busy: bool = False | |
| def model_loaded(self) -> bool: | |
| return self._llm is not None | |
| def model_exists(self) -> bool: | |
| return self.settings.model_path.exists() | |
| def last_error(self) -> str | None: | |
| return self._last_error | |
| def load_time_seconds(self) -> float | None: | |
| return self._load_time_seconds | |
| def busy(self) -> bool: | |
| return self._busy | |
| def load_model(self): | |
| if self._llm is not None: | |
| return self._llm | |
| with self._init_lock: | |
| if self._llm is not None: | |
| return self._llm | |
| if not self.model_exists: | |
| self._last_error = f"Model not found at {self.settings.model_path}" | |
| raise FileNotFoundError(self._last_error) | |
| start_time = time.perf_counter() | |
| try: | |
| self._llm = llama_cpp.Llama( | |
| model_path=str(self.settings.model_path), | |
| n_gpu_layers=self.settings.n_gpu_layers, | |
| n_ctx=self.settings.n_ctx, | |
| n_threads=self.settings.n_threads, | |
| n_batch=self.settings.n_batch, | |
| verbose=False, | |
| use_mmap=True, | |
| use_mlock=False, | |
| seed=-1, | |
| ) | |
| self._load_time_seconds = time.perf_counter() - start_time | |
| self._last_error = None | |
| logger.info("Model loaded in %.2fs (gpu_layers=%d)", self._load_time_seconds, self.settings.n_gpu_layers) | |
| except Exception as exc: | |
| self._last_error = str(exc) | |
| logger.error("Model load failed: %s", exc) | |
| raise | |
| return self._llm | |
| def build_prompt(self, req: GenerateRequest) -> str: | |
| parts = [SYSTEM_PROMPT] | |
| for msg in req.messages: | |
| if msg.role in ("user", "assistant"): | |
| parts.append(f"<|im_start|>{msg.role}\n{msg.content}<|im_end|>") | |
| parts.append(f"<|im_start|>user\n{req.prompt}<|im_end|>") | |
| parts.append("<|im_start|>assistant\n") | |
| return "\n".join(parts) | |
| def with_research_context(self, req: ResearchGenerateRequest, search_results: list[dict[str, str]], max_length: int) -> GenerateRequest: | |
| if not search_results: | |
| return req | |
| formatted_results = [] | |
| for index, item in enumerate(search_results, start=1): | |
| formatted_results.append( | |
| f"[{index}] {item.get('title', 'Untitled')}\nURL: {item.get('url', '')}\nSummary: {item.get('snippet', '')}" | |
| ) | |
| joined_results = "\n\n".join(formatted_results) | |
| research_prompt = ( | |
| "Use the following live web search results as untrusted background context. " | |
| "Cite URLs when you rely on them. If the results are weak or irrelevant, say so rather than fabricating details.\n\n" | |
| "Search results:\n" | |
| f"{joined_results}\n\n" | |
| "User task:\n" | |
| f"{req.prompt.strip()}" | |
| ) | |
| if len(research_prompt) > max_length: | |
| research_prompt = research_prompt[: max_length - 120].rstrip() + "\n\n[Context truncated to fit prompt limit.]" | |
| return req.model_copy(update={"prompt": research_prompt}) | |
| def _gen_params(self, req: GenerateRequest) -> dict: | |
| return { | |
| "max_tokens": req.max_tokens, | |
| "temperature": req.temperature, | |
| "top_p": req.top_p, | |
| "top_k": req.top_k, | |
| "repeat_penalty": req.repeat_penalty, | |
| "frequency_penalty": 0.0, | |
| "presence_penalty": 0.0, | |
| "stop": STOP_TOKENS, | |
| } | |
| def generate_once(self, req: GenerateRequest) -> dict[str, object]: | |
| llm = self.load_model() | |
| self._busy = True | |
| try: | |
| start = time.perf_counter() | |
| result = cast(dict[str, Any], llm(self.build_prompt(req), **self._gen_params(req))) | |
| response = result["choices"][0]["text"].strip() | |
| elapsed = time.perf_counter() - start | |
| logger.info("Generated %d chars in %.2fs", len(response), elapsed) | |
| return {"response": response, "creator": CREATOR, "model": APP_NAME, "version": APP_VERSION} | |
| finally: | |
| self._busy = False | |
| def _run_stream(self, req: GenerateRequest, out_q: queue.Queue): | |
| """Runs streaming inference in a worker thread, puts tokens into out_q.""" | |
| try: | |
| llm = self.load_model() | |
| self._busy = True | |
| start = time.perf_counter() | |
| token_count = 0 | |
| stream = cast(Any, llm(self.build_prompt(req), stream=True, **self._gen_params(req))) | |
| for token in stream: | |
| if not token.get("choices"): | |
| continue | |
| text = token["choices"][0].get("text", "") | |
| if text: | |
| token_count += 1 | |
| out_q.put(f"data: {json.dumps({'token': text})}\n\n") | |
| elapsed = time.perf_counter() - start | |
| logger.info("Streamed %d tokens in %.2fs", token_count, elapsed) | |
| out_q.put("data: [DONE]\n\n") | |
| except Exception as exc: | |
| logger.error("Stream error: %s", exc) | |
| out_q.put(f"data: {json.dumps({'error': str(exc)})}\n\n") | |
| finally: | |
| self._busy = False | |
| out_q.put(None) | |
| def _check_api_key(request: Request, settings: Settings) -> JSONResponse | None: | |
| if not settings.api_key: | |
| return None | |
| auth = request.headers.get("Authorization", "") | |
| token = auth[7:] if auth.startswith("Bearer ") else auth | |
| # Timing-safe comparison (prevent timing attacks) | |
| import hmac | |
| if not hmac.compare_digest(token, settings.api_key): | |
| return JSONResponse({"error": "Unauthorized"}, status_code=401) | |
| return None | |
| class RateLimitMiddleware(BaseHTTPMiddleware): | |
| def __init__(self, app, max_requests: int = DEFAULT_RATE_LIMIT_MAX, window_seconds: int = 60): | |
| super().__init__(app) | |
| self._max = max_requests | |
| self._window = window_seconds | |
| self._clients: dict[str, list[float]] = {} | |
| self._lock = threading.Lock() | |
| self._cleanup_done = 0 | |
| async def dispatch(self, request: Request, call_next): | |
| client_ip = request.client.host if request.client else "unknown" | |
| now = time.monotonic() | |
| # Cleanup old entries periodically (every 1000 requests) | |
| self._cleanup_done += 1 | |
| if self._cleanup_done > 1000: | |
| self._cleanup_done = 0 | |
| with self._lock: | |
| cutoff = now - self._window | |
| self._clients = {ip: [t for t in ts if t >= cutoff] for ip, ts in self._clients.items() if ts} | |
| with self._lock: | |
| timestamps = self._clients.get(client_ip, []) | |
| timestamps = [t for t in timestamps if now - t < self._window] | |
| if len(timestamps) >= self._max: | |
| return JSONResponse( | |
| {"error": "Rate limit exceeded. Try again later."}, | |
| status_code=429, | |
| headers={"Retry-After": str(self._window)}, | |
| ) | |
| timestamps.append(now) | |
| self._clients[client_ip] = timestamps | |
| return await call_next(request) | |
| class RequestSizeLimitMiddleware(BaseHTTPMiddleware): | |
| def __init__(self, app, max_bytes: int = 50_000): | |
| super().__init__(app) | |
| self._max = max_bytes | |
| async def dispatch(self, request: Request, call_next): | |
| content_length = request.headers.get("content-length") | |
| if content_length: | |
| try: | |
| if int(content_length) > self._max: | |
| return JSONResponse({"error": "Request body too large"}, status_code=413) | |
| except ValueError: | |
| return JSONResponse({"error": "Invalid Content-Length header"}, status_code=400) | |
| return await call_next(request) | |
| def create_app(settings: Settings | None = None) -> FastAPI: | |
| settings = settings or Settings() | |
| model_service = ModelService(settings) | |
| search_service = WebSearchService(settings) | |
| model_lock = asyncio.Lock() | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s", datefmt="%H:%M:%S") | |
| async def lifespan(_: FastAPI): | |
| if settings.preload_model: | |
| with suppress(Exception): | |
| await asyncio.to_thread(model_service.load_model) | |
| yield | |
| app = FastAPI(title=f"{APP_NAME} API", version=APP_VERSION, lifespan=lifespan) | |
| app.state.settings = settings | |
| app.state.model_service = model_service | |
| app.state.search_service = search_service | |
| frontend_assets_path = settings.frontend_path.parent / "assets" | |
| if frontend_assets_path.exists(): | |
| app.mount("/assets", StaticFiles(directory=str(frontend_assets_path)), name="frontend-assets") | |
| cors_origins = [o.strip() for o in settings.cors_origins.split(",") if o.strip()] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=cors_origins, | |
| allow_methods=["POST", "GET", "OPTIONS"], | |
| allow_headers=["Content-Type", "Authorization"], | |
| ) | |
| if _bool_from_env("BLITZKODE_RATE_LIMIT", default=True): | |
| app.add_middleware(RateLimitMiddleware, max_requests=_int_from_env("BLITZKODE_RATE_LIMIT_MAX", DEFAULT_RATE_LIMIT_MAX)) | |
| app.add_middleware(RequestSizeLimitMiddleware, max_bytes=_int_from_env("BLITZKODE_MAX_REQUEST_BYTES", 50_000)) | |
| async def root(): | |
| if not settings.frontend_path.exists(): | |
| raise HTTPException(status_code=404, detail="Frontend build is missing. Run `npm install` and `npm run build` in frontend/.") | |
| return FileResponse(str(settings.frontend_path)) | |
| async def health(): | |
| status = "healthy" if model_service.model_exists else "degraded" | |
| return JSONResponse( | |
| { | |
| "status": status, | |
| "model_loaded": model_service.model_loaded, | |
| "model_path": str(settings.model_path), | |
| "model_exists": model_service.model_exists, | |
| "frontend_exists": settings.frontend_path.exists(), | |
| "version": APP_VERSION, | |
| "gpu_layers": settings.n_gpu_layers, | |
| "last_error": model_service.last_error, | |
| "busy": model_service.busy, | |
| } | |
| ) | |
| async def generate(req: GenerateRequest, request: Request): | |
| auth_err = _check_api_key(request, settings) | |
| if auth_err: | |
| return auth_err | |
| prompt, err = _validate_prompt(req.prompt, settings.max_prompt_length) | |
| if err: | |
| return err | |
| async with model_lock: | |
| try: | |
| sanitized = req.model_copy(update={"prompt": prompt}) | |
| payload = await asyncio.to_thread(model_service.generate_once, sanitized) | |
| return JSONResponse(payload) | |
| except FileNotFoundError as exc: | |
| return JSONResponse({"error": str(exc)}, status_code=503) | |
| except Exception as exc: | |
| return JSONResponse({"error": str(exc)}, status_code=500) | |
| async def generate_research(req: ResearchGenerateRequest, request: Request): | |
| auth_err = _check_api_key(request, settings) | |
| if auth_err: | |
| return auth_err | |
| prompt, err = _validate_prompt(req.prompt, settings.max_prompt_length) | |
| if err: | |
| return err | |
| if not search_service.enabled: | |
| return JSONResponse({"error": "Web search is disabled"}, status_code=503) | |
| search_query = (req.search_query or prompt).strip() | |
| try: | |
| results = await asyncio.to_thread(search_service.search, search_query, req.search_results, req.deep_search) | |
| sanitized = req.model_copy(update={"prompt": prompt}) | |
| enriched = model_service.with_research_context(sanitized, results, settings.max_prompt_length) | |
| async with model_lock: | |
| payload = await asyncio.to_thread(model_service.generate_once, enriched) | |
| payload["search_results"] = results | |
| return JSONResponse(payload) | |
| except FileNotFoundError as exc: | |
| return JSONResponse({"error": str(exc)}, status_code=503) | |
| except (RuntimeError, urllib.error.URLError, TimeoutError) as exc: | |
| return JSONResponse({"error": f"Search failed: {exc}"}, status_code=502) | |
| except Exception as exc: | |
| return JSONResponse({"error": str(exc)}, status_code=500) | |
| async def generate_stream(req: GenerateRequest, request: Request): | |
| auth_err = _check_api_key(request, settings) | |
| if auth_err: | |
| return auth_err | |
| prompt, err = _validate_prompt(req.prompt, settings.max_prompt_length) | |
| if err: | |
| return err | |
| if not model_service.model_exists: | |
| return JSONResponse({"error": f"Model not found at {settings.model_path}"}, status_code=503) | |
| sanitized = req.model_copy(update={"prompt": prompt}) | |
| async def _locked_stream(): | |
| async with model_lock: | |
| token_q: queue.Queue = queue.Queue() | |
| thread = threading.Thread( | |
| target=model_service._run_stream, | |
| args=(sanitized, token_q), | |
| daemon=True, | |
| ) | |
| thread.start() | |
| # Use thread-safe queue.get() instead of deprecated get_running_loop() | |
| while True: | |
| chunk = await asyncio.to_thread(token_q.get) | |
| if chunk is None: | |
| break | |
| yield chunk | |
| return StreamingResponse( | |
| _locked_stream(), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}, | |
| ) | |
| async def search_web(req: SearchRequest, request: Request): | |
| auth_err = _check_api_key(request, settings) | |
| if auth_err: | |
| return auth_err | |
| if not search_service.enabled: | |
| return JSONResponse({"error": "Web search is disabled"}, status_code=503) | |
| try: | |
| results = await asyncio.to_thread(search_service.search, req.query, req.max_results, req.deep) | |
| return JSONResponse({"query": req.query.strip(), "deep": req.deep, "results": results}) | |
| except (RuntimeError, urllib.error.URLError, TimeoutError) as exc: | |
| return JSONResponse({"error": f"Search failed: {exc}"}, status_code=502) | |
| except Exception as exc: | |
| return JSONResponse({"error": str(exc)}, status_code=500) | |
| async def info(): | |
| return JSONResponse( | |
| { | |
| "name": APP_NAME, | |
| "creator": CREATOR, | |
| "version": APP_VERSION, | |
| "status": "ready" if model_service.model_exists else "model-missing", | |
| "mode": f"{'GPU' if settings.n_gpu_layers > 0 else 'CPU'} (llama.cpp)", | |
| "gpu_layers": settings.n_gpu_layers, | |
| "context_window": settings.n_ctx, | |
| "model_loaded": model_service.model_loaded, | |
| "load_time_seconds": model_service.load_time_seconds, | |
| "busy": model_service.busy, | |
| "web_search_enabled": search_service.enabled, | |
| "endpoints": { | |
| "generate": "POST /generate", | |
| "research_generate": "POST /generate/research", | |
| "stream": "POST /generate/stream", | |
| "search": "POST /search/web", | |
| "health": "GET /health", | |
| "info": "GET /info", | |
| }, | |
| } | |
| ) | |
| return app | |
| app = create_app() | |
| def main() -> None: | |
| s = Settings() | |
| print(f"\n{'=' * 50}") | |
| print(f"{APP_NAME.upper()} v{APP_VERSION}") | |
| print(f"Creator: {CREATOR}") | |
| print(f"{'=' * 50}") | |
| print(f"Model: {s.model_path}") | |
| print(f"GPU: {s.n_gpu_layers} layers") | |
| print(f"Ctx: {s.n_ctx} | Threads: {s.n_threads}") | |
| print(f"URL: http://localhost:{s.port}\n") | |
| uvicorn.run(app, host=s.host, port=s.port, log_level="warning") | |
| if __name__ == "__main__": | |
| main() | |