redhairedshanks1 commited on
Commit
e2c0c6a
·
1 Parent(s): 4a61db5

Update api_routes.py

Browse files
Files changed (1) hide show
  1. api_routes.py +352 -352
api_routes.py CHANGED
@@ -1,352 +1,352 @@
1
- # API Routes - Complete REST API for MasterLLM
2
- # File: api_routes.py
3
-
4
- from fastapi import APIRouter, HTTPException, UploadFile, File, Form
5
- from fastapi.responses import StreamingResponse
6
- from pydantic import BaseModel
7
- from typing import Optional, List, Dict, Any
8
- import json
9
- import os
10
- import uuid
11
- from datetime import datetime
12
-
13
- # Import our services
14
- from services.pipeline_generator import generate_pipeline, format_pipeline_for_display
15
- from services.pipeline_executor import execute_pipeline_streaming, execute_pipeline
16
- from services.session_manager import session_manager
17
-
18
- router = APIRouter(prefix="/api/v1", tags=["MasterLLM API"])
19
-
20
-
21
- # ========================
22
- # REQUEST/RESPONSE MODELS
23
- # ========================
24
-
25
- class PipelineRequest(BaseModel):
26
- user_input: str
27
- file_path: Optional[str] = None
28
- session_id: Optional[str] = None
29
- prefer_bedrock: bool = True
30
-
31
- class ExecuteRequest(BaseModel):
32
- pipeline: Dict[str, Any]
33
- file_path: str
34
- session_id: Optional[str] = None
35
- prefer_bedrock: bool = True
36
-
37
- class SessionCreate(BaseModel):
38
- user_id: Optional[str] = None
39
- metadata: Optional[Dict[str, Any]] = None
40
-
41
- class MessageAdd(BaseModel):
42
- role: str
43
- content: str
44
- metadata: Optional[Dict[str, Any]] = None
45
-
46
-
47
- # ========================
48
- # SESSION ENDPOINTS
49
- # ========================
50
-
51
- @router.post("/sessions")
52
- async def create_session(request: SessionCreate):
53
- """Create a new user session"""
54
- try:
55
- session_id = session_manager.create_session(
56
- user_id=request.user_id,
57
- metadata=request.metadata
58
- )
59
-
60
- return {
61
- "success": True,
62
- "session_id": session_id,
63
- "message": "Session created successfully"
64
- }
65
- except Exception as e:
66
- raise HTTPException(status_code=500, detail=str(e))
67
-
68
-
69
- @router.get("/sessions/{session_id}")
70
- async def get_session(session_id: str):
71
- """Get session data"""
72
- session = session_manager.get_session(session_id)
73
-
74
- if not session:
75
- raise HTTPException(status_code=404, detail="Session not found")
76
-
77
- return {
78
- "success": True,
79
- "session": session
80
- }
81
-
82
-
83
- @router.get("/sessions/{session_id}/stats")
84
- async def get_session_stats(session_id: str):
85
- """Get session statistics"""
86
- stats = session_manager.get_session_stats(session_id)
87
-
88
- if not stats:
89
- raise HTTPException(status_code=404, detail="Session not found")
90
-
91
- return {
92
- "success": True,
93
- "stats": stats
94
- }
95
-
96
-
97
- @router.get("/sessions/{session_id}/history")
98
- async def get_session_history(session_id: str, limit: int = 50):
99
- """Get conversation history for a session"""
100
- history = session_manager.get_session_history(session_id, limit)
101
-
102
- return {
103
- "success": True,
104
- "history": history,
105
- "count": len(history)
106
- }
107
-
108
-
109
- @router.post("/sessions/{session_id}/messages")
110
- async def add_message(session_id: str, message: MessageAdd):
111
- """Add a message to session history"""
112
- success = session_manager.add_message(
113
- session_id=session_id,
114
- role=message.role,
115
- content=message.content,
116
- metadata=message.metadata
117
- )
118
-
119
- if not success:
120
- raise HTTPException(status_code=500, detail="Failed to add message")
121
-
122
- return {
123
- "success": True,
124
- "message": "Message added successfully"
125
- }
126
-
127
-
128
- # ========================
129
- # PIPELINE GENERATION ENDPOINTS
130
- # ========================
131
-
132
- @router.post("/pipeline/generate")
133
- async def generate_pipeline_api(request: PipelineRequest):
134
- """
135
- Generate a pipeline from user input using Bedrock (priority) or Gemini (fallback)
136
- """
137
- try:
138
- pipeline = generate_pipeline(
139
- user_input=request.user_input,
140
- file_path=request.file_path,
141
- prefer_bedrock=request.prefer_bedrock
142
- )
143
-
144
- # Add to session if provided
145
- if request.session_id:
146
- session_manager.update_session(
147
- request.session_id,
148
- {
149
- "proposed_pipeline": pipeline,
150
- "state": "pipeline_proposed"
151
- }
152
- )
153
-
154
- # Format for display
155
- formatted = format_pipeline_for_display(pipeline)
156
-
157
- return {
158
- "success": True,
159
- "pipeline": pipeline,
160
- "formatted_display": formatted,
161
- "generator": pipeline.get("_generator"),
162
- "model": pipeline.get("_model")
163
- }
164
-
165
- except Exception as e:
166
- raise HTTPException(status_code=500, detail=str(e))
167
-
168
-
169
- # ========================
170
- # PIPELINE EXECUTION ENDPOINTS
171
- # ========================
172
-
173
- @router.post("/pipeline/execute")
174
- async def execute_pipeline_api(request: ExecuteRequest):
175
- """
176
- Execute a pipeline (non-streaming) using Bedrock (priority) or CrewAI (fallback)
177
- """
178
- try:
179
- result = execute_pipeline(
180
- pipeline=request.pipeline,
181
- file_path=request.file_path,
182
- session_id=request.session_id,
183
- prefer_bedrock=request.prefer_bedrock
184
- )
185
-
186
- # Save execution to session
187
- if request.session_id:
188
- session_manager.save_pipeline_execution(
189
- session_id=request.session_id,
190
- pipeline=request.pipeline,
191
- result=result,
192
- file_path=request.file_path,
193
- executor=result.get("executor", "unknown")
194
- )
195
-
196
- session_manager.update_session(
197
- request.session_id,
198
- {
199
- "state": "completed",
200
- "last_result": result
201
- }
202
- )
203
-
204
- return {
205
- "success": True,
206
- "result": result,
207
- "executor": result.get("executor")
208
- }
209
-
210
- except Exception as e:
211
- raise HTTPException(status_code=500, detail=str(e))
212
-
213
-
214
- @router.post("/pipeline/execute/stream")
215
- async def execute_pipeline_stream_api(request: ExecuteRequest):
216
- """
217
- Execute a pipeline with streaming updates using Bedrock (priority) or CrewAI (fallback)
218
- """
219
- def event_stream():
220
- try:
221
- for event in execute_pipeline_streaming(
222
- pipeline=request.pipeline,
223
- file_path=request.file_path,
224
- session_id=request.session_id,
225
- prefer_bedrock=request.prefer_bedrock
226
- ):
227
- # Format as Server-Sent Events
228
- yield f"data: {json.dumps(event)}\n\n"
229
-
230
- # Save final result to session
231
- if event.get("type") == "final" and request.session_id:
232
- session_manager.save_pipeline_execution(
233
- session_id=request.session_id,
234
- pipeline=request.pipeline,
235
- result=event.get("data"),
236
- file_path=request.file_path,
237
- executor=event.get("executor", "unknown")
238
- )
239
-
240
- except Exception as e:
241
- error_event = {
242
- "type": "error",
243
- "error": str(e)
244
- }
245
- yield f"data: {json.dumps(error_event)}\n\n"
246
-
247
- return StreamingResponse(
248
- event_stream(),
249
- media_type="text/event-stream"
250
- )
251
-
252
-
253
- # ========================
254
- # FILE UPLOAD ENDPOINT
255
- # ========================
256
-
257
- @router.post("/upload")
258
- async def upload_file(
259
- file: UploadFile = File(...),
260
- session_id: Optional[str] = Form(None)
261
- ):
262
- """
263
- Upload a document for processing
264
- """
265
- try:
266
- # Create uploads directory if it doesn't exist
267
- upload_dir = "uploads"
268
- os.makedirs(upload_dir, exist_ok=True)
269
-
270
- # Generate unique filename
271
- file_ext = os.path.splitext(file.filename)[1]
272
- unique_filename = f"{uuid.uuid4()}{file_ext}"
273
- file_path = os.path.join(upload_dir, unique_filename)
274
-
275
- # Save file
276
- with open(file_path, "wb") as f:
277
- content = await file.read()
278
- f.write(content)
279
-
280
- # Update session if provided
281
- if session_id:
282
- session_manager.update_session(
283
- session_id,
284
- {"current_file": file_path}
285
- )
286
-
287
- return {
288
- "success": True,
289
- "file_path": file_path,
290
- "filename": file.filename,
291
- "size_bytes": len(content)
292
- }
293
-
294
- except Exception as e:
295
- raise HTTPException(status_code=500, detail=str(e))
296
-
297
-
298
- # ========================
299
- # PIPELINE HISTORY ENDPOINTS
300
- # ========================
301
-
302
- @router.get("/pipelines/history")
303
- async def get_pipeline_history(
304
- session_id: Optional[str] = None,
305
- limit: int = 10
306
- ):
307
- """Get pipeline execution history"""
308
- executions = session_manager.get_pipeline_executions(
309
- session_id=session_id,
310
- limit=limit
311
- )
312
-
313
- return {
314
- "success": True,
315
- "executions": executions,
316
- "count": len(executions)
317
- }
318
-
319
-
320
- @router.get("/pipelines/stats")
321
- async def get_pipeline_stats():
322
- """Get overall pipeline execution statistics"""
323
- # This would query the pipeline executions collection
324
- # For now, return basic stats
325
- return {
326
- "success": True,
327
- "stats": {
328
- "total_executions": 0,
329
- "bedrock_executions": 0,
330
- "crewai_executions": 0,
331
- "avg_duration_seconds": 0
332
- }
333
- }
334
-
335
-
336
- # ========================
337
- # HEALTH CHECK
338
- # ========================
339
-
340
- @router.get("/health")
341
- async def health_check():
342
- """API health check"""
343
- return {
344
- "status": "healthy",
345
- "timestamp": datetime.now().isoformat(),
346
- "version": "2.0.0",
347
- "features": {
348
- "bedrock_available": os.getenv("AWS_ACCESS_KEY_ID") is not None,
349
- "gemini_available": os.getenv("GOOGLE_API_KEY") is not None,
350
- "mongodb_connected": session_manager.sessions_col is not None
351
- }
352
- }
 
1
+ # API Routes - Complete REST API for MasterLLM
2
+ # File: api_routes.py
3
+
4
+ from fastapi import APIRouter, HTTPException, UploadFile, File, Form
5
+ from fastapi.responses import StreamingResponse
6
+ from pydantic import BaseModel
7
+ from typing import Optional, List, Dict, Any
8
+ import json
9
+ import os
10
+ import uuid
11
+ from datetime import datetime
12
+
13
+ # Import our services
14
+ from services.pipeline_generator import generate_pipeline, format_pipeline_for_display
15
+ from services.pipeline_executor import execute_pipeline_streaming, execute_pipeline
16
+ from services.session_manager import session_manager
17
+
18
+ router = APIRouter(prefix="/api/v1", tags=["MasterLLM API"])
19
+
20
+
21
+ # ========================
22
+ # REQUEST/RESPONSE MODELS
23
+ # ========================
24
+
25
+ class PipelineRequest(BaseModel):
26
+ user_input: str
27
+ file_path: Optional[str] = None
28
+ session_id: Optional[str] = None
29
+ prefer_bedrock: bool = True
30
+
31
+ class ExecuteRequest(BaseModel):
32
+ pipeline: Dict[str, Any]
33
+ file_path: str
34
+ session_id: Optional[str] = None
35
+ prefer_bedrock: bool = True
36
+
37
+ class SessionCreate(BaseModel):
38
+ user_id: Optional[str] = None
39
+ metadata: Optional[Dict[str, Any]] = None
40
+
41
+ class MessageAdd(BaseModel):
42
+ role: str
43
+ content: str
44
+ metadata: Optional[Dict[str, Any]] = None
45
+
46
+
47
+ # ========================
48
+ # SESSION ENDPOINTS
49
+ # ========================
50
+
51
+ @router.post("/sessions")
52
+ async def create_session(request: SessionCreate):
53
+ """Create a new user session"""
54
+ try:
55
+ session_id = session_manager.create_session(
56
+ user_id=request.user_id,
57
+ metadata=request.metadata
58
+ )
59
+
60
+ return {
61
+ "success": True,
62
+ "session_id": session_id,
63
+ "message": "Session created successfully"
64
+ }
65
+ except Exception as e:
66
+ raise HTTPException(status_code=500, detail=str(e))
67
+
68
+
69
+ @router.get("/sessions/{session_id}")
70
+ async def get_session(session_id: str):
71
+ """Get session data"""
72
+ session = session_manager.get_session(session_id)
73
+
74
+ if not session:
75
+ raise HTTPException(status_code=404, detail="Session not found")
76
+
77
+ return {
78
+ "success": True,
79
+ "session": session
80
+ }
81
+
82
+
83
+ @router.get("/sessions/{session_id}/stats")
84
+ async def get_session_stats(session_id: str):
85
+ """Get session statistics"""
86
+ stats = session_manager.get_session_stats(session_id)
87
+
88
+ if not stats:
89
+ raise HTTPException(status_code=404, detail="Session not found")
90
+
91
+ return {
92
+ "success": True,
93
+ "stats": stats
94
+ }
95
+
96
+
97
+ @router.get("/sessions/{session_id}/history")
98
+ async def get_session_history(session_id: str, limit: int = 50):
99
+ """Get conversation history for a session"""
100
+ history = session_manager.get_session_history(session_id, limit)
101
+
102
+ return {
103
+ "success": True,
104
+ "history": history,
105
+ "count": len(history)
106
+ }
107
+
108
+
109
+ @router.post("/sessions/{session_id}/messages")
110
+ async def add_message(session_id: str, message: MessageAdd):
111
+ """Add a message to session history"""
112
+ success = session_manager.add_message(
113
+ session_id=session_id,
114
+ role=message.role,
115
+ content=message.content,
116
+ metadata=message.metadata
117
+ )
118
+
119
+ if not success:
120
+ raise HTTPException(status_code=500, detail="Failed to add message")
121
+
122
+ return {
123
+ "success": True,
124
+ "message": "Message added successfully"
125
+ }
126
+
127
+
128
+ # ========================
129
+ # PIPELINE GENERATION ENDPOINTS
130
+ # ========================
131
+
132
+ @router.post("/pipeline/generate")
133
+ async def generate_pipeline_api(request: PipelineRequest):
134
+ """
135
+ Generate a pipeline from user input using Bedrock (priority) or Gemini (fallback)
136
+ """
137
+ try:
138
+ pipeline = generate_pipeline(
139
+ user_input=request.user_input,
140
+ file_path=request.file_path,
141
+ prefer_bedrock=request.prefer_bedrock
142
+ )
143
+
144
+ # Add to session if provided
145
+ if request.session_id:
146
+ session_manager.update_session(
147
+ request.session_id,
148
+ {
149
+ "proposed_pipeline": pipeline,
150
+ "state": "pipeline_proposed"
151
+ }
152
+ )
153
+
154
+ # Format for display
155
+ formatted = format_pipeline_for_display(pipeline)
156
+
157
+ return {
158
+ "success": True,
159
+ "pipeline": pipeline,
160
+ "formatted_display": formatted,
161
+ "generator": pipeline.get("_generator"),
162
+ "model": pipeline.get("_model")
163
+ }
164
+
165
+ except Exception as e:
166
+ raise HTTPException(status_code=500, detail=str(e))
167
+
168
+
169
+ # ========================
170
+ # PIPELINE EXECUTION ENDPOINTS
171
+ # ========================
172
+
173
+ @router.post("/pipeline/execute")
174
+ async def execute_pipeline_api(request: ExecuteRequest):
175
+ """
176
+ Execute a pipeline (non-streaming) using Bedrock (priority) or CrewAI (fallback)
177
+ """
178
+ try:
179
+ result = execute_pipeline(
180
+ pipeline=request.pipeline,
181
+ file_path=request.file_path,
182
+ session_id=request.session_id,
183
+ prefer_bedrock=request.prefer_bedrock
184
+ )
185
+
186
+ # Save execution to session
187
+ if request.session_id:
188
+ session_manager.save_pipeline_execution(
189
+ session_id=request.session_id,
190
+ pipeline=request.pipeline,
191
+ result=result,
192
+ file_path=request.file_path,
193
+ executor=result.get("executor", "unknown")
194
+ )
195
+
196
+ session_manager.update_session(
197
+ request.session_id,
198
+ {
199
+ "state": "completed",
200
+ "last_result": result
201
+ }
202
+ )
203
+
204
+ return {
205
+ "success": True,
206
+ "result": result,
207
+ "executor": result.get("executor")
208
+ }
209
+
210
+ except Exception as e:
211
+ raise HTTPException(status_code=500, detail=str(e))
212
+
213
+
214
+ @router.post("/pipeline/execute/stream")
215
+ async def execute_pipeline_stream_api(request: ExecuteRequest):
216
+ """
217
+ Execute a pipeline with streaming updates using Bedrock (priority) or CrewAI (fallback)
218
+ """
219
+ def event_stream():
220
+ try:
221
+ for event in execute_pipeline_streaming(
222
+ pipeline=request.pipeline,
223
+ file_path=request.file_path,
224
+ session_id=request.session_id,
225
+ prefer_bedrock=request.prefer_bedrock
226
+ ):
227
+ # Format as Server-Sent Events
228
+ yield f"data: {json.dumps(event)}\n\n"
229
+
230
+ # Save final result to session
231
+ if event.get("type") == "final" and request.session_id:
232
+ session_manager.save_pipeline_execution(
233
+ session_id=request.session_id,
234
+ pipeline=request.pipeline,
235
+ result=event.get("data"),
236
+ file_path=request.file_path,
237
+ executor=event.get("executor", "unknown")
238
+ )
239
+
240
+ except Exception as e:
241
+ error_event = {
242
+ "type": "error",
243
+ "error": str(e)
244
+ }
245
+ yield f"data: {json.dumps(error_event)}\n\n"
246
+
247
+ return StreamingResponse(
248
+ event_stream(),
249
+ media_type="text/event-stream"
250
+ )
251
+
252
+
253
+ # ========================
254
+ # FILE UPLOAD ENDPOINT
255
+ # ========================
256
+
257
+ @router.post("/upload")
258
+ async def upload_file(
259
+ file: UploadFile = File(...),
260
+ session_id: Optional[str] = Form(None)
261
+ ):
262
+ """
263
+ Upload a document for processing
264
+ """
265
+ try:
266
+ # Create uploads directory if it doesn't exist
267
+ upload_dir = "uploads"
268
+ os.makedirs(upload_dir, exist_ok=True)
269
+
270
+ # Generate unique filename
271
+ file_ext = os.path.splitext(file.filename)[1]
272
+ unique_filename = f"{uuid.uuid4()}{file_ext}"
273
+ file_path = os.path.join(upload_dir, unique_filename)
274
+
275
+ # Save file
276
+ with open(file_path, "wb") as f:
277
+ content = await file.read()
278
+ f.write(content)
279
+
280
+ # Update session if provided
281
+ if session_id:
282
+ session_manager.update_session(
283
+ session_id,
284
+ {"current_file": file_path}
285
+ )
286
+
287
+ return {
288
+ "success": True,
289
+ "file_path": file_path,
290
+ "filename": file.filename,
291
+ "size_bytes": len(content)
292
+ }
293
+
294
+ except Exception as e:
295
+ raise HTTPException(status_code=500, detail=str(e))
296
+
297
+
298
+ # ========================
299
+ # PIPELINE HISTORY ENDPOINTS
300
+ # ========================
301
+
302
+ @router.get("/pipelines/history")
303
+ async def get_pipeline_history(
304
+ session_id: Optional[str] = None,
305
+ limit: int = 10
306
+ ):
307
+ """Get pipeline execution history"""
308
+ executions = session_manager.get_pipeline_executions(
309
+ session_id=session_id,
310
+ limit=limit
311
+ )
312
+
313
+ return {
314
+ "success": True,
315
+ "executions": executions,
316
+ "count": len(executions)
317
+ }
318
+
319
+
320
+ @router.get("/pipelines/stats")
321
+ async def get_pipeline_stats():
322
+ """Get overall pipeline execution statistics"""
323
+ # This would query the pipeline executions collection
324
+ # For now, return basic stats
325
+ return {
326
+ "success": True,
327
+ "stats": {
328
+ "total_executions": 0,
329
+ "bedrock_executions": 0,
330
+ "crewai_executions": 0,
331
+ "avg_duration_seconds": 0
332
+ }
333
+ }
334
+
335
+
336
+ # ========================
337
+ # HEALTH CHECK
338
+ # ========================
339
+
340
+ @router.get("/health")
341
+ async def health_check():
342
+ """API health check"""
343
+ return {
344
+ "status": "healthy",
345
+ "timestamp": datetime.now().isoformat(),
346
+ "version": "2.0.0",
347
+ "features": {
348
+ "bedrock_available": os.getenv("AWS_ACCESS_KEY_ID") is not None,
349
+ "gemini_available": os.getenv("GOOGLE_API_KEY") is not None,
350
+ "mongodb_connected": session_manager.sessions_col is not None
351
+ }
352
+ }