likhonsheikh commited on
Commit
ab30b6f
·
verified ·
1 Parent(s): 9436565

Upload app.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. app.py +525 -147
app.py CHANGED
@@ -1,8 +1,9 @@
1
  """
2
  Dual-Compatible API Endpoint (OpenAI + Anthropic)
3
- llama.cpp powered - Qwen2.5-Coder-7B-Instruct Q4_K_M
4
- - OpenAI format: /v1/chat/completions
5
- - Anthropic format: /anthropic/v1/messages
 
6
  """
7
 
8
  import os
@@ -11,13 +12,17 @@ import uuid
11
  import logging
12
  import re
13
  import json
 
 
14
  from datetime import datetime
15
  from logging.handlers import RotatingFileHandler
16
  from typing import List, Optional, Union, Dict, Any, Literal
17
  from contextlib import asynccontextmanager
18
- from threading import Thread
 
 
19
 
20
- from fastapi import FastAPI, HTTPException, Header, Request
21
  from fastapi.responses import StreamingResponse, JSONResponse
22
  from fastapi.middleware.cors import CORSMiddleware
23
  from pydantic import BaseModel, Field
@@ -51,42 +56,313 @@ for uvicorn_logger in ["uvicorn", "uvicorn.error", "uvicorn.access"]:
51
  uv_log.handlers = [file_handler, console_handler]
52
 
53
  logger.info("=" * 60)
54
- logger.info(f"llama.cpp API (OpenAI + Anthropic) Startup at {datetime.now().isoformat()}")
55
  logger.info(f"Log file: {LOG_FILE}")
56
  logger.info("=" * 60)
57
 
58
  # ============== Configuration ==============
59
- MODEL_PATH = "/app/models/qwen2.5-coder-7b-instruct-q4_k_m.gguf"
60
- N_CTX = 8192 # Context window
61
- N_THREADS = 2 # CPU threads
62
- N_BATCH = 128 # Batch size
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
 
64
- llm = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65
 
 
66
  @asynccontextmanager
67
  async def lifespan(app: FastAPI):
68
- global llm
69
- logger.info(f"Loading model: {MODEL_PATH}")
70
- try:
71
- llm = Llama(
72
- model_path=MODEL_PATH,
73
- n_ctx=N_CTX,
74
- n_threads=N_THREADS,
75
- n_batch=N_BATCH,
76
- verbose=True
77
- )
78
- logger.info("Model loaded successfully!")
79
- except Exception as e:
80
- logger.error(f"Failed to load model: {e}", exc_info=True)
81
- raise
 
82
  yield
83
  logger.info("Shutting down...")
84
- del llm
85
 
86
  app = FastAPI(
87
- title="Dual-Compatible API (OpenAI + Anthropic)",
88
- description="llama.cpp powered API with dual SDK compatibility",
89
- version="2.0.0",
90
  lifespan=lifespan
91
  )
92
 
@@ -102,16 +378,17 @@ app.add_middleware(
102
  async def log_requests(request: Request, call_next):
103
  request_id = str(uuid.uuid4())[:8]
104
  start_time = time.time()
105
- logger.info(f"[{request_id}] {request.method} {request.url.path} - Started")
106
- try:
107
- response = await call_next(request)
108
- duration = (time.time() - start_time) * 1000
109
- logger.info(f"[{request_id}] {request.method} {request.url.path} - {response.status_code} ({duration:.2f}ms)")
110
- return response
111
- except Exception as e:
112
- duration = (time.time() - start_time) * 1000
113
- logger.error(f"[{request_id}] {request.method} {request.url.path} - Error: {e} ({duration:.2f}ms)")
114
- raise
 
115
 
116
  # ============================================================
117
  # ANTHROPIC-COMPATIBLE MODELS
@@ -177,10 +454,13 @@ AnthropicToolChoice = Union[AnthropicToolChoiceAuto, AnthropicToolChoiceAny, Ant
177
  class AnthropicMetadata(BaseModel):
178
  user_id: Optional[str] = None
179
 
 
 
 
180
  class AnthropicSystemContent(BaseModel):
181
  type: Literal["text"] = "text"
182
  text: str
183
- cache_control: Optional[Dict[str, str]] = None
184
 
185
  class AnthropicThinkingConfig(BaseModel):
186
  type: Literal["enabled", "disabled"] = "enabled"
@@ -335,6 +615,17 @@ def extract_anthropic_system(system: Optional[Union[str, List[AnthropicSystemCon
335
  texts.append(block.text)
336
  return " ".join(texts)
337
 
 
 
 
 
 
 
 
 
 
 
 
338
  def extract_openai_content(content: Optional[Union[str, List[Dict[str, Any]]]]) -> str:
339
  if content is None:
340
  return ""
@@ -429,15 +720,12 @@ def parse_thinking_response(text: str) -> tuple:
429
  def parse_tool_use(text: str) -> Optional[Dict[str, Any]]:
430
  """Parse tool use from model response"""
431
  try:
432
- # Try to find JSON with "tool" key - handle nested braces
433
- # First try: the entire text might be JSON
434
  text_stripped = text.strip()
435
  if text_stripped.startswith("{") and text_stripped.endswith("}"):
436
  parsed = json.loads(text_stripped)
437
  if "tool" in parsed:
438
  return parsed
439
 
440
- # Second try: find JSON object containing "tool"
441
  brace_count = 0
442
  start_idx = None
443
  for i, char in enumerate(text):
@@ -469,14 +757,24 @@ def generate_id(prefix: str = "msg") -> str:
469
  async def root():
470
  return {
471
  "status": "healthy",
472
- "model": "qwen2.5-coder-7b-instruct-q4_k_m",
473
  "backend": "llama.cpp",
 
 
 
 
 
 
 
 
 
474
  "endpoints": {
475
  "openai": "/v1/chat/completions",
476
  "anthropic": "/anthropic/v1/messages"
477
  },
478
- "features": ["extended-thinking", "streaming", "tool-use", "dual-compatibility"],
479
- "context_length": N_CTX
 
480
  }
481
 
482
  @app.get("/logs")
@@ -491,7 +789,35 @@ async def get_logs(lines: int = 100):
491
 
492
  @app.get("/health")
493
  async def health():
494
- return {"status": "ok", "model_loaded": llm is not None, "backend": "llama.cpp"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
495
 
496
  # ============================================================
497
  # OPENAI-COMPATIBLE ENDPOINTS (/v1)
@@ -499,9 +825,14 @@ async def health():
499
 
500
  @app.get("/v1/models")
501
  async def openai_list_models():
502
- return OpenAIModelList(
503
- data=[OpenAIModel(id="qwen2.5-coder-7b", created=int(time.time()), owned_by="qwen")]
504
- )
 
 
 
 
 
505
 
506
  @app.post("/v1/chat/completions")
507
  async def openai_chat_completions(
@@ -509,13 +840,18 @@ async def openai_chat_completions(
509
  authorization: Optional[str] = Header(None)
510
  ):
511
  chat_id = generate_id("chatcmpl")
512
- logger.info(f"[{chat_id}] OpenAI chat - model: {request.model}, max_tokens: {request.max_tokens}")
 
 
 
 
513
 
514
  try:
 
515
  prompt = format_openai_messages(request.messages)
516
 
517
  if request.stream:
518
- return await openai_stream_response(request, prompt, chat_id)
519
 
520
  stop_tokens = ["<|im_end|>", "<|endoftext|>"]
521
  if request.stop:
@@ -559,56 +895,61 @@ async def openai_chat_completions(
559
  except Exception as e:
560
  logger.error(f"[{chat_id}] Error: {e}", exc_info=True)
561
  raise HTTPException(status_code=500, detail=str(e))
 
 
562
 
563
- async def openai_stream_response(request: OpenAIChatRequest, prompt: str, chat_id: str):
564
  async def generate():
565
- created = int(time.time())
566
-
567
- initial_chunk = {
568
- "id": chat_id,
569
- "object": "chat.completion.chunk",
570
- "created": created,
571
- "model": request.model,
572
- "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}]
573
- }
574
- yield f"data: {json.dumps(initial_chunk)}\n\n"
575
-
576
- stop_tokens = ["<|im_end|>", "<|endoftext|>"]
577
- if request.stop:
578
- if isinstance(request.stop, str):
579
- stop_tokens.append(request.stop)
580
- else:
581
- stop_tokens.extend(request.stop)
582
-
583
- for output in llm(
584
- prompt,
585
- max_tokens=request.max_tokens or 1024,
586
- temperature=request.temperature or 0.7,
587
- top_p=request.top_p or 0.95,
588
- stop=stop_tokens,
589
- stream=True,
590
- echo=False
591
- ):
592
- text = output["choices"][0]["text"]
593
- if text:
594
- chunk = {
595
- "id": chat_id,
596
- "object": "chat.completion.chunk",
597
- "created": created,
598
- "model": request.model,
599
- "choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}]
600
- }
601
- yield f"data: {json.dumps(chunk)}\n\n"
602
-
603
- final_chunk = {
604
- "id": chat_id,
605
- "object": "chat.completion.chunk",
606
- "created": created,
607
- "model": request.model,
608
- "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
609
- }
610
- yield f"data: {json.dumps(final_chunk)}\n\n"
611
- yield "data: [DONE]\n\n"
 
 
 
612
 
613
  return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache"})
614
 
@@ -618,18 +959,20 @@ async def openai_stream_response(request: OpenAIChatRequest, prompt: str, chat_i
618
 
619
  @app.get("/anthropic/v1/models")
620
  async def anthropic_list_models():
621
- return {
622
- "object": "list",
623
- "data": [{
624
- "id": "qwen2.5-coder-7b",
625
  "object": "model",
626
  "created": int(time.time()),
627
  "owned_by": "qwen",
628
- "display_name": "Qwen2.5 Coder 7B Instruct (Q4_K_M)",
629
  "supports_thinking": True,
630
- "supports_tools": True
631
- }]
632
- }
 
 
633
 
634
  @app.post("/anthropic/v1/messages", response_model=AnthropicMessageResponse)
635
  async def anthropic_create_message(
@@ -640,15 +983,36 @@ async def anthropic_create_message(
640
  ):
641
  message_id = generate_id("msg")
642
 
 
 
 
 
 
643
  thinking_enabled = False
644
  budget_tokens = 1024
645
  if request.thinking:
646
  thinking_enabled = request.thinking.type == "enabled"
647
  budget_tokens = request.thinking.budget_tokens or 1024
648
 
649
- logger.info(f"[{message_id}] Anthropic msg - model: {request.model}, max_tokens: {request.max_tokens}, thinking: {thinking_enabled}, tools: {len(request.tools) if request.tools else 0}")
 
 
 
650
 
651
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
652
  prompt = format_anthropic_messages(
653
  request.messages,
654
  request.system,
@@ -657,8 +1021,15 @@ async def anthropic_create_message(
657
  budget_tokens
658
  )
659
 
 
 
 
 
 
 
 
660
  if request.stream:
661
- return await anthropic_stream_response(request, prompt, message_id, thinking_enabled)
662
 
663
  total_max_tokens = request.max_tokens + (budget_tokens if thinking_enabled else 0)
664
 
@@ -707,7 +1078,7 @@ async def anthropic_create_message(
707
  if usage["completion_tokens"] >= total_max_tokens:
708
  stop_reason = "max_tokens"
709
 
710
- logger.info(f"[{message_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}")
711
 
712
  return AnthropicMessageResponse(
713
  id=message_id,
@@ -716,56 +1087,63 @@ async def anthropic_create_message(
716
  stop_reason=stop_reason,
717
  usage=AnthropicUsage(
718
  input_tokens=usage["prompt_tokens"],
719
- output_tokens=usage["completion_tokens"]
 
 
720
  )
721
  )
722
 
723
  except Exception as e:
724
  logger.error(f"[{message_id}] Error: {e}", exc_info=True)
725
  raise HTTPException(status_code=500, detail=str(e))
 
 
726
 
727
- async def anthropic_stream_response(request: AnthropicMessageRequest, prompt: str, message_id: str, thinking_enabled: bool):
728
  async def generate():
729
- start_event = {
730
- "type": "message_start",
731
- "message": {
732
- "id": message_id, "type": "message", "role": "assistant", "content": [],
733
- "model": request.model, "stop_reason": None, "stop_sequence": None,
734
- "usage": {"input_tokens": 0, "output_tokens": 0}
 
 
735
  }
736
- }
737
- yield f"event: message_start\ndata: {json.dumps(start_event)}\n\n"
738
-
739
- # Start text block
740
- yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n"
741
-
742
- stop_tokens = ["<|im_end|>", "<|endoftext|>"]
743
- if request.stop_sequences:
744
- stop_tokens.extend(request.stop_sequences)
745
-
746
- total_tokens = 0
747
- for output in llm(
748
- prompt,
749
- max_tokens=request.max_tokens,
750
- temperature=request.temperature or 0.7,
751
- top_p=request.top_p or 0.95,
752
- stop=stop_tokens,
753
- stream=True,
754
- echo=False
755
- ):
756
- text = output["choices"][0]["text"]
757
- if text:
758
- total_tokens += 1
759
- yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': text}})}\n\n"
760
-
761
- yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n"
762
- yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn'}, 'usage': {'output_tokens': total_tokens}})}\n\n"
763
- yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n"
764
 
765
  return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
766
 
767
  @app.post("/anthropic/v1/messages/count_tokens", response_model=AnthropicTokenCountResponse)
768
  async def anthropic_count_tokens(request: AnthropicTokenCountRequest):
 
769
  prompt = format_anthropic_messages(request.messages, request.system)
770
  tokens = llm.tokenize(prompt.encode())
771
  return AnthropicTokenCountResponse(input_tokens=len(tokens))
 
1
  """
2
  Dual-Compatible API Endpoint (OpenAI + Anthropic)
3
+ llama.cpp powered with advanced features:
4
+ - Request Queue & Rate Limiting
5
+ - Prompt Caching (KV Cache)
6
+ - Multi-Model Hot-Swap
7
  """
8
 
9
  import os
 
12
  import logging
13
  import re
14
  import json
15
+ import asyncio
16
+ import hashlib
17
  from datetime import datetime
18
  from logging.handlers import RotatingFileHandler
19
  from typing import List, Optional, Union, Dict, Any, Literal
20
  from contextlib import asynccontextmanager
21
+ from threading import Thread, Lock
22
+ from collections import OrderedDict
23
+ from dataclasses import dataclass, field
24
 
25
+ from fastapi import FastAPI, HTTPException, Header, Request, BackgroundTasks
26
  from fastapi.responses import StreamingResponse, JSONResponse
27
  from fastapi.middleware.cors import CORSMiddleware
28
  from pydantic import BaseModel, Field
 
56
  uv_log.handlers = [file_handler, console_handler]
57
 
58
  logger.info("=" * 60)
59
+ logger.info(f"llama.cpp API v3.0 Startup at {datetime.now().isoformat()}")
60
  logger.info(f"Log file: {LOG_FILE}")
61
  logger.info("=" * 60)
62
 
63
  # ============== Configuration ==============
64
+ MODELS_DIR = "/app/models"
65
+ N_CTX = 8192
66
+ N_THREADS = 2
67
+ N_BATCH = 128
68
+
69
+ # Model configurations
70
+ MODEL_CONFIGS = {
71
+ "qwen2.5-coder-7b": {
72
+ "path": f"{MODELS_DIR}/qwen2.5-coder-7b-instruct-q4_k_m.gguf",
73
+ "url": "https://huggingface.co/Qwen/Qwen2.5-Coder-7B-Instruct-GGUF/resolve/main/qwen2.5-coder-7b-instruct-q4_k_m.gguf",
74
+ "size": "7B",
75
+ "quantization": "Q4_K_M",
76
+ "default": True
77
+ },
78
+ "qwen2.5-coder-1.5b": {
79
+ "path": f"{MODELS_DIR}/qwen2.5-coder-1.5b-instruct-q8_0.gguf",
80
+ "url": "https://huggingface.co/Qwen/Qwen2.5-Coder-1.5B-Instruct-GGUF/resolve/main/qwen2.5-coder-1.5b-instruct-q8_0.gguf",
81
+ "size": "1.5B",
82
+ "quantization": "Q8_0",
83
+ "default": False
84
+ }
85
+ }
86
+
87
+ # ============== Feature 1: Request Queue ==============
88
+ @dataclass
89
+ class QueuedRequest:
90
+ id: str
91
+ priority: int = 0 # Higher = more priority
92
+ created_at: float = field(default_factory=time.time)
93
+ future: asyncio.Future = field(default_factory=lambda: asyncio.get_event_loop().create_future())
94
+
95
+ class RequestQueue:
96
+ def __init__(self, max_concurrent: int = 1, max_queue_size: int = 50):
97
+ self.max_concurrent = max_concurrent
98
+ self.max_queue_size = max_queue_size
99
+ self.queue: List[QueuedRequest] = []
100
+ self.active_requests = 0
101
+ self.lock = asyncio.Lock()
102
+ self.stats = {
103
+ "total_requests": 0,
104
+ "completed_requests": 0,
105
+ "rejected_requests": 0,
106
+ "avg_wait_time": 0.0
107
+ }
108
+
109
+ async def acquire(self, request_id: str, priority: int = 0) -> int:
110
+ """Add request to queue, return position. Raises if queue full."""
111
+ async with self.lock:
112
+ if len(self.queue) >= self.max_queue_size:
113
+ self.stats["rejected_requests"] += 1
114
+ raise HTTPException(status_code=503, detail="Queue full, try again later")
115
+
116
+ self.stats["total_requests"] += 1
117
+
118
+ if self.active_requests < self.max_concurrent:
119
+ self.active_requests += 1
120
+ return 0 # Immediate processing
121
+
122
+ req = QueuedRequest(id=request_id, priority=priority)
123
+ self.queue.append(req)
124
+ self.queue.sort(key=lambda x: (-x.priority, x.created_at))
125
+ position = self.queue.index(req) + 1
126
+
127
+ logger.info(f"[{request_id}] Queued at position {position}")
128
+ return position
129
+
130
+ async def wait_for_turn(self, request_id: str) -> float:
131
+ """Wait until it's this request's turn. Returns wait time."""
132
+ start = time.time()
133
+ while True:
134
+ async with self.lock:
135
+ # Check if we're first in queue and can proceed
136
+ if self.queue and self.queue[0].id == request_id:
137
+ if self.active_requests < self.max_concurrent:
138
+ self.queue.pop(0)
139
+ self.active_requests += 1
140
+ wait_time = time.time() - start
141
+ # Update rolling average
142
+ self.stats["avg_wait_time"] = (
143
+ self.stats["avg_wait_time"] * 0.9 + wait_time * 0.1
144
+ )
145
+ return wait_time
146
+ await asyncio.sleep(0.1)
147
+
148
+ async def release(self):
149
+ """Release a slot when request completes."""
150
+ async with self.lock:
151
+ self.active_requests = max(0, self.active_requests - 1)
152
+ self.stats["completed_requests"] += 1
153
+
154
+ def get_status(self) -> Dict:
155
+ return {
156
+ "queue_length": len(self.queue),
157
+ "active_requests": self.active_requests,
158
+ "max_concurrent": self.max_concurrent,
159
+ "stats": self.stats
160
+ }
161
+
162
+ def get_position(self, request_id: str) -> Optional[int]:
163
+ for i, req in enumerate(self.queue):
164
+ if req.id == request_id:
165
+ return i + 1
166
+ return None
167
+
168
+ request_queue = RequestQueue(max_concurrent=1, max_queue_size=50)
169
+
170
+ # ============== Feature 2: Prompt Cache ==============
171
+ class PromptCache:
172
+ def __init__(self, max_size: int = 10):
173
+ self.max_size = max_size
174
+ self.cache: OrderedDict[str, Dict] = OrderedDict()
175
+ self.lock = Lock()
176
+ self.stats = {"hits": 0, "misses": 0}
177
+
178
+ def _hash_prompt(self, system: str, tools: Optional[List] = None) -> str:
179
+ """Generate hash for system prompt + tools combination."""
180
+ content = system or ""
181
+ if tools:
182
+ content += json.dumps(tools, sort_keys=True)
183
+ return hashlib.md5(content.encode()).hexdigest()[:16]
184
+
185
+ def get(self, system: str, tools: Optional[List] = None) -> Optional[Dict]:
186
+ """Get cached prompt prefix."""
187
+ with self.lock:
188
+ key = self._hash_prompt(system, tools)
189
+ if key in self.cache:
190
+ self.stats["hits"] += 1
191
+ self.cache.move_to_end(key)
192
+ logger.debug(f"Prompt cache HIT: {key}")
193
+ return self.cache[key]
194
+ self.stats["misses"] += 1
195
+ return None
196
+
197
+ def set(self, system: str, tools: Optional[List], data: Dict):
198
+ """Cache prompt prefix data."""
199
+ with self.lock:
200
+ key = self._hash_prompt(system, tools)
201
+ if len(self.cache) >= self.max_size:
202
+ oldest = next(iter(self.cache))
203
+ del self.cache[oldest]
204
+ logger.debug(f"Prompt cache evicted: {oldest}")
205
+ self.cache[key] = data
206
+ logger.debug(f"Prompt cache SET: {key}")
207
+
208
+ def get_stats(self) -> Dict:
209
+ total = self.stats["hits"] + self.stats["misses"]
210
+ hit_rate = (self.stats["hits"] / total * 100) if total > 0 else 0
211
+ return {
212
+ "size": len(self.cache),
213
+ "max_size": self.max_size,
214
+ "hits": self.stats["hits"],
215
+ "misses": self.stats["misses"],
216
+ "hit_rate": f"{hit_rate:.1f}%"
217
+ }
218
 
219
+ prompt_cache = PromptCache(max_size=10)
220
+
221
+ # ============== Feature 3: Multi-Model Manager ==============
222
+ class ModelManager:
223
+ def __init__(self):
224
+ self.models: Dict[str, Llama] = {}
225
+ self.current_model: Optional[str] = None
226
+ self.lock = Lock()
227
+ self.load_stats: Dict[str, Dict] = {}
228
+
229
+ def load_model(self, model_id: str) -> Llama:
230
+ """Load a model (lazy loading with hot-swap)."""
231
+ with self.lock:
232
+ if model_id in self.models:
233
+ self.current_model = model_id
234
+ return self.models[model_id]
235
+
236
+ if model_id not in MODEL_CONFIGS:
237
+ raise HTTPException(status_code=400, detail=f"Unknown model: {model_id}")
238
+
239
+ config = MODEL_CONFIGS[model_id]
240
+
241
+ # Check if model file exists
242
+ if not os.path.exists(config["path"]):
243
+ raise HTTPException(
244
+ status_code=503,
245
+ detail=f"Model file not found: {model_id}. Available: {list(self.models.keys())}"
246
+ )
247
+
248
+ logger.info(f"Loading model: {model_id}")
249
+ start = time.time()
250
+
251
+ try:
252
+ llm = Llama(
253
+ model_path=config["path"],
254
+ n_ctx=N_CTX,
255
+ n_threads=N_THREADS,
256
+ n_batch=N_BATCH,
257
+ verbose=False
258
+ )
259
+
260
+ load_time = time.time() - start
261
+ self.models[model_id] = llm
262
+ self.current_model = model_id
263
+ self.load_stats[model_id] = {
264
+ "loaded_at": datetime.now().isoformat(),
265
+ "load_time": f"{load_time:.2f}s"
266
+ }
267
+
268
+ logger.info(f"Model {model_id} loaded in {load_time:.2f}s")
269
+ return llm
270
+
271
+ except Exception as e:
272
+ logger.error(f"Failed to load model {model_id}: {e}")
273
+ raise HTTPException(status_code=500, detail=f"Failed to load model: {e}")
274
+
275
+ def get_model(self, model_id: Optional[str] = None) -> Llama:
276
+ """Get a model, loading if necessary."""
277
+ if model_id is None:
278
+ # Use default or current model
279
+ model_id = self.current_model or self._get_default_model()
280
+
281
+ # Normalize model name
282
+ model_id = self._normalize_model_id(model_id)
283
+
284
+ if model_id in self.models:
285
+ return self.models[model_id]
286
+
287
+ return self.load_model(model_id)
288
+
289
+ def _normalize_model_id(self, model_id: str) -> str:
290
+ """Normalize model ID to match config keys."""
291
+ model_id = model_id.lower().strip()
292
+ # Handle common variations
293
+ if "7b" in model_id and "qwen" in model_id:
294
+ return "qwen2.5-coder-7b"
295
+ if "1.5b" in model_id and "qwen" in model_id:
296
+ return "qwen2.5-coder-1.5b"
297
+ # Check if exact match
298
+ if model_id in MODEL_CONFIGS:
299
+ return model_id
300
+ # Default to 7B
301
+ return "qwen2.5-coder-7b"
302
+
303
+ def _get_default_model(self) -> str:
304
+ for model_id, config in MODEL_CONFIGS.items():
305
+ if config.get("default"):
306
+ return model_id
307
+ return list(MODEL_CONFIGS.keys())[0]
308
+
309
+ def list_models(self) -> List[Dict]:
310
+ """List all available models."""
311
+ models = []
312
+ for model_id, config in MODEL_CONFIGS.items():
313
+ models.append({
314
+ "id": model_id,
315
+ "size": config["size"],
316
+ "quantization": config["quantization"],
317
+ "loaded": model_id in self.models,
318
+ "available": os.path.exists(config["path"]),
319
+ "default": config.get("default", False)
320
+ })
321
+ return models
322
+
323
+ def get_stats(self) -> Dict:
324
+ return {
325
+ "current_model": self.current_model,
326
+ "loaded_models": list(self.models.keys()),
327
+ "load_stats": self.load_stats
328
+ }
329
+
330
+ def unload_model(self, model_id: str):
331
+ """Unload a model to free memory."""
332
+ with self.lock:
333
+ if model_id in self.models:
334
+ del self.models[model_id]
335
+ if self.current_model == model_id:
336
+ self.current_model = None
337
+ logger.info(f"Model {model_id} unloaded")
338
+
339
+ model_manager = ModelManager()
340
 
341
+ # ============== App Initialization ==============
342
  @asynccontextmanager
343
  async def lifespan(app: FastAPI):
344
+ # Load default model on startup
345
+ default_model = None
346
+ for model_id, config in MODEL_CONFIGS.items():
347
+ if config.get("default") and os.path.exists(config["path"]):
348
+ default_model = model_id
349
+ break
350
+
351
+ if default_model:
352
+ try:
353
+ model_manager.load_model(default_model)
354
+ except Exception as e:
355
+ logger.error(f"Failed to load default model: {e}")
356
+ else:
357
+ logger.warning("No default model found, will load on first request")
358
+
359
  yield
360
  logger.info("Shutting down...")
 
361
 
362
  app = FastAPI(
363
+ title="Dual-Compatible API (OpenAI + Anthropic) v3.0",
364
+ description="llama.cpp API with Queue, Caching, and Multi-Model support",
365
+ version="3.0.0",
366
  lifespan=lifespan
367
  )
368
 
 
378
  async def log_requests(request: Request, call_next):
379
  request_id = str(uuid.uuid4())[:8]
380
  start_time = time.time()
381
+
382
+ # Add request ID to headers for tracking
383
+ response = await call_next(request)
384
+
385
+ duration = (time.time() - start_time) * 1000
386
+ logger.info(f"[{request_id}] {request.method} {request.url.path} - {response.status_code} ({duration:.2f}ms)")
387
+
388
+ response.headers["X-Request-ID"] = request_id
389
+ response.headers["X-Processing-Time"] = f"{duration:.2f}ms"
390
+
391
+ return response
392
 
393
  # ============================================================
394
  # ANTHROPIC-COMPATIBLE MODELS
 
454
  class AnthropicMetadata(BaseModel):
455
  user_id: Optional[str] = None
456
 
457
+ class AnthropicCacheControl(BaseModel):
458
+ type: Literal["ephemeral"] = "ephemeral"
459
+
460
  class AnthropicSystemContent(BaseModel):
461
  type: Literal["text"] = "text"
462
  text: str
463
+ cache_control: Optional[AnthropicCacheControl] = None
464
 
465
  class AnthropicThinkingConfig(BaseModel):
466
  type: Literal["enabled", "disabled"] = "enabled"
 
615
  texts.append(block.text)
616
  return " ".join(texts)
617
 
618
+ def check_cache_control(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> bool:
619
+ """Check if cache_control is set to ephemeral."""
620
+ if system is None or isinstance(system, str):
621
+ return False
622
+ for block in system:
623
+ if isinstance(block, dict) and block.get("cache_control", {}).get("type") == "ephemeral":
624
+ return True
625
+ elif hasattr(block, "cache_control") and block.cache_control and block.cache_control.type == "ephemeral":
626
+ return True
627
+ return False
628
+
629
  def extract_openai_content(content: Optional[Union[str, List[Dict[str, Any]]]]) -> str:
630
  if content is None:
631
  return ""
 
720
  def parse_tool_use(text: str) -> Optional[Dict[str, Any]]:
721
  """Parse tool use from model response"""
722
  try:
 
 
723
  text_stripped = text.strip()
724
  if text_stripped.startswith("{") and text_stripped.endswith("}"):
725
  parsed = json.loads(text_stripped)
726
  if "tool" in parsed:
727
  return parsed
728
 
 
729
  brace_count = 0
730
  start_idx = None
731
  for i, char in enumerate(text):
 
757
  async def root():
758
  return {
759
  "status": "healthy",
760
+ "version": "3.0.0",
761
  "backend": "llama.cpp",
762
+ "features": [
763
+ "request-queue",
764
+ "prompt-caching",
765
+ "multi-model",
766
+ "extended-thinking",
767
+ "streaming",
768
+ "tool-use",
769
+ "dual-compatibility"
770
+ ],
771
  "endpoints": {
772
  "openai": "/v1/chat/completions",
773
  "anthropic": "/anthropic/v1/messages"
774
  },
775
+ "models": model_manager.list_models(),
776
+ "queue": request_queue.get_status(),
777
+ "cache": prompt_cache.get_stats()
778
  }
779
 
780
  @app.get("/logs")
 
789
 
790
  @app.get("/health")
791
  async def health():
792
+ return {
793
+ "status": "ok",
794
+ "models": model_manager.get_stats(),
795
+ "queue": request_queue.get_status(),
796
+ "cache": prompt_cache.get_stats()
797
+ }
798
+
799
+ @app.get("/queue/status")
800
+ async def queue_status():
801
+ return request_queue.get_status()
802
+
803
+ @app.get("/models/status")
804
+ async def models_status():
805
+ return {
806
+ "models": model_manager.list_models(),
807
+ "stats": model_manager.get_stats()
808
+ }
809
+
810
+ @app.post("/models/{model_id}/load")
811
+ async def load_model(model_id: str):
812
+ """Manually load a model."""
813
+ model_manager.load_model(model_id)
814
+ return {"status": "loaded", "model": model_id}
815
+
816
+ @app.post("/models/{model_id}/unload")
817
+ async def unload_model(model_id: str):
818
+ """Unload a model to free memory."""
819
+ model_manager.unload_model(model_id)
820
+ return {"status": "unloaded", "model": model_id}
821
 
822
  # ============================================================
823
  # OPENAI-COMPATIBLE ENDPOINTS (/v1)
 
825
 
826
  @app.get("/v1/models")
827
  async def openai_list_models():
828
+ models = []
829
+ for model_id, config in MODEL_CONFIGS.items():
830
+ models.append(OpenAIModel(
831
+ id=model_id,
832
+ created=int(time.time()),
833
+ owned_by="qwen"
834
+ ))
835
+ return OpenAIModelList(data=models)
836
 
837
  @app.post("/v1/chat/completions")
838
  async def openai_chat_completions(
 
840
  authorization: Optional[str] = Header(None)
841
  ):
842
  chat_id = generate_id("chatcmpl")
843
+
844
+ # Queue management
845
+ position = await request_queue.acquire(chat_id)
846
+ if position > 0:
847
+ await request_queue.wait_for_turn(chat_id)
848
 
849
  try:
850
+ llm = model_manager.get_model(request.model)
851
  prompt = format_openai_messages(request.messages)
852
 
853
  if request.stream:
854
+ return await openai_stream_response(request, prompt, chat_id, llm)
855
 
856
  stop_tokens = ["<|im_end|>", "<|endoftext|>"]
857
  if request.stop:
 
895
  except Exception as e:
896
  logger.error(f"[{chat_id}] Error: {e}", exc_info=True)
897
  raise HTTPException(status_code=500, detail=str(e))
898
+ finally:
899
+ await request_queue.release()
900
 
901
+ async def openai_stream_response(request: OpenAIChatRequest, prompt: str, chat_id: str, llm: Llama):
902
  async def generate():
903
+ try:
904
+ created = int(time.time())
905
+
906
+ initial_chunk = {
907
+ "id": chat_id,
908
+ "object": "chat.completion.chunk",
909
+ "created": created,
910
+ "model": request.model,
911
+ "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}]
912
+ }
913
+ yield f"data: {json.dumps(initial_chunk)}\n\n"
914
+
915
+ stop_tokens = ["<|im_end|>", "<|endoftext|>"]
916
+ if request.stop:
917
+ if isinstance(request.stop, str):
918
+ stop_tokens.append(request.stop)
919
+ else:
920
+ stop_tokens.extend(request.stop)
921
+
922
+ for output in llm(
923
+ prompt,
924
+ max_tokens=request.max_tokens or 1024,
925
+ temperature=request.temperature or 0.7,
926
+ top_p=request.top_p or 0.95,
927
+ stop=stop_tokens,
928
+ stream=True,
929
+ echo=False
930
+ ):
931
+ text = output["choices"][0]["text"]
932
+ if text:
933
+ chunk = {
934
+ "id": chat_id,
935
+ "object": "chat.completion.chunk",
936
+ "created": created,
937
+ "model": request.model,
938
+ "choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}]
939
+ }
940
+ yield f"data: {json.dumps(chunk)}\n\n"
941
+
942
+ final_chunk = {
943
+ "id": chat_id,
944
+ "object": "chat.completion.chunk",
945
+ "created": created,
946
+ "model": request.model,
947
+ "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
948
+ }
949
+ yield f"data: {json.dumps(final_chunk)}\n\n"
950
+ yield "data: [DONE]\n\n"
951
+ finally:
952
+ await request_queue.release()
953
 
954
  return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache"})
955
 
 
959
 
960
  @app.get("/anthropic/v1/models")
961
  async def anthropic_list_models():
962
+ models = []
963
+ for model_id, config in MODEL_CONFIGS.items():
964
+ models.append({
965
+ "id": model_id,
966
  "object": "model",
967
  "created": int(time.time()),
968
  "owned_by": "qwen",
969
+ "display_name": f"Qwen2.5 Coder {config['size']} ({config['quantization']})",
970
  "supports_thinking": True,
971
+ "supports_tools": True,
972
+ "loaded": model_id in model_manager.models,
973
+ "available": os.path.exists(config["path"])
974
+ })
975
+ return {"object": "list", "data": models}
976
 
977
  @app.post("/anthropic/v1/messages", response_model=AnthropicMessageResponse)
978
  async def anthropic_create_message(
 
983
  ):
984
  message_id = generate_id("msg")
985
 
986
+ # Queue management
987
+ position = await request_queue.acquire(message_id)
988
+ if position > 0:
989
+ await request_queue.wait_for_turn(message_id)
990
+
991
  thinking_enabled = False
992
  budget_tokens = 1024
993
  if request.thinking:
994
  thinking_enabled = request.thinking.type == "enabled"
995
  budget_tokens = request.thinking.budget_tokens or 1024
996
 
997
+ # Check for cache control
998
+ use_cache = check_cache_control(request.system)
999
+ cache_hit = False
1000
+ cache_tokens = 0
1001
 
1002
  try:
1003
+ llm = model_manager.get_model(request.model)
1004
+
1005
+ # Check prompt cache
1006
+ system_text = extract_anthropic_system(request.system)
1007
+ tools_list = [t.model_dump() for t in request.tools] if request.tools else None
1008
+
1009
+ if use_cache:
1010
+ cached = prompt_cache.get(system_text or "", tools_list)
1011
+ if cached:
1012
+ cache_hit = True
1013
+ cache_tokens = cached.get("tokens", 0)
1014
+ logger.info(f"[{message_id}] Prompt cache hit, saved ~{cache_tokens} tokens")
1015
+
1016
  prompt = format_anthropic_messages(
1017
  request.messages,
1018
  request.system,
 
1021
  budget_tokens
1022
  )
1023
 
1024
+ # Cache the prompt prefix if cache_control is set
1025
+ if use_cache and not cache_hit:
1026
+ prompt_cache.set(system_text or "", tools_list, {
1027
+ "tokens": len(llm.tokenize(prompt.encode())) // 2, # Estimate prefix tokens
1028
+ "created": time.time()
1029
+ })
1030
+
1031
  if request.stream:
1032
+ return await anthropic_stream_response(request, prompt, message_id, thinking_enabled, llm)
1033
 
1034
  total_max_tokens = request.max_tokens + (budget_tokens if thinking_enabled else 0)
1035
 
 
1078
  if usage["completion_tokens"] >= total_max_tokens:
1079
  stop_reason = "max_tokens"
1080
 
1081
+ logger.info(f"[{message_id}] Generated in {gen_time:.2f}s - tokens: {usage['completion_tokens']}, cache_hit: {cache_hit}")
1082
 
1083
  return AnthropicMessageResponse(
1084
  id=message_id,
 
1087
  stop_reason=stop_reason,
1088
  usage=AnthropicUsage(
1089
  input_tokens=usage["prompt_tokens"],
1090
+ output_tokens=usage["completion_tokens"],
1091
+ cache_creation_input_tokens=cache_tokens if use_cache and not cache_hit else None,
1092
+ cache_read_input_tokens=cache_tokens if cache_hit else None
1093
  )
1094
  )
1095
 
1096
  except Exception as e:
1097
  logger.error(f"[{message_id}] Error: {e}", exc_info=True)
1098
  raise HTTPException(status_code=500, detail=str(e))
1099
+ finally:
1100
+ await request_queue.release()
1101
 
1102
+ async def anthropic_stream_response(request: AnthropicMessageRequest, prompt: str, message_id: str, thinking_enabled: bool, llm: Llama):
1103
  async def generate():
1104
+ try:
1105
+ start_event = {
1106
+ "type": "message_start",
1107
+ "message": {
1108
+ "id": message_id, "type": "message", "role": "assistant", "content": [],
1109
+ "model": request.model, "stop_reason": None, "stop_sequence": None,
1110
+ "usage": {"input_tokens": 0, "output_tokens": 0}
1111
+ }
1112
  }
1113
+ yield f"event: message_start\ndata: {json.dumps(start_event)}\n\n"
1114
+
1115
+ yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n"
1116
+
1117
+ stop_tokens = ["<|im_end|>", "<|endoftext|>"]
1118
+ if request.stop_sequences:
1119
+ stop_tokens.extend(request.stop_sequences)
1120
+
1121
+ total_tokens = 0
1122
+ for output in llm(
1123
+ prompt,
1124
+ max_tokens=request.max_tokens,
1125
+ temperature=request.temperature or 0.7,
1126
+ top_p=request.top_p or 0.95,
1127
+ stop=stop_tokens,
1128
+ stream=True,
1129
+ echo=False
1130
+ ):
1131
+ text = output["choices"][0]["text"]
1132
+ if text:
1133
+ total_tokens += 1
1134
+ yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': text}})}\n\n"
1135
+
1136
+ yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n"
1137
+ yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn'}, 'usage': {'output_tokens': total_tokens}})}\n\n"
1138
+ yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n"
1139
+ finally:
1140
+ await request_queue.release()
1141
 
1142
  return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
1143
 
1144
  @app.post("/anthropic/v1/messages/count_tokens", response_model=AnthropicTokenCountResponse)
1145
  async def anthropic_count_tokens(request: AnthropicTokenCountRequest):
1146
+ llm = model_manager.get_model(request.model)
1147
  prompt = format_anthropic_messages(request.messages, request.system)
1148
  tokens = llm.tokenize(prompt.encode())
1149
  return AnthropicTokenCountResponse(input_tokens=len(tokens))