ChiragPatankar commited on
Commit
0e6edd5
·
verified ·
1 Parent(s): e2ed9d3

Create main.py

Browse files
Files changed (1) hide show
  1. main.py +416 -0
main.py ADDED
@@ -0,0 +1,416 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, HTTPException, Depends, Request, Header, BackgroundTasks
2
+ from fastapi.middleware.cors import CORSMiddleware
3
+ from fastapi.responses import JSONResponse
4
+ from pydantic import BaseModel
5
+ from typing import Optional, List, Dict, Any
6
+ import os
7
+ from dotenv import load_dotenv
8
+ import google.generativeai as genai
9
+ from datetime import datetime
10
+ import json
11
+ import asyncio
12
+ from database import get_db
13
+ from sqlalchemy.orm import Session
14
+ import models
15
+ from mcp_config import mcp_settings
16
+ from middleware import rate_limit_middleware, validate_mcp_request
17
+ import time
18
+
19
+ # Load environment variables
20
+ load_dotenv()
21
+
22
+ app = FastAPI(
23
+ title="Gemini MCP Server",
24
+ description="AI Customer Support Bot using Google Gemini",
25
+ version="2.0.0"
26
+ )
27
+
28
+ # Add middleware
29
+ app.middleware("http")(rate_limit_middleware)
30
+ app.middleware("http")(validate_mcp_request)
31
+
32
+ # Configure CORS
33
+ app.add_middleware(
34
+ CORSMiddleware,
35
+ allow_origins=["*"], # In production, replace with specific origins
36
+ allow_credentials=True,
37
+ allow_methods=["*"],
38
+ allow_headers=["*"],
39
+ )
40
+
41
+ # MCP Models
42
+ class MCPRequest(BaseModel):
43
+ query: str
44
+ context: Optional[Dict[str, Any]] = None
45
+ user_id: Optional[str] = None
46
+ metadata: Optional[Dict[str, Any]] = None
47
+ mcp_version: Optional[str] = "1.0"
48
+ priority: Optional[str] = "normal" # high, normal, low
49
+
50
+ class MCPResponse(BaseModel):
51
+ response: str
52
+ context: Optional[Dict[str, Any]] = None
53
+ metadata: Optional[Dict[str, Any]] = None
54
+ mcp_version: str = "1.0"
55
+ processing_time: Optional[float] = None
56
+
57
+ class MCPError(BaseModel):
58
+ code: str
59
+ message: str
60
+ details: Optional[Dict[str, Any]] = None
61
+
62
+ class MCPBatchRequest(BaseModel):
63
+ queries: List[str]
64
+ context: Optional[Dict[str, Any]] = None
65
+ user_id: Optional[str] = None
66
+ metadata: Optional[Dict[str, Any]] = None
67
+ mcp_version: Optional[str] = "1.0"
68
+
69
+ class MCPBatchResponse(BaseModel):
70
+ responses: List[MCPResponse]
71
+ batch_metadata: Optional[Dict[str, Any]] = None
72
+ mcp_version: str = "1.0"
73
+
74
+ # Environment variables
75
+ GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
76
+
77
+ # Initialize Gemini
78
+ if GEMINI_API_KEY:
79
+ genai.configure(api_key=GEMINI_API_KEY)
80
+ gemini_model = genai.GenerativeModel('gemini-1.5-flash') # Free tier
81
+ else:
82
+ gemini_model = None
83
+
84
+ # MCP Authentication
85
+ async def verify_mcp_auth(x_mcp_auth: str = Header(...)):
86
+ if not x_mcp_auth:
87
+ raise HTTPException(status_code=401, detail="MCP authentication required")
88
+ # TODO: Implement proper MCP authentication
89
+ return True
90
+
91
+ @app.get("/")
92
+ async def root():
93
+ return {
94
+ "message": "Gemini MCP Server",
95
+ "version": "2.0.0",
96
+ "status": "active",
97
+ "ai_provider": "Google Gemini"
98
+ }
99
+
100
+ @app.get("/mcp/version")
101
+ async def mcp_version():
102
+ return {
103
+ "version": "1.0",
104
+ "supported_versions": ["1.0"],
105
+ "server_version": "2.0.0",
106
+ "deprecation_notice": None
107
+ }
108
+
109
+ @app.get("/mcp/capabilities")
110
+ async def mcp_capabilities():
111
+ return {
112
+ "models": {
113
+ "gemini-1.5-flash": {
114
+ "version": "1.5",
115
+ "capabilities": ["text-generation", "context-aware", "multi-language"],
116
+ "max_tokens": 8192,
117
+ "supported_languages": ["en", "es", "fr", "de", "it", "pt", "ja", "ko", "zh"]
118
+ }
119
+ },
120
+ "context_providers": {
121
+ "internal": {
122
+ "version": "1.0",
123
+ "capabilities": ["basic-context", "conversation-history"],
124
+ "max_context_size": 1000000 # Gemini's large context window
125
+ }
126
+ },
127
+ "features": [
128
+ "context-aware-responses",
129
+ "user-tracking",
130
+ "response-storage",
131
+ "batch-processing",
132
+ "priority-queuing",
133
+ "multi-language-support"
134
+ ],
135
+ "rate_limits": {
136
+ "requests_per_period": mcp_settings.RATE_LIMIT_REQUESTS,
137
+ "period_seconds": mcp_settings.RATE_LIMIT_PERIOD
138
+ }
139
+ }
140
+
141
+ @app.post("/mcp/process", response_model=MCPResponse)
142
+ async def process_mcp_request(
143
+ request: MCPRequest,
144
+ background_tasks: BackgroundTasks,
145
+ db: Session = Depends(get_db),
146
+ auth: bool = Depends(verify_mcp_auth)
147
+ ):
148
+ start_time = time.time()
149
+ try:
150
+ # Validate MCP version
151
+ if request.mcp_version not in ["1.0"]:
152
+ raise HTTPException(
153
+ status_code=400,
154
+ detail=f"Unsupported MCP version: {request.mcp_version}"
155
+ )
156
+
157
+ # Fetch additional context
158
+ context = await fetch_context(request.query, request.context)
159
+
160
+ # Process with Gemini AI
161
+ response = await process_with_gemini(request.query, context, request.priority)
162
+
163
+ # Store the interaction in the database if user_id is provided
164
+ if request.user_id:
165
+ background_tasks.add_task(
166
+ store_interaction,
167
+ db,
168
+ request.user_id,
169
+ request.query,
170
+ response,
171
+ context
172
+ )
173
+
174
+ processing_time = time.time() - start_time
175
+ return MCPResponse(
176
+ response=response,
177
+ context=context,
178
+ metadata={
179
+ "processed_at": datetime.utcnow().isoformat(),
180
+ "model": "gemini-1.5-flash",
181
+ "context_provider": "internal",
182
+ "priority": request.priority,
183
+ "ai_provider": "Google Gemini"
184
+ },
185
+ mcp_version="1.0",
186
+ processing_time=processing_time
187
+ )
188
+ except Exception as e:
189
+ error = MCPError(
190
+ code="PROCESSING_ERROR",
191
+ message=str(e),
192
+ details={"timestamp": datetime.utcnow().isoformat()}
193
+ )
194
+ return JSONResponse(
195
+ status_code=500,
196
+ content=error.dict()
197
+ )
198
+
199
+ @app.post("/mcp/batch", response_model=MCPBatchResponse)
200
+ async def process_batch_request(
201
+ request: MCPBatchRequest,
202
+ background_tasks: BackgroundTasks,
203
+ db: Session = Depends(get_db),
204
+ auth: bool = Depends(verify_mcp_auth)
205
+ ):
206
+ try:
207
+ # Process queries concurrently for better performance
208
+ tasks = []
209
+ for query in request.queries:
210
+ task = process_single_query_async(query, request.context)
211
+ tasks.append(task)
212
+
213
+ # Wait for all tasks to complete
214
+ query_results = await asyncio.gather(*tasks, return_exceptions=True)
215
+
216
+ responses = []
217
+ for i, result in enumerate(query_results):
218
+ if isinstance(result, Exception):
219
+ # Handle individual query errors
220
+ mcp_response = MCPResponse(
221
+ response=f"Error processing query: {str(result)}",
222
+ context={},
223
+ metadata={
224
+ "processed_at": datetime.utcnow().isoformat(),
225
+ "model": "gemini-1.5-flash",
226
+ "error": True
227
+ },
228
+ mcp_version="1.0"
229
+ )
230
+ else:
231
+ context, response = result
232
+ mcp_response = MCPResponse(
233
+ response=response,
234
+ context=context,
235
+ metadata={
236
+ "processed_at": datetime.utcnow().isoformat(),
237
+ "model": "gemini-1.5-flash",
238
+ "context_provider": "internal"
239
+ },
240
+ mcp_version="1.0"
241
+ )
242
+
243
+ # Store interaction if user_id is provided
244
+ if request.user_id:
245
+ background_tasks.add_task(
246
+ store_interaction,
247
+ db,
248
+ request.user_id,
249
+ request.queries[i],
250
+ response,
251
+ context
252
+ )
253
+
254
+ responses.append(mcp_response)
255
+
256
+ return MCPBatchResponse(
257
+ responses=responses,
258
+ batch_metadata={
259
+ "total_queries": len(request.queries),
260
+ "processed_at": datetime.utcnow().isoformat(),
261
+ "success_rate": f"{len([r for r in query_results if not isinstance(r, Exception)])}/{len(request.queries)}"
262
+ },
263
+ mcp_version="1.0"
264
+ )
265
+ except Exception as e:
266
+ error = MCPError(
267
+ code="BATCH_PROCESSING_ERROR",
268
+ message=str(e),
269
+ details={"timestamp": datetime.utcnow().isoformat()}
270
+ )
271
+ return JSONResponse(
272
+ status_code=500,
273
+ content=error.dict()
274
+ )
275
+
276
+ @app.get("/mcp/health")
277
+ async def health_check():
278
+ # Test Gemini connection
279
+ gemini_status = "disconnected"
280
+ if gemini_model and GEMINI_API_KEY:
281
+ try:
282
+ # Quick test call
283
+ test_response = await asyncio.to_thread(
284
+ gemini_model.generate_content,
285
+ "Test",
286
+ generation_config=genai.types.GenerationConfig(max_output_tokens=10)
287
+ )
288
+ gemini_status = "connected" if test_response.text else "error"
289
+ except Exception:
290
+ gemini_status = "error"
291
+
292
+ return {
293
+ "status": "healthy" if gemini_status == "connected" else "degraded",
294
+ "timestamp": datetime.utcnow().isoformat(),
295
+ "services": {
296
+ "gemini_ai": gemini_status,
297
+ "database": "connected" # Assume connected, add actual check if needed
298
+ },
299
+ "mcp_version": "1.0",
300
+ "ai_provider": "Google Gemini",
301
+ "model": "gemini-1.5-flash",
302
+ "rate_limits": {
303
+ "current_usage": "0%",
304
+ "requests_per_period": mcp_settings.RATE_LIMIT_REQUESTS,
305
+ "period_seconds": mcp_settings.RATE_LIMIT_PERIOD
306
+ }
307
+ }
308
+
309
+ async def fetch_context(message: str, existing_context: Optional[Dict] = None) -> dict:
310
+ """Build context for the query"""
311
+ context = {
312
+ "timestamp": datetime.utcnow().isoformat(),
313
+ "query_length": len(message),
314
+ "language_detected": "en", # Add language detection if needed
315
+ }
316
+
317
+ # Merge existing context if provided
318
+ if existing_context:
319
+ context.update(existing_context)
320
+
321
+ return context
322
+
323
+ async def process_with_gemini(message: str, context: dict, priority: str = "normal") -> str:
324
+ """Process message with Google Gemini"""
325
+ if not gemini_model or not GEMINI_API_KEY:
326
+ raise HTTPException(
327
+ status_code=503,
328
+ detail="Gemini AI service not available. Please set GEMINI_API_KEY."
329
+ )
330
+
331
+ try:
332
+ # Build enhanced prompt for customer support
333
+ enhanced_prompt = f"""
334
+ You are an AI customer support assistant. Provide helpful, accurate, and professional responses.
335
+
336
+ Customer Query: {message}
337
+
338
+ Context Information:
339
+ - Timestamp: {context.get('timestamp', 'N/A')}
340
+ - Priority: {priority}
341
+ - Previous context: {json.dumps(context, indent=2)}
342
+
343
+ Instructions:
344
+ 1. Provide a clear, helpful response to the customer's question
345
+ 2. Be professional and empathetic
346
+ 3. If you don't know something, say so honestly
347
+ 4. Offer to escalate to human support if needed
348
+ 5. Keep responses concise but complete
349
+
350
+ Response:
351
+ """
352
+
353
+ # Configure generation parameters based on priority
354
+ temperature = 0.7 if priority == "high" else 0.8
355
+ max_tokens = 1000 if priority == "high" else 500
356
+
357
+ # Generate response with Gemini
358
+ response = await asyncio.to_thread(
359
+ gemini_model.generate_content,
360
+ enhanced_prompt,
361
+ generation_config=genai.types.GenerationConfig(
362
+ temperature=temperature,
363
+ max_output_tokens=max_tokens,
364
+ top_p=0.8,
365
+ )
366
+ )
367
+
368
+ return response.text.strip()
369
+
370
+ except Exception as e:
371
+ raise HTTPException(
372
+ status_code=500,
373
+ detail=f"Gemini AI processing error: {str(e)}"
374
+ )
375
+
376
+ async def process_single_query_async(query: str, context: Optional[Dict] = None):
377
+ """Helper function for async batch processing"""
378
+ built_context = await fetch_context(query, context)
379
+ response = await process_with_gemini(query, built_context)
380
+ return built_context, response
381
+
382
+ async def store_interaction(
383
+ db: Session,
384
+ user_id: str,
385
+ message: str,
386
+ response: str,
387
+ context: dict
388
+ ):
389
+ """Store interaction in database"""
390
+ try:
391
+ chat_message = models.ChatMessage(
392
+ user_id=int(user_id),
393
+ message=message,
394
+ response=response,
395
+ context=json.dumps(context)
396
+ )
397
+ db.add(chat_message)
398
+ db.commit()
399
+ except Exception as e:
400
+ # Log error but don't raise it since this is a background task
401
+ print(f"Error storing interaction: {str(e)}")
402
+
403
+ if __name__ == "__main__":
404
+ import uvicorn
405
+
406
+ # Check for required environment variables
407
+ if not GEMINI_API_KEY:
408
+ print("❌ Error: GEMINI_API_KEY environment variable is required")
409
+ print("🔑 Get your FREE Gemini API key at: https://aistudio.google.com/app/apikey")
410
+ exit(1)
411
+
412
+ print("🚀 Starting Gemini-Powered MCP Server...")
413
+ print(f"🤖 Using Google Gemini AI (gemini-1.5-flash)")
414
+ print(f"🔧 Server: Gemini MCP Server")
415
+
416
+ uvicorn.run(app, host="0.0.0.0", port=8000)