Spaces:
Sleeping
Sleeping
File size: 10,670 Bytes
86ddd31 2fccbc6 19052ba 2fccbc6 19052ba cb8bf64 2fccbc6 40db972 325f883 86ddd31 325f883 2f7a273 dfddecf 86ddd31 2f7a273 dddc062 86ddd31 dddc062 325f883 2f7a273 86ddd31 b500d63 86ddd31 b500d63 86ddd31 b500d63 cb8bf64 2f7a273 86ddd31 5be3717 da71da2 86ddd31 da71da2 86ddd31 19052ba 2f7a273 86ddd31 da71da2 a3c9eb2 dfddecf a3c9eb2 dfddecf 86ddd31 da71da2 86ddd31 19052ba 86ddd31 56f8933 86ddd31 da71da2 86ddd31 da71da2 86ddd31 da71da2 56f8933 da71da2 86ddd31 da71da2 19052ba da71da2 86ddd31 da71da2 2f7a273 dfddecf da71da2 86ddd31 da71da2 dfddecf 86ddd31 da71da2 86ddd31 dfddecf 56f8933 dfddecf 86ddd31 56f8933 da71da2 86ddd31 da71da2 112ad16 19052ba c098a9c da71da2 19052ba dfddecf da71da2 86ddd31 dfddecf 86ddd31 da71da2 86ddd31 b500d63 da71da2 dfddecf 3d0c21e 86ddd31 da71da2 86ddd31 da71da2 dfddecf 86ddd31 dfddecf da71da2 dfddecf da71da2 dfddecf da71da2 86ddd31 da71da2 86ddd31 da71da2 86ddd31 da71da2 dfddecf da71da2 86ddd31 dfddecf da71da2 86ddd31 da71da2 86ddd31 da71da2 dfddecf da71da2 86ddd31 19052ba da71da2 86ddd31 da71da2 86ddd31 da71da2 86ddd31 da71da2 86ddd31 da71da2 86ddd31 fb8cb62 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# app.py
from __future__ import annotations
import os
import io
import traceback
from contextlib import redirect_stdout
from typing import List, Dict, Any
import gradio as gr
import pandas as pd
from datetime import datetime
# --- BACKEND IMPORTS ---
import regex as re2
from langchain_cohere import ChatCohere
# --- LOCAL MODULE IMPORTS ---
from settings import (
HEALTHCARE_SETTINGS, GENERAL_CONVERSATION_PROMPT,
COHERE_MODEL_PRIMARY, COHERE_TIMEOUT_S, USE_OPEN_FALLBACKS
)
from audit_log import log_event
from privacy import safety_filter, refusal_reply
from llm_router import cohere_chat, _co_client, cohere_embed
# --- UTILITY FUNCTIONS ---
def load_markdown_text(filepath: str) -> str:
"""Safely loads text content from a markdown file."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
return f"**Error:** The document `{os.path.basename(filepath)}` was not found."
def _sanitize_text(s: str) -> str:
if not isinstance(s, str): return s
return re2.sub(r'[\p{C}--[\n\t]]+', '', s)
def _create_python_script(user_scenario: str, schema_context: str) -> str:
"""Uses an LLM to act as an "AI Coder", writing a complete Python script."""
prompt_for_coder = f"""
You are an expert Python data scientist. Your sole job is to write a single, complete, and executable Python script to answer the user's request.
You have access to a list of pandas dataframes loaded into a variable named `dfs`.
--- DATA SCHEMA ---
{schema_context}
--- END SCHEMA ---
CRITICAL RULES FOR YOUR SCRIPT:
1. **HANDLE DATA TYPES:** Before performing any mathematical operations, you MUST explicitly convert string values (e.g., '5.5%') to numeric types (`float` or `int`).
2. **CHECK COLUMN NAMES:** You MUST use the exact, case-sensitive column names provided in the DATA SCHEMA. A `KeyError` will cause a failure.
3. **PRINT FINDINGS:** Use the `print()` function at each step to output your results as a formatted report.
--- USER'S SCENARIO ---
{user_scenario}
--- PYTHON SCRIPT ---
Now, write the complete Python script to be executed.
```python
"""
generated_text = cohere_chat(prompt_for_coder)
match = re2.search(r"```python\n(.*?)```", generated_text, re2.DOTALL)
if match:
return match.group(1).strip()
else:
return "print('Error: The AI failed to generate a valid Python script.')"
def _append_msg(history_messages: List[Dict[str, str]], role: str, content: str) -> List[Dict[str, str]]:
return (history_messages or []) + [{"role": role, "content": content}]
def ping_cohere() -> str:
"""Lightweight health check against Cohere."""
try:
cli = _co_client()
if not cli: return "Cohere client not initialized."
vecs = cohere_embed(["hello", "world"])
return f"Cohere OK ✅ (model={COHERE_MODEL_PRIMARY})" if vecs else "Cohere reachable."
except Exception as e:
return f"Cohere ping failed: {e}"
# --- THE CORE ANALYSIS ENGINE ---
def handle(user_msg: str, files: list) -> str:
"""This is the powerful backend engine that supports both modes."""
try:
safe_in, blocked_in, reason_in = safety_filter(user_msg, mode="input")
if blocked_in: return refusal_reply(reason_in)
file_paths: List[str] = [getattr(f, "name", None) or f for f in (files or [])]
if file_paths:
# --- MODE 1: DATA ANALYST (files are present) ---
dataframes = []
schema_parts = []
for i, p in enumerate(file_paths):
if p.endswith('.csv'):
try: df = pd.read_csv(p)
except UnicodeDecodeError: df = pd.read_csv(p, encoding='latin1')
dataframes.append(df)
schema_parts.append(f"DataFrame `dfs[{i}]` (from `{os.path.basename(p)}`):\n{df.head().to_markdown()}\n")
if not dataframes: return "Please upload at least one CSV file."
schema_context = "\n".join(schema_parts)
analysis_script = _create_python_script(safe_in, schema_context)
execution_namespace = {"dfs": dataframes, "pd": pd}
output_buffer = io.StringIO()
try:
with redirect_stdout(output_buffer):
exec(analysis_script, execution_namespace)
result = output_buffer.getvalue()
return _sanitize_text(result or "(The script ran but produced no output.)")
except Exception as e:
return f"An error occurred executing the script: {e}\n\nGenerated Script:\n```python\n{analysis_script}\n```"
else:
# --- MODE 2: CONVERSATIONAL AI (no files are present) ---
prompt = f"{GENERAL_CONVERSATION_PROMPT}\n\nUser: {safe_in}\nAssistant:"
return _sanitize_text(cohere_chat(prompt) or "How can I help further?")
except Exception as e:
tb = traceback.format_exc()
log_event("app_error", None, {"err": str(e), "tb": tb})
return f"A critical error occurred: {e}"
# --- PRE-LOAD LEGAL DOCUMENTS ---
PRIVACY_POLICY_TEXT = load_markdown_text("privacy_policy.md")
TERMS_OF_SERVICE_TEXT = load_markdown_text("terms_of_service.md")
# ---------------- THE PROFESSIONAL UI WITH DUAL-MODE SUPPORT ----------------
with gr.Blocks(theme="soft", css="style.css") as demo:
assessment_history = gr.State([])
with gr.Group(visible=False) as privacy_modal:
with gr.Blocks():
gr.Markdown(PRIVACY_POLICY_TEXT)
close_privacy_btn = gr.Button("Close")
with gr.Group(visible=False) as terms_modal:
with gr.Blocks():
gr.Markdown(TERMS_OF_SERVICE_TEXT)
close_terms_btn = gr.Button("Close")
gr.Markdown("# Universal AI Data Analyst")
with gr.Row(variant="panel"):
with gr.Column(scale=1):
gr.Markdown("## New Assessment")
gr.Markdown("<p style='font-size:0.9rem; color: #6C757D;'>Upload CSV files for data analysis, or just enter a prompt to chat with the AI.</p>") # UX Improvement
files_input = gr.Files(label="Upload Data Files (.csv)", file_count="multiple", type="filepath", file_types=[".csv"])
prompt_input = gr.Textbox(label="Prompt", placeholder="Paste your scenario or question here.", lines=15)
with gr.Row():
send_btn = gr.Button("▶️ Send / Run Analysis", variant="primary", scale=2) # UX Improvement
clear_btn = gr.Button("🗑️ Clear")
ping_btn = gr.Button("Ping Cohere")
ping_out = gr.Markdown()
with gr.Column(scale=2):
with gr.Tabs():
with gr.TabItem("Current Assessment", id=0):
chat_history_output = gr.Chatbot(label="Analysis Output", type="messages", height=600)
with gr.TabItem("Assessment History", id=1):
gr.Markdown("## Review Past Assessments")
history_dropdown = gr.Dropdown(label="Select an assessment to review", choices=[])
history_display = gr.Markdown(label="Selected Assessment Details")
with gr.Row(): gr.Markdown("---")
with gr.Row():
privacy_link = gr.Button("Privacy Policy", variant="link")
terms_link = gr.Button("Terms of Service", variant="link")
def run_analysis_wrapper(prompt, files, chat_history_list, history_state_list):
# --- THE LOGIC FIX IS HERE ---
if not prompt:
gr.Warning("Please enter a prompt.")
yield chat_history_list, history_state_list, gr.update()
return
chat_with_user_msg = _append_msg(chat_history_list, "user", prompt)
thinking_message = _append_msg(chat_with_user_msg, "assistant", "```\n🧠 Thinking... Please wait.\n```")
yield thinking_message, history_state_list, gr.update()
ai_response_text = handle(prompt, files)
final_chat = _append_msg(chat_with_user_msg, "assistant", ai_response_text)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Only save to history if it was a data analysis session
if files:
file_names = [os.path.basename(f.name if hasattr(f, 'name') else f) for f in files]
new_assessment = {"id": timestamp, "prompt": prompt, "files": file_names, "response": ai_response_text}
updated_history = history_state_list + [new_assessment]
history_labels = [f"{item['id']} - {item['prompt'][:40]}..." for item in updated_history]
yield final_chat, updated_history, gr.update(choices=history_labels)
else:
# For simple chat, just update the chat window
yield final_chat, history_state_list, gr.update()
def view_history(selection, history_state_list):
if not selection or not history_state_list: return ""
selected_id = selection.split(" - ")
selected_assessment = next((item for item in history_state_list if item["id"] == selected_id), None)
if selected_assessment:
file_list_md = "\n- ".join(selected_assessment['files'])
return f"""### Assessment from: {selected_assessment['id']}\n**Files Used:**\n- {file_list_md}\n---\n**Original Prompt:**\n> {selected_assessment['prompt']}\n---\n**AI Generated Response:**\n{selected_assessment['response']}"""
return "Could not find the selected assessment."
send_btn.click(
run_analysis_wrapper,
inputs=[prompt_input, files_input, chat_history_output, assessment_history],
outputs=[chat_history_output, assessment_history, history_dropdown]
)
history_dropdown.change(
view_history,
inputs=[history_dropdown, assessment_history],
outputs=[history_display]
)
clear_btn.click(lambda: (None, None, [], []), outputs=[prompt_input, files_input, chat_history_output, assessment_history])
ping_btn.click(ping_cohere, outputs=[ping_out])
privacy_link.click(lambda: gr.update(visible=True), outputs=[privacy_modal])
close_privacy_btn.click(lambda: gr.update(visible=False), outputs=[privacy_modal])
terms_link.click(lambda: gr.update(visible=True), outputs=[terms_modal])
close_terms_btn.click(lambda: gr.update(visible=False), outputs=[terms_modal])
if __name__ == "__main__":
if not os.getenv("COHERE_API_KEY"):
print("🔴 COHERE_API_KEY environment variable not set. Application may not function correctly.")
demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860"))) |