File size: 6,801 Bytes
1df0e33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b249d06
 
 
 
1df0e33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b249d06
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1df0e33
 
 
 
 
 
b249d06
 
1df0e33
 
b249d06
 
 
 
1df0e33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import time
import uuid
import json
import asyncio
from typing import AsyncGenerator
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
from aetheris.api.schemas import (
    ChatCompletionRequest, ChatCompletionResponse, ChatCompletionChunk,
    ChatCompletionChoice, ChatMessage, ChatCompletionChunkChoice, ChatCompletionChunkDelta,
    CompletionRequest, CompletionResponse, CompletionChoice,
    ModelList, ModelCard
)
from aetheris.inference import InferenceEngine

app = FastAPI(title="Aetheris API", version="0.1.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Global engine instance
engine: InferenceEngine = None

def get_engine():
    global engine
    if engine is None:
        # Defaults, ideally loaded from config/env
        engine = InferenceEngine()
    return engine

@app.on_event("startup")
async def startup_event():
    get_engine()

@app.get("/")
async def root():
    return {"status": "running", "message": "Aetheris API is active. Use /v1/chat/completions for inference."}

@app.get("/v1/models", response_model=ModelList)
async def list_models():
    return ModelList(data=[ModelCard(id="aetheris-hybrid-mamba-moe")])

@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
    engine = get_engine()
    
    # Simple prompt construction from messages
    prompt = ""
    for msg in request.messages:
        prompt += f"{msg.role}: {msg.content}\n"
    prompt += "assistant: "

    request_id = f"chatcmpl-{uuid.uuid4()}"
    created_time = int(time.time())

    if request.stream:
        async def event_generator():
            yield json.dumps(ChatCompletionChunk(
                id=request_id,
                created=created_time,
                model=request.model,
                choices=[ChatCompletionChunkChoice(
                    index=0,
                    delta=ChatCompletionChunkDelta(role="assistant"),
                    finish_reason=None
                )]
            ).model_dump())

            # Offload synchronous generation to a thread to avoid blocking the event loop
            queue = asyncio.Queue()
            loop = asyncio.get_running_loop()
            import threading
            stop_event = threading.Event()

            def producer():
                try:
                    # Run the synchronous generator
                    for token in engine.generate(
                        prompt=prompt,
                        max_new_tokens=request.max_tokens or 100,
                        temperature=request.temperature,
                        top_p=request.top_p,
                        repetition_penalty=1.0 + request.frequency_penalty,
                        stream=True
                    ):
                        if stop_event.is_set():
                            break
                        # Schedule the put() coroutine on the main loop
                        asyncio.run_coroutine_threadsafe(queue.put(token), loop)
                except Exception as e:
                    print(f"Generation error: {e}")
                finally:
                    # Signal done
                    asyncio.run_coroutine_threadsafe(queue.put(None), loop)

            thread = threading.Thread(target=producer, daemon=True)
            thread.start()

            try:
                while True:
                    token = await queue.get()
                    if token is None:
                        break
                    
                    yield json.dumps(ChatCompletionChunk(
                        id=request_id,
                        created=created_time,
                        model=request.model,
                        choices=[ChatCompletionChunkChoice(
                            index=0,
                            delta=ChatCompletionChunkDelta(content=token),
                            finish_reason=None
                        )]
                    ).model_dump())
                
                yield json.dumps(ChatCompletionChunk(
                    id=request_id,
                    created=created_time,
                    model=request.model,
                    choices=[ChatCompletionChunkChoice(
                        index=0,
                        delta=ChatCompletionChunkDelta(),
                        finish_reason="stop"
                    )]
                ).model_dump())
                
                yield "[DONE]"
            finally:
                stop_event.set()

        return EventSourceResponse(event_generator())

    else:
        generated_text = engine.generate_full(
            prompt=prompt,
            max_new_tokens=request.max_tokens or 100,
            temperature=request.temperature,
            top_p=request.top_p,
            repetition_penalty=1.0 + request.frequency_penalty
        )

        return ChatCompletionResponse(
            id=request_id,
            created=created_time,
            model=request.model,
            choices=[ChatCompletionChoice(
                index=0,
                message=ChatMessage(role="assistant", content=generated_text),
                finish_reason="stop"
            )],
            usage={"prompt_tokens": len(prompt), "completion_tokens": len(generated_text), "total_tokens": len(prompt) + len(generated_text)} # Approximated
        )

@app.post("/v1/completions")
async def completions(request: CompletionRequest):
    engine = get_engine()
    
    prompt = request.prompt
    if isinstance(prompt, list):
        prompt = prompt[0] # Handle single prompt for now

    request_id = f"cmpl-{uuid.uuid4()}"
    created_time = int(time.time())

    if request.stream:
        # Streaming for completions not fully implemented to match OpenAI exactly in this demo, 
        # but logic is similar to chat.
        # For simplicity, returning non-streaming for now or basic stream.
        pass # TODO: Implement streaming for completions

    generated_text = engine.generate_full(
        prompt=prompt,
        max_new_tokens=request.max_tokens or 16,
        temperature=request.temperature,
        top_p=request.top_p,
        repetition_penalty=1.0 + request.frequency_penalty
    )

    return CompletionResponse(
        id=request_id,
        created=created_time,
        model=request.model,
        choices=[CompletionChoice(
            text=generated_text,
            index=0,
            logprobs=None,
            finish_reason="length" # or stop
        )],
        usage={"prompt_tokens": len(prompt), "completion_tokens": len(generated_text), "total_tokens": len(prompt) + len(generated_text)}
    )