Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import io | |
| import json | |
| import os | |
| import traceback | |
| from contextlib import redirect_stdout | |
| from datetime import datetime | |
| from typing import Any, Dict, List | |
| import gradio as gr | |
| import pandas as pd | |
| import regex as re2 | |
| import re | |
| from langchain_cohere import ChatCohere # noqa: F401 | |
| from settings import ( | |
| GENERAL_CONVERSATION_PROMPT, | |
| COHERE_MODEL_PRIMARY, | |
| COHERE_TIMEOUT_S, # noqa: F401 | |
| USE_OPEN_FALLBACKS # noqa: F401 | |
| ) | |
| # Try to import optional HIPAA flags; fall back to safe defaults if not defined. | |
| try: | |
| from settings import PHI_MODE, PERSIST_HISTORY, HISTORY_TTL_DAYS, REDACT_BEFORE_LLM, ALLOW_EXTERNAL_PHI | |
| except Exception: | |
| PHI_MODE = False | |
| PERSIST_HISTORY = True | |
| HISTORY_TTL_DAYS = 365 | |
| REDACT_BEFORE_LLM = False | |
| ALLOW_EXTERNAL_PHI = True | |
| from audit_log import log_event | |
| from privacy import safety_filter, refusal_reply | |
| from llm_router import cohere_chat, _co_client, cohere_embed | |
| # ---------------------- Helpers ---------------------- | |
| def load_markdown_text(filepath: str) -> str: | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| return f"**Error:** Document `{os.path.basename(filepath)}` not found." | |
| def _sanitize_text(s: str) -> str: | |
| if not isinstance(s, str): | |
| return s | |
| return re2.sub(r"[\p{C}--[\n\t]]+", "", s) | |
| # Conservative PHI redaction patterns | |
| PHI_PATTERNS = [ | |
| (re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "[REDACTED_SSN]"), | |
| (re.compile(r"\b\d{9}\b"), "[REDACTED_MRN]"), | |
| (re.compile(r"\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b"), "[REDACTED_PHONE]"), | |
| (re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"), "[REDACTED_EMAIL]"), | |
| (re.compile(r"\b(19|20)\d{2}-\d{2}-\d{2}\b"), "[REDACTED_DOB]"), | |
| (re.compile(r"\b\d{2}/\d{2}/(19|20)\d{2}\b"), "[REDACTED_DOB]"), | |
| (re.compile(r"\b\d{5}(-\d{4})?\b"), "[REDACTED_ZIP]"), | |
| ] | |
| def redact_phi(text: str) -> str: | |
| if not isinstance(text, str): | |
| return text | |
| t = text | |
| for pat, repl in PHI_PATTERNS: | |
| t = pat.sub(repl, t) | |
| return t | |
| def safe_log(event_name: str, meta: dict | None = None): | |
| try: | |
| meta = (meta or {}).copy() | |
| meta.pop("raw", None) | |
| log_event(event_name, None, meta) | |
| except Exception: | |
| pass | |
| # ---------------------- Audit Trail ---------------------- | |
| import hashlib | |
| from datetime import datetime as dt | |
| def _hash_content(content: str) -> str: | |
| """Generate a short hash for content identification without storing full content.""" | |
| return hashlib.sha256(content.encode('utf-8')).hexdigest()[:16] | |
| def _safe_truncate(text: str, max_length: int = 500) -> str: | |
| """Safely truncate text for logging without exposing sensitive data.""" | |
| if not text or len(text) <= max_length: | |
| return text | |
| return text[:max_length] + f"... [truncated, {len(text)} total chars]" | |
| def log_analysis_start(user_prompt: str, filenames: List[str], schema_summary: List[Dict[str, Any]]) -> str: | |
| """ | |
| Log the start of an analysis session. | |
| Captures data lineage: what files were uploaded and their schemas. | |
| Returns a session_id for correlating subsequent log entries. | |
| """ | |
| session_id = dt.now().strftime("%Y%m%d_%H%M%S_") + _hash_content(user_prompt)[:8] | |
| # Build schema summary without sensitive data | |
| schema_log = [] | |
| for schema in schema_summary: | |
| schema_log.append({ | |
| "filename": schema.get("filename"), | |
| "rows": schema.get("rows"), | |
| "columns": schema.get("columns"), | |
| "column_names": schema.get("column_names"), | |
| "dtypes": schema.get("dtypes"), | |
| }) | |
| safe_log("analysis_session_start", { | |
| "session_id": session_id, | |
| "prompt_hash": _hash_content(user_prompt), | |
| "prompt_length": len(user_prompt), | |
| "file_count": len(filenames), | |
| "filenames": filenames, | |
| "schemas": schema_log, | |
| "timestamp": dt.now().isoformat(), | |
| }) | |
| return session_id | |
| def log_code_generation(session_id: str, generated_code: str) -> None: | |
| """ | |
| Log the generated analysis code. | |
| Captures code execution logs for traceability. | |
| Every finding can be traced back to specific lines of generated Python code. | |
| """ | |
| # Parse code to extract key operations for the log | |
| code_operations = [] | |
| if "groupby" in generated_code: | |
| code_operations.append("groupby") | |
| if "merge" in generated_code or "join" in generated_code: | |
| code_operations.append("merge/join") | |
| if "pivot" in generated_code: | |
| code_operations.append("pivot") | |
| if "agg" in generated_code or "aggregate" in generated_code: | |
| code_operations.append("aggregate") | |
| if "sort" in generated_code: | |
| code_operations.append("sort") | |
| if "filter" in generated_code or ".loc[" in generated_code or ".query(" in generated_code: | |
| code_operations.append("filter") | |
| if "mean(" in generated_code or "sum(" in generated_code or "count(" in generated_code: | |
| code_operations.append("statistics") | |
| safe_log("code_generation", { | |
| "session_id": session_id, | |
| "code_hash": _hash_content(generated_code), | |
| "code_length": len(generated_code), | |
| "code_lines": generated_code.count('\n') + 1, | |
| "operations_detected": code_operations, | |
| "timestamp": dt.now().isoformat(), | |
| }) | |
| def log_code_execution(session_id: str, success: bool, output_size: int, error: str = None) -> None: | |
| """ | |
| Log the result of code execution. | |
| Captures execution status and output metadata. | |
| """ | |
| safe_log("code_execution", { | |
| "session_id": session_id, | |
| "success": success, | |
| "output_size_bytes": output_size, | |
| "error": _safe_truncate(error) if error else None, | |
| "timestamp": dt.now().isoformat(), | |
| }) | |
| def log_analysis_complete( | |
| session_id: str, | |
| validated_output_keys: List[str], | |
| report_length: int, | |
| total_duration_ms: float = None | |
| ) -> None: | |
| """ | |
| Log successful completion of analysis. | |
| Captures analytical provenance: what was produced and output structure. | |
| """ | |
| safe_log("analysis_session_complete", { | |
| "session_id": session_id, | |
| "output_keys": validated_output_keys, | |
| "output_key_count": len(validated_output_keys), | |
| "report_length": report_length, | |
| "duration_ms": total_duration_ms, | |
| "timestamp": dt.now().isoformat(), | |
| }) | |
| def log_analysis_error(session_id: str, error_type: str, error_message: str) -> None: | |
| """ | |
| Log analysis failure. | |
| Captures error information for debugging without exposing sensitive data. | |
| """ | |
| safe_log("analysis_session_error", { | |
| "session_id": session_id, | |
| "error_type": error_type, | |
| "error_message": _safe_truncate(error_message), | |
| "timestamp": dt.now().isoformat(), | |
| }) | |
| # ---------------------- JSON Validation ---------------------- | |
| class JSONValidationError(Exception): | |
| """Raised when script output fails JSON validation.""" | |
| pass | |
| def validate_json_output(raw_output: str) -> Dict[str, Any]: | |
| """ | |
| Validates and parses JSON output from the analysis script. | |
| Creates the hard boundary between calculation and communication. | |
| """ | |
| cleaned_output = raw_output.strip() | |
| if not cleaned_output: | |
| raise JSONValidationError( | |
| "Analysis script produced no output. The script must print a JSON object to stdout." | |
| ) | |
| # Handle multiple JSON objects (take the last complete one) | |
| json_candidates = [] | |
| brace_count = 0 | |
| current_start = None | |
| for i, char in enumerate(cleaned_output): | |
| if char == '{': | |
| if brace_count == 0: | |
| current_start = i | |
| brace_count += 1 | |
| elif char == '}': | |
| brace_count -= 1 | |
| if brace_count == 0 and current_start is not None: | |
| json_candidates.append(cleaned_output[current_start:i+1]) | |
| current_start = None | |
| if not json_candidates: | |
| json_to_parse = cleaned_output | |
| else: | |
| json_to_parse = json_candidates[-1] | |
| try: | |
| parsed = json.loads(json_to_parse) | |
| except json.JSONDecodeError as e: | |
| error_context = cleaned_output[:500] + ("..." if len(cleaned_output) > 500 else "") | |
| raise JSONValidationError( | |
| f"Analysis script produced invalid JSON. Parse error: {e.msg} at position {e.pos}.\n\n" | |
| f"Raw output (first 500 chars):\n```\n{error_context}\n```" | |
| ) | |
| if not isinstance(parsed, dict): | |
| raise JSONValidationError( | |
| f"Analysis output must be a JSON object (dictionary), not {type(parsed).__name__}. " | |
| f"Ensure your script prints a dictionary with json.dumps()." | |
| ) | |
| if "error" in parsed: | |
| error_msg = parsed.get("error", "Unknown error") | |
| raise JSONValidationError(f"Analysis script reported an error: {error_msg}") | |
| if not parsed: | |
| raise JSONValidationError( | |
| "Analysis script produced an empty JSON object. " | |
| "Ensure your script populates the output dictionary with findings." | |
| ) | |
| safe_log("json_validation_success", {"keys": list(parsed.keys()), "key_count": len(parsed)}) | |
| return parsed | |
| def format_validated_json_for_report(validated_data: Dict[str, Any]) -> str: | |
| """Formats validated JSON data for the report generator.""" | |
| try: | |
| return json.dumps(validated_data, indent=2, default=str, ensure_ascii=False) | |
| except (TypeError, ValueError) as e: | |
| safe_log("json_format_warning", {"error": str(e)}) | |
| return json.dumps({"raw_data": str(validated_data)}, indent=2) | |
| # ---------------------- Schema Validation ---------------------- | |
| class SchemaValidationError(Exception): | |
| """Raised when input data fails schema validation.""" | |
| pass | |
| def validate_dataframe_schema(df: pd.DataFrame, filename: str) -> Dict[str, Any]: | |
| """ | |
| Validates a DataFrame's schema before analysis. | |
| Implements the ClarityOps requirement: | |
| "Schema validation examines column names, data types, and value ranges | |
| before analysis begins. The system rejects malformed inputs." | |
| Args: | |
| df: The DataFrame to validate | |
| filename: Original filename for error messages | |
| Returns: | |
| Dict containing schema metadata for logging | |
| Raises: | |
| SchemaValidationError: If the DataFrame fails validation | |
| """ | |
| errors = [] | |
| warnings = [] | |
| # Check 1: DataFrame is not empty | |
| if df.empty: | |
| raise SchemaValidationError(f"File '{filename}' contains no data (empty DataFrame).") | |
| # Check 2: Has at least one column | |
| if len(df.columns) == 0: | |
| raise SchemaValidationError(f"File '{filename}' has no columns.") | |
| # Check 3: Column names are valid (not empty, no duplicates) | |
| col_names = list(df.columns) | |
| # Check for empty column names | |
| empty_cols = [i for i, c in enumerate(col_names) if str(c).strip() == "" or pd.isna(c)] | |
| if empty_cols: | |
| errors.append(f"Empty column names at positions: {empty_cols}") | |
| # Check for duplicate column names | |
| seen = {} | |
| duplicates = [] | |
| for col in col_names: | |
| col_str = str(col) | |
| if col_str in seen: | |
| duplicates.append(col_str) | |
| seen[col_str] = True | |
| if duplicates: | |
| errors.append(f"Duplicate column names: {list(set(duplicates))}") | |
| # Check 4: Data types are recognizable (skip if duplicates found) | |
| has_duplicates = len(duplicates) > 0 | |
| if not has_duplicates: | |
| for col in df.columns: | |
| dtype = df[col].dtype | |
| if dtype == object: | |
| # Check if object column has mixed types that could cause issues | |
| sample = df[col].dropna().head(100) | |
| if len(sample) > 0: | |
| types_in_col = set(type(x).__name__ for x in sample) | |
| if len(types_in_col) > 2: # Allow str + one other type | |
| warnings.append(f"Column '{col}' has mixed types: {types_in_col}") | |
| # Check 5: Reasonable row count (warn if very large) | |
| if len(df) > 1_000_000: | |
| warnings.append(f"Large dataset ({len(df):,} rows) may impact performance.") | |
| # Check 6: Check for completely null columns (skip if duplicates found) | |
| if not has_duplicates: | |
| null_cols = [col for col in df.columns if df[col].isna().all()] | |
| if null_cols: | |
| warnings.append(f"Columns with all null values: {null_cols}") | |
| # Check 7: Validate numeric columns have reasonable ranges (skip if duplicates found) | |
| if not has_duplicates: | |
| import numpy as np | |
| for col in df.select_dtypes(include=['number']).columns: | |
| col_data = df[col].dropna() | |
| if len(col_data) > 0: | |
| if np.isinf(col_data).any(): | |
| errors.append(f"Column '{col}' contains infinite values.") | |
| # If there are critical errors, reject the input | |
| if errors: | |
| error_msg = f"Schema validation failed for '{filename}':\n" + "\n".join(f" - {e}" for e in errors) | |
| raise SchemaValidationError(error_msg) | |
| # Build schema metadata | |
| schema_info = { | |
| "filename": filename, | |
| "rows": len(df), | |
| "columns": len(df.columns), | |
| "column_names": col_names, | |
| "dtypes": {str(col): str(df[col].dtype) for col in df.columns}, | |
| "null_counts": {str(col): int(df[col].isna().sum()) for col in df.columns}, | |
| "warnings": warnings, | |
| } | |
| # Log warnings but don't fail | |
| if warnings: | |
| safe_log("schema_validation_warnings", {"filename": filename, "warnings": warnings}) | |
| safe_log("schema_validation_passed", {"filename": filename, "rows": len(df), "columns": len(df.columns)}) | |
| return schema_info | |
| def validate_all_dataframes(dataframes: List[pd.DataFrame], filenames: List[str]) -> List[Dict[str, Any]]: | |
| """ | |
| Validates all uploaded DataFrames. | |
| Args: | |
| dataframes: List of DataFrames to validate | |
| filenames: Corresponding filenames | |
| Returns: | |
| List of schema metadata dicts | |
| Raises: | |
| SchemaValidationError: If any DataFrame fails validation | |
| """ | |
| schema_infos = [] | |
| all_errors = [] | |
| for df, filename in zip(dataframes, filenames): | |
| try: | |
| schema_info = validate_dataframe_schema(df, filename) | |
| schema_infos.append(schema_info) | |
| except SchemaValidationError as e: | |
| all_errors.append(str(e)) | |
| if all_errors: | |
| raise SchemaValidationError("\n\n".join(all_errors)) | |
| return schema_infos | |
| # ---------------------- Sandbox Execution ---------------------- | |
| class SandboxViolationError(Exception): | |
| """Raised when generated code attempts forbidden operations.""" | |
| pass | |
| # Restricted import function that only allows safe modules | |
| _ALLOWED_MODULES = frozenset({ | |
| "json", "math", "statistics", "collections", "itertools", "functools", | |
| "operator", "string", "re", "datetime", "decimal", "fractions", | |
| "random", "copy", "types", "typing", "dataclasses", "enum", | |
| "numpy", "pandas", "scipy.stats", | |
| }) | |
| _BLOCKED_MODULES = frozenset({ | |
| "os", "sys", "subprocess", "shutil", "pathlib", "glob", | |
| "socket", "http", "urllib", "requests", "ftplib", "smtplib", | |
| "pickle", "shelve", "marshal", "importlib", "builtins", | |
| "ctypes", "multiprocessing", "threading", "asyncio", | |
| "eval", "exec", "compile", "open", "file", "input", | |
| "code", "codeop", "pty", "tty", "termios", "resource", | |
| "signal", "mmap", "sysconfig", "platform", | |
| }) | |
| def _safe_import(name: str, globals_dict=None, locals_dict=None, fromlist=(), level=0): | |
| """Restricted import that only allows whitelisted modules.""" | |
| import builtins as _builtins | |
| base_module = name.split('.')[0] | |
| if base_module in _BLOCKED_MODULES or name in _BLOCKED_MODULES: | |
| raise SandboxViolationError(f"Import of '{name}' is not allowed in sandbox environment.") | |
| if base_module not in _ALLOWED_MODULES and name not in _ALLOWED_MODULES: | |
| raise SandboxViolationError(f"Import of '{name}' is not allowed. Allowed modules: {', '.join(sorted(_ALLOWED_MODULES))}") | |
| return _builtins.__import__(name, globals_dict, locals_dict, fromlist, level) | |
| def _create_sandbox_builtins() -> Dict[str, Any]: | |
| """ | |
| Creates a restricted builtins dict that prevents dangerous operations. | |
| Allows safe operations needed for data analysis. | |
| """ | |
| import builtins | |
| # Safe builtins for data analysis | |
| safe_builtins = { | |
| # Types and constructors | |
| "bool": builtins.bool, | |
| "int": builtins.int, | |
| "float": builtins.float, | |
| "str": builtins.str, | |
| "list": builtins.list, | |
| "dict": builtins.dict, | |
| "tuple": builtins.tuple, | |
| "set": builtins.set, | |
| "frozenset": builtins.frozenset, | |
| "bytes": builtins.bytes, | |
| "bytearray": builtins.bytearray, | |
| "complex": builtins.complex, | |
| "slice": builtins.slice, | |
| "type": builtins.type, | |
| "object": builtins.object, | |
| # Iteration and sequences | |
| "range": builtins.range, | |
| "enumerate": builtins.enumerate, | |
| "zip": builtins.zip, | |
| "map": builtins.map, | |
| "filter": builtins.filter, | |
| "reversed": builtins.reversed, | |
| "sorted": builtins.sorted, | |
| "iter": builtins.iter, | |
| "next": builtins.next, | |
| "len": builtins.len, | |
| # Math and comparison | |
| "abs": builtins.abs, | |
| "min": builtins.min, | |
| "max": builtins.max, | |
| "sum": builtins.sum, | |
| "pow": builtins.pow, | |
| "round": builtins.round, | |
| "divmod": builtins.divmod, | |
| # Logic and identity | |
| "all": builtins.all, | |
| "any": builtins.any, | |
| "isinstance": builtins.isinstance, | |
| "issubclass": builtins.issubclass, | |
| "id": builtins.id, | |
| "hash": builtins.hash, | |
| # String and representation | |
| "repr": builtins.repr, | |
| "ascii": builtins.ascii, | |
| "chr": builtins.chr, | |
| "ord": builtins.ord, | |
| "format": builtins.format, | |
| "print": builtins.print, | |
| # Attribute access | |
| "getattr": builtins.getattr, | |
| "setattr": builtins.setattr, | |
| "hasattr": builtins.hasattr, | |
| "delattr": builtins.delattr, | |
| # Other safe operations | |
| "callable": builtins.callable, | |
| "dir": builtins.dir, | |
| "vars": builtins.vars, | |
| "locals": builtins.locals, | |
| "globals": lambda: {}, # Return empty dict to prevent access to real globals | |
| # Exceptions (needed for error handling in scripts) | |
| "Exception": builtins.Exception, | |
| "ValueError": builtins.ValueError, | |
| "TypeError": builtins.TypeError, | |
| "KeyError": builtins.KeyError, | |
| "IndexError": builtins.IndexError, | |
| "AttributeError": builtins.AttributeError, | |
| "ZeroDivisionError": builtins.ZeroDivisionError, | |
| "StopIteration": builtins.StopIteration, | |
| "RuntimeError": builtins.RuntimeError, | |
| # Constants | |
| "None": None, | |
| "True": True, | |
| "False": False, | |
| "Ellipsis": builtins.Ellipsis, | |
| "NotImplemented": builtins.NotImplemented, | |
| # Restricted import | |
| "__import__": _safe_import, | |
| "__name__": "__sandbox__", | |
| "__doc__": None, | |
| } | |
| return safe_builtins | |
| def _create_sandbox_namespace(dataframes: List[Any]) -> Dict[str, Any]: | |
| """ | |
| Creates a sandboxed execution namespace with only safe operations. | |
| This implements the ClarityOps security model: | |
| - Memory-only execution (no file I/O) | |
| - No network access | |
| - No system calls | |
| - Only data analysis libraries available | |
| """ | |
| import numpy as np | |
| sandbox_builtins = _create_sandbox_builtins() | |
| namespace = { | |
| "__builtins__": sandbox_builtins, | |
| # Pre-loaded safe modules | |
| "dfs": dataframes, | |
| "pd": pd, | |
| "np": np, | |
| "re": re, | |
| "json": json, | |
| # Common pandas/numpy items for convenience | |
| "DataFrame": pd.DataFrame, | |
| "Series": pd.Series, | |
| "NaN": np.nan, | |
| "nan": np.nan, | |
| } | |
| return namespace | |
| def execute_in_sandbox(script: str, dataframes: List[Any]) -> str: | |
| """ | |
| Executes the analysis script in a sandboxed environment. | |
| Returns the captured stdout output. | |
| Raises: | |
| SandboxViolationError: If script attempts forbidden operations | |
| Exception: For other execution errors | |
| """ | |
| # Pre-execution safety checks on the script text | |
| forbidden_patterns = [ | |
| (r'\bopen\s*\(', "File operations (open) are not allowed"), | |
| (r'\bos\s*\.', "OS module access is not allowed"), | |
| (r'\bsys\s*\.', "Sys module access is not allowed"), | |
| (r'\bsubprocess', "Subprocess module is not allowed"), | |
| (r'\bsocket\s*\.', "Network operations are not allowed"), | |
| (r'\burllib', "Network operations are not allowed"), | |
| (r'\brequests\s*\.', "Network operations are not allowed"), | |
| (r'\bhttp\s*\.', "Network operations are not allowed"), | |
| (r'\beval\s*\(', "eval() is not allowed"), | |
| (r'\bexec\s*\(', "exec() is not allowed"), | |
| (r'\bcompile\s*\(', "compile() is not allowed"), | |
| (r'\b__import__\s*\(', "Direct __import__ calls are not allowed"), | |
| (r'\bimportlib', "importlib is not allowed"), | |
| (r'\bpickle', "pickle module is not allowed"), | |
| (r'\bshutil', "shutil module is not allowed"), | |
| (r'\bglobals\s*\(\s*\)', "globals() access is restricted"), | |
| (r'\.to_csv\s*\(', "Writing files (to_csv) is not allowed"), | |
| (r'\.to_excel\s*\(', "Writing files (to_excel) is not allowed"), | |
| (r'\.to_parquet\s*\(', "Writing files (to_parquet) is not allowed"), | |
| (r'\.to_sql\s*\(', "Database operations (to_sql) are not allowed"), | |
| (r'pd\.read_', "Reading files is not allowed - use the provided dfs variable"), | |
| ] | |
| for pattern, message in forbidden_patterns: | |
| if re.search(pattern, script): | |
| raise SandboxViolationError(f"Security violation: {message}") | |
| # Create sandboxed namespace | |
| namespace = _create_sandbox_namespace(dataframes) | |
| # Capture stdout | |
| output_buffer = io.StringIO() | |
| try: | |
| with redirect_stdout(output_buffer): | |
| exec(script, namespace, namespace) | |
| return output_buffer.getvalue() | |
| except SandboxViolationError: | |
| raise | |
| except Exception as e: | |
| # Re-raise with context but don't expose internal details | |
| raise RuntimeError(f"Script execution error: {type(e).__name__}: {e}") | |
| # ---------------------- Analysis Script Generation ---------------------- | |
| def _create_python_script(user_scenario: str, schema_context: str) -> str: | |
| EXPERT_ANALYTICAL_GUIDELINES = """ | |
| --- EXPERT ANALYTICAL GUIDELINES --- | |
| When writing your script, you MUST follow these expert analytical principles: | |
| **DATA INTEGRATION & LINKING:** | |
| 1. When linking datasets, identify the correct join keys by examining column names and values. Never assume column names match across datasets. | |
| 2. If a required column doesn't exist in a dataset, derive it from related data or clearly note its absence in the output. | |
| 3. Use the most recent/relevant time period data when multiple periods exist (e.g., prefer 2021 over 2013 census data if both available). | |
| **AGGREGATION & GROUPING:** | |
| 4. When asked about "specialties," "categories," or "types," group by the broadest categorical column first (e.g., 'Specialty' not 'Procedure'). | |
| 5. When asked about specific items, use the most granular level (e.g., specific procedures, individual facilities). | |
| 6. Always verify the appropriate level of aggregation matches the user's question. | |
| **PRIORITIZATION & RANKING:** | |
| 7. To prioritize locations/facilities, create a composite risk score combining: (a) population/volume, (b) relevant health indicators, and (c) recency of data. | |
| 8. When ranking, consider both absolute values AND relative performance against benchmarks (provincial/national averages). | |
| 9. Include sample sizes/record counts alongside rankings to indicate statistical reliability. | |
| **CALCULATIONS & ESTIMATES:** | |
| 10. For time-based capacity calculations, use standard assumptions: 60 working days per 3-month period, 5 days/week, unless data specifies otherwise. | |
| 11. For cost calculations, always separate and sum component costs (startup + ongoing + variable) before multiplying by volume. | |
| 12. When extracting numeric values from text fields, use robust parsing: strip currency symbols, handle ranges (take midpoint), convert percentages. | |
| **UNITS & VALIDATION:** | |
| 13. Preserve and label units correctly: percentages (%), currency (CAD/USD), time (days/weeks), clinical measures (mmHg for BP, % for A1c, kg/m² for BMI). | |
| 14. Validate calculated values against reasonable ranges (e.g., A1c typically 4-14%, BP typically 60-200 mmHg). | |
| 15. Flag outliers or unexpected values in the output for human review. | |
| **OUTPUT COMPLETENESS:** | |
| 16. For each evaluation question, ensure the JSON output contains all data needed to answer it fully. | |
| 17. Include both raw values AND calculated metrics (averages, percentages, rankings). | |
| 18. When comparing to benchmarks, include both the benchmark value and the comparison result. | |
| """ | |
| prompt_for_coder = f"""\ | |
| You are an expert Python data scientist. Your job is to write a script to extract the data needed to answer the user's request. | |
| You have dataframes in a list `dfs`. | |
| {EXPERT_ANALYTICAL_GUIDELINES} | |
| --- DATA SCHEMA --- | |
| {schema_context} | |
| --- END DATA SCHEMA --- | |
| CRITICAL RULES: | |
| 1. **DO NOT READ FILES:** You MUST NOT include `pd.read_csv`. The data is ALREADY loaded in the `dfs` variable. You MUST use this variable. Failure to do so will cause a fatal error. | |
| 2. **JSON OUTPUT ONLY:** Your script's ONLY output must be a single JSON object printed to stdout containing the raw data findings. | |
| 3. **BE PRECISE:** Use the exact, case-sensitive column names from the schema and robustly clean strings (`re.sub()`) before converting to numbers. | |
| 4. **JSON SERIALIZATION:** Before adding data to your final dictionary for JSON conversion, you MUST convert any pandas-specific types (like `int64`) to standard Python types using `.item()` for single values or `.tolist()` for lists. | |
| 5. **SINGLE JSON OUTPUT:** Print exactly ONE JSON object at the end of your script. Do not print debug statements or multiple JSON objects. | |
| 6. **VALID JSON STRUCTURE:** The output MUST be a dictionary/object, not an array or primitive value. | |
| 7. **SAFE DATA JOINING:** When joining/merging dataframes or looking up values across dataframes, ALWAYS check if matches exist before accessing with `.iloc[0]`. Use `.merge()` with `how='left'` or check `len(filtered_df) > 0` before accessing rows. Never assume keys will match exactly between dataframes. | |
| --- USER'S SCENARIO --- | |
| {user_scenario} | |
| --- PYTHON SCRIPT --- | |
| Now, write the complete Python script that performs the analysis and prints a single, serializable JSON object. | |
| ```python | |
| """ | |
| generated_text = cohere_chat(prompt_for_coder) | |
| match = re2.search(r"```python\n(.*?)```", generated_text, re2.DOTALL) | |
| if match: | |
| return match.group(1).strip() | |
| return "print(json.dumps({'error': 'Failed to generate a valid Python script.'}))" | |
| def _generate_long_report(prompt: str) -> str: | |
| try: | |
| client = _co_client() | |
| if not client: | |
| return "Error: Cohere client not initialized." | |
| response = client.chat( | |
| model=COHERE_MODEL_PRIMARY, | |
| message=prompt, | |
| max_tokens=4096, | |
| ) | |
| return response.text | |
| except Exception as e: | |
| safe_log("cohere_chat_error", {"err": str(e)}) | |
| return f"Error during final report generation: {e}" | |
| def _generate_final_report(user_scenario: str, validated_json_str: str) -> str: | |
| prompt_for_writer = f"""\ | |
| You are an expert healthcare management consultant and data analyst. | |
| A data science script has run to extract key findings. You have the user's original request and the validated JSON data. | |
| Your task is to synthesize these validated findings into a single, comprehensive, and professional report that directly answers all of the user's questions with detailed justifications. | |
| --- USER'S ORIGINAL SCENARIO & DELIVERABLES --- | |
| {user_scenario} | |
| --- END SCENARIO --- | |
| --- VALIDATED DATA FINDINGS (JSON) --- | |
| {validated_json_str} | |
| --- END VALIDATED DATA --- | |
| --- ANALYTICAL INTERPRETATION GUIDELINES --- | |
| When writing your report, follow these principles: | |
| **ACCURACY & UNITS:** | |
| - Report numerical values with appropriate precision (1-2 decimal places for percentages, whole numbers for counts). | |
| - Always include correct units: % for percentages, days for wait times, $ for costs, mmHg for blood pressure, % for A1c, kg/m² for BMI. | |
| - Verify that values make clinical/operational sense before reporting (e.g., A1c should be 4-14%, not measured in mmHg). | |
| **CONTEXT & BENCHMARKS:** | |
| - Compare findings against relevant benchmarks (provincial averages, national standards, historical baselines). | |
| - Explain what "good" vs "poor" performance means in context. | |
| - Quantify differences (e.g., "50 days above average" not just "higher than average"). | |
| **CAUSATION & INTERPRETATION:** | |
| - Distinguish correlation from causation; avoid overstating causal claims. | |
| - Consider confounding factors (case complexity, patient demographics, resource constraints). | |
| - Acknowledge data limitations and uncertainty. | |
| **RECOMMENDATIONS:** | |
| - Make recommendations specific, actionable, and tied directly to the data findings. | |
| - Prioritize recommendations by impact and feasibility. | |
| - Include implementation considerations (resources needed, timeline, risks). | |
| - Suggest metrics for monitoring success. | |
| **COMPLETENESS:** | |
| - Address EVERY evaluation question explicitly. | |
| - If data is insufficient to answer a question fully, state what's missing and provide the best available answer. | |
| - Cross-reference related findings to provide a coherent narrative. | |
| Now, write the final, polished report. The report MUST: | |
| 1. Follow the "Expected Output Format" requested by the user. | |
| 2. Use tables, bullet points, and DETAILED narrative justifications for each recommendation. | |
| 3. Synthesize the validated data into actionable insights. Do not just copy the raw numbers; interpret them. | |
| 4. Ensure you fully address ALL evaluation questions, especially the final recommendations. | |
| 5. Verify all units and values are clinically/operationally plausible before including them. | |
| """ | |
| return _generate_long_report(prompt_for_writer) | |
| def _append_msg(h: List[Dict[str, str]], r: str, c: str) -> List[Dict[str, str]]: | |
| return (h or []) + [{"role": r, "content": c}] | |
| def ping_cohere() -> str: | |
| try: | |
| cli = _co_client() | |
| if not cli: | |
| return "Cohere client not initialized." | |
| vecs = cohere_embed(["hello", "world"]) | |
| return f"Cohere OK ✅ (model={COHERE_MODEL_PRIMARY})" if vecs else "Cohere reachable." | |
| except Exception as e: | |
| return f"Cohere ping failed: {e}" | |
| def handle(user_msg: str, files: list, yield_update) -> str: | |
| try: | |
| safe_in, blocked_in, reason_in = safety_filter(user_msg, mode="input") | |
| if blocked_in: | |
| return refusal_reply(reason_in) | |
| redacted_in = safe_in | |
| if PHI_MODE and REDACT_BEFORE_LLM: | |
| redacted_in = redact_phi(safe_in) | |
| file_paths: List[str] = [getattr(f, "name", None) or f for f in (files or [])] | |
| if file_paths: | |
| dataframes, schema_parts, filenames = [], [], [] | |
| for i, p in enumerate(file_paths): | |
| if p.endswith(".csv"): | |
| try: | |
| df = pd.read_csv(p) | |
| except UnicodeDecodeError: | |
| df = pd.read_csv(p, encoding="latin1") | |
| dataframes.append(df) | |
| filenames.append(os.path.basename(p)) | |
| schema_parts.append( | |
| f"DataFrame `dfs[{i}]` (`{os.path.basename(p)}`):\n{df.head().to_markdown()}\n" | |
| ) | |
| if not dataframes: | |
| return "Please upload at least one CSV file." | |
| # Schema Validation - examines column names, data types, and value ranges | |
| yield_update("```\n🔎 Validating input schema...\n```") | |
| try: | |
| schema_infos = validate_all_dataframes(dataframes, filenames) | |
| except SchemaValidationError as e: | |
| safe_log("schema_validation_failed", {"error": str(e)}) | |
| return f"**Schema Validation Failed**\n\n{e}\n\nPlease fix the data issues and re-upload." | |
| # Start audit trail session | |
| import time as _time | |
| _start_time = _time.time() | |
| session_id = log_analysis_start(safe_in, filenames, schema_infos) | |
| schema_context = "\n".join(schema_parts) | |
| prompt_for_code = redacted_in if (PHI_MODE and not ALLOW_EXTERNAL_PHI) else safe_in | |
| yield_update("```\n🧠 Generating aligned analysis script...\n```") | |
| analysis_script = _create_python_script(prompt_for_code, schema_context) | |
| # Log generated code | |
| log_code_generation(session_id, analysis_script) | |
| yield_update("```\n⚙️ Executing script in sandbox...\n```") | |
| try: | |
| raw_data_output = execute_in_sandbox(analysis_script, dataframes) | |
| log_code_execution(session_id, success=True, output_size=len(raw_data_output)) | |
| except SandboxViolationError as e: | |
| log_code_execution(session_id, success=False, output_size=0, error=str(e)) | |
| log_analysis_error(session_id, "sandbox_violation", str(e)) | |
| safe_log("sandbox_violation", {"error": str(e)}) | |
| return ( | |
| f"**Security Violation Detected**\n\n{e}\n\n" | |
| f"The generated script attempted a forbidden operation. " | |
| f"Please rephrase your request.\n\n" | |
| f"Generated Script:\n```python\n{analysis_script}\n```" | |
| ) | |
| except Exception as e: | |
| log_code_execution(session_id, success=False, output_size=0, error=str(e)) | |
| log_analysis_error(session_id, "execution_error", str(e)) | |
| return ( | |
| f"An error occurred executing the script: {e}\n\nGenerated Script:\n" | |
| f"```python\n{analysis_script}\n```" | |
| ) | |
| # JSON Validation - creates hard boundary between calculation and communication | |
| yield_update("```\n🔍 Validating JSON output...\n```") | |
| try: | |
| validated_data = validate_json_output(raw_data_output) | |
| validated_json_str = format_validated_json_for_report(validated_data) | |
| safe_log("json_validation_passed", {"output_keys": list(validated_data.keys())}) | |
| except JSONValidationError as e: | |
| log_analysis_error(session_id, "json_validation_error", str(e)) | |
| safe_log("json_validation_failed", {"error": str(e)}) | |
| return ( | |
| f"**JSON Validation Failed**\n\n{e}\n\n" | |
| f"Generated Script:\n```python\n{analysis_script}\n```" | |
| ) | |
| yield_update("```\n✍️ Synthesizing final comprehensive report...\n```") | |
| writer_input = redacted_in if (PHI_MODE and not ALLOW_EXTERNAL_PHI) else safe_in | |
| final_report = _generate_final_report(writer_input, validated_json_str) | |
| # Log successful completion | |
| _end_time = _time.time() | |
| _duration_ms = (_end_time - _start_time) * 1000 | |
| log_analysis_complete( | |
| session_id, | |
| validated_output_keys=list(validated_data.keys()), | |
| report_length=len(final_report), | |
| total_duration_ms=_duration_ms | |
| ) | |
| # Append code traceability section | |
| # "Every finding traces back to specific lines of generated Python code" | |
| traceability_section = ( | |
| f"\n\n---\n\n" | |
| f"<details>\n" | |
| f"<summary>📜 <strong>View Analysis Code</strong> (click to expand)</summary>\n\n" | |
| f"The findings in this report were generated by the following Python code, " | |
| f"executed in a sandboxed environment:\n\n" | |
| f"```python\n{analysis_script}\n```\n\n" | |
| f"**Session ID:** `{session_id}`\n\n" | |
| f"</details>" | |
| ) | |
| return _sanitize_text(final_report) + traceability_section | |
| else: | |
| chat_input = redacted_in if (PHI_MODE and not ALLOW_EXTERNAL_PHI) else safe_in | |
| prompt = f"{GENERAL_CONVERSATION_PROMPT}\n\nUser: {chat_input}\nAssistant:" | |
| return _sanitize_text(cohere_chat(prompt) or "How can I help further?") | |
| except Exception as e: | |
| safe_log("app_error", {"err": str(e)}) | |
| return "A critical error occurred. Please contact your administrator." if PHI_MODE else f"A critical error occurred: {e}" | |
| PRIVACY_POLICY_TEXT = load_markdown_text("privacy_policy.md") | |
| TERMS_OF_SERVICE_TEXT = load_markdown_text("terms_of_service.md") | |
| # ---------------------- UI Assets ---------------------- | |
| SLEEK_CSS = """ | |
| :root, body, #root, .gradio-container { height: 100%; } | |
| .gradio-container { padding: 0 !important; } | |
| .block { padding: 0 !important; } | |
| .header { | |
| padding: 20px 28px; | |
| background: linear-gradient(135deg, #0e1726, #1d2a44 60%, #243a5e); | |
| color: #fff; | |
| display: flex; align-items: center; justify-content: space-between; | |
| gap: 16px; | |
| } | |
| .header h1 { margin: 0; font-size: 22px; letter-spacing: 0.3px; font-weight: 600; } | |
| .header .badge { font-size: 12px; opacity: 0.9; background:#ffffff22; padding:6px 10px; border-radius: 999px; } | |
| .main { | |
| display: grid; | |
| grid-template-columns: 420px 1fr; | |
| gap: 16px; | |
| padding: 16px; | |
| height: calc(100vh - 72px); | |
| box-sizing: border-box; | |
| } | |
| .left, .right { | |
| background: #0b1020; | |
| color: #e9edf3; | |
| border-radius: 16px; | |
| border: 1px solid #1c2642; | |
| } | |
| .left { padding: 16px; display: flex; flex-direction: column; gap: 12px; } | |
| .right { padding: 0; display: flex; flex-direction: column; } | |
| .panel-title { font-size: 14px; font-weight: 600; color: #aeb8cc; margin-bottom: 6px; } | |
| .helper { font-size: 12px; color: #97a3bb; margin-bottom: 8px; } | |
| .actions { | |
| display: flex; gap: 8px; align-items: center; justify-content: stretch; | |
| } | |
| .actions .gr-button { flex: 1; } | |
| .right .tabs { height: 100%; display: flex; flex-direction: column; } | |
| .right .tabitem { flex: 1; display: flex; flex-direction: column; overflow: hidden; } | |
| #chatbot_container { flex: 1; min-height: 600px; max-height: calc(100vh - 150px); overflow-y: auto; } | |
| #chatbot_container > * { min-height: 600px; } | |
| .hr { height: 1px; background: #16203b; margin: 10px 0; } | |
| .voice-hint { font-size: 12px; color:#9fb0cc; margin-top: 4px; } | |
| """ | |
| VOICE_STT_HTML = """ | |
| <script> | |
| let __rs_rec = null; | |
| function rs_toggle_stt(elemId){ | |
| const SpeechRecognition = window.SpeechRecognition || window.webkitSpeechRecognition; | |
| if (!SpeechRecognition){ | |
| alert("This browser does not support Speech Recognition. Try Chrome or Edge."); | |
| return; | |
| } | |
| if (__rs_rec){ __rs_rec.stop(); __rs_rec = null; return; } | |
| __rs_rec = new SpeechRecognition(); | |
| __rs_rec.lang = "en-US"; | |
| __rs_rec.interimResults = true; | |
| __rs_rec.continuous = true; | |
| const box = document.querySelector(`#${elemId} textarea`); | |
| if (!box){ alert("Prompt box not found."); return; } | |
| let base = box.value || ""; | |
| __rs_rec.onresult = (ev) => { | |
| let t = ""; | |
| for (let i = ev.resultIndex; i < ev.results.length; i++){ | |
| t += ev.results[i].transcript; | |
| } | |
| box.value = (base + " " + t).trim(); | |
| box.dispatchEvent(new Event("input", { bubbles: true })); | |
| }; | |
| __rs_rec.onend = () => { __rs_rec = null; }; | |
| __rs_rec.start(); | |
| } | |
| </script> | |
| """ | |
| # ---------------------- Gradio UI ---------------------- | |
| with gr.Blocks(theme=gr.themes.Soft(), css=SLEEK_CSS, fill_width=True) as demo: | |
| assessment_history = gr.State([]) | |
| with gr.Row(elem_classes=["header"]): | |
| gr.Markdown("<h1>Clarity Ops Augmented Decision Support</h1>") | |
| pill = "PHI Mode ON · history off" if (PHI_MODE and not PERSIST_HISTORY) else \ | |
| "PHI Mode ON" if PHI_MODE else "PHI Mode OFF" | |
| gr.Markdown(f"<span class='badge'>{pill}</span>") | |
| with gr.Row(elem_classes=["main"]): | |
| with gr.Column(elem_classes=["left"]): | |
| gr.Markdown("<div class='panel-title'>New Assessment</div>") | |
| gr.Markdown("<div class='helper'>Upload CSVs for analysis, or enter a prompt. Voice works in modern browsers.</div>") | |
| files_input = gr.Files( | |
| label="Upload Data Files (.csv)", | |
| file_count="multiple", | |
| type="filepath", | |
| file_types=[".csv"], | |
| ) | |
| prompt_input = gr.Textbox( | |
| label="Prompt", | |
| placeholder="Paste your scenario or question here...", | |
| lines=12, | |
| elem_id="prompt_box", | |
| autofocus=True, | |
| ) | |
| with gr.Row(elem_classes=["actions"]): | |
| send_btn = gr.Button("▶️ Run Analysis", variant="primary") | |
| clear_btn = gr.Button("🧹 Clear") | |
| voice_btn = gr.Button("🎙️ Voice") | |
| gr.Markdown("<div class='voice-hint'>Click Voice to start/stop dictation into the prompt box.</div>") | |
| ping_btn = gr.Button("🔌 Ping Cohere") | |
| ping_out = gr.Markdown() | |
| gr.Markdown("<div class='hr'></div>") | |
| if PHI_MODE: | |
| gr.Markdown( | |
| "⚠️ **PHI Mode:** History persistence is disabled by default. Avoid unnecessary identifiers." | |
| ) | |
| with gr.Accordion("Privacy & Terms", open=False): | |
| gr.Markdown(PRIVACY_POLICY_TEXT) | |
| gr.Markdown("<div class='hr'></div>") | |
| gr.Markdown(TERMS_OF_SERVICE_TEXT) | |
| with gr.Column(elem_classes=["right"]): | |
| with gr.Tabs(elem_classes=["tabs"]): | |
| with gr.TabItem("Current Assessment", id=0, elem_classes=["tabitem"]): | |
| with gr.Column(elem_id="chatbot_container"): | |
| chat_history_output = gr.Chatbot(label="Analysis Output", type="messages", container=False, autoscroll=True, height=700) | |
| with gr.TabItem("Assessment History", id=1, elem_classes=["tabitem"]): | |
| gr.Markdown("### Review Past Assessments") | |
| history_dropdown = gr.Dropdown(label="Select an assessment to review", choices=[]) | |
| history_display = gr.Markdown(label="Selected Assessment Details") | |
| gr.HTML(VOICE_STT_HTML) | |
| def run_analysis_wrapper(prompt, files, chat_history_list, history_state_list): | |
| if not prompt: | |
| gr.Warning("Please enter a prompt.") | |
| yield chat_history_list, history_state_list, gr.update() | |
| return | |
| chat_with_user_msg = _append_msg(chat_history_list, "user", prompt) | |
| def dummy_update(message: str): | |
| pass | |
| thinking_message = _append_msg( | |
| chat_with_user_msg, | |
| "assistant", | |
| "```\n🧠 Generating and executing analysis... Please wait.\n```", | |
| ) | |
| yield thinking_message, history_state_list, gr.update() | |
| ai_response_text = handle(prompt, files, dummy_update) | |
| final_chat = _append_msg(chat_with_user_msg, "assistant", ai_response_text) | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| file_names: List[str] = [] | |
| if files: | |
| file_names = [ | |
| os.path.basename(f.name if hasattr(f, "name") else f) for f in files | |
| ] | |
| new_entry = { | |
| "id": timestamp, | |
| "prompt": prompt, | |
| "files": file_names, | |
| "response": ai_response_text, | |
| "chat_history": final_chat, | |
| } | |
| if PERSIST_HISTORY and (not PHI_MODE or (PHI_MODE and HISTORY_TTL_DAYS > 0)): | |
| updated_history: List[Dict[str, Any]] = (history_state_list or []) + [new_entry] | |
| else: | |
| updated_history = history_state_list or [] | |
| history_labels = [f"{item['id']} - {item['prompt'][:40]}..." for item in updated_history] | |
| yield final_chat, updated_history, gr.update(choices=history_labels) | |
| def view_history(selection: str, history_state_list: List[Dict[str, Any]]) -> str: | |
| if not selection or not history_state_list: | |
| return "" | |
| try: | |
| selected_id = selection.split(" - ", 1)[0] | |
| except Exception: | |
| selected_id = selection | |
| selected_assessment = next( | |
| (item for item in history_state_list if item.get("id") == selected_id), None | |
| ) | |
| if not selected_assessment: | |
| return "Could not find the selected assessment." | |
| file_list = selected_assessment.get("files", []) | |
| file_list_md = "\n- ".join(file_list) if file_list else "*(no files uploaded)*" | |
| chat_entries = selected_assessment.get("chat_history", []) | |
| chat_md_lines = [] | |
| for msg in chat_entries: | |
| role = msg.get("role", "").capitalize() | |
| content = msg.get("content", "") | |
| chat_md_lines.append(f"**{role}:** {content}") | |
| chat_md = "\n\n".join(chat_md_lines) | |
| return f"""### Assessment from: {selected_assessment['id']} | |
| **Files Used:** | |
| - {file_list_md} | |
| --- | |
| **Original Prompt:** | |
| > {selected_assessment['prompt']} | |
| --- | |
| **AI Generated Response:** | |
| {selected_assessment['response']} | |
| --- | |
| **Chat Transcript:** | |
| {chat_md} | |
| """ | |
| send_btn.click( | |
| run_analysis_wrapper, | |
| inputs=[prompt_input, files_input, chat_history_output, assessment_history], | |
| outputs=[chat_history_output, assessment_history, history_dropdown], | |
| ) | |
| history_dropdown.change( | |
| view_history, | |
| inputs=[history_dropdown, assessment_history], | |
| outputs=[history_display], | |
| ) | |
| clear_btn.click( | |
| lambda: (None, None, []), | |
| outputs=[prompt_input, files_input, chat_history_output], | |
| ) | |
| ping_btn.click(ping_cohere, outputs=[ping_out]) | |
| voice_btn.click(None, [], [], js="rs_toggle_stt('prompt_box')") | |
| if __name__ == "__main__": | |
| if not os.getenv("COHERE_API_KEY"): | |
| print("🔴 COHERE_API_KEY environment variable not set. Application may not function correctly.") | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860"))) |