Soumik555 commited on
Commit
e3e5ff8
·
1 Parent(s): b641792

database chat external user's api

Browse files
Files changed (1) hide show
  1. controller.py +325 -589
controller.py CHANGED
@@ -5,16 +5,16 @@ import os
5
  import asyncio
6
  import threading
7
  import uuid
8
- import time
9
- from fastapi import FastAPI, HTTPException, Header, Request
10
  from fastapi.encoders import jsonable_encoder
11
- from typing import Dict, List, Optional
 
12
  import numpy as np
13
  import pandas as pd
14
  from pandasai import SmartDataframe
15
  from langchain_groq.chat_models import ChatGroq
16
  from dotenv import load_dotenv
17
- from pydantic import BaseModel, Field
18
  from csv_service import clean_data, extract_chart_filenames, generate_csv_data, get_csv_basic_info
19
  from urllib.parse import unquote
20
  from langchain_groq import ChatGroq
@@ -36,109 +36,43 @@ from fastapi.middleware.cors import CORSMiddleware
36
  import matplotlib
37
  matplotlib.use('Agg')
38
 
39
- # Load environment variables first
40
- load_dotenv()
41
-
42
- # Configure professional logging
43
- logging.basicConfig(
44
- level=logging.INFO,
45
- format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
46
- datefmt='%Y-%m-%d %H:%M:%S'
47
- )
48
- logger = logging.getLogger(__name__)
49
-
50
- # Performance logger for timing critical operations
51
- perf_logger = logging.getLogger("performance")
52
- perf_logger.setLevel(logging.INFO)
53
-
54
- def log_performance(operation_name: str):
55
- """Decorator to log operation performance"""
56
- def decorator(func):
57
- async def async_wrapper(*args, **kwargs):
58
- start_time = time.time()
59
- try:
60
- result = await func(*args, **kwargs)
61
- duration = time.time() - start_time
62
- perf_logger.info(f"{operation_name} completed in {duration:.2f}s")
63
- return result
64
- except Exception as e:
65
- duration = time.time() - start_time
66
- perf_logger.error(f"{operation_name} failed after {duration:.2f}s: {str(e)}")
67
- raise
68
-
69
- def sync_wrapper(*args, **kwargs):
70
- start_time = time.time()
71
- try:
72
- result = func(*args, **kwargs)
73
- duration = time.time() - start_time
74
- perf_logger.info(f"{operation_name} completed in {duration:.2f}s")
75
- return result
76
- except Exception as e:
77
- duration = time.time() - start_time
78
- perf_logger.error(f"{operation_name} failed after {duration:.2f}s: {str(e)}")
79
- raise
80
-
81
- return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
82
- return decorator
83
-
84
  # Initialize FastAPI app
85
- app = FastAPI(title="CSV Chat Service", version="1.0.0")
86
 
87
- logger.info("Initializing CSV Chat Service...")
 
 
88
 
89
  # Initialize the ProcessPoolExecutor
90
  max_cpus = os.cpu_count()
91
- logger.info(f"System CPUs detected: {max_cpus}, using {max_cpus-2} workers")
92
 
93
- # Ensure directories exist
94
- directories = ["/app/cache", "/app", "/app/generated_charts"]
95
- for directory in directories:
96
- os.makedirs(directory, exist_ok=True)
97
- logger.debug(f"Directory ensured: {directory}")
98
 
99
- open("/app/pandasai.log", "a").close()
100
- logger.debug("PandasAI log file initialized")
101
 
102
- # CORS Configuration - Optimized and Fixed
103
- image_file_path = os.getenv("IMAGE_FILE_PATH")
104
- image_not_found = os.getenv("IMAGE_NOT_FOUND")
105
-
106
- # Get allowed hosts from environment variable with fallback
107
- allowed_hosts_env = os.getenv("ALLOWED_HOSTS", "")
108
- if allowed_hosts_env:
109
- allowed_hosts = [host.strip() for host in allowed_hosts_env.split(",") if host.strip()]
110
- else:
111
- # Secure fallback: only allow your specific domain
112
- allowed_hosts = ["https://chatcsvandpdf.vercel.app"]
113
 
114
- logger.info(f"CORS configured for origins: {allowed_hosts}")
115
 
116
- # Add CORS middleware with optimized configuration
 
 
117
  app.add_middleware(
118
  CORSMiddleware,
119
- allow_origins=allowed_hosts,
120
  allow_credentials=True,
121
- allow_methods=["GET", "POST", "OPTIONS"], # Only methods you actually use
122
- allow_headers=["Content-Type", "Authorization", "Accept"], # Specific headers only
123
- expose_headers=["Content-Length"],
124
- max_age=3600, # Cache preflight requests for 1 hour
125
  )
126
 
127
- # Load environment variables with validation
128
- groq_api_keys = os.getenv("GROQ_API_KEYS", "").split(",")
129
  model_name = os.getenv("GROQ_LLM_MODEL")
130
 
131
- if not groq_api_keys or not groq_api_keys[0]:
132
- logger.critical("GROQ_API_KEYS not configured properly")
133
- raise ValueError("GROQ API keys are required")
134
-
135
- if not model_name:
136
- logger.critical("GROQ_LLM_MODEL not configured")
137
- raise ValueError("Model name is required")
138
-
139
- logger.info(f"Service initialized with {len(groq_api_keys)} API keys and model: {model_name}")
140
-
141
- # Pydantic models remain the same
142
  class CsvUrlRequest(BaseModel):
143
  csv_url: str
144
 
@@ -149,7 +83,7 @@ class ImageRequest(BaseModel):
149
  class FileProps(BaseModel):
150
  fileName: str
151
  filePath: str
152
- fileType: str
153
 
154
  class Files(BaseModel):
155
  csv_files: List[FileProps]
@@ -157,156 +91,118 @@ class Files(BaseModel):
157
 
158
  class FileBoxProps(BaseModel):
159
  files: Files
160
-
161
- # Optimized thread-safe key management
162
- class ApiKeyManager:
163
- def __init__(self, keys: List[str], name: str):
164
- self.keys = keys
165
- self.name = name
166
- self.current_index = 0
167
- self.lock = threading.Lock()
168
- self.exhausted = False
169
 
170
- def get_next_key(self):
171
- with self.lock:
172
- if self.current_index >= len(self.keys):
173
- self.exhausted = True
174
- logger.warning(f"All {self.name} API keys exhausted")
175
- return None
176
- key = self.keys[self.current_index]
177
- self.current_index += 1
178
- logger.debug(f"Using {self.name} API key index: {self.current_index-1}")
179
- return key
180
-
181
- def reset(self):
182
- with self.lock:
183
- self.current_index = 0
184
- self.exhausted = False
185
- logger.info(f"Reset {self.name} API key rotation")
186
-
187
- # Initialize key managers
188
- groq_key_manager = ApiKeyManager(groq_api_keys, "Groq")
189
- langchain_key_manager = ApiKeyManager(groq_api_keys, "Langchain")
190
- chart_key_manager = ApiKeyManager(groq_api_keys, "Chart")
191
-
192
- # Use a process pool for CPU-bound operations
193
- process_executor = ProcessPoolExecutor(max_workers=max_cpus-2)
194
- logger.info(f"ProcessPoolExecutor initialized with {max_cpus-2} workers")
195
-
196
- # Middleware to log requests
197
- @app.middleware("http")
198
- async def log_requests(request: Request, call_next):
199
- start_time = time.time()
200
- client_ip = request.client.host
201
- method = request.method
202
- url = str(request.url)
203
-
204
- logger.info(f"Request started: {method} {url} from {client_ip}")
205
-
206
- response = await call_next(request)
207
-
208
- process_time = time.time() - start_time
209
- logger.info(f"Request completed: {method} {url} - Status: {response.status_code} - Duration: {process_time:.2f}s")
210
-
211
- return response
212
 
213
- # ROOT CHECK
214
- @app.get("/")
215
- async def root():
216
- logger.debug("Health check endpoint accessed")
217
- return {"message": "CSV Chat-Service-1 is running", "status": "healthy"}
218
 
219
  # PING CHECK
220
  @app.get("/ping")
221
- async def ping():
222
- logger.debug("Ping endpoint accessed")
223
- return {"message": "Pong !!", "timestamp": time.time()}
224
-
225
- # CORS test endpoint
226
- @app.get("/api/cors-test")
227
- async def cors_test():
228
- logger.info("CORS test endpoint accessed")
229
- return {
230
- "message": "CORS is working",
231
- "allowed_origins": allowed_hosts,
232
- "timestamp": time.time()
233
- }
234
 
235
  # BASIC KNOWLEDGE BASED ON CSV
 
 
236
  @app.post("/api/basic_csv_data")
237
- @log_performance("basic_csv_data")
238
  async def basic_csv_data(request: CsvUrlRequest):
239
  try:
240
  decoded_url = unquote(request.csv_url)
241
- logger.info(f"Processing basic CSV data request for URL: {decoded_url[:100]}...")
242
-
 
243
  loop = asyncio.get_running_loop()
244
  csv_data = await loop.run_in_executor(
245
  process_executor, get_csv_basic_info, decoded_url
246
  )
247
-
248
- logger.info("CSV basic data retrieved successfully")
249
  return {"data": csv_data}
250
-
251
  except Exception as e:
252
- logger.error(f"Error in basic_csv_data: {str(e)}")
253
  raise HTTPException(status_code=400, detail=f"Failed to retrieve CSV data: {str(e)}")
254
 
 
255
  # GET THE CHART FROM A SPECIFIC FILE PATH
256
  @app.post("/api/get-chart")
257
- @log_performance("get_chart")
258
  async def get_image(request: ImageRequest, authorization: str = Header(None)):
259
- logger.info(f"Chart retrieval request for chat_id: {request.chat_id}")
260
-
261
- # Optimized auth check
262
- if not authorization or not authorization.startswith("Bearer "):
263
- logger.warning("Invalid authorization header format")
264
- raise HTTPException(status_code=401, detail="Authorization header missing or invalid")
265
 
266
- token = authorization[7:] # Remove "Bearer " prefix
267
- if not token or token != os.getenv("AUTH_TOKEN"):
268
- logger.warning("Authentication failed")
 
 
 
 
269
  raise HTTPException(status_code=403, detail="Invalid token")
270
 
271
  try:
 
272
  image_file_path = request.image_path
273
- unique_file_name = f'{str(uuid.uuid4())}.png'
274
-
275
- logger.info(f"Uploading chart to Supabase: {unique_file_name}")
276
- image_public_url = await upload_file_to_supabase(
277
- image_file_path, unique_file_name, chat_id=request.chat_id
278
- )
279
-
280
- # Clean up local file
281
- if os.path.exists(image_file_path):
282
- os.remove(image_file_path)
283
- logger.debug(f"Local file cleaned up: {image_file_path}")
284
-
285
- logger.info("Chart uploaded successfully")
286
  return {"image_url": image_public_url}
287
-
288
  except Exception as e:
289
- logger.error(f"Error in get_image: {str(e)}")
290
  return {"answer": "error"}
 
291
 
292
  # GET CSV DATA FOR GENERATING THE TABLE
293
  @app.post("/api/csv_data")
294
- @log_performance("csv_data_generation")
295
  async def get_csv_data(request: CsvUrlRequest):
296
  try:
297
  decoded_url = unquote(request.csv_url)
298
- logger.info(f"Generating CSV table data for URL: {decoded_url[:100]}...")
299
-
300
  loop = asyncio.get_running_loop()
301
  csv_data = await loop.run_in_executor(
302
  process_executor, generate_csv_data, decoded_url
303
- )
304
-
305
- logger.info("CSV table data generated successfully")
306
  return csv_data
307
-
308
  except Exception as e:
309
- logger.error(f"Error in get_csv_data: {str(e)}")
310
  raise HTTPException(status_code=400, detail=f"Failed to retrieve CSV data: {str(e)}")
311
 
312
  # EXECUTE THE PYTHON CODE
@@ -315,112 +211,109 @@ class ExecutionRequest(BaseModel):
315
  csv_url: str = Field(..., alias="csv_url")
316
  codeExecutionPayload: CsvChatResult
317
 
 
318
  @app.post("/api/code_execution_csv")
319
- @log_performance("code_execution")
320
  async def code_execution_csv(
321
- request_data: ExecutionRequest,
322
  authorization: Optional[str] = Header(None)
323
  ):
324
- logger.info(f"Code execution request for chat_id: {request_data.chat_id}")
325
-
326
  expected_token = os.environ.get("AUTH_TOKEN")
327
  if not authorization or not expected_token or authorization.replace("Bearer ", "") != expected_token:
328
- logger.warning("Unauthorized code execution attempt")
329
  raise HTTPException(status_code=401, detail="Unauthorized")
330
 
331
  try:
 
 
 
 
332
  decoded_url = unquote(request_data.csv_url)
333
- logger.info("Loading and cleaning CSV data for execution")
334
-
335
  df = clean_data(decoded_url)
336
  executor = PythonExecutor(df)
337
-
338
- logger.info("Executing Python code payload")
339
- formatted_output = await executor.process_response(
340
- request_data.codeExecutionPayload, request_data.chat_id
341
- )
342
-
343
- logger.info("Code execution completed successfully")
344
  return {"answer": formatted_output}
345
 
346
  except Exception as e:
347
- logger.error(f"Code execution error: {str(e)}")
348
  return {"error": "Failed to execute request", "message": str(e)}
349
 
 
350
  # CHAT CODING STARTS FROM HERE
351
 
 
352
  def groq_chat(csv_url: str, question: str):
353
- """Optimized groq chat with better error handling"""
354
- logger.info("Starting Groq chat processing")
355
-
356
- while not groq_key_manager.exhausted:
357
- current_api_key = groq_key_manager.get_next_key()
358
- if not current_api_key:
359
- break
360
 
361
  try:
362
- logger.debug("Loading and cleaning CSV data")
363
  data = clean_data(csv_url)
364
-
365
  llm = ChatGroq(model=model_name, api_key=current_api_key)
 
366
  chart_filename = f"chart_{uuid.uuid4()}.png"
367
  chart_path = os.path.join("generated_charts", chart_filename)
368
 
369
- logger.debug(f"Creating SmartDataframe with chart path: {chart_path}")
370
  df = SmartDataframe(
371
  data,
372
  config={
373
  'llm': llm,
374
- 'save_charts': True,
375
  'open_charts': False,
376
- 'save_charts_path': os.path.dirname(chart_path),
377
- 'custom_chart_filename': chart_filename,
378
  'enable_cache': False
379
  }
380
  )
381
 
382
- logger.debug("Processing chat query with SmartDataframe")
383
  answer = df.chat(question)
384
 
385
- # Optimized response processing
386
  if isinstance(answer, pd.DataFrame):
387
  processed = answer.apply(handle_out_of_range_float).to_dict(orient="records")
388
  elif isinstance(answer, pd.Series):
389
  processed = answer.apply(handle_out_of_range_float).to_dict()
390
- elif isinstance(answer, (list, dict)):
391
- processed = jsonable_encoder(answer)
 
 
392
  else:
393
  processed = {"answer": str(handle_out_of_range_float(answer))}
394
 
395
- logger.info("Groq chat processing completed successfully")
396
  return processed
397
 
398
  except Exception as e:
399
  error_message = str(e)
400
- if "429" in error_message or "rate limit" in error_message.lower():
401
- logger.warning(f"Rate limit hit, trying next API key")
402
- continue
 
 
 
403
  else:
404
- logger.error(f"Groq chat error: {error_message}")
405
  return {"error": error_message}
406
 
407
- logger.error("All Groq API keys exhausted")
408
- return {"error": "All API keys exhausted."}
409
-
410
  def langchain_csv_chat(csv_url: str, question: str, chart_required: bool):
411
- """Optimized langchain chat with better key management"""
412
- logger.info(f"Starting Langchain CSV chat (chart_required: {chart_required})")
413
-
414
  data = clean_data(csv_url)
415
  attempts = 0
416
 
417
  while attempts < len(groq_api_keys):
418
- api_key = langchain_key_manager.get_next_key()
419
- if not api_key:
420
- break
421
-
422
- attempts += 1
423
- logger.debug(f"Langchain attempt {attempts} with API key")
 
424
 
425
  try:
426
  llm = ChatGroq(model=model_name, api_key=api_key)
@@ -437,114 +330,95 @@ def langchain_csv_chat(csv_url: str, question: str, chart_required: bool):
437
  llm,
438
  data,
439
  agent_type="tool-calling",
440
- verbose=False, # Reduce verbosity for performance
441
  allow_dangerous_code=True,
442
  extra_tools=[tool],
443
  return_intermediate_steps=True
444
  )
445
 
446
  prompt = _prompt_generator(question, chart_required, csv_url)
447
- logger.debug("Invoking Langchain agent")
448
  result = agent.invoke({"input": prompt})
449
-
450
- logger.info("Langchain processing completed successfully")
451
  return result.get("output")
452
 
453
  except Exception as e:
454
  error_message = str(e)
455
- if "429" in error_message or "rate limit" in error_message.lower():
456
- logger.warning(f"Langchain rate limit hit, attempt {attempts}")
457
- continue
 
458
  else:
459
- logger.error(f"Langchain error on attempt {attempts}: {error_message}")
460
  return {"error": error_message}
461
 
462
- logger.error("All Langchain API keys exhausted")
463
  return {"error": "All API keys exhausted"}
464
 
 
465
  @app.post("/api/csv-chat")
466
- @log_performance("csv_chat")
467
  async def csv_chat(request: Dict, authorization: str = Header(None)):
468
- """Optimized CSV chat endpoint"""
469
- logger.info("Request Body: %s", request)
470
- chat_id = request.get("chat_id", "unknown")
471
- logger.info(f"CSV chat request started for chat_id: {chat_id}")
472
-
473
- # Optimized authorization check
474
  if not authorization or not authorization.startswith("Bearer "):
475
- logger.warning("Invalid authorization format")
476
  raise HTTPException(status_code=401, detail="Invalid authorization")
477
 
478
- token = authorization[7:] # Remove "Bearer " prefix
479
  if token != os.getenv("AUTH_TOKEN"):
480
- logger.warning("Authentication failed")
481
  raise HTTPException(status_code=403, detail="Invalid token")
482
 
483
  try:
484
- # Extract request parameters
485
  query = request.get("query")
486
  csv_url = request.get("csv_url")
487
  decoded_url = unquote(csv_url)
488
- detailed_answer = request.get("detailed_answer", False)
489
  conversation_history = request.get("conversation_history", [])
490
- generate_report = request.get("generate_report", False)
 
491
 
492
- logger.info(f"Processing query: '{query[:50]}...' (detailed: {detailed_answer}, report: {generate_report})")
493
-
494
- # Report generation path
495
- if generate_report:
496
- logger.info("Generating CSV report")
497
  report_files = await generate_csv_report(csv_url, query, chat_id, conversation_history)
498
  if report_files is not None:
499
- logger.info("Report generated successfully")
500
  return {"answer": jsonable_encoder(report_files)}
501
 
502
- # Initial chat question path
503
  if if_initial_chat_question(query):
504
- logger.info("Processing as initial chat question")
505
  answer = await asyncio.to_thread(
506
  langchain_csv_chat, decoded_url, query, False
507
  )
508
- logger.info("Initial chat question processed")
509
  return {"answer": jsonable_encoder(answer)}
510
 
511
- # Detailed answer orchestration
512
- if detailed_answer:
513
- logger.info("Processing with orchestrator for detailed answer")
514
- orchestrator_answer = await asyncio.to_thread(
515
- csv_orchestrator_chat, decoded_url, query, conversation_history, chat_id
516
- )
517
- if orchestrator_answer is not None:
518
- logger.info("Orchestrator processing completed")
519
- return {"answer": jsonable_encoder(orchestrator_answer)}
520
 
521
- # Cerebras CSV agent processing
522
- logger.info("Processing with Cerebras CSV agent")
523
- result = await query_csv_agent(decoded_url, query, chat_id)
524
 
525
- if result is not None and result != "":
526
- logger.info("Cerebras agent processing successful")
 
527
  return {"answer": result}
528
 
529
- # Fallback to langchain
530
- logger.info("Falling back to Langchain processing")
 
 
531
  lang_answer = await asyncio.to_thread(
532
  langchain_csv_chat, decoded_url, query, False
533
  )
534
-
535
- if not process_answer(lang_answer):
536
- logger.info("Langchain fallback successful")
537
- return {"answer": jsonable_encoder(lang_answer)}
538
-
539
- logger.warning("All processing methods failed")
540
- return {"answer": "error"}
541
 
542
  except Exception as e:
543
- logger.error(f"Critical error in csv_chat: {str(e)}")
544
  return {"answer": "error"}
545
 
546
  def handle_out_of_range_float(value):
547
- """Optimized float handling"""
548
  if isinstance(value, float):
549
  if np.isnan(value):
550
  return None
@@ -552,82 +426,125 @@ def handle_out_of_range_float(value):
552
  return "Infinity"
553
  return value
554
 
555
- # CHART GENERATION SECTION
 
 
 
 
 
 
556
 
557
  instructions = """
 
558
  - Please ensure that each value is clearly visible, You may need to adjust the font size, rotate the labels, or use truncation to improve readability (if needed).
559
  - For multiple charts, put all of them in a single file.
560
  - Use colorblind-friendly palette
561
  - Read above instructions and follow them.
 
562
  """
563
 
 
 
 
 
 
 
 
564
  def model():
565
- """Get model instance with current API key"""
566
- api_key = chart_key_manager.get_next_key()
567
- if not api_key:
568
- raise Exception("All API keys exhausted for chart generation")
 
569
  return ChatGroq(model=model_name, api_key=api_key)
570
 
571
  def groq_chart(csv_url: str, question: str):
572
- """Optimized Groq chart generation"""
573
- logger.info("Starting Groq chart generation")
574
 
575
  for attempt in range(len(groq_api_keys)):
576
- api_key = chart_key_manager.get_next_key()
577
- if not api_key:
578
- break
579
-
580
  try:
581
- logger.debug(f"Chart generation attempt {attempt + 1}")
 
 
 
 
 
 
 
582
  data = clean_data(csv_url)
583
- llm = ChatGroq(model=model_name, api_key=api_key)
 
 
 
584
 
 
585
  chart_filename = f"chart_{uuid.uuid4()}.png"
586
  chart_path = os.path.join("generated_charts", chart_filename)
587
 
 
588
  df = SmartDataframe(
589
  data,
590
  config={
591
  'llm': llm,
592
- 'save_charts': True,
593
  'open_charts': False,
594
- 'save_charts_path': os.path.dirname(chart_path),
595
- 'custom_chart_filename': chart_filename,
596
  'enable_cache': False
597
  }
598
  )
599
 
600
  answer = df.chat(question + instructions)
601
 
602
- if not process_answer(answer):
603
- logger.info("Groq chart generated successfully")
604
- return answer
605
-
606
  except Exception as e:
607
  error = str(e)
608
- if "429" in error or "rate limit" in error.lower():
609
- logger.warning(f"Rate limit hit on attempt {attempt + 1}")
610
- continue
 
611
  else:
612
  logger.error(f"Chart generation error: {error}")
613
  return {"error": error}
614
 
615
- logger.error("All API keys exhausted for Groq chart generation")
616
  return {"error": "All API keys exhausted for chart generation"}
617
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
618
  def langchain_csv_chart(csv_url: str, question: str, chart_required: bool):
619
- """Optimized langchain chart generation"""
620
- logger.info("Starting Langchain chart generation")
 
 
 
 
 
 
621
 
622
  data = clean_data(csv_url)
623
 
624
  for attempt in range(len(groq_api_keys)):
625
- api_key = chart_key_manager.get_next_key()
626
- if not api_key:
627
- break
628
-
629
  try:
630
- logger.debug(f"Langchain chart attempt {attempt + 1}")
 
 
 
 
631
  llm = ChatGroq(model=model_name, api_key=api_key)
632
  tool = PythonAstREPLTool(locals={
633
  "df": data,
@@ -643,7 +560,7 @@ def langchain_csv_chart(csv_url: str, question: str, chart_required: bool):
643
  llm,
644
  data,
645
  agent_type="tool-calling",
646
- verbose=False, # Reduced verbosity
647
  allow_dangerous_code=True,
648
  extra_tools=[tool],
649
  return_intermediate_steps=True
@@ -652,43 +569,46 @@ def langchain_csv_chart(csv_url: str, question: str, chart_required: bool):
652
  result = agent.invoke({"input": _prompt_generator(question, True, csv_url)})
653
  output = result.get("output", "")
654
 
655
- # Clean up matplotlib figures
656
  plt.close('all')
657
 
658
- # Extract chart filenames
659
  chart_files = extract_chart_filenames(output)
660
  if len(chart_files) > 0:
661
- full_paths = [os.path.join(image_file_path, f) for f in chart_files]
662
- logger.info(f"Langchain chart generated: {len(chart_files)} files")
663
- return full_paths
 
 
664
 
665
  except Exception as e:
666
  error_message = str(e)
667
- if "429" in error_message or "rate limit" in error_message.lower():
668
- logger.warning(f"Langchain chart rate limit hit on attempt {attempt + 1}")
669
- continue
 
670
  else:
671
- logger.error(f"Langchain chart error: {error_message}")
672
  return {"error": error_message}
673
 
674
- logger.error("All API keys exhausted for Langchain chart generation")
675
  return "Chart generation failed after all retries"
676
 
 
 
677
  @app.post("/api/csv-chart")
678
- @log_performance("csv_chart_generation")
679
  async def csv_chart(request: dict, authorization: str = Header(None)):
680
- """Optimized CSV chart generation endpoint"""
681
- chat_id = request.get("chat_id", "unknown")
682
- logger.info(f"Chart generation request started for chat_id: {chat_id}")
683
-
684
- # Optimized authorization check
 
685
  if not authorization or not authorization.startswith("Bearer "):
686
- logger.warning("Invalid chart authorization")
687
  raise HTTPException(status_code=401, detail="Authorization required")
688
 
689
- token = authorization[7:] # Remove "Bearer " prefix
690
  if token != os.getenv("AUTH_TOKEN"):
691
- logger.warning("Chart authentication failed")
692
  raise HTTPException(status_code=403, detail="Invalid credentials")
693
 
694
  try:
@@ -697,261 +617,77 @@ async def csv_chart(request: dict, authorization: str = Header(None)):
697
  detailed_answer = request.get("detailed_answer", False)
698
  conversation_history = request.get("conversation_history", [])
699
  generate_report = request.get("generate_report", False)
 
700
 
701
- logger.info(f"Chart request: '{query[:50]}...' (detailed: {detailed_answer})")
702
-
703
- # Report generation
704
- if generate_report:
705
- logger.info("Generating chart report")
706
  report_files = await generate_csv_report(csv_url, query, chat_id, conversation_history)
707
  if report_files is not None:
708
- logger.info("Chart report generated successfully")
709
  return {"orchestrator_response": jsonable_encoder(report_files)}
710
 
711
  loop = asyncio.get_running_loop()
712
-
713
- # Initial chart question processing
714
  if if_initial_chart_question(query):
715
- logger.info("Processing as initial chart question")
716
  langchain_result = await loop.run_in_executor(
717
  process_executor, langchain_csv_chart, csv_url, query, True
718
  )
719
-
720
  if isinstance(langchain_result, list) and len(langchain_result) > 0:
721
- unique_file_name = f'{str(uuid.uuid4())}.png'
722
- logger.info("Uploading initial chart to Supabase")
723
-
724
- image_public_url = await upload_file_to_supabase(
725
- langchain_result[0], unique_file_name, chat_id=chat_id
726
- )
727
-
728
- # Clean up local file
729
- if os.path.exists(langchain_result[0]):
730
- os.remove(langchain_result[0])
731
- logger.debug("Local chart file cleaned up")
732
-
733
- logger.info("Initial chart uploaded successfully")
734
  return {"image_url": image_public_url}
 
735
 
736
- # Detailed answer orchestration
737
- if detailed_answer:
738
- logger.info("Processing with orchestrator for detailed chart answer")
739
- orchestrator_answer = await asyncio.to_thread(
740
- csv_orchestrator_chat, csv_url, query, conversation_history, chat_id
741
- )
742
-
743
- if orchestrator_answer is not None:
744
- logger.info("Orchestrator chart processing completed")
745
- return {"orchestrator_response": jsonable_encoder(orchestrator_answer)}
746
-
747
- # Cerebras processing
748
- logger.info("Trying Cerebras AI for chart generation")
 
 
 
 
 
 
 
 
 
 
 
749
  result = await query_csv_agent(csv_url, query, chat_id)
750
-
751
  if result is not None and result != "":
752
- logger.info("Cerebras chart processing successful")
753
- return {"orchestrator_response": jsonable_encoder(result)}
754
 
755
- # Fallback to langchain
756
- logger.info("Falling back to Langchain for chart generation")
757
  langchain_paths = await loop.run_in_executor(
758
  process_executor, langchain_csv_chart, csv_url, query, True
759
  )
760
-
761
  if isinstance(langchain_paths, list) and len(langchain_paths) > 0:
762
- unique_file_name = f'{str(uuid.uuid4())}.png'
763
- logger.info("Uploading fallback chart to Supabase")
764
-
765
- image_public_url = await upload_file_to_supabase(
766
- langchain_paths[0], unique_file_name, chat_id=chat_id
767
- )
768
-
769
- # Clean up local file
770
- if os.path.exists(langchain_paths[0]):
771
- os.remove(langchain_paths[0])
772
- logger.debug("Fallback chart file cleaned up")
773
-
774
- logger.info("Fallback chart uploaded successfully")
775
- return {"image_url": image_public_url}
776
  else:
777
- logger.error("All chart generation methods failed")
778
- return {"answer": "error"}
779
 
780
  except Exception as e:
781
- logger.error(f"Critical chart generation error: {str(e)}")
782
  return {"answer": "error"}
783
 
784
- # Additional optimizations and middleware
785
-
786
- @app.on_event("startup")
787
- async def startup_event():
788
- """Application startup tasks"""
789
- logger.info("=== CSV Chat Service Starting Up ===")
790
- logger.info(f"Environment: {os.getenv('ENVIRONMENT', 'development')}")
791
- logger.info(f"Available API keys: {len(groq_api_keys)}")
792
- logger.info(f"Model: {model_name}")
793
- logger.info(f"Max workers: {max_cpus-2}")
794
- logger.info("=== Startup Complete ===")
795
-
796
- @app.on_event("shutdown")
797
- async def shutdown_event():
798
- """Application shutdown cleanup"""
799
- logger.info("=== CSV Chat Service Shutting Down ===")
800
- try:
801
- # Clean up process executor
802
- process_executor.shutdown(wait=True)
803
- logger.info("ProcessPoolExecutor shutdown complete")
804
-
805
- # Clean up matplotlib
806
- plt.close('all')
807
- logger.info("Matplotlib figures cleaned up")
808
-
809
- except Exception as e:
810
- logger.error(f"Error during shutdown: {str(e)}")
811
-
812
- logger.info("=== Shutdown Complete ===")
813
-
814
- # Health check with detailed status
815
- @app.get("/api/health")
816
- async def health_check():
817
- """Detailed health check endpoint"""
818
- try:
819
- health_status = {
820
- "status": "healthy",
821
- "timestamp": time.time(),
822
- "service": "CSV Chat Service",
823
- "version": "1.0.0",
824
- "system": {
825
- "cpu_count": max_cpus,
826
- "workers": max_cpus-2,
827
- "api_keys_available": len(groq_api_keys),
828
- "model": model_name
829
- },
830
- "directories": {
831
- "cache": os.path.exists("/app/cache"),
832
- "charts": os.path.exists("/app/generated_charts"),
833
- "app": os.path.exists("/app")
834
- }
835
- }
836
-
837
- logger.debug("Health check completed successfully")
838
- return health_status
839
-
840
- except Exception as e:
841
- logger.error(f"Health check failed: {str(e)}")
842
- return {
843
- "status": "unhealthy",
844
- "error": str(e),
845
- "timestamp": time.time()
846
- }
847
-
848
- # API key status endpoint (for monitoring)
849
- @app.get("/api/key-status")
850
- async def api_key_status(authorization: str = Header(None)):
851
- """Monitor API key usage status"""
852
- # Quick auth check
853
- if not authorization or authorization.replace("Bearer ", "") != os.getenv("AUTH_TOKEN"):
854
- raise HTTPException(status_code=401, detail="Unauthorized")
855
-
856
- try:
857
- status = {
858
- "groq_keys": {
859
- "total": len(groq_api_keys),
860
- "current_index": groq_key_manager.current_index,
861
- "exhausted": groq_key_manager.exhausted
862
- },
863
- "langchain_keys": {
864
- "total": len(groq_api_keys),
865
- "current_index": langchain_key_manager.current_index,
866
- "exhausted": langchain_key_manager.exhausted
867
- },
868
- "chart_keys": {
869
- "total": len(groq_api_keys),
870
- "current_index": chart_key_manager.current_index,
871
- "exhausted": chart_key_manager.exhausted
872
- },
873
- "timestamp": time.time()
874
- }
875
-
876
- logger.debug("API key status retrieved")
877
- return status
878
-
879
- except Exception as e:
880
- logger.error(f"Error getting key status: {str(e)}")
881
- return {"error": str(e)}
882
-
883
- # Reset API keys endpoint (for emergency reset)
884
- @app.post("/api/reset-keys")
885
- async def reset_api_keys(authorization: str = Header(None)):
886
- """Reset all API key managers"""
887
- if not authorization or authorization.replace("Bearer ", "") != os.getenv("AUTH_TOKEN"):
888
- raise HTTPException(status_code=401, detail="Unauthorized")
889
-
890
- try:
891
- groq_key_manager.reset()
892
- langchain_key_manager.reset()
893
- chart_key_manager.reset()
894
-
895
- logger.info("All API key managers reset successfully")
896
- return {
897
- "message": "All API key managers reset",
898
- "timestamp": time.time(),
899
- "status": "success"
900
- }
901
-
902
- except Exception as e:
903
- logger.error(f"Error resetting keys: {str(e)}")
904
- return {"error": str(e)}
905
-
906
- # Error handler for better error logging
907
- @app.exception_handler(Exception)
908
- async def global_exception_handler(request: Request, exc: Exception):
909
- """Global exception handler with detailed logging"""
910
- logger.error(f"Unhandled exception on {request.method} {request.url}: {str(exc)}")
911
- return {"error": "Internal server error", "timestamp": time.time()}
912
-
913
- # Optimized utility functions
914
- def safe_file_cleanup(file_path: str):
915
- """Safely remove file with logging"""
916
- try:
917
- if os.path.exists(file_path):
918
- os.remove(file_path)
919
- logger.debug(f"File cleaned up: {file_path}")
920
- return True
921
- except Exception as e:
922
- logger.warning(f"Failed to cleanup file {file_path}: {str(e)}")
923
- return False
924
-
925
- # Memory optimization: Periodic cleanup task
926
- @app.on_event("startup")
927
- async def setup_periodic_cleanup():
928
- """Setup periodic cleanup of temporary files"""
929
- async def cleanup_task():
930
- while True:
931
- try:
932
- await asyncio.sleep(300) # Run every 5 minutes
933
- charts_dir = "/app/generated_charts"
934
- if os.path.exists(charts_dir):
935
- # Clean up files older than 1 hour
936
- current_time = time.time()
937
- cleaned_count = 0
938
-
939
- for filename in os.listdir(charts_dir):
940
- file_path = os.path.join(charts_dir, filename)
941
- try:
942
- if os.path.getmtime(file_path) < current_time - 3600: # 1 hour
943
- os.remove(file_path)
944
- cleaned_count += 1
945
- except Exception:
946
- continue
947
-
948
- if cleaned_count > 0:
949
- logger.info(f"Periodic cleanup: removed {cleaned_count} old chart files")
950
-
951
- except Exception as e:
952
- logger.error(f"Periodic cleanup error: {str(e)}")
953
-
954
- # Start cleanup task in background
955
- asyncio.create_task(cleanup_task())
956
- logger.info("Periodic cleanup task started")
957
-
 
5
  import asyncio
6
  import threading
7
  import uuid
8
+ from fastapi import FastAPI, HTTPException, Header
 
9
  from fastapi.encoders import jsonable_encoder
10
+ from typing import Dict, List, Literal, Optional
11
+ from fastapi.responses import FileResponse
12
  import numpy as np
13
  import pandas as pd
14
  from pandasai import SmartDataframe
15
  from langchain_groq.chat_models import ChatGroq
16
  from dotenv import load_dotenv
17
+ from pydantic import BaseModel, Field, ValidationError
18
  from csv_service import clean_data, extract_chart_filenames, generate_csv_data, get_csv_basic_info
19
  from urllib.parse import unquote
20
  from langchain_groq import ChatGroq
 
36
  import matplotlib
37
  matplotlib.use('Agg')
38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
  # Initialize FastAPI app
40
+ app = FastAPI()
41
 
42
+ # Set up logging
43
+ logging.basicConfig(level=logging.INFO)
44
+ logger = logging.getLogger(__name__)
45
 
46
  # Initialize the ProcessPoolExecutor
47
  max_cpus = os.cpu_count()
48
+ logger.info(f"Max CPUs: {max_cpus}")
49
 
50
+ # Ensure the cache directory exists
51
+ os.makedirs("/app/cache", exist_ok=True)
 
 
 
52
 
53
+ os.makedirs("/app", exist_ok=True)
54
+ open("/app/pandasai.log", "a").close() # Create the file if it doesn't exist
55
 
56
+ # Ensure the generated_charts directory exists
57
+ os.makedirs("/app/generated_charts", exist_ok=True)
 
 
 
 
 
 
 
 
 
58
 
59
+ load_dotenv()
60
 
61
+ image_file_path = os.getenv("IMAGE_FILE_PATH")
62
+ image_not_found = os.getenv("IMAGE_NOT_FOUND")
63
+ allowed_hosts = os.getenv("ALLOWED_HOSTS", "").split(",")
64
  app.add_middleware(
65
  CORSMiddleware,
66
+ allow_origins=allowed_hosts,
67
  allow_credentials=True,
68
+ allow_methods=["*"],
69
+ allow_headers=["*"],
 
 
70
  )
71
 
72
+ # Load environment variables
73
+ groq_api_keys = os.getenv("GROQ_API_KEYS").split(",")
74
  model_name = os.getenv("GROQ_LLM_MODEL")
75
 
 
 
 
 
 
 
 
 
 
 
 
76
  class CsvUrlRequest(BaseModel):
77
  csv_url: str
78
 
 
83
  class FileProps(BaseModel):
84
  fileName: str
85
  filePath: str
86
+ fileType: str # 'csv' | 'image'
87
 
88
  class Files(BaseModel):
89
  csv_files: List[FileProps]
 
91
 
92
  class FileBoxProps(BaseModel):
93
  files: Files
 
 
 
 
 
 
 
 
 
94
 
95
+ dummy_response = FileBoxProps(
96
+ files=Files(
97
+ csv_files=[
98
+ FileProps(
99
+ fileName="sales_data.csv",
100
+ filePath="/downloads/sales_data.csv",
101
+ fileType="csv"
102
+ ),
103
+ FileProps(
104
+ fileName="customer_data.csv",
105
+ filePath="/downloads/customer_data.csv",
106
+ fileType="csv"
107
+ )
108
+ ],
109
+ image_files=[
110
+ FileProps(
111
+ fileName="chart.png",
112
+ filePath="/downloads/chart.png",
113
+ fileType="image"
114
+ ),
115
+ FileProps(
116
+ fileName="graph.jpg",
117
+ filePath="/downloads/graph.jpg",
118
+ fileType="image"
119
+ )
120
+ ]
121
+ )
122
+ )
123
+
124
+
125
+ # Thread-safe key management for groq_chat
126
+ current_groq_key_index = 0
127
+ current_groq_key_lock = threading.Lock()
128
+
129
+ # Thread-safe key management for langchain_csv_chat
130
+ current_langchain_key_index = 0
131
+ current_langchain_key_lock = threading.Lock()
 
 
 
 
 
132
 
 
 
 
 
 
133
 
134
  # PING CHECK
135
  @app.get("/ping")
136
+ async def root():
137
+ return {"message": "Pong !!"}
138
+
139
+
 
 
 
 
 
 
 
 
 
140
 
141
  # BASIC KNOWLEDGE BASED ON CSV
142
+
143
+ # Remove trailing slash from the URL otherwise it will redirect to GET method
144
  @app.post("/api/basic_csv_data")
 
145
  async def basic_csv_data(request: CsvUrlRequest):
146
  try:
147
  decoded_url = unquote(request.csv_url)
148
+ logger.info(f"Fetching CSV data from URL: {decoded_url}")
149
+ # csv_data = await get_csv_basic_info(decoded_url)
150
+ # Run the synchronous function in a thread pool executor
151
  loop = asyncio.get_running_loop()
152
  csv_data = await loop.run_in_executor(
153
  process_executor, get_csv_basic_info, decoded_url
154
  )
155
+ logger.info(f"CSV data fetched successfully: {csv_data}")
 
156
  return {"data": csv_data}
 
157
  except Exception as e:
158
+ logger.error(f"Error while fetching CSV data: {e}")
159
  raise HTTPException(status_code=400, detail=f"Failed to retrieve CSV data: {str(e)}")
160
 
161
+
162
  # GET THE CHART FROM A SPECIFIC FILE PATH
163
  @app.post("/api/get-chart")
 
164
  async def get_image(request: ImageRequest, authorization: str = Header(None)):
165
+ if not authorization:
166
+ raise HTTPException(status_code=401, detail="Authorization header missing")
 
 
 
 
167
 
168
+ if not authorization.startswith("Bearer "):
169
+ raise HTTPException(status_code=401, detail="Invalid authorization header format")
170
+
171
+ token = authorization.split(" ")[1]
172
+ if not token:
173
+ raise HTTPException(status_code=401, detail="Token missing")
174
+ if token != os.getenv("AUTH_TOKEN"):
175
  raise HTTPException(status_code=403, detail="Invalid token")
176
 
177
  try:
178
+ logger.info("Groq Chat created a chat for the user query...")
179
  image_file_path = request.image_path
180
+ unique_file_name =f'{str(uuid.uuid4())}.png'
181
+ logger.info("Uploading the chart to supabase...")
182
+ image_public_url = await upload_file_to_supabase(f"{image_file_path}", unique_file_name, chat_id=request.chat_id)
183
+ logger.info("Image uploaded to Supabase and Image URL is... ", {image_public_url})
184
+ os.remove(image_file_path)
 
 
 
 
 
 
 
 
185
  return {"image_url": image_public_url}
186
+ # return FileResponse(image_file_path, media_type="image/png")
187
  except Exception as e:
188
+ logger.error(f"Error: {e}")
189
  return {"answer": "error"}
190
+
191
 
192
  # GET CSV DATA FOR GENERATING THE TABLE
193
  @app.post("/api/csv_data")
 
194
  async def get_csv_data(request: CsvUrlRequest):
195
  try:
196
  decoded_url = unquote(request.csv_url)
197
+ logger.info(f"Fetching CSV data from URL: {decoded_url}")
198
+ # csv_data = await generate_csv_data(decoded_url)
199
  loop = asyncio.get_running_loop()
200
  csv_data = await loop.run_in_executor(
201
  process_executor, generate_csv_data, decoded_url
202
+ )
 
 
203
  return csv_data
 
204
  except Exception as e:
205
+ logger.error(f"Error while fetching CSV data: {e}")
206
  raise HTTPException(status_code=400, detail=f"Failed to retrieve CSV data: {str(e)}")
207
 
208
  # EXECUTE THE PYTHON CODE
 
211
  csv_url: str = Field(..., alias="csv_url")
212
  codeExecutionPayload: CsvChatResult
213
 
214
+
215
  @app.post("/api/code_execution_csv")
 
216
  async def code_execution_csv(
217
+ request_data: ExecutionRequest, # Change from ExecutionRequest to dict to see raw input
218
  authorization: Optional[str] = Header(None)
219
  ):
220
+ # Auth check remains the same
 
221
  expected_token = os.environ.get("AUTH_TOKEN")
222
  if not authorization or not expected_token or authorization.replace("Bearer ", "") != expected_token:
 
223
  raise HTTPException(status_code=401, detail="Unauthorized")
224
 
225
  try:
226
+ # First log the incoming request data
227
+ logger.info("Incoming request data:", request_data)
228
+
229
+ # Rest of your processing logic...
230
  decoded_url = unquote(request_data.csv_url)
 
 
231
  df = clean_data(decoded_url)
232
  executor = PythonExecutor(df)
233
+ formatted_output = await executor.process_response(request_data.codeExecutionPayload, request_data.chat_id)
 
 
 
 
 
 
234
  return {"answer": formatted_output}
235
 
236
  except Exception as e:
237
+ logger.info("Processing error:", str(e))
238
  return {"error": "Failed to execute request", "message": str(e)}
239
 
240
+
241
  # CHAT CODING STARTS FROM HERE
242
 
243
+ # Modified groq_chat function with thread-safe key rotation
244
  def groq_chat(csv_url: str, question: str):
245
+ global current_groq_key_index, current_groq_key_lock
246
+
247
+ while True:
248
+ with current_groq_key_lock:
249
+ if current_groq_key_index >= len(groq_api_keys):
250
+ return {"error": "All API keys exhausted."}
251
+ current_api_key = groq_api_keys[current_groq_key_index]
252
 
253
  try:
254
+
255
  data = clean_data(csv_url)
 
256
  llm = ChatGroq(model=model_name, api_key=current_api_key)
257
+ # Generate unique filename using UUID
258
  chart_filename = f"chart_{uuid.uuid4()}.png"
259
  chart_path = os.path.join("generated_charts", chart_filename)
260
 
261
+ # Configure SmartDataframe with chart settings
262
  df = SmartDataframe(
263
  data,
264
  config={
265
  'llm': llm,
266
+ 'save_charts': True, # Enable chart saving
267
  'open_charts': False,
268
+ 'save_charts_path': os.path.dirname(chart_path), # Directory to save
269
+ 'custom_chart_filename': chart_filename, # Unique filename
270
  'enable_cache': False
271
  }
272
  )
273
 
 
274
  answer = df.chat(question)
275
 
276
+ # Process different response types
277
  if isinstance(answer, pd.DataFrame):
278
  processed = answer.apply(handle_out_of_range_float).to_dict(orient="records")
279
  elif isinstance(answer, pd.Series):
280
  processed = answer.apply(handle_out_of_range_float).to_dict()
281
+ elif isinstance(answer, list):
282
+ processed = [handle_out_of_range_float(item) for item in answer]
283
+ elif isinstance(answer, dict):
284
+ processed = {k: handle_out_of_range_float(v) for k, v in answer.items()}
285
  else:
286
  processed = {"answer": str(handle_out_of_range_float(answer))}
287
 
 
288
  return processed
289
 
290
  except Exception as e:
291
  error_message = str(e)
292
+ if error_message != "":
293
+ logger.warning("Rate limit exceeded. Switching to next API key.")
294
+ with current_groq_key_lock:
295
+ current_groq_key_index += 1
296
+ if current_groq_key_index >= len(groq_api_keys):
297
+ return {"error": "All API keys exhausted."}
298
  else:
299
+ logger.error("Error in groq_chat: %s", e)
300
  return {"error": error_message}
301
 
302
+ # Modified langchain_csv_chat with thread-safe key rotation
 
 
303
  def langchain_csv_chat(csv_url: str, question: str, chart_required: bool):
304
+ global current_langchain_key_index, current_langchain_key_lock, current_langchain_chart_key_index, current_langchain_chart_lock
305
+
 
306
  data = clean_data(csv_url)
307
  attempts = 0
308
 
309
  while attempts < len(groq_api_keys):
310
+ with current_langchain_key_lock:
311
+ if current_langchain_key_index >= len(groq_api_keys):
312
+ current_langchain_key_index = 0
313
+ api_key = groq_api_keys[current_langchain_key_index]
314
+ current_key = current_langchain_key_index
315
+ current_langchain_key_index += 1
316
+ attempts += 1
317
 
318
  try:
319
  llm = ChatGroq(model=model_name, api_key=api_key)
 
330
  llm,
331
  data,
332
  agent_type="tool-calling",
333
+ verbose=True,
334
  allow_dangerous_code=True,
335
  extra_tools=[tool],
336
  return_intermediate_steps=True
337
  )
338
 
339
  prompt = _prompt_generator(question, chart_required, csv_url)
 
340
  result = agent.invoke({"input": prompt})
 
 
341
  return result.get("output")
342
 
343
  except Exception as e:
344
  error_message = str(e)
345
+ if error_message != "":
346
+ with current_langchain_chart_lock:
347
+ current_langchain_chart_key_index = (current_langchain_chart_key_index + 1)
348
+ logger.warning(f"Rate limit exceeded. Switching to next API key: {groq_api_keys[current_langchain_chart_key_index]}")
349
  else:
350
+ logger.error(f"Error with API key {api_key}: {error_message}")
351
  return {"error": error_message}
352
 
 
353
  return {"error": "All API keys exhausted"}
354
 
355
+ # Async endpoint with non-blocking execution
356
  @app.post("/api/csv-chat")
 
357
  async def csv_chat(request: Dict, authorization: str = Header(None)):
358
+ # Authorization checks
 
 
 
 
 
359
  if not authorization or not authorization.startswith("Bearer "):
 
360
  raise HTTPException(status_code=401, detail="Invalid authorization")
361
 
362
+ token = authorization.split(" ")[1]
363
  if token != os.getenv("AUTH_TOKEN"):
 
364
  raise HTTPException(status_code=403, detail="Invalid token")
365
 
366
  try:
 
367
  query = request.get("query")
368
  csv_url = request.get("csv_url")
369
  decoded_url = unquote(csv_url)
370
+ detailed_answer = request.get("detailed_answer")
371
  conversation_history = request.get("conversation_history", [])
372
+ generate_report = request.get("generate_report")
373
+ chat_id = request.get("chat_id")
374
 
375
+ if generate_report is True:
 
 
 
 
376
  report_files = await generate_csv_report(csv_url, query, chat_id, conversation_history)
377
  if report_files is not None:
 
378
  return {"answer": jsonable_encoder(report_files)}
379
 
 
380
  if if_initial_chat_question(query):
 
381
  answer = await asyncio.to_thread(
382
  langchain_csv_chat, decoded_url, query, False
383
  )
384
+ logger.info("langchain_answer:", answer)
385
  return {"answer": jsonable_encoder(answer)}
386
 
387
+ # Orchestrate the execution
388
+ if detailed_answer is True:
389
+ orchestrator_answer = await asyncio.to_thread(
390
+ csv_orchestrator_chat, decoded_url, query, conversation_history, chat_id
391
+ )
392
+ if orchestrator_answer is not None:
393
+ return {"answer": jsonable_encoder(orchestrator_answer)}
 
 
394
 
395
+ # Process with groq_chat first
396
+ # groq_answer = await asyncio.to_thread(groq_chat, decoded_url, query)
397
+ # logger.info("groq_answer:", groq_answer)
398
 
399
+ result = await query_csv_agent(decoded_url, query, chat_id)
400
+ logger.info("cerebras csv answer == >", result)
401
+ if result is not None or result == "":
402
  return {"answer": result}
403
 
404
+ # if process_answer(groq_answer) == "Empty response received.":
405
+ # return {"answer": "Sorry, I couldn't find relevant data..."}
406
+
407
+ # if process_answer(groq_answer):
408
  lang_answer = await asyncio.to_thread(
409
  langchain_csv_chat, decoded_url, query, False
410
  )
411
+ if process_answer(lang_answer):
412
+ return {"answer": "error"}
413
+ return {"answer": jsonable_encoder(lang_answer)}
414
+
415
+ # return {"answer": jsonable_encoder(groq_answer)}
 
 
416
 
417
  except Exception as e:
418
+ logger.error(f"Error processing request: {str(e)}")
419
  return {"answer": "error"}
420
 
421
  def handle_out_of_range_float(value):
 
422
  if isinstance(value, float):
423
  if np.isnan(value):
424
  return None
 
426
  return "Infinity"
427
  return value
428
 
429
+
430
+
431
+
432
+
433
+
434
+
435
+ # CHART CODING STARTS FROM HERE
436
 
437
  instructions = """
438
+
439
  - Please ensure that each value is clearly visible, You may need to adjust the font size, rotate the labels, or use truncation to improve readability (if needed).
440
  - For multiple charts, put all of them in a single file.
441
  - Use colorblind-friendly palette
442
  - Read above instructions and follow them.
443
+
444
  """
445
 
446
+ # Thread-safe configuration for chart endpoints
447
+ current_groq_chart_key_index = 0
448
+ current_groq_chart_lock = threading.Lock()
449
+
450
+ # current_langchain_chart_key_index = 0
451
+ # current_langchain_chart_lock = threading.Lock()
452
+
453
  def model():
454
+ global current_groq_chart_key_index, current_groq_chart_lock
455
+ with current_groq_chart_lock:
456
+ if current_groq_chart_key_index >= len(groq_api_keys):
457
+ raise Exception("All API keys exhausted for chart generation")
458
+ api_key = groq_api_keys[current_groq_chart_key_index]
459
  return ChatGroq(model=model_name, api_key=api_key)
460
 
461
  def groq_chart(csv_url: str, question: str):
462
+ global current_groq_chart_key_index, current_groq_chart_lock
 
463
 
464
  for attempt in range(len(groq_api_keys)):
 
 
 
 
465
  try:
466
+ # Clean cache before processing
467
+ # cache_db_path = "/workspace/cache/cache_db_0.11.db"
468
+ # if os.path.exists(cache_db_path):
469
+ # try:
470
+ # os.remove(cache_db_path)
471
+ # except Exception as e:
472
+ # logger.info(f"Cache cleanup error: {e}")
473
+
474
  data = clean_data(csv_url)
475
+ with current_groq_chart_lock:
476
+ current_api_key = groq_api_keys[current_groq_chart_key_index]
477
+
478
+ llm = ChatGroq(model=model_name, api_key=current_api_key)
479
 
480
+ # Generate unique filename using UUID
481
  chart_filename = f"chart_{uuid.uuid4()}.png"
482
  chart_path = os.path.join("generated_charts", chart_filename)
483
 
484
+ # Configure SmartDataframe with chart settings
485
  df = SmartDataframe(
486
  data,
487
  config={
488
  'llm': llm,
489
+ 'save_charts': True, # Enable chart saving
490
  'open_charts': False,
491
+ 'save_charts_path': os.path.dirname(chart_path), # Directory to save
492
+ 'custom_chart_filename': chart_filename, # Unique filename
493
  'enable_cache': False
494
  }
495
  )
496
 
497
  answer = df.chat(question + instructions)
498
 
499
+ if process_answer(answer):
500
+ return "Chart not generated"
501
+ return answer
502
+
503
  except Exception as e:
504
  error = str(e)
505
+ # if "429" in error:
506
+ if error != "":
507
+ with current_groq_chart_lock:
508
+ current_groq_chart_key_index = (current_groq_chart_key_index + 1)
509
  else:
510
  logger.error(f"Chart generation error: {error}")
511
  return {"error": error}
512
 
 
513
  return {"error": "All API keys exhausted for chart generation"}
514
 
515
+
516
+
517
+
518
+ # Global locks for key rotation (chart endpoints)
519
+ # current_groq_chart_key_index = 0
520
+ # current_groq_chart_lock = threading.Lock()
521
+ current_langchain_chart_key_index = 0
522
+ current_langchain_chart_lock = threading.Lock()
523
+
524
+
525
+ # Use a process pool to run CPU-bound charts generation
526
+ process_executor = ProcessPoolExecutor(max_workers=max_cpus-2)
527
+
528
+ # --- LANGCHAIN-BASED CHART GENERATION ---
529
  def langchain_csv_chart(csv_url: str, question: str, chart_required: bool):
530
+ """
531
+ Generate a chart using the langchain-based method.
532
+ Modifications:
533
+ • No shared deletion of cache.
534
+ • Close matplotlib figures after generation.
535
+ • Return a list of full chart file paths.
536
+ """
537
+ global current_langchain_chart_key_index, current_langchain_chart_lock
538
 
539
  data = clean_data(csv_url)
540
 
541
  for attempt in range(len(groq_api_keys)):
 
 
 
 
542
  try:
543
+ with current_langchain_chart_lock:
544
+ api_key = groq_api_keys[current_langchain_chart_key_index]
545
+ current_key = current_langchain_chart_key_index
546
+ current_langchain_chart_key_index = (current_langchain_chart_key_index + 1) % len(groq_api_keys)
547
+
548
  llm = ChatGroq(model=model_name, api_key=api_key)
549
  tool = PythonAstREPLTool(locals={
550
  "df": data,
 
560
  llm,
561
  data,
562
  agent_type="tool-calling",
563
+ verbose=True,
564
  allow_dangerous_code=True,
565
  extra_tools=[tool],
566
  return_intermediate_steps=True
 
569
  result = agent.invoke({"input": _prompt_generator(question, True, csv_url)})
570
  output = result.get("output", "")
571
 
572
+ # Close figures to avoid interference
573
  plt.close('all')
574
 
575
+ # Extract chart filenames (assuming extract_chart_filenames returns a list)
576
  chart_files = extract_chart_filenames(output)
577
  if len(chart_files) > 0:
578
+ # Return full paths (join with your image_file_path)
579
+ return [os.path.join(image_file_path, f) for f in chart_files]
580
+
581
+ if attempt < len(groq_api_keys) - 1:
582
+ logger.info(f"Langchain chart error (key {current_key}): {output}")
583
 
584
  except Exception as e:
585
  error_message = str(e)
586
+ if error_message != "":
587
+ with current_langchain_chart_lock:
588
+ current_langchain_chart_key_index = (current_langchain_chart_key_index + 1)
589
+ logger.warning(f"Rate limit exceeded. Switching to next API key: {groq_api_keys[current_langchain_chart_key_index]}")
590
  else:
591
+ logger.error(f"Error with API key {api_key}: {error_message}")
592
  return {"error": error_message}
593
 
594
+ logger.error("All API keys exhausted for chart generation")
595
  return "Chart generation failed after all retries"
596
 
597
+
598
+ # --- FASTAPI ENDPOINT FOR CHART GENERATION ---
599
  @app.post("/api/csv-chart")
 
600
  async def csv_chart(request: dict, authorization: str = Header(None)):
601
+ """
602
+ Endpoint for generating a chart from CSV data.
603
+ This endpoint uses a ProcessPoolExecutor to run the (CPU-bound) chart generation
604
+ functions in separate processes so that multiple requests can run in parallel.
605
+ """
606
+ # --- Authorization Check ---
607
  if not authorization or not authorization.startswith("Bearer "):
 
608
  raise HTTPException(status_code=401, detail="Authorization required")
609
 
610
+ token = authorization.split(" ")[1]
611
  if token != os.getenv("AUTH_TOKEN"):
 
612
  raise HTTPException(status_code=403, detail="Invalid credentials")
613
 
614
  try:
 
617
  detailed_answer = request.get("detailed_answer", False)
618
  conversation_history = request.get("conversation_history", [])
619
  generate_report = request.get("generate_report", False)
620
+ chat_id = request.get("chat_id", "")
621
 
622
+ if generate_report is True:
 
 
 
 
623
  report_files = await generate_csv_report(csv_url, query, chat_id, conversation_history)
624
  if report_files is not None:
 
625
  return {"orchestrator_response": jsonable_encoder(report_files)}
626
 
627
  loop = asyncio.get_running_loop()
628
+ # First, try the langchain-based method if the question qualifies
 
629
  if if_initial_chart_question(query):
 
630
  langchain_result = await loop.run_in_executor(
631
  process_executor, langchain_csv_chart, csv_url, query, True
632
  )
633
+ logger.info("Langchain chart result:", langchain_result)
634
  if isinstance(langchain_result, list) and len(langchain_result) > 0:
635
+ unique_file_name =f'{str(uuid.uuid4())}.png'
636
+ logger.info("Uploading the chart to supabase...")
637
+ image_public_url = await upload_file_to_supabase(f"{langchain_result[0]}", unique_file_name, chat_id=chat_id)
638
+ logger.info("Image uploaded to Supabase and Image URL is... ", {image_public_url})
639
+ os.remove(langchain_result[0])
 
 
 
 
 
 
 
 
640
  return {"image_url": image_public_url}
641
+ # return FileResponse(langchain_result[0], media_type="image/png")
642
 
643
+ # Use orchestrator to handle the user's chart query first
644
+ if detailed_answer is True:
645
+ orchestrator_answer = await asyncio.to_thread(
646
+ csv_orchestrator_chat, csv_url, query, conversation_history, chat_id
647
+ )
648
+
649
+ if orchestrator_answer is not None:
650
+ return {"orchestrator_response": jsonable_encoder(orchestrator_answer)}
651
+
652
+ # Next, try the groq-based method
653
+ # groq_result = await loop.run_in_executor(
654
+ # process_executor, groq_chart, csv_url, query
655
+ # )
656
+ # logger.info(f"Groq chart result: {groq_result}")
657
+ # if isinstance(groq_result, str) and groq_result != "Chart not generated":
658
+ # unique_file_name =f'{str(uuid.uuid4())}.png'
659
+ # logger.info("Uploading the chart to supabase...")
660
+ # image_public_url = await upload_file_to_supabase(f"{groq_result}", unique_file_name, chat_id=chat_id)
661
+ # logger.info("Image uploaded to Supabase and Image URL is... ", {image_public_url})
662
+ # os.remove(groq_result)
663
+ # return {"image_url": image_public_url}
664
+ # return FileResponse(groq_result, media_type="image/png")
665
+
666
+ logger.info("Trying cerebras ai llama...")
667
  result = await query_csv_agent(csv_url, query, chat_id)
668
+ logger.info("cerebras ai result ==>", result)
669
  if result is not None and result != "":
670
+ return {"orchestrator_response": jsonable_encoder(result)}
 
671
 
672
+ # Fallback: try langchain-based again
673
+ logger.error("Cerebras ai llama response failed, trying langchain groq....")
674
  langchain_paths = await loop.run_in_executor(
675
  process_executor, langchain_csv_chart, csv_url, query, True
676
  )
677
+ logger.info("Fallback langchain chart result:", langchain_paths)
678
  if isinstance(langchain_paths, list) and len(langchain_paths) > 0:
679
+ unique_file_name =f'{str(uuid.uuid4())}.png'
680
+ logger.info("Uploading the chart to supabase...")
681
+ image_public_url = await upload_file_to_supabase(f"{langchain_paths[0]}", unique_file_name, chat_id=chat_id)
682
+ logger.info("Image uploaded to Supabase and Image URL is... ", {image_public_url})
683
+ os.remove(langchain_paths[0])
684
+ return {"image_url": image_public_url}
685
+ return FileResponse(langchain_paths[0], media_type="image/png")
 
 
 
 
 
 
 
686
  else:
687
+ logger.error("All chart generation methods failed")
688
+ return {"answer": "error"}
689
 
690
  except Exception as e:
691
+ logger.error(f"Critical chart error: {str(e)}")
692
  return {"answer": "error"}
693