""" SQL-based Feedback Analysis Service This module implements a SQL-based approach to analyzing feedback: 1. LLM analyzes user query 2. LLM generates 1-5 SQL queries to answer the question 3. Execute SQL queries on the feedback DataFrame 4. LLM synthesizes a comprehensive answer from query + SQL queries + results 5. (Optional) Generate visualizations of the results """ from __future__ import annotations import json import re from dataclasses import dataclass from typing import List, Dict, Any, Optional import pandas as pd import sqlite3 from .config import settings from .data_loader import load_feedback try: from openai import OpenAI # type: ignore except Exception: OpenAI = None try: import google.generativeai as genai # type: ignore except Exception: genai = None @dataclass class SQLQueryResult: """ Result of a single SQL query execution. Attributes: query: The SQL query that was executed result: DataFrame containing the query results (empty if error occurred) error: Error message if query failed, None if successful """ query: str result: pd.DataFrame error: Optional[str] = None @dataclass class AnalysisResult: """ Complete analysis result from processing a user query. Attributes: user_query: The original question asked by the user sql_queries: List of SQL queries that were generated and executed query_results: Results from executing each SQL query summary: Final synthesized answer in natural language visualizations: Optional list of visualization specifications for frontend rendering """ user_query: str sql_queries: List[str] query_results: List[SQLQueryResult] summary: str visualizations: Optional[List[Dict[str, Any]]] = None class SQLFeedbackService: """ Main service for SQL-based feedback analysis. This service implements a 4-stage pipeline: 1. Generate SQL queries from natural language questions (using LLM) 2. Execute SQL queries on feedback data (using SQLite in-memory) 3. Synthesize comprehensive answers from query results (using LLM) 4. Generate visualization specifications for results The service also includes automatic quality evaluation and improvement of generated answers to ensure high-quality responses. """ def __init__(self): """ Initialize the SQL feedback service. Loads feedback data from CSV into memory. If loading fails, the service will still initialize but will raise errors when trying to process queries. """ self.df: Optional[pd.DataFrame] = None self._load_data() def _load_data(self) -> None: """ Load feedback data from CSV file into memory. The data is loaded once at initialization and kept in memory for fast query execution. If the CSV file is missing or invalid, an error is logged but the service continues to initialize. Raises: FileNotFoundError: If CSV file doesn't exist (handled internally) ValueError: If CSV is missing required columns (handled internally) """ try: from .config import settings self.df = load_feedback() csv_path_used = settings.csv_path print(f"✅ Loaded {len(self.df)} feedback records from: {csv_path_used}", flush=True) except Exception as e: print(f"❌ Error loading feedback data: {e}", flush=True) import traceback traceback.print_exc() self.df = None def _get_schema_info(self) -> str: """ Generate comprehensive schema information for the feedback table. This function analyzes the actual CSV file structure and provides detailed information about each field including business meaning, data types, examples, and usage patterns. Returns: A detailed formatted string describing the table schema with business context, examples, and statistics. """ if self.df is None: return "No data available" # Helper function to get sample values def get_sample_values(col_name, n=5): try: samples = self.df[col_name].dropna().head(n).tolist() return [str(s) for s in samples] except: return [] # Helper function to get unique values if not too many def get_unique_values(col_name, max_show=10): try: unique_vals = self.df[col_name].dropna().unique().tolist() if len(unique_vals) <= max_show: return unique_vals return unique_vals[:max_show] except: return [] all_columns_info = "" # Analyze each column that exists in the dataframe for col in self.df.columns: col_info = "" # Get column statistics dtype = str(self.df[col].dtype) non_null_count = self.df[col].notna().sum() null_count = self.df[col].isna().sum() samples = get_sample_values(col, 3) # ID field if col == 'ID': col_info = f"• {col} (UUID/טקסט): מזהה ייחודי גלובלי של כל משוב\n" col_info += f" - משמעות עסקית: מזהה ייחודי לכל משוב במערכת, מאפשר מעקב, קישור בין משובים, ומניעת כפילויות\n" col_info += f" - דוגמאות: {', '.join(samples[:2])}\n" col_info += f" - שימוש בשאילתות: WHERE ID = '...', COUNT(DISTINCT ID), GROUP BY ID\n" # ServiceName field elif col == 'ServiceName': unique_services = self.df[col].nunique() unique_samples = get_unique_values(col, 5) col_info = f"• {col} (טקסט): שם השירות הדיגיטלי הממשלתי\n" col_info += f" - משמעות עסקית: מזהה את השירות שעליו ניתן המשוב. מאפשר ניתוח לפי שירות, השוואה בין שירותים, זיהוי שירותים בעייתיים או מצטיינים\n" col_info += f" - יש {unique_services} שירותים ייחודיים במערכת\n" col_info += f" - דוגמאות: {', '.join(unique_samples[:3])}\n" col_info += f" - שימוש בשאילתות: WHERE ServiceName = '...', GROUP BY ServiceName, COUNT(*) GROUP BY ServiceName\n" # Level field elif col == 'Level': level_dist = self.df[col].value_counts().sort_index().to_dict() avg_level = self.df[col].mean() col_info = f"• {col} (מספר שלם 1-5): דירוג שביעות רצון המשתמש מהשירות\n" col_info += f" - משמעות עסקית: מדד שביעות רצון. 1=גרוע מאוד, 2=גרוע, 3=בינוני, 4=טוב, 5=מעולה. מאפשר מדידת שביעות רצון, זיהוי בעיות, ומעקב אחר שיפורים\n" col_info += f" - דירוג ממוצע: {avg_level:.2f}\n" col_info += f" - חלוקה: {level_dist}\n" col_info += f" - שימוש בשאילתות: WHERE Level >= 4 (משובים חיוביים), WHERE Level <= 2 (משובים שליליים), AVG(Level), GROUP BY Level\n" # Text field elif col == 'Text': sample_text = samples[0][:80] + "..." if samples and len(samples[0]) > 80 else (samples[0] if samples else "") avg_length = self.df[col].str.len().mean() if self.df[col].dtype == 'object' else 0 col_info = f"• {col} (טקסט ארוך): התוכן החופשי של המשוב מהמשתמש\n" col_info += f" - משמעות עסקית: ביקורות, הצעות לשיפור, תלונות, מחמאות. מאפשר ניתוח איכותי, זיהוי נושאים חוזרים, וקבלת תובנות עסקיות\n" col_info += f" - אורך ממוצע: {avg_length:.0f} תווים\n" col_info += f" - דוגמה: '{sample_text}'\n" col_info += f" - שימוש בשאילתות: WHERE Text LIKE '%מילה%', WHERE Text LIKE '%בעיה%', LENGTH(Text), COUNT(*) WHERE Text IS NOT NULL\n" # ReferenceNumber field elif col == 'ReferenceNumber': if non_null_count > 0: ref_min = int(self.df[col].min()) ref_max = int(self.df[col].max()) col_info = f"• {col} (מספר שלם): מספר הפניה פנימי של המשוב\n" col_info += f" - משמעות עסקית: מספר הפניה במערכת. מאפשר קישור למסמכים או בקשות קשורות, מעקב אחר תהליכים, וניהול בקשות\n" col_info += f" - טווח: {ref_min} - {ref_max}\n" col_info += f" - דוגמאות: {', '.join([str(s) for s in samples[:2]])}\n" col_info += f" - NULL: {null_count} רשומות ({null_count/len(self.df)*100:.1f}%)\n" col_info += f" - שימוש בשאילתות: WHERE ReferenceNumber = 6928, WHERE ReferenceNumber IS NOT NULL\n" # RequestID field elif col == 'RequestID': if non_null_count > 0: col_info = f"• {col} (UUID/טקסט): מזהה ייחודי של הבקשה המקורית שקשורה למשוב\n" col_info += f" - משמעות עסקית: מאפשר קישור בין בקשות למשובים, מעקב אחר תהליכים, וניתוח הקשר בין בקשה למשוב\n" col_info += f" - דוגמאות: {samples[0][:30]}...\n" col_info += f" - NULL: {null_count} רשומות ({null_count/len(self.df)*100:.1f}%)\n" col_info += f" - שימוש בשאילתות: WHERE RequestID = '...', COUNT(DISTINCT RequestID)\n" # ProcessID field elif col == 'ProcessID': col_info = f"• {col} (UUID/טקסט): מזהה ייחודי של התהליך העסקי שקשור למשוב\n" col_info += f" - משמעות עסקית: מאפשר ניתוח לפי תהליכים, זיהוי תהליכים בעייתיים, ומעקב אחר ביצועים\n" col_info += f" - NULL: {null_count} רשומות ({null_count/len(self.df)*100:.1f}%)\n" col_info += f" - שימוש בשאילתות: WHERE ProcessID = '...', COUNT(DISTINCT ProcessID)\n" # Year field elif col == 'Year': year_min = int(self.df[col].min()) year_max = int(self.df[col].max()) year_dist = self.df[col].value_counts().sort_index().to_dict() col_info = f"• {col} (מספר שלם): שנה שבה ניתן המשוב\n" col_info += f" - משמעות עסקית: מאפשר ניתוח מגמות לאורך שנים, השוואה בין שנים, זיהוי שיפורים או הידרדרות, ותכנון אסטרטגי\n" col_info += f" - טווח: {year_min} - {year_max}\n" col_info += f" - חלוקה: {year_dist}\n" col_info += f" - שימוש בשאילתות: WHERE Year = 2020, GROUP BY Year, SELECT Year, COUNT(*) GROUP BY Year\n" # Month field elif col == 'Month': month_min = int(self.df[col].min()) month_max = int(self.df[col].max()) month_names = {1: 'ינואר', 2: 'פברואר', 3: 'מרץ', 4: 'אפריל', 5: 'מאי', 6: 'יוני', 7: 'יולי', 8: 'אוגוסט', 9: 'ספטמבר', 10: 'אוקטובר', 11: 'נובמבר', 12: 'דצמבר'} col_info = f"• {col} (מספר שלם 1-12): חודש בשנה שבו ניתן המשוב\n" col_info += f" - משמעות עסקית: מאפשר ניתוח עונתי, זיהוי חודשים בעייתיים או מצטיינים, תכנון משאבים לפי עונות\n" col_info += f" - טווח: {month_min} - {month_max} ({month_names.get(month_min, '')} - {month_names.get(month_max, '')})\n" col_info += f" - דוגמאות: {', '.join([str(s) for s in samples[:3]])}\n" col_info += f" - שימוש בשאילתות: WHERE Month = 1, GROUP BY Month, SELECT Month, COUNT(*) GROUP BY Month ORDER BY Month\n" # DayInMonth field elif col == 'DayInMonth': day_min = int(self.df[col].min()) day_max = int(self.df[col].max()) col_info = f"• {col} (מספר שלם 1-31): יום בחודש שבו ניתן המשוב\n" col_info += f" - משמעות עסקית: מאפשר ניתוח לפי ימים בחודש, זיהוי ימים בעייתיים (למשל סוף חודש), וניתוח דפוסים יומיים\n" col_info += f" - טווח: {day_min} - {day_max}\n" col_info += f" - דוגמאות: {', '.join([str(s) for s in samples[:3]])}\n" col_info += f" - שימוש בשאילתות: WHERE DayInMonth = 1, GROUP BY DayInMonth, SELECT DayInMonth, COUNT(*) GROUP BY DayInMonth\n" # DayOfWeek field elif col == 'DayOfWeek': unique_days = get_unique_values(col, 10) day_names_he = {'Monday': 'שני', 'Tuesday': 'שלישי', 'Wednesday': 'רביעי', 'Thursday': 'חמישי', 'Friday': 'שישי', 'Saturday': 'שבת', 'Sunday': 'ראשון'} col_info = f"• {col} (טקסט): יום בשבוע שבו ניתן המשוב (באנגלית)\n" col_info += f" - משמעות עסקית: מאפשר ניתוח לפי ימי השבוע, זיהוי ימים בעייתיים, תכנון כוח אדם, וזיהוי דפוסים שבועיים\n" col_info += f" - ערכים אפשריים: Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Sunday\n" col_info += f" - דוגמאות: {', '.join(unique_days[:3])}\n" col_info += f" - שימוש בשאילתות: WHERE DayOfWeek = 'Monday', GROUP BY DayOfWeek, SELECT DayOfWeek, COUNT(*) GROUP BY DayOfWeek\n" # Hour field elif col == 'Hour': hour_min = int(self.df[col].min()) hour_max = int(self.df[col].max()) col_info = f"• {col} (מספר שלם 0-23): שעה ביום שבה ניתן המשוב\n" col_info += f" - משמעות עסקית: מאפשר ניתוח לפי שעות היום, זיהוי שעות שיא, תכנון זמינות שירות, וזיהוי דפוסים יומיים\n" col_info += f" - טווח: {hour_min} - {hour_max} (0=חצות, 12=צהריים, 23=23:00)\n" col_info += f" - דוגמאות: {', '.join([str(s) for s in samples[:3]])}\n" col_info += f" - שימוש בשאילתות: WHERE Hour >= 9 AND Hour <= 17 (שעות עבודה), GROUP BY Hour, SELECT Hour, COUNT(*) GROUP BY Hour ORDER BY Hour\n" # DayOrNight field elif col == 'DayOrNight': unique_values = get_unique_values(col, 5) col_info = f"• {col} (טקסט): האם המשוב ניתן בשעות היום או הלילה\n" col_info += f" - משמעות עסקית: מאפשר ניתוח לפי שעות פעילות, זיהוי הבדלים בין יום ללילה, תכנון זמינות שירות\n" col_info += f" - ערכים אפשריים: 'יום' או 'לילה' (יום=6:00-18:00, לילה=18:00-6:00)\n" col_info += f" - דוגמאות: {', '.join(unique_values)}\n" col_info += f" - שימוש בשאילתות: WHERE DayOrNight = 'יום', GROUP BY DayOrNight, SELECT DayOrNight, COUNT(*) GROUP BY DayOrNight\n" # Default for any other columns else: if dtype in ['int64', 'float64']: val_min = self.df[col].min() val_max = self.df[col].max() col_info = f"• {col} ({dtype}): מספר. טווח: {val_min} - {val_max}\n" else: unique_count = self.df[col].nunique() col_info = f"• {col} ({dtype}): טקסט. {unique_count} ערכים ייחודיים\n" col_info += f" - דוגמאות: {', '.join(samples[:2])}\n" all_columns_info += col_info + "\n" # Build final schema info total_records = len(self.df) unique_services = self.df['ServiceName'].nunique() if 'ServiceName' in self.df.columns else 0 avg_level = self.df['Level'].mean() if 'Level' in self.df.columns else 0 schema_info = f"""שם הטבלה: Feedback_transformed (עם אות גדולה F - חובה!) סטטיסטיקות כלליות: - סך הכל משובים: {total_records} - מספר שירותים ייחודיים: {unique_services} - דירוג ממוצע: {avg_level:.2f} שדות בטבלה: {all_columns_info} ⚠️ חשוב: כל שאילתה חייבת להתחיל ב-SELECT ולהשתמש ב-FROM Feedback_transformed! """ return schema_info def analyze_query(self, query: str) -> AnalysisResult: """ Main analysis pipeline: 1. Analyze user query 2. Generate SQL queries 3. Execute SQL queries 4. Synthesize answer """ print(f"🔍 Analyzing query: {query}", flush=True) if self.df is None: error_msg = "No feedback data available. Please ensure feedback_transformed_2.csv exists in 0_preprocessing/ directory." print(f"❌ {error_msg}", flush=True) raise ValueError(error_msg) print(f"✅ Data loaded: {len(self.df)} rows", flush=True) # Check API keys if not settings.gemini_api_key and not settings.openai_api_key: error_msg = "❌ No API keys configured! Please set GEMINI_API_KEY or OPENAI_API_KEY in Repository secrets." print(error_msg, flush=True) return AnalysisResult( user_query=query, sql_queries=[], query_results=[], summary=error_msg, visualizations=None ) print(f"✅ API keys available: Gemini={bool(settings.gemini_api_key)}, OpenAI={bool(settings.openai_api_key)}", flush=True) # Step 1: Generate SQL queries (with gibberish validation) try: print("🔍 Generating SQL queries...", flush=True) sql_queries = self._generate_sql_queries(query) print(f"✅ Generated {len(sql_queries)} SQL queries", flush=True) if len(sql_queries) == 0: error_msg = "לא נוצרו שאילתות SQL. ייתכן שהשאלה לא ברורה או שיש בעיה עם ה-API. נסה לשאול שאלה אחרת או בדוק את ה-API keys." print(f"❌ {error_msg}", flush=True) return AnalysisResult( user_query=query, sql_queries=[], query_results=[], summary=error_msg, visualizations=None ) except ValueError as e: # If query is gibberish, return a friendly error message print(f"❌ Query validation error: {e}", flush=True) return AnalysisResult( user_query=query, sql_queries=[], query_results=[], summary=str(e), visualizations=None ) except Exception as e: error_msg = f"שגיאה ביצירת שאילתות SQL: {str(e)}. בדוק את ה-API keys והחיבור לאינטרנט." print(f"❌ {error_msg}", flush=True) import traceback traceback.print_exc() return AnalysisResult( user_query=query, sql_queries=[], query_results=[], summary=error_msg, visualizations=None ) # Step 2: Execute SQL queries print("🔍 Executing SQL queries...", flush=True) query_results = self._execute_sql_queries(sql_queries) successful_results = [r for r in query_results if not r.error and len(r.result) > 0] print(f"✅ Executed {len(query_results)} queries, {len(successful_results)} successful", flush=True) # Step 3: Synthesize answer print("🔍 Synthesizing answer...", flush=True) summary = self._synthesize_answer(query, sql_queries, query_results) # Step 4: (Optional) Generate visualizations visualizations = self._generate_visualizations(query_results) return AnalysisResult( user_query=query, sql_queries=sql_queries, query_results=query_results, summary=summary, visualizations=visualizations ) def _is_gibberish_query(self, query: str) -> bool: """ Check if the query is gibberish or unintelligible. Returns True if the query appears to be gibberish, False otherwise. """ # Remove extra whitespace query_clean = query.strip() # Check if query is too short or empty if len(query_clean) < 3: return True # Check if query contains only special characters or numbers if not any(c.isalpha() for c in query_clean): return True # Check if query is mostly non-alphabetic characters alpha_count = sum(1 for c in query_clean if c.isalpha()) if alpha_count < len(query_clean) * 0.3: # Less than 30% alphabetic return True return False def _generate_sql_queries(self, query: str) -> List[str]: """ Use LLM to generate 1-5 SQL queries that will help answer the user's question. """ # Check if query is gibberish if self._is_gibberish_query(query): raise ValueError("השאלה לא ברורה. אנא נסח את השאלה בצורה יותר ברורה ומפורטת בעברית.") schema_info = self._get_schema_info() prompt = f"""צור שאילתות SQL לשאלה: {query} {schema_info} כללים חשובים: 1. שם הטבלה: Feedback_transformed (עם אות גדולה F) - חובה בכל שאילתה! 2. שאילתות זמן: השתמש בשדות המחושבים (Year, Month, DayInMonth, DayOfWeek, Hour, DayOrNight) - הם כבר מוכנים! 3. Level: 1-5 (1=גרוע מאוד, 5=מעולה) - השתמש ב-Level לניתוח שביעות רצון 4. Text: חיפוש עם LIKE '%מילה%' - לניתוח תוכן המשובים 5. ServiceName: שם השירות - לניתוח לפי שירותים דוגמאות לשאילתות: - SELECT ServiceName, AVG(Level) as avg_rating, COUNT(*) as count FROM Feedback_transformed GROUP BY ServiceName ORDER BY avg_rating DESC - SELECT COUNT(*) FROM Feedback_transformed WHERE Level >= 4 - SELECT Year, COUNT(*) as count FROM Feedback_transformed GROUP BY Year ORDER BY Year - SELECT DayOfWeek, COUNT(*) as count FROM Feedback_transformed GROUP BY DayOfWeek - SELECT ServiceName, Level, COUNT(*) as count FROM Feedback_transformed GROUP BY ServiceName, Level פורמט התשובה - JSON בלבד: {{ "queries": [ "SELECT ... FROM Feedback_transformed ...", "SELECT ... FROM Feedback_transformed ..." ] }} תן רק את ה-JSON, ללא טקסט נוסף.""" # Try Gemini first if settings.gemini_api_key and genai is not None: try: print("🔍 Using Gemini API for SQL generation...", flush=True) genai.configure(api_key=settings.gemini_api_key) model = genai.GenerativeModel("gemini-2.0-flash") response = model.generate_content(prompt) text = getattr(response, "text", None) if text: print(f"✅ Received response from Gemini: {text[:200]}...", flush=True) queries = self._parse_sql_queries(text) print(f"✅ Parsed {len(queries)} SQL queries from Gemini response", flush=True) return queries else: print("❌ Gemini returned empty response", flush=True) except Exception as e: print(f"❌ Gemini error in SQL generation: {e}", flush=True) import traceback traceback.print_exc() # Fallback to OpenAI if settings.openai_api_key and OpenAI is not None: try: print("🔍 Using OpenAI API for SQL generation...", flush=True) client = OpenAI(api_key=settings.openai_api_key) response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], temperature=0.3, ) text = response.choices[0].message.content if text: print(f"✅ Received response from OpenAI: {text[:200]}...", flush=True) queries = self._parse_sql_queries(text) print(f"✅ Parsed {len(queries)} SQL queries from OpenAI response", flush=True) return queries else: print("❌ OpenAI returned empty response", flush=True) except Exception as e: print(f"❌ OpenAI error in SQL generation: {e}", flush=True) import traceback traceback.print_exc() # Fallback: return empty list print("❌ No API available or all APIs failed. Returning empty query list.", flush=True) return [] def _parse_sql_queries(self, text: str) -> List[str]: """ Parse SQL queries from LLM response text. The LLM is instructed to return JSON, but sometimes it may include markdown formatting or return SQL directly. This function handles multiple formats for robustness. Args: text: Raw text response from LLM (may be JSON, markdown, or plain SQL) Returns: List of SQL query strings, cleaned and validated. Empty list if parsing fails completely. Strategy: 1. First, try to parse as JSON (expected format) 2. If that fails, try to extract SQL queries using regex 3. Return empty list if both methods fail """ # Try to extract JSON first (expected format) try: # Remove markdown code blocks if present (LLM sometimes adds these) text = re.sub(r'```json\s*', '', text) text = re.sub(r'```\s*', '', text) text = text.strip() # Try to parse as JSON data = json.loads(text) if isinstance(data, dict) and "queries" in data: queries = data["queries"] if isinstance(queries, list): # Filter out empty or invalid queries return [q for q in queries if isinstance(q, str) and q.strip()] except Exception: # JSON parsing failed, try fallback method pass # Fallback: try to extract SQL queries directly using regex # This handles cases where LLM returns SQL without JSON wrapper sql_pattern = r'SELECT\s+.*?(?=\n\n|\nSELECT|$)' matches = re.findall(sql_pattern, text, re.IGNORECASE | re.DOTALL) if matches: return [m.strip() for m in matches] # If all parsing methods fail, return empty list # The calling function will handle this gracefully return [] def _execute_sql_queries(self, sql_queries: List[str]) -> List[SQLQueryResult]: """ Execute SQL queries on the feedback DataFrame using SQLite in-memory database. This method creates a temporary SQLite database in memory, loads the feedback DataFrame into it, and executes each SQL query. Errors are caught per-query so one failing query doesn't stop the others. Args: sql_queries: List of SQL query strings to execute Returns: List of SQLQueryResult objects, one per query. Each result contains either the query results (DataFrame) or an error message. Implementation details: - Uses SQLite in-memory database (':memory:') for fast execution - DataFrame is loaded into table named 'Feedback_transformed' - Each query is executed independently (errors don't cascade) - Connection is always closed in finally block for safety """ if self.df is None: return [] results = [] # Create in-memory SQLite database # Using in-memory is fast and doesn't require disk I/O conn = sqlite3.connect(':memory:') try: # Write DataFrame to SQLite table named 'Feedback_transformed' # if_exists='replace' ensures clean state on each execution self.df.to_sql('Feedback_transformed', conn, index=False, if_exists='replace') # Execute each query independently # This allows partial success - if one query fails, others can still succeed for query in sql_queries: try: # Execute query and get results as DataFrame result_df = pd.read_sql_query(query, conn) results.append(SQLQueryResult( query=query, result=result_df, error=None )) except Exception as e: # Store error but continue with other queries results.append(SQLQueryResult( query=query, result=pd.DataFrame(), # Empty DataFrame on error error=str(e) )) finally: # Always close connection, even if errors occur conn.close() return results def _evaluate_answer_quality(self, query: str, answer: str, sql_queries: List[str] = None, query_results: List = None) -> tuple[float, str]: """ Evaluate the quality of an answer using an LLM reviewer. Args: query: The user's original question answer: The synthesized answer to evaluate sql_queries: List of SQL queries that were executed (optional, for context) query_results: Results from executing those queries (optional, for context) Returns: tuple: (score 0-100, feedback/reasoning) """ # Build context about queries and results if available context_text = "" if sql_queries and query_results: context_text = "\n\nהשאילתות שבוצעו:\n" for i, (q, r) in enumerate(zip(sql_queries, query_results), 1): context_text += f"{i}. {q}\n" if hasattr(r, 'error') and r.error: context_text += f" שגיאה: {r.error}\n" elif hasattr(r, 'result'): context_text += f" תוצאות: {len(r.result) if hasattr(r.result, '__len__') else 'N/A'} שורות\n" evaluation_prompt = f"""אתה בודק איכות תשובות מקצועי. הערך את התשובה הבאה: שאלת המשתמש המקורית: {query} {context_text} התשובה שניתנה: {answer} ⚠️ הערך את התשובה לפי הקריטריונים הבאים (0-100): 1. האם התשובה עונה ישירות על השאלה המקורית? (0-30 נקודות) - האם התשובה מתייחסת ישירות לשאלה: {query}? - אם השאלה מבקשת סיווג/חלוקה לפי שירותים (ServiceName) - האם התשובה כוללת ניתוח נפרד לכל שירות? - אם השאלה מבקשת סיווג/חלוקה לפי דירוגים (Level) - האם התשובה כוללת ניתוח נפרד לכל דירוג? - אם השאלה מבקשת סיווג/חלוקה לפי תאריכים - האם התשובה כוללת ניתוח נפרד לפי תקופות? - האם התשובה היא תשובה מילולית מפורטת ולא רק הודעה ששאילתות בוצעו? 2. האם התשובה מבוססת על הנתונים והשאילתות? (0-25 נקודות) - האם התשובה משתמשת בנתונים מהשאילתות? - האם התשובה מסבירה איך השאילתות עוזרות לענות על השאלה? - האם התשובה כוללת מספרים מדויקים מהתוצאות? 3. האם התשובה מפורטת ומקיפה? (0-20 נקודות) - האם התשובה ארוכה ומפורטת (לפחות 400-600 מילים)? - האם התשובה כוללת ניתוח מעמיק ולא רק רשימת נתונים? 4. האם התשובה ברורה וקוהרנטית? (0-15 נקודות) - האם התשובה כתובה בשפה ברורה ומובנת? - האם התשובה מאורגנת היטב (לא גיבוב של מילים)? 5. האם התשובה כוללת תובנות עסקיות? (0-10 נקודות) - האם התשובה כוללת תובנות על תהליכים דיגיטליים? - האם התשובה כוללת המלצות מעשיות? תן ציון כולל (0-100) והסבר קצר (2-3 משפטים) למה הציון הזה. פורמט התשובה - JSON בלבד: {{ "score": <מספר 0-100>, "reasoning": "<הסבר קצר>" }} תן רק את ה-JSON, ללא טקסט נוסף.""" # Try Gemini first if settings.gemini_api_key and genai is not None: try: genai.configure(api_key=settings.gemini_api_key) model = genai.GenerativeModel("gemini-2.0-flash") response = model.generate_content(evaluation_prompt) text = getattr(response, "text", None) if text: # Try to parse JSON from response # Extract JSON (may be wrapped in markdown or other text) json_match = re.search(r'\{[^}]+\}', text, re.DOTALL) if json_match: try: data = json.loads(json_match.group()) score = float(data.get('score', 0)) reasoning = data.get('reasoning', '') return score, reasoning except (json.JSONDecodeError, ValueError, KeyError): pass except Exception as e: print(f"Gemini error in evaluation: {e}", flush=True) # Fallback to OpenAI if settings.openai_api_key and OpenAI is not None: try: client = OpenAI(api_key=settings.openai_api_key) response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": evaluation_prompt}], temperature=0.3, ) text = response.choices[0].message.content if text: # Try to parse JSON from response json_match = re.search(r'\{[^}]+\}', text, re.DOTALL) if json_match: try: data = json.loads(json_match.group()) score = float(data.get('score', 0)) reasoning = data.get('reasoning', '') return score, reasoning except (json.JSONDecodeError, ValueError, KeyError): pass except Exception as e: print(f"OpenAI error in evaluation: {e}", flush=True) # Default: return high score if evaluation fails (don't block) return 85.0, "לא ניתן להעריך - מחזיר ציון ברירת מחדל" def _synthesize_answer(self, query: str, sql_queries: List[str], query_results: List[SQLQueryResult], max_retries: int = 2) -> str: """ Use LLM to synthesize a comprehensive answer from: - User query - SQL queries that were executed - Results of those queries Includes quality evaluation and automatic improvement if score < 80. Args: query: The user's original question sql_queries: List of SQL queries that were executed query_results: Results from executing those queries max_retries: Maximum number of retry attempts if quality is low Returns: Final synthesized answer """ # Format query results for the prompt results_text = "" for i, qr in enumerate(query_results, 1): results_text += f"\nשאילתה {i}:\n{qr.query}\n\n" if qr.error: results_text += f"שגיאה: {qr.error}\n\n" else: # Format result as table if len(qr.result) == 0: results_text += "תוצאה: אין תוצאות\n\n" else: results_text += f"תוצאה ({len(qr.result)} שורות):\n" results_text += qr.result.to_string(index=False) results_text += "\n\n" prompt = f"""אתה אנליסט נתונים. המשתמש שאל שאלה על משובי משתמשים. שאלת המשתמש: {query} תוצאות השאילתות: {results_text} כתוב תשובה קצרה, מדויקת ומסודרת שמבוססת ישירות על התוצאות. ⚠️ כללים: 1. ענה ישירות על השאלה - לא יותר, לא פחות 2. השתמש במספרים המדויקים מהתוצאות 3. פסקאות קצרות (2-3 משפטים כל אחת) 4. אם יש סיווג - רשום כל קטגוריה עם המספר שלה 5. עברית פשוטה וברורה מבנה: - פסקה ראשונה: תשובה ישירה לשאלה עם המספרים העיקריים - פסקאות נוספות: פירוט לפי הצורך (אם יש סיווג - כל קטגוריה בנפרד) - משפט סיכום: המסקנה העיקרית ⚠️ חשוב: תשובה קצרה ומדויקת. לא פילוסופיה, לא הסברים ארוכים - רק הנתונים והמסקנות. אם יש שגיאות בשאילתות, ציין זאת בקצרה.""" # Try Gemini first if settings.gemini_api_key and genai is not None: try: genai.configure(api_key=settings.gemini_api_key) model = genai.GenerativeModel("gemini-2.0-flash") generation_config = { "temperature": 0.8, "top_p": 0.95, "top_k": 40, "max_output_tokens": 4000, } response = model.generate_content(prompt, generation_config=generation_config) text = getattr(response, "text", None) if text and text.strip(): answer = text.strip() # Evaluate answer quality with context score, reasoning = self._evaluate_answer_quality(query, answer, sql_queries, query_results) print(f"Answer quality score: {score:.1f}/100 - {reasoning}", flush=True) # If score is below 80, try to improve if score < 80 and max_retries > 0: print(f"Answer quality below threshold (80). Attempting improvement...", flush=True) improvement_prompt = f"""התשובה הקודמת קיבלה ציון {score}/100. הסיבה: {reasoning} שאלת המשתמש: {query} התשובה הקודמת (שצריך לשפר): {answer} תוצאות השאילתות: {results_text} כתוב תשובה משופרת - קצרה, מדויקת ומסודרת. ⚠️ כללים: 1. ענה ישירות על השאלה - לא יותר, לא פחות 2. השתמש במספרים המדויקים מהתוצאות 3. פסקאות קצרות (2-3 משפטים כל אחת) 4. אם יש סיווג - רשום כל קטגוריה עם המספר שלה 5. עברית פשוטה וברורה מבנה: - פסקה ראשונה: תשובה ישירה לשאלה עם המספרים העיקריים - פסקאות נוספות: פירוט לפי הצורך (אם יש סיווג - כל קטגוריה בנפרד) - משפט סיכום: המסקנה העיקרית ⚠️ חשוב: תשובה קצרה ומדויקת. לא פילוסופיה, לא הסברים ארוכים - רק הנתונים והמסקנות.""" try: response = model.generate_content(improvement_prompt, generation_config=generation_config) improved_text = getattr(response, "text", None) if improved_text and improved_text.strip(): # Re-evaluate improved answer with context improved_score, improved_reasoning = self._evaluate_answer_quality(query, improved_text.strip(), sql_queries, query_results) print(f"Improved answer quality score: {improved_score:.1f}/100 - {improved_reasoning}", flush=True) if improved_score > score: return improved_text.strip() except Exception as e: print(f"Error improving answer: {e}", flush=True) return answer except Exception as e: print(f"Gemini error in synthesis: {e}", flush=True) # Fallback to OpenAI if settings.openai_api_key and OpenAI is not None: try: client = OpenAI(api_key=settings.openai_api_key) response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], temperature=0.8, max_tokens=3000, ) text = response.choices[0].message.content if text and text.strip(): answer = text.strip() # Evaluate answer quality with context score, reasoning = self._evaluate_answer_quality(query, answer, sql_queries, query_results) print(f"Answer quality score: {score:.1f}/100 - {reasoning}", flush=True) # If score is below 80, try to improve if score < 80 and max_retries > 0: print(f"Answer quality below threshold (80). Attempting improvement...", flush=True) improvement_prompt = f"""התשובה הקודמת קיבלה ציון {score}/100. הסיבה: {reasoning} שאלת המשתמש: {query} התשובה הקודמת (שצריך לשפר): {answer} תוצאות השאילתות: {results_text} כתוב תשובה משופרת - קצרה, מדויקת ומסודרת. ⚠️ כללים: 1. ענה ישירות על השאלה - לא יותר, לא פחות 2. השתמש במספרים המדויקים מהתוצאות 3. פסקאות קצרות (2-3 משפטים כל אחת) 4. אם יש סיווג - רשום כל קטגוריה עם המספר שלה 5. עברית פשוטה וברורה מבנה: - פסקה ראשונה: תשובה ישירה לשאלה עם המספרים העיקריים - פסקאות נוספות: פירוט לפי הצורך (אם יש סיווג - כל קטגוריה בנפרד) - משפט סיכום: המסקנה העיקרית ⚠️ חשוב: תשובה קצרה ומדויקת. לא פילוסופיה, לא הסברים ארוכים - רק הנתונים והמסקנות.""" try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": improvement_prompt}], temperature=0.8, max_tokens=3000, ) improved_text = response.choices[0].message.content if improved_text and improved_text.strip(): # Re-evaluate improved answer with context improved_score, improved_reasoning = self._evaluate_answer_quality(query, improved_text.strip(), sql_queries, query_results) print(f"Improved answer quality score: {improved_score:.1f}/100 - {improved_reasoning}", flush=True) if improved_score > score: return improved_text.strip() except Exception as e: print(f"Error improving answer: {e}", flush=True) return answer except Exception as e: print(f"OpenAI error in synthesis: {e}", flush=True) # Fallback: generate a detailed answer from query results even if LLM failed # This ensures we always return a meaningful answer, not just a status message successful_results = [r for r in query_results if not r.error and len(r.result) > 0] failed_results = [r for r in query_results if r.error] if len(sql_queries) == 0: return "לא נוצרו שאילתות SQL. ייתכן שהשאלה לא ברורה או שיש בעיה עם ה-API. נסה לשאול שאלה אחרת או בדוק את ה-API keys ב-Repository secrets." if successful_results: fallback_text = f"סיכום מפורט של הממצאים:\n\n" fallback_text += f"בוצעו {len(sql_queries)} שאילתות, מתוכן {len(successful_results)} הצליחו והחזירו תוצאות.\n" if failed_results: fallback_text += f"⚠️ {len(failed_results)} שאילתות נכשלו.\n" fallback_text += "\n" # Analyze and summarize each result for i, qr in enumerate(successful_results, 1): fallback_text += f"ממצאים משאילתה {i}:\n" fallback_text += f"שאילתה: {qr.query}\n" fallback_text += f"מספר רשומות: {len(qr.result)}\n\n" # Try to provide meaningful analysis if len(qr.result) > 0: fallback_text += "תוצאות:\n" # Show summary statistics if possible numeric_cols = qr.result.select_dtypes(include=['number']).columns if len(numeric_cols) > 0: fallback_text += "סטטיסטיקות:\n" for col in numeric_cols[:3]: # Limit to first 3 numeric columns fallback_text += f"- {col}: ממוצע {qr.result[col].mean():.2f}, סכום {qr.result[col].sum():.0f}\n" fallback_text += "\n" # Show sample data fallback_text += "דוגמאות מהנתונים:\n" fallback_text += qr.result.head(5).to_string(index=False) fallback_text += "\n\n" if failed_results: fallback_text += "\nשגיאות בשאילתות:\n" for i, qr in enumerate(failed_results, 1): fallback_text += f"שאילתה {i}: {qr.error}\n" fallback_text += "\n" fallback_text += "הערה: תשובה זו נוצרה אוטומטית מהתוצאות. לניתוח מפורט יותר, נסה לשאול שאלה ספציפית יותר." return fallback_text else: # If no successful results, provide detailed error information error_details = "" if failed_results: error_details = "\n\nשגיאות בשאילתות:\n" for i, qr in enumerate(failed_results, 1): error_details += f"שאילתה {i}: {qr.query}\n" error_details += f"שגיאה: {qr.error}\n\n" return f"בוצעו {len(sql_queries)} שאילתות, אך לא התקבלו תוצאות מהנתונים.{error_details}\nייתכן שהנתונים לא מכילים מידע התואם לשאלה שנשאלה. נסה לשאול שאלה אחרת או לבדוק את הנתונים הזמינים." def _generate_visualizations(self, query_results: List[SQLQueryResult]) -> Optional[List[Dict[str, Any]]]: """ Generate visualization specifications for query results. This function analyzes the structure of query results and automatically determines the best visualization type (bar, line, scatter, histogram). The specifications are returned as dictionaries that the frontend can use with Chart.js to render the visualizations. Args: query_results: List of SQL query results to visualize Returns: List of visualization specification dictionaries, or None if no visualizations can be generated. Each dict contains: - type: Chart type (bar, line, scatter, histogram) - title: Display title - x, y: Column names for axes - data: The actual data to visualize Visualization selection logic: - 2 columns: bar chart (categorical + numeric) or line chart (time series) - 1 column: histogram (if numeric) - 3+ columns: bar chart (first categorical + first numeric) """ visualizations = [] for i, qr in enumerate(query_results, 1): # Skip queries that failed or returned no results if qr.error or len(qr.result) == 0: continue # Determine visualization type based on result structure result = qr.result # If result has 2 columns, might be a bar chart or line chart if len(result.columns) == 2: col1, col2 = result.columns # If first column is categorical and second is numeric if result[col2].dtype in ['int64', 'float64']: # Check if it's a time series (col1 looks like date/time) if 'date' in col1.lower() or 'time' in col1.lower() or 'תאריך' in col1.lower(): visualizations.append({ "type": "line", "title": f"תוצאה של שאילתה {i}", "x": col1, "y": col2, "x_label": col1, "y_label": col2, "data": result.to_dict('records') }) else: # Bar chart for categorical data visualizations.append({ "type": "bar", "title": f"תוצאה של שאילתה {i}", "x": col1, "y": col2, "x_label": col1, "y_label": col2, "data": result.to_dict('records') }) # If both are numeric, might be a scatter plot elif result[col1].dtype in ['int64', 'float64'] and result[col2].dtype in ['int64', 'float64']: visualizations.append({ "type": "scatter", "title": f"תוצאה של שאילתה {i}", "x": col1, "y": col2, "x_label": col1, "y_label": col2, "data": result.to_dict('records') }) # If result has 1 column with numeric values, might be a distribution elif len(result.columns) == 1: col = result.columns[0] if result[col].dtype in ['int64', 'float64']: visualizations.append({ "type": "histogram", "title": f"תוצאה של שאילתה {i}", "x": col, "x_label": col, "data": result[col].tolist() }) # If result has 3+ columns, try to find the best visualization elif len(result.columns) >= 3: # Look for numeric columns numeric_cols = [c for c in result.columns if result[c].dtype in ['int64', 'float64']] categorical_cols = [c for c in result.columns if result[c].dtype == 'object'] # If we have one categorical and one numeric, use bar chart if len(categorical_cols) >= 1 and len(numeric_cols) >= 1: cat_col = categorical_cols[0] num_col = numeric_cols[0] visualizations.append({ "type": "bar", "title": f"תוצאה של שאילתה {i}", "x": cat_col, "y": num_col, "x_label": cat_col, "y_label": num_col, "data": result.to_dict('records') }) return visualizations if visualizations else None