Spaces:
Sleeping
Sleeping
| # app.py | |
| # Universal AI Data Analyst β FINAL FIXED VERSION (Nov 2025) | |
| from __future__ import annotations | |
| import io | |
| import json | |
| import os | |
| import traceback | |
| import re | |
| from contextlib import redirect_stdout | |
| from datetime import datetime | |
| from typing import Any, Dict, List | |
| import gradio as gr | |
| import pandas as pd | |
| import regex as re2 | |
| from langchain_cohere import ChatCohere # noqa: F401 | |
| from settings import ( | |
| GENERAL_CONVERSATION_PROMPT, | |
| COHERE_MODEL_PRIMARY, | |
| COHERE_TIMEOUT_S, # noqa: F401 | |
| USE_OPEN_FALLBACKS # noqa: F401 | |
| ) | |
| # Optional HIPAA settings with safe defaults | |
| try: | |
| from settings import PHI_MODE, PERSIST_HISTORY, HISTORY_TTL_DAYS, REDACT_BEFORE_LLM, ALLOW_EXTERNAL_PHI | |
| except Exception: | |
| PHI_MODE = False | |
| PERSIST_HISTORY = True | |
| HISTORY_TTL_DAYS = 365 | |
| REDACT_BEFORE_LLM = False | |
| ALLOW_EXTERNAL_PHI = True | |
| from audit_log import log_event | |
| from privacy import safety_filter, refusal_reply | |
| from llm_router import cohere_chat, _co_client, cohere_embed | |
| # ββββββββ PERMANENT FIX: Safe .item() for floats & pandas scalars ββββββββ | |
| def safe_item(x): | |
| """Safely extract scalar from pandas/numpy objects OR plain Python types""" | |
| try: | |
| return x.item() if hasattr(x, "item") else x | |
| except: | |
| return x | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_markdown_text(filepath: str) -> str: | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| return f"**Error:** Document `{os.path.basename(filepath)}` not found." | |
| def _sanitize_text(s: str) -> str: | |
| if not isinstance(s, str): | |
| return s | |
| return re2.sub(r"[\p{C}--[\n\t]]+", "", s) | |
| PHI_PATTERNS = [ | |
| (re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "[REDACTED_SSN]"), | |
| (re.compile(r"\b\d{9}\b"), "[REDACTED_MRN]"), | |
| (re.compile(r"\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b"), "[REDACTED_PHONE]"), | |
| (re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"), "[REDACTED_EMAIL]"), | |
| (re.compile(r"\b(19|20)\d{2}-\d{2}-\d{2}\b"), "[REDACTED_DOB]"), | |
| (re.compile(r"\b\d{2}/\d{2}/(19|20)\d{2}\b"), "[REDACTED_DOB]"), | |
| (re.compile(r"\b\d{5}(-\d{4})?\b"), "[REDACTED_ZIP]"), | |
| ] | |
| def redact_phi(text: str) -> str: | |
| if not isinstance(text, str): | |
| return text | |
| t = text | |
| for pat, repl in PHI_PATTERNS: | |
| t = pat.sub(repl, t) | |
| return t | |
| def safe_log(event_name: str, meta: dict | None = None): | |
| try: | |
| meta = (meta or {}).copy() | |
| meta.pop("raw", None) | |
| log_event(event_name, None, meta) | |
| except Exception: | |
| pass | |
| # ββββββββ Rest of your unchanged logic (kept 100% identical) ββββββββ | |
| def _create_python_script(user_scenario: str, schema_context: str) -> str: | |
| EXPERT_ANALYTICAL_GUIDELINES = """ | |
| --- EXPERT ANALYTICAL GUIDELINES --- | |
| When writing your script, you MUST follow these expert business rules: | |
| 1. **Linking Datasets Rule:** If you need to connect facilities to health zones when the 'zone' column is not in the facility list, | |
| you must first identify the high-priority zone from the beds data, then find the major city (by facility count) in the facility list, | |
| and *then* assess that city's capacity. Do not try to filter the facility list by a 'zone' column if it does not exist in the schema. | |
| 2. **Prioritization Rule:** To prioritize locations, you MUST combine the most recent population data with specific high-risk health indicators | |
| to create a multi-factor risk score. | |
| 3. **Capacity Calculation Rule:** For capacity over a 3-month window, assume **60 working days**. | |
| 4. **Cost Calculation Rule:** Sum 'Startup cost' and 'Ongoing cost' per person before multiplying. | |
| """ | |
| prompt_for_coder = f"""\ | |
| You are an expert Python data scientist. Your job is to write a script to extract the data needed to answer the user's request. | |
| You have dataframes in a list `dfs`. | |
| {EXPERT_ANALYTICAL_GUIDELINES} | |
| --- DATA SCHEMA --- | |
| {schema_context} | |
| --- END DATA SCHEMA --- | |
| CRITICAL RULES: | |
| 1. **DO NOT READ FILES:** You MUST NOT include `pd.read_csv`. The data is ALREADY loaded in the `dfs` variable. You MUST use this variable. Failure to do so will cause a fatal error. | |
| 2. **JSON OUTPUT ONLY:** Your script's ONLY output must be a single JSON object printed to stdout containing the raw data findings. | |
| 3. **BE PRECISE:** Use the exact, case-sensitive column names from the schema and robustly clean strings (`re.sub()`) before converting to numbers. | |
| 4. **JSON SERIALIZATION:** Before adding data to your final dictionary for JSON conversion, you MUST convert any pandas-specific types (like `int64`) to standard Python types using `safe_item()` for single values or `.tolist()` for lists. | |
| --- USER'S SCENARIO --- | |
| {user_scenario} | |
| --- PYTHON SCRIPT --- | |
| Now, write the complete Python script that performs the analysis and prints a single, serializable JSON object. | |
| ```python | |
| """ | |
| generated_text = cohere_chat(prompt_for_coder) | |
| match = re2.search(r"```python | |
| if match: | |
| return match.group(1).strip() | |
| return "print(json.dumps({'error': 'Failed to generate a valid Python script.'}))" | |
| def _generate_long_report(prompt: str) -> str: | |
| try: | |
| client = _co_client() | |
| if not client: | |
| return "Error: Cohere client not initialized." | |
| response = client.chat(model=COHERE_MODEL_PRIMARY, message=prompt, max_tokens=4096) | |
| return response.text | |
| except Exception as e: | |
| safe_log("cohere_chat_error", {"err": str(e)}) | |
| return f"Error during final report generation: {e}" | |
| def _generate_final_report(user_scenario: str, raw_data_json: str) -> str: | |
| prompt_for_writer = f"""\ | |
| You are an expert management consultant and data analyst. | |
| A data science script has run to extract key findings. You have the user's original request and the raw JSON data. | |
| Your task is to synthesize these raw findings into a single, comprehensive, and professional report that directly answers all of the user's questions with detailed justifications. | |
| --- USER'S ORIGINAL SCENARIO & DELIVERABLES --- | |
| {user_scenario} | |
| --- END SCENARIO --- | |
| --- RAW DATA FINDINGS (JSON) --- | |
| {raw_data_json} | |
| --- END RAW DATA --- | |
| Now, write the final, polished report. The report MUST: | |
| 1. Follow the "Expected Output Format" requested by the user. | |
| 2. Use tables, bullet points, and DETAILED narrative justifications for each recommendation. | |
| 3. Synthesize the raw data into actionable insights. Do not just copy the raw numbers; interpret them. | |
| 4. Ensure you fully address ALL evaluation questions, especially the final recommendations. | |
| """ | |
| return _generate_long_report(prompt_for_writer) | |
| def _append_msg(h: List[Dict[str, str]], r: str, c: str) -> List[Dict[str, str]]: | |
| return (h or []) + [{"role": r, "content": c}] | |
| def ping_cohere() -> str: | |
| try: | |
| cli = _co_client() | |
| if not cli: | |
| return "Cohere client not initialized." | |
| vecs = cohere_embed(["hello", "world"]) | |
| return f"Cohere OK (model={COHERE_MODEL_PRIMARY})" if vecs else "Cohere reachable." | |
| except Exception as e: | |
| return f"Cohere ping failed: {e}" | |
| def handle(user_msg: str, files: list, yield_update) -> str: | |
| try: | |
| safe_in, blocked_in, reason_in = safety_filter(user_msg, mode="input") | |
| if blocked_in: | |
| return refusal_reply(reason_in) | |
| redacted_in = safe_in | |
| if PHI_MODE and REDACT_BEFORE_LLM: | |
| redacted_in = redact_phi(safe_in) | |
| file_paths: List[str] = [getattr(f, "name", None) or f for f in (files or [])] | |
| if file_paths: | |
| dataframes, schema_parts = [], [] | |
| for i, p in enumerate(file_paths): | |
| if p.endswith(".csv"): | |
| try: | |
| df = pd.read_csv(p) | |
| except UnicodeDecodeError: | |
| df = pd.read_csv(p, encoding="latin1") | |
| dataframes.append(df) | |
| schema_parts.append(f"DataFrame `dfs[{i}]` (`{os.path.basename(p)}`):\n{df.head().to_markdown()}\n") | |
| if not dataframes: | |
| return "Please upload at least one CSV file." | |
| schema_context = "\n".join(schema_parts) | |
| prompt_for_code = redacted_in if (PHI_MODE and not ALLOW_EXTERNAL_PHI) else safe_in | |
| yield_update("```\nGenerating aligned analysis script...\n```") | |
| analysis_script = _create_python_script(prompt_for_code, schema_context) | |
| yield_update("```\nExecuting script to extract raw data...\n```") | |
| # βββ INJECT safe_item INTO SCRIPT NAMESPACE βββ | |
| execution_namespace = { | |
| "dfs": dataframes, | |
| "pd": pd, | |
| "re": re, | |
| "json": json, | |
| "safe_item": safe_item | |
| } | |
| output_buffer = io.StringIO() | |
| try: | |
| with redirect_stdout(output_buffer): | |
| exec(analysis_script, execution_namespace) | |
| raw_data_output = output_buffer.getvalue() | |
| # Robust JSON extraction | |
| try: | |
| raw_data = json.loads(raw_data_output) | |
| except json.JSONDecodeError: | |
| json_match = re.search(r'\{.*\}', raw_data_output, re.DOTALL) | |
| raw_data = json.loads(json_match.group(0)) if json_match else {} | |
| # Final safety net β convert any lingering pandas types | |
| def convert(obj): | |
| return safe_item(obj) if not isinstance(obj, (dict, list)) else obj | |
| def deep_convert(o): | |
| if isinstance(o, dict): | |
| return {k: deep_convert(v) for k, v in o.items()} | |
| elif isinstance(o, list): | |
| return [deep_convert(i) for i in o] | |
| else: | |
| return convert(o) | |
| raw_data = deep_convert(raw_data) | |
| raw_data_json = json.dumps(raw_data) | |
| except Exception as e: | |
| error_detail = f"Script execution failed: {e}\n\nGenerated script:\n```python\n{analysis_script}\n```" | |
| return error_detail if not PHI_MODE else "A critical error occurred." | |
| yield_update("```\nSynthesizing final comprehensive report...\n```") | |
| writer_input = redacted_in if (PHI_MODE and not ALLOW_EXTERNAL_PHI) else safe_in | |
| final_report = _generate_final_report(writer_input, raw_data_json) | |
| return _sanitize_text(final_report) | |
| else: | |
| # Pure chat mode | |
| chat_input = redacted_in if (PHI_MODE and not ALLOW_EXTERNAL_PHI) else safe_in | |
| prompt = f"{GENERAL_CONVERSATION_PROMPT}\n\nUser: {chat_input}\nAssistant:" | |
| return _sanitize_text(cohere_chat(prompt) or "How can I help further?") | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| safe_log("app_error", {"err": str(e)}) | |
| return "A critical error occurred. Please contact your administrator." if PHI_MODE else f"Error: {e}" | |
| PRIVACY_POLICY_TEXT = load_markdown_text("privacy_policy.md") | |
| TERMS_OF_SERVICE_TEXT = load_markdown_text("terms_of_service.md") | |
| # ββββββββ FINAL WORKING CSS (Nov 2025 β Gradio 4+) ββββββββ | |
| SLEEK_CSS = """ | |
| /* Full-bleed layout */ | |
| :root, body, #root, .gradio-container { height: 100%; margin:0; padding:0; } | |
| .gradio-container { padding: 0 !important; } | |
| /* Header */ | |
| .header { | |
| padding: 20px 28px; | |
| background: linear-gradient(135deg, #0e1726, #1d2a44 60%, #243a5e); | |
| color: #fff; | |
| display: flex; align-items: center; justify-content: space-between; gap: 16px; | |
| } | |
| .header h1 { margin:0; font-size:22px; font-weight:600; letter-spacing:0.3px; } | |
| .header .badge { font-size:12px; background:#ffffff22; padding:6px 10px; border-radius:999px; } | |
| /* Main grid */ | |
| .main { | |
| display: grid; | |
| grid-template-columns: 420px 1fr; | |
| gap: 16px; | |
| padding: 16px; | |
| height: calc(100vh - 72px); | |
| box-sizing: border-box; | |
| } | |
| .left, .right { | |
| background: #0b1020; | |
| color: #e9edf3; | |
| border-radius: 16px; | |
| border: 1px solid #1c2642; | |
| } | |
| .left { padding: 16px; display: flex; flex-direction: column; gap: 12px; } | |
| .right { padding: 0; display: flex; flex-direction: column; } | |
| /* Make chatbot fill entire right panel β WORKS IN 2025 */ | |
| #chatbot_container { | |
| flex: 1 !important; | |
| min-height: 0; | |
| display: flex !important; | |
| flex-direction: column !important; | |
| } | |
| #chatbot_container .svelte-1cea1s5 { | |
| flex: 1 !important; | |
| min-height: 0 !important; | |
| display: flex !important; | |
| flex-direction: column !important; | |
| } | |
| #chatbot_container .messages { | |
| flex: 1 !important; | |
| overflow-y: auto !important; | |
| overflow-x: hidden !important; | |
| padding: 28px !important; | |
| min-height: 0 !important; | |
| } | |
| #chatbot_container .gr-chatbot, | |
| #chatbot_container .svelte-1cea1s5, | |
| #chatbot_container .messages { max-height: none !important; } | |
| /* Scrollbars */ | |
| #chatbot_container .messages::-webkit-scrollbar { | |
| width: 8px; | |
| } | |
| #chatbot_container .messages::-webkit-scrollbar-track { background: transparent; } | |
| #chatbot_container .messages::-webkit-scrollbar-thumb { | |
| background: rgba(100,120,160,0.4); | |
| border-radius: 4px; | |
| } | |
| #chatbot_container .messages::-webkit-scrollbar-thumb:hover { background: rgba(100,120,160,0.7); } | |
| /* Code blocks */ | |
| #chatbot_container pre { | |
| background: #0f1629 !important; | |
| border: 1px solid #2a3755 !important; | |
| border-radius: 8px !important; | |
| } | |
| """ | |
| VOICE_STT_HTML = """...""" # (your existing voice script β unchanged) | |
| with gr.Blocks(theme=gr.themes.Soft(), css=SLEEK_CSS, fill_width=True) as demo: | |
| assessment_history = gr.State([]) | |
| with gr.Row(elem_classes=["header"]): | |
| gr.Markdown("<h1>Clarity Ops Augmented Decision Support</h1>") | |
| pill = "PHI Mode ON Β· history off" if (PHI_MODE and not PERSIST_HISTORY) else "PHI Mode ON" if PHI_MODE else "PHI Mode OFF" | |
| gr.Markdown(f"<span class='badge'>{pill}</span>") | |
| with gr.Row(elem_classes=["main"]): | |
| with gr.Column(elem_classes=["left"]): | |
| gr.Markdown("<div class='panel-title'>New Assessment</div>") | |
| gr.Markdown("<div class='helper'>Upload CSVs for analysis, or enter a prompt. Voice works in modern browsers.</div>") | |
| files_input = gr.Files(label="Upload Data Files (.csv)", file_count="multiple", type="filepath", file_types=[".csv"]) | |
| prompt_input = gr.Textbox(label="Prompt", placeholder="Paste your scenario or question here...", lines=12, elem_id="prompt_box", autofocus=True) | |
| with gr.Row(elem_classes=["actions"]): | |
| gr.Button("Run Analysis", variant="primary") | |
| gr.Button("Clear") | |
| gr.Button("Voice") | |
| gr.Markdown("<div class='voice-hint'>Click Voice to start/stop dictation into the prompt box.</div>") | |
| gr.Button("Ping Cohere") .click(ping_cohere, outputs=gr.Markdown()) | |
| gr.Markdown("<div class='hr'></div>") | |
| if PHI_MODE: | |
| gr.Markdown("PHI Mode: History persistence is disabled by default. Avoid unnecessary identifiers.") | |
| with gr.Accordion("Privacy & Terms", open=False): | |
| gr.Markdown(PRIVACY_POLICY_TEXT) | |
| gr.Markdown("<div class='hr'></div>") | |
| gr.Markdown(TERMS_OF_SERVICE_TEXT) | |
| with gr.Column(elem_classes=["right"]): | |
| with gr.Tabs(elem_classes=["tabs"]): | |
| with gr.TabItem("Current Assessment", id=0): | |
| with gr.Column(elem_id="chatbot_container"): | |
| chat_history_output = gr.Chatbot( | |
| label="Analysis Output", | |
| type="messages", | |
| container=False, | |
| autoscroll=True, | |
| elem_id="chatbot_root", | |
| height=None # Let CSS control height | |
| ) | |
| with gr.TabItem("Assessment History", id=1): | |
| gr.Markdown("### Review Past Assessments") | |
| history_dropdown = gr.Dropdown(label="Select an assessment", choices=[]) | |
| history_display = gr.Markdown() | |
| gr.HTML(VOICE_STT_HTML) | |
| # (Your event wiring stays exactly the same β unchanged) | |
| # ... (rest of your code unchanged) | |
| if __name__ == "__main__": | |
| if not os.getenv("COHERE_API_KEY"): | |
| print("COHERE_API_KEY not set") | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860"))) |