AquaSense / chatbot_backend.py
Rivalcoder
Add Fiels
36e3763
import os
import json
import logging
import re
from typing import List, Dict, Any, Optional
from supabase import create_client, Client
from groq import Groq
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GroqRAGChatbot:
def __init__(self):
"""Initialize optimized RAG Chatbot with correct models and Supabase"""
self.groq_client = Groq(api_key=os.getenv('GROQ_API_KEY'))
self.models = {
'intent_analyzer': 'llama-3.1-8b-instant',
'query_builder': 'llama-3.3-70b-versatile',
'response_generator': 'llama-3.3-70b-versatile'
}
self.supabase_url = os.getenv('SUPABASE_URL')
self.supabase_key = os.getenv('SUPABASE_KEY')
self.supabase: Client = create_client(self.supabase_url, self.supabase_key)
self.schema_info = {
'table_name': 'groundwater_data',
'key_columns': {
'district': 'District name (VARCHAR) - ALWAYS REQUIRED - lowercase',
'state': 'State name (VARCHAR) - ALWAYS REQUIRED - lowercase',
'annual_gw_draft_total': 'Total groundwater draft in hectare meters (DECIMAL)',
'annual_replenishable_gw_resource': 'Replenishable groundwater resource (DECIMAL)',
'stage_of_development': 'Development stage percentage (DECIMAL)',
'net_gw_availability': 'Net groundwater availability (DECIMAL)',
'annual_draft_irrigation': 'Irrigation draft (DECIMAL)',
'st_area_shape': 'Underground water coverage area in square meters (DOUBLE PRECISION)',
'st_length_shape': 'Underground water perimeter in meters (DOUBLE PRECISION)',
'geometry': 'Geographic boundaries for underground water mapping (TEXT)'
}
}
def get_db_connection(self):
try:
result = self.supabase.table(self.schema_info['table_name']).select('*').limit(1).execute()
return True
except Exception as e:
logger.error(f"Supabase connection error: {e}")
return False
def analyze_user_intent(self, user_query: str) -> Dict[str, Any]:
try:
prompt = f"""Analyze this user query and respond with JSON only:
Query: "{user_query}"
Available columns: {', '.join(self.schema_info['key_columns'].keys())}
IMPORTANT: For underground water analysis, always consider st_area_shape (coverage area) and st_length_shape (perimeter).
Response format:
{{
"intent_type": "comparison|ranking|statistics|filter|geographic",
"entities": ["district names mentioned"],
"target_columns": ["relevant column names"],
"needs_visualization": true|false,
"requires_geography": true|false,
"underground_focus": true|false
}}"""
response = self.groq_client.chat.completions.create(
messages=[
{"role": "system", "content": "You are a query analyzer. Respond only with valid JSON. Always include district name and underground water metrics."},
{"role": "user", "content": prompt}
],
model=self.models['intent_analyzer'],
temperature=0.1,
max_tokens=200
)
return json.loads(response.choices[0].message.content)
except Exception as e:
logger.error(f"Intent analysis error: {e}")
return {
"intent_type": "ranking",
"entities": [],
"target_columns": ["annual_gw_draft_total"],
"needs_visualization": True,
"requires_geography": False,
"underground_focus": True
}
def build_supabase_query(self, user_query: str, intent_analysis: Dict[str, Any]) -> Any:
"""
Build a Supabase query using the client's query methods instead of generating raw SQL
"""
try:
intent_type = intent_analysis.get('intent_type', 'ranking')
entities = intent_analysis.get('entities', [])
target_columns = intent_analysis.get('target_columns', ['annual_gw_draft_total'])
# Infer top N from free text
inferred_limit: Optional[int] = None
try:
m = re.search(r"\btop\s*(\d+)\b", (user_query or '').lower())
if m:
inferred_limit = int(m.group(1))
except Exception:
inferred_limit = None
# Always include these columns
mandatory_columns = ['district', 'state', 'st_area_shape', 'st_length_shape', 'geometry']
selected_columns = list(set(mandatory_columns + target_columns))
# Start building the query
query = self.supabase.table(self.schema_info['table_name']).select(','.join(selected_columns))
# Apply filters based on intent
if entities:
# Apply OR across district/state for each entity with wildcards
blacklist = {
'district', 'districts', 'state', 'states',
'district names mentioned', 'district names', 'unknown', 'india', 'indian'
}
safe_entities = []
for raw in entities:
try:
token = str(raw).strip().lower()
except Exception:
continue
if not token:
continue
# ignore placeholders or generic tokens containing admin unit words
if token in blacklist or ('district' in token) or ('state' in token):
continue
# ignore extremely short tokens
if len(token) < 3:
continue
safe_entities.append(token)
if safe_entities:
or_clauses = []
for e in safe_entities:
# Use PostgREST ilike syntax with *wildcards*
pattern = f"*{e}*"
or_clauses.append(f"district.ilike.{pattern}")
or_clauses.append(f"state.ilike.{pattern}")
# Combine into a single OR string
or_str = ','.join(or_clauses)
try:
query = query.or_(or_str)
except Exception:
# Fallback: chain first entity as ilike filter
try:
query = query.ilike('district', pattern)
except Exception:
pass
else:
# No safe entities; do not constrain by entity at all
pass
elif intent_type == "filter":
# For filtering queries, we might need to add specific conditions
# This is a simple implementation - you might want to enhance it
if "high" in user_query.lower() or "greater" in user_query.lower():
query = query.gt('stage_of_development', 80)
elif "low" in user_query.lower() or "less" in user_query.lower():
query = query.lt('stage_of_development', 40)
# Choose metric preference from query keywords
ql = (user_query or '').lower()
metric_preference = None
# Map "water level high" to underground coverage area if available
if any(k in ql for k in ["water level", "waterlevel", "underground", "groundwater", "coverage"]):
metric_preference = 'st_area_shape'
# Apply ordering based on intent/metric
if intent_type == "ranking":
column = metric_preference or (target_columns[0] if target_columns else 'annual_gw_draft_total')
order = "desc" if any(word in ql for word in ['highest', 'top', 'most', 'maximum', 'high']) else "asc"
query = query.order(column, desc=(order == "desc"))
elif intent_type == "geographic":
query = query.order('st_area_shape', desc=True)
# Apply limit (return more rows to power Knowledge/Insights)
# If specific entities mentioned, default to a tighter limit unless user said otherwise
if entities:
default_limit = 50
else:
default_limit = 50 if intent_type in ["ranking", "geographic"] else 200
final_limit = inferred_limit if (isinstance(inferred_limit, int) and inferred_limit > 0) else default_limit
# Clamp reasonable bounds (1..500)
final_limit = max(1, min(500, final_limit))
query = query.limit(final_limit)
return query
except Exception as e:
logger.error(f"Supabase query building error: {e}")
# Fallback to a simple query
return self.supabase.table(self.schema_info['table_name']).select('*').limit(10)
def execute_supabase_query(self, query) -> Optional[List[Dict[str, Any]]]:
"""
Execute the Supabase query and return results
"""
try:
result = query.execute()
# Supabase-py v2 returns a PostgrestResponse with .data
rows = getattr(result, 'data', None)
if rows is None:
rows = []
# Normalize string fields to lowercase except geometry
for row in rows:
for k, v in list(row.items()):
if isinstance(v, str) and k != 'geometry':
row[k] = v.lower()
# print(rows)
logger.info(f"Supabase query returned {len(rows)} results")
return rows
except Exception as e:
logger.error(f"Supabase query execution error: {e}")
return []
def get_quick_stats(self) -> Dict[str, Any]:
try:
total_result = self.supabase.table(self.schema_info['table_name']).select("district", count="exact").limit(1).execute()
total_districts = total_result.count if hasattr(total_result, 'count') else len(total_result.data) if total_result.data else 0
all_data = self.supabase.table(self.schema_info['table_name']).select("stage_of_development").execute()
if all_data.data:
developments = []
for row in all_data.data:
val = row.get('stage_of_development')
try:
if val is not None:
num_val = float(val) if isinstance(val, str) else val
if isinstance(num_val, (int, float)):
# Sanitize: ignore invalid/negative and extreme outliers
if 0 <= num_val <= 500:
developments.append(num_val)
except (ValueError, TypeError):
continue
if developments:
avg_development = sum(developments) / len(developments)
# Clamp to sensible range
avg_development = max(0.0, min(200.0, avg_development))
over_exploited = len([d for d in developments if d is not None and d > 100])
critical = len([d for d in developments if d is not None and 80 <= d <= 100])
else:
avg_development = 0
over_exploited = 0
critical = 0
else:
avg_development = 0
over_exploited = 0
critical = 0
return {
"total_districts": total_districts,
"avg_development": round(float(avg_development), 1),
"over_exploited": over_exploited,
"critical": critical
}
except Exception as e:
logger.error(f"Stats query error: {e}")
return {
"total_districts": 0,
"avg_development": 0,
"over_exploited": 0,
"critical": 0
}
def generate_response(self, user_query: str, query_results: List[Dict[str, Any]]) -> str:
try:
if not query_results:
return "No data found matching your query. Please try rephrasing your question or check if the district names are correct."
results_summary = []
for result in query_results[:5]:
result_items = []
for k, v in result.items():
if v is not None and k != 'geometry':
if k == 'st_area_shape':
try:
area_val = float(v)
result_items.append(f"Underground Coverage Area: {area_val:,.0f} sq.m")
except (ValueError, TypeError):
result_items.append(f"Underground Coverage Area: {v}")
elif k == 'st_length_shape':
try:
length_val = float(v)
result_items.append(f"Underground Perimeter: {length_val:,.0f} m")
except (ValueError, TypeError):
result_items.append(f"Underground Perimeter: {v}")
else:
result_items.append(f"{k}: {v}")
results_summary.append(", ".join(result_items))
results_text = "\n".join(results_summary)
prompt = f"""Analyze Indian groundwater data results with focus on underground water availability.
User Question: {user_query}
Results ({len(query_results)} total):
{results_text}
IMPORTANT CONTEXT:
- st_area_shape represents underground water coverage area (larger = more underground water extent)
- st_length_shape represents underground water perimeter (longer = more complex underground water boundaries)
- These metrics help assess underground water availability and distribution
Provide analysis with:
1. Direct answer to the user's question
2. District names with specific numbers
3. Underground water coverage insights using st_area_shape and st_length_shape
4. Practical implications for water management
5. Which districts have better underground water availability based on area/perimeter metrics
Keep response informative and highlight underground water aspects."""
response = self.groq_client.chat.completions.create(
messages=[
{"role": "system", "content": "You are an Indian groundwater expert specializing in underground water analysis. Provide insights using area and perimeter metrics for underground water availability."},
{"role": "user", "content": prompt}
],
model=self.models['response_generator'],
temperature=0.3,
max_tokens=500
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Response generation error: {e}")
if query_results:
districts = [r.get('district', 'Unknown') for r in query_results[:3]]
return f"Found underground water data for {len(query_results)} districts including {', '.join(districts)}. Check the map visualization for underground water coverage areas and detailed results below."
return "Unable to generate detailed analysis, but query executed successfully."
def generate_summary_and_followups(self, user_query: str, query_results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate a concise summary and 3 follow-up questions to deepen analysis."""
try:
# Build compact, token-light context
top_rows = []
for r in (query_results or [])[:5]:
summary_row = {
k: v for k, v in r.items()
if k in {
'district', 'state', 'annual_gw_draft_total', 'stage_of_development',
'net_gw_availability', 'st_area_shape', 'st_length_shape'
} and v is not None
}
top_rows.append(summary_row)
prompt = (
"You are an assistant that outputs strict JSON. Given a user query and a small set "
"of Indian groundwater results (with underground coverage metrics), produce: "
"1) a one-paragraph summary (<= 80 words), 2) three concise follow-up questions.\n\n"
f"User Query: {user_query}\n\n"
f"Results Sample: {json.dumps(top_rows) }\n\n"
"Respond ONLY as JSON with keys 'summary' and 'follow_ups' (array of 3 strings)."
)
response = self.groq_client.chat.completions.create(
messages=[
{"role": "system", "content": "Output valid JSON only."},
{"role": "user", "content": prompt}
],
model=self.models['intent_analyzer'],
temperature=0.2,
max_tokens=200
)
data = json.loads(response.choices[0].message.content)
summary = data.get('summary') or ""
follow_ups = data.get('follow_ups') or []
# Ensure exactly up to 3
follow_ups = [str(q) for q in follow_ups][:3]
return {"summary": summary, "follow_ups": follow_ups}
except Exception as e:
logger.warning(f"Summary/follow-ups generation failed: {e}")
# Sensible fallback
fallback = [
"Do you want to compare two or more districts?",
"Should I filter by over-exploited or critical status?",
"Would you like a geographic view of underground coverage?"
]
return {"summary": "", "follow_ups": fallback}
def build_visualization_spec(self, user_query: str, intent_analysis: Dict[str, Any], query_results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Derive a lightweight visualization spec without altering existing logic."""
try:
if not query_results:
return {"enabled": False}
intent_type = intent_analysis.get("intent_type", "ranking")
target_columns = intent_analysis.get("target_columns", ["annual_gw_draft_total"]) or ["annual_gw_draft_total"]
primary = target_columns[0]
# Prefer known numeric metrics
numeric_preferences = [
"annual_gw_draft_total",
"stage_of_development",
"net_gw_availability",
"annual_replenishable_gw_resource",
"annual_draft_irrigation",
"st_area_shape",
"st_length_shape"
]
metric = next((c for c in [primary] + numeric_preferences if any(c in r for r in query_results)), primary)
# Fallback metric if not present
if not any(metric in r for r in query_results):
metric = "st_area_shape" if any("st_area_shape" in r for r in query_results) else primary
spec: Dict[str, Any] = {
"enabled": True,
"chart_type": "bar",
"x": "district" if any("district" in r for r in query_results) else None,
"y": metric,
"title": "",
"top_n": 10,
}
if intent_type == "comparison":
spec["title"] = f"Comparison of {metric.replace('_',' ').title()}"
spec["chart_type"] = "bar"
elif intent_type == "ranking":
spec["title"] = f"Ranking by {metric.replace('_',' ').title()}"
spec["chart_type"] = "bar"
elif intent_type == "statistics":
spec["title"] = f"Distribution of {metric.replace('_',' ').title()}"
spec["chart_type"] = "histogram"
spec["x"] = metric
spec["y"] = None
elif intent_type == "geographic":
spec["title"] = "Underground Coverage by District"
spec["chart_type"] = "bar"
spec["y"] = "st_area_shape" if any("st_area_shape" in r for r in query_results) else metric
return spec
except Exception:
return {"enabled": False}
def compute_insights(self, query_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Compute actionable insights from a result set without additional API calls.
Returns a list of {title, detail} objects suitable for display.
"""
try:
if not query_results:
return []
# Build a DataFrame-like view without importing pandas here
rows = []
for r in query_results:
try:
rows.append({
'district': r.get('district'),
'stage': float(r.get('stage_of_development')) if r.get('stage_of_development') not in (None, "") else None,
'draft_total': float(r.get('annual_gw_draft_total')) if r.get('annual_gw_draft_total') not in (None, "") else None,
'availability': float(r.get('net_gw_availability')) if r.get('net_gw_availability') not in (None, "") else None,
'replenishable': float(r.get('annual_replenishable_gw_resource')) if r.get('annual_replenishable_gw_resource') not in (None, "") else None,
'draft_irrigation': float(r.get('annual_draft_irrigation')) if r.get('annual_draft_irrigation') not in (None, "") else None,
'area': float(r.get('st_area_shape')) if r.get('st_area_shape') not in (None, "") else None,
'perimeter': float(r.get('st_length_shape')) if r.get('st_length_shape') not in (None, "") else None,
})
except Exception:
continue
if not rows:
return []
insights: List[Dict[str, Any]] = []
# Over-exploited and critical counts
over_ex = [r for r in rows if r['stage'] is not None and r['stage'] > 100]
critical = [r for r in rows if r['stage'] is not None and 80 <= r['stage'] <= 100]
if over_ex:
top_over = sorted(over_ex, key=lambda x: x['stage'], reverse=True)[:3]
names = ", ".join([str(r.get('district', 'unknown')).title() for r in top_over])
insights.append({
"title": "Over‑exploited hotspots",
"detail": f"{len(over_ex)} districts >100% development. Top: {names}."
})
if critical:
insights.append({
"title": "Critical watchlist",
"detail": f"{len(critical)} districts between 80–100% development; prioritize monitoring."
})
# Highest draft and availability gaps
with_draft = [r for r in rows if r['draft_total'] is not None]
if with_draft:
top_draft = sorted(with_draft, key=lambda x: x['draft_total'], reverse=True)[:3]
names = ", ".join([str(r.get('district', 'unknown')).title() for r in top_draft])
insights.append({
"title": "Top pressure points",
"detail": f"Highest total draft in: {names}. Target demand management here first."
})
# Supply-demand gap if both available
gap_rows = [r for r in rows if r['availability'] is not None and r['draft_total'] is not None]
if gap_rows:
gaps = sorted(gap_rows, key=lambda x: (x['draft_total'] - x['availability']), reverse=True)
worst = gaps[0]
if worst:
insights.append({
"title": "Availability gap",
"detail": f"Largest draft minus availability gap in {str(worst.get('district', 'unknown')).title()}."
})
# Recharge potential: big underground coverage areas
with_area = [r for r in rows if r['area'] is not None]
if with_area:
top_area = sorted(with_area, key=lambda x: x['area'], reverse=True)[:3]
names = ", ".join([str(r.get('district', 'unknown')).title() for r in top_area])
insights.append({
"title": "Recharge potential",
"detail": f"Large underground coverage in: {names}. Consider MAR sites."
})
# Complex boundaries: high perimeter relative to area (shape complexity)
complex_rows = [r for r in rows if r['perimeter'] and r['area'] and r['area'] > 0]
if complex_rows:
# Complexity ~ perimeter / sqrt(area)
ranked = sorted(complex_rows, key=lambda x: x['perimeter'] / max(1.0, x['area'] ** 0.5), reverse=True)[:3]
names = ", ".join([str(r.get('district', 'unknown')).title() for r in ranked])
insights.append({
"title": "Boundary complexity",
"detail": f"Complex underground boundaries in: {names}. Densify observation wells."
})
# Ensure at least 5 insights by adding generic, data-backed items
if len(insights) < 5 and with_draft:
avg_draft = sum([r['draft_total'] for r in with_draft if r['draft_total'] is not None]) / max(1, len(with_draft))
insights.append({
"title": "Average draft benchmark",
"detail": f"Avg annual draft across results is ~{avg_draft:,.0f} HM."
})
if len(insights) < 5 and with_area:
median_area = sorted([r['area'] for r in with_area if r['area'] is not None])
if median_area:
mid = median_area[len(median_area)//2]
insights.append({
"title": "Coverage benchmark",
"detail": f"Median underground coverage area is ~{mid:,.0f} sq.m."
})
return insights[:8]
except Exception:
return []
def chat(self, user_query: str) -> Dict[str, Any]:
logger.info(f"Processing query: {user_query}")
try:
intent_analysis = self.analyze_user_intent(user_query)
logger.info(f"Intent analysis: {intent_analysis}")
# Build and execute the Supabase query directly
query = self.build_supabase_query(user_query, intent_analysis)
query_results = self.execute_supabase_query(query)
if not query_results:
return {
"response": "Unable to retrieve data. This could be due to incorrect district names or database connectivity issues. Please try rephrasing your query.",
"intent_analysis": intent_analysis,
"results": [],
"results_count": 0,
"success": False
}
response = self.generate_response(user_query, query_results)
viz_spec = self.build_visualization_spec(user_query, intent_analysis, query_results)
aux = self.generate_summary_and_followups(user_query, query_results)
insights = self.compute_insights(query_results)
return {
"response": response,
"intent_analysis": intent_analysis,
"results": query_results,
"results_count": len(query_results),
"success": True,
"visualization": viz_spec,
"summary": aux.get("summary", ""),
"follow_ups": aux.get("follow_ups", []),
"insights": insights
}
except Exception as e:
logger.error(f"Chat processing error: {e}")
return {
"response": f"An error occurred while processing your query: {str(e)}",
"intent_analysis": {"error": str(e)},
"results": [],
"results_count": 0,
"success": False
}