Bc-AI commited on
Commit
fbb8cc6
·
verified ·
1 Parent(s): 348c180

Upload 3 files

Browse files
Files changed (3) hide show
  1. Dockerfile +22 -0
  2. app.py +175 -0
  3. requirements.txt +12 -0
Dockerfile ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.10-slim
2
+
3
+ WORKDIR /app
4
+
5
+ # Install system dependencies
6
+ RUN apt-get update && apt-get install -y \
7
+ gcc \
8
+ g++ \
9
+ && rm -rf /var/lib/apt/lists/*
10
+
11
+ # Copy requirements first to leverage Docker cache
12
+ COPY head_node/requirements.txt .
13
+ RUN pip install --no-cache-dir -r requirements.txt
14
+
15
+ # Copy application code
16
+ COPY . .
17
+
18
+ # Expose port for the API
19
+ EXPOSE 7860
20
+
21
+ # Start the application
22
+ CMD ["python", "app.py"]
app.py ADDED
@@ -0,0 +1,175 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+ import json
4
+ import requests
5
+ import asyncio
6
+ from datetime import datetime
7
+ from typing import Dict, List, Optional
8
+ from fastapi import FastAPI, HTTPException, BackgroundTasks
9
+ from fastapi.responses import StreamingResponse
10
+ import uvicorn
11
+ from pydantic import BaseModel
12
+ from shared.models import ChatRequest, ChatResponse, ChatMessage, WorkerStatus
13
+ from shared.chat_history import save_detailed_chat_log, initialize_chat_file
14
+
15
+ app = FastAPI(
16
+ title="Multi-Node Hugging Face API Gateway",
17
+ description="API Gateway that routes requests to specialized worker nodes",
18
+ version="1.0.0"
19
+ )
20
+
21
+ # Initialize chat history file
22
+ initialize_chat_file()
23
+
24
+ # Configuration - in production, these would come from environment variables
25
+ WORKER_NODES = {
26
+ "sam-x-nano": os.getenv("NANO_WORKER_URL", "http://nano-worker:8000"),
27
+ "sam-x-mini": os.getenv("MINI_WORKER_URL", "http://mini-worker:8000"),
28
+ "sam-x-fast": os.getenv("FAST_WORKER_URL", "http://fast-worker:8000"),
29
+ "sam-x-large": os.getenv("LARGE_WORKER_URL", "http://large-worker:8000"),
30
+ }
31
+
32
+ # In-memory worker status tracking (in production, use Redis or database)
33
+ worker_status = {}
34
+
35
+ @app.on_event('startup')
36
+ def startup_event():
37
+ print("Starting Multi-Node Hugging Face API Gateway...")
38
+ # Initialize worker status
39
+ for model, url in WORKER_NODES.items():
40
+ worker_status[model] = {"active": True, "last_check": time.time(), "load": 0.0}
41
+
42
+
43
+ def route_to_worker(chat_request: ChatRequest) -> Dict:
44
+ """
45
+ Route the request to the appropriate worker node based on model
46
+ """
47
+ model = chat_request.model.lower()
48
+
49
+ if model not in WORKER_NODES:
50
+ raise HTTPException(status_code=400, detail=f"Model {model} not available")
51
+
52
+ worker_url = WORKER_NODES[model]
53
+
54
+ # Make request to worker
55
+ try:
56
+ response = requests.post(
57
+ f"{worker_url}/chat/completions",
58
+ json=chat_request.dict(),
59
+ timeout=300 # 5 minute timeout for long inference
60
+ )
61
+ response.raise_for_status()
62
+ return response.json()
63
+ except requests.exceptions.RequestException as e:
64
+ print(f"Error contacting worker {worker_url}: {str(e)}")
65
+ worker_status[model] = {"active": False, "last_check": time.time(), "load": 0.0}
66
+ raise HTTPException(status_code=503, detail=f"Worker for model {model} is not available")
67
+ except Exception as e:
68
+ print(f"Unexpected error contacting worker {worker_url}: {str(e)}")
69
+ raise HTTPException(status_code=500, detail="Internal server error")
70
+
71
+
72
+ @app.post("/chat/completions", response_model=ChatResponse)
73
+ async def chat_completions(request: ChatRequest, background_tasks: BackgroundTasks):
74
+ """
75
+ Main chat completions endpoint - routes to appropriate worker
76
+ """
77
+ start_time = time.time()
78
+
79
+ try:
80
+ # Route to appropriate worker
81
+ worker_response = route_to_worker(request)
82
+
83
+ # Calculate processing time
84
+ processing_time = time.time() - start_time
85
+
86
+ # Extract response content
87
+ response_content = ""
88
+ if "choices" in worker_response and len(worker_response["choices"]) > 0:
89
+ response_content = worker_response["choices"][0].get("message", {}).get("content", "")
90
+
91
+ # Save chat history in background
92
+ background_tasks.add_task(
93
+ save_detailed_chat_log,
94
+ request.dict(),
95
+ response_content,
96
+ request.model,
97
+ processing_time
98
+ )
99
+
100
+ return worker_response
101
+
102
+ except HTTPException:
103
+ # Re-raise HTTP exceptions
104
+ raise
105
+ except Exception as e:
106
+ print(f"Error in chat_completions: {str(e)}")
107
+ raise HTTPException(status_code=500, detail="Internal server error")
108
+
109
+
110
+ @app.get("/models")
111
+ async def list_models():
112
+ """
113
+ List available models
114
+ """
115
+ available_models = [model for model, url in WORKER_NODES.items()
116
+ if worker_status.get(model, {}).get("active", True)]
117
+
118
+ return {
119
+ "object": "list",
120
+ "data": [
121
+ {
122
+ "id": model,
123
+ "object": "model",
124
+ "created": int(time.time()),
125
+ "owned_by": "multinode-hf-api"
126
+ }
127
+ for model in available_models
128
+ ]
129
+ }
130
+
131
+
132
+ @app.get("/health")
133
+ async def health_check():
134
+ """
135
+ Health check endpoint
136
+ """
137
+ active_workers = {model: status for model, status in worker_status.items()
138
+ if status.get("active", False)}
139
+
140
+ return {
141
+ "status": "healthy" if active_workers else "no_active_workers",
142
+ "active_workers": list(active_workers.keys()),
143
+ "total_workers": len(WORKER_NODES)
144
+ }
145
+
146
+
147
+ @app.get("/worker-status")
148
+ async def get_worker_status():
149
+ """
150
+ Get detailed status of all workers
151
+ """
152
+ return worker_status
153
+
154
+
155
+ @app.post("/chat")
156
+ async def simple_chat(message: str, model: str = "sam-x-nano", max_tokens: int = 512):
157
+ """
158
+ Simplified chat endpoint for basic interactions
159
+ """
160
+ chat_request = ChatRequest(
161
+ messages=[ChatMessage(role="user", content=message)],
162
+ model=model,
163
+ max_tokens=max_tokens
164
+ )
165
+
166
+ worker_response = route_to_worker(chat_request)
167
+
168
+ if "choices" in worker_response and len(worker_response["choices"]) > 0:
169
+ return {"response": worker_response["choices"][0]["message"]["content"]}
170
+ else:
171
+ raise HTTPException(status_code=500, detail="No response from worker")
172
+
173
+
174
+ if __name__ == "__main__":
175
+ uvicorn.run(app, host="0.0.0.0", port=7860)
requirements.txt ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Requirements for Head Node
2
+ fastapi==0.104.1
3
+ uvicorn==0.24.0
4
+ requests==2.31.0
5
+ pydantic==2.5.0
6
+ python-multipart==0.0.6
7
+ huggingface_hub==0.20.1
8
+ tokenizers==0.15.0
9
+ transformers==4.35.2
10
+ numpy==1.24.3
11
+ pytz==2023.3.post1
12
+ aiohttp==3.9.0