ollama_server / app.py
APINOW-service's picture
Update app.py
fac5b9e verified
import json
import time
import threading
import uuid
import requests
import tiktoken
# from ollama import ollama as Client
import os
from functools import wraps
from collections import defaultdict
from flask import Flask, request, jsonify, Response
import subprocess
import sys
import requests
import json
import time
import platform
def run_command(command):
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
for line in process.stdout:
print(line)
process.wait()
if process.returncode != 0:
sys.exit(process.returncode)
# ================================
# OLLAMA SETUP
# ================================
class ollama():
def __init__(
self,
base_url='http://localhost:11434'
):
self.base_url = base_url
self.models = []
self.os = platform.system()
def check_install_ollama(self):
try:
subprocess.check_output(["ollama", "--version"])
except:
try:
if self.os == 'Linux':
run_command('apt update -y')
run_command('apt-get install zstd')
run_command("curl -fsSL https://ollama.com/install.sh | sh")
elif self.os == 'Windows':
run_command(
'powershell -Command "iwr https://ollama.com/install.ps1 -UseBasicParsing | iex"'
)
except Exception as e:
print(e)
subprocess.Popen(
["ollama", "serve"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
for _ in range(10):
try:
subprocess.check_output(["ollama", "list"])
return
except:
time.sleep(2)
print("❌ Ollama failed to start")
sys.exit(1)
def pull_model(self,model):
self.check_install_ollama()
url = self.base_url + "/api/pull"
with requests.post(url, json= {"model": model}, stream=True) as response:
for line in response.iter_lines():
if line:
data = json.loads(line.decode())
yield data
def get_model(self):
self.check_install_ollama()
for m in requests.get(self.base_url + "/api/tags").json()["models"]:
self.models.append(str(m['name']))
return list(self.models)
def ollama_chat(self,payloads=None):
self.check_install_ollama()
url = self.base_url + "/api/chat"
payload = {
"stream": True,
}
if payload:
payload.update(payloads) # βœ… cleaner dict merge
in_thinking = False
response = requests.post(url, json=payload, stream=True)
for line in response.iter_lines():
if not line:
continue
data = json.loads(line.decode("utf-8")) # βœ… decode bytes
msg = data.get("message", {})
# βœ… Proper error handling
if "error" in data:
error = data["error"]
raise RuntimeError(error)
# 🧠 Thinking start
if msg.get("thinking"):
if not in_thinking:
in_thinking = True
yield "<think>\n"
yield msg["thinking"]
# πŸ’¬ Final content
elif msg.get("content"):
if in_thinking:
yield "</think>\n"
in_thinking = False
yield msg["content"]
# Stream finished
if data.get("done"):
if in_thinking:
yield "</think>"
break
def create(self,stream:bool=True,**kwords,):
if stream:
return self.ollama_chat(**kwords)
else:
txt = ''
for chunk in self.ollama_chat(
**kwords
):
txt+= chunk
return txt
Client = ollama
# ==========================================
# CONFIGURATION
# ==========================================
OLLAMA_URL = "http://localhost:11434"
HOST = "0.0.0.0"
PORT = 8000
# ---- Multi-user API keys ----
API_KEYS = {
"sk-user1": {"rate_limit": 60}, # 60 requests per minute
"sk-user2": {"rate_limit": 30},
}
# ---- In-memory rate tracking ----
request_log = defaultdict(list)
lock = threading.Lock()
app = Flask(__name__)
# ==========================================
# UTILITY FUNCTIONS
# ==========================================
def count_tokens(messages):
"""
Approximate OpenAI-style token counting.
"""
try:
enc = tiktoken.get_encoding("cl100k_base")
except Exception:
return 0
total = 0
for m in messages:
total += len(enc.encode(m.get("content", "")))
return total
def enforce_stop(text, stop):
"""
Strict stop enforcement like OpenAI.
"""
if not stop:
return text
if isinstance(stop, str):
stop = [stop]
for s in stop:
if s in text:
return text.split(s)[0]
return text
def error_response(message, status=400, error_type="invalid_request_error"):
return jsonify({
"error": {
"message": message,
"type": error_type,
"param": None,
"code": None
}
}), status
# ==========================================
# AUTH MIDDLEWARE
# ==========================================
def require_api_key(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return error_response("Missing API key", 401)
key = auth.split(" ")[1]
if key not in API_KEYS:
return error_response("Invalid API key", 401)
request.api_key = key
return f(*args, **kwargs)
return decorated
# ==========================================
# RATE LIMITING (per key / per minute)
# ==========================================
def check_rate_limit(key):
limit = API_KEYS[key]["rate_limit"]
now = time.time()
with lock:
request_log[key] = [
t for t in request_log[key]
if now - t < 60
]
if len(request_log[key]) >= limit:
return False
request_log[key].append(now)
return True
# ==========================================
# MODELS ENDPOINT
# ==========================================
@app.route("/v1/models", methods=["GET"])
# @require_api_key
def list_models():
try:
response = requests.get(f"{OLLAMA_URL}/api/tags")
data = response.json()
models = [{
"id": m["name"],
"object": "model",
"created": 0,
"owned_by": "local"
} for m in data.get("models", [])]
return jsonify({
"object": "list",
"data": models
})
except Exception as e:
return error_response(str(e), 500)
# ==========================================
# CHAT COMPLETIONS
# ==========================================
@app.route("/v1/chat/completions", methods=["POST"])
# @require_api_key
def chat_completions():
# if not check_rate_limit(request.api_key):
# return error_response("Rate limit exceeded", 429, "rate_limit_error")
data = request.json
if not data:
return error_response("Invalid JSON body")
model = data.get("model")
messages = data.get("messages")
stream = data.get("stream", False)
if not model or not messages:
return error_response("model and messages are required")
# ---- OpenAI param mapping ----
options = {}
if data.get("max_tokens"):
options["num_predict"] = data["max_tokens"]
if data.get("temperature") is not None:
options["temperature"] = data["temperature"]
if data.get("top_p") is not None:
options["top_p"] = data["top_p"]
if data.get("stop"):
stop = data["stop"]
options["stop"] = stop if isinstance(stop, list) else [stop]
else:
stop = None
payload = {
"model": model,
"messages": messages,
"stream": stream,
"options": options
}
request_id = f"chatcmpl-{uuid.uuid4().hex[:24]}"
# ======================================
# STREAM MODE
# ======================================
if stream:
def generate():
try:
for chunk in Client().create(payloads=payload):
chunk_data = {
'id': request_id,
'object': 'chat.completion.chunk',
'choices': [{
'delta': {'content': chunk},
'index': 0,
'finish_reason': None
}]
}
yield f"data: {json.dumps(chunk_data)}\n\n"
# End of stream
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
yield "data: [DONE]\n\n"
return Response(generate(), mimetype="text/event-stream")
# ======================================
# NON-STREAM MODE
# ======================================
try:
full_text = Client().create(stream=False,payloads=payload)
prompt_tokens = count_tokens(messages)
completion_tokens = len(full_text.split())
total_tokens = prompt_tokens + completion_tokens
return jsonify({
"id": request_id,
"object": "chat.completion",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": full_text
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens
}
})
except Exception as e:
return error_response(str(e), 500)
# ==========================================
# EMBEDDINGS
# ==========================================
@app.route("/v1/embeddings", methods=["POST"])
# @require_api_key
def embeddings():
# if not check_rate_limit(request.api_key):
# return error_response("Rate limit exceeded", 429, "rate_limit_error")
data = request.json
if not data or not data.get("model") or not data.get("input"):
return error_response("model and input are required")
try:
response = requests.post(
f"{OLLAMA_URL}/api/embeddings",
json={
"model": data["model"],
"prompt": data["input"]
}
)
embedding = response.json().get("embedding")
return jsonify({
"object": "list",
"data": [{
"object": "embedding",
"embedding": embedding,
"index": 0
}],
"model": data["model"]
})
except Exception as e:
return error_response(str(e), 500)
# ==========================================
# HEALTH CHECK
# ==========================================
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "ok"})
# ==========================================
# MAIN
# ==========================================
if __name__ == "__main__":
client = Client()
models = [
'qwen3-vl:2b',
'deepseek-r1:1.5b',
'vortex/helpingai-lite:latest',
'qwen3:0.6b',
"gemma3:4b"
]
for model in models:
run_command(f"ollama pull {model}")
app.run(host=HOST, port=7860)