import os import sys import duckdb import json import subprocess from pathlib import Path from typing import Dict, List, Tuple import streamlit as st try: from dotenv import load_dotenv, find_dotenv load_dotenv(find_dotenv()) except Exception: pass # ---------- Basic page setup ---------- st.set_page_config(page_title="Excel → Dataset", page_icon="📊", layout="wide") PRIMARY_DIR = Path(__file__).parent.resolve() UPLOAD_DIR = PRIMARY_DIR / "uploads" DB_DIR = PRIMARY_DIR / "dbs" SCRIPT_PATH = PRIMARY_DIR / "source_to_duckdb.py" # must be colocated UPLOAD_DIR.mkdir(parents=True, exist_ok=True) DB_DIR.mkdir(parents=True, exist_ok=True) st.markdown( """ """, unsafe_allow_html=True, ) st.title("Data Analysis Agent") # --------- Session state helpers --------- if "processing" not in st.session_state: st.session_state.processing = False if "processed_key" not in st.session_state: st.session_state.processed_key = None if "last_overview_md" not in st.session_state: st.session_state.last_overview_md = None if "last_preview_items" not in st.session_state: st.session_state.last_preview_items = [] # list of dicts: {'table_ref': str, 'label': str} def _file_key(uploaded) -> str: # unique-ish key per upload (name + size) try: size = len(uploaded.getbuffer()) except Exception: size = 0 return f"{uploaded.name}:{size}" # ---------- DuckDB helpers ---------- def list_user_tables(con: duckdb.DuckDBPyConnection) -> List[str]: """ Robust discovery: 1) information_schema.tables for BASE TABLE/VIEW in any schema, excluding __* 2) duckdb_tables() as a secondary path 3) __excel_tables mapping + existence check as last resort """ # 1) information_schema (most portable) try: q = ( "SELECT table_schema, table_name " "FROM information_schema.tables " "WHERE table_type IN ('BASE TABLE','VIEW') " "AND table_name NOT LIKE '__%%' " "ORDER BY table_schema, table_name" ) rows = con.execute(q).fetchall() names = [] for schema, name in rows: if (schema or '').lower() == 'main': names.append(name) else: names.append(f'{schema}."{name}"') if names: return names except Exception: try: rows = con.execute( "SELECT sheet_name, table_name, inferred_title, original_title_text, block_index, start_row " "FROM __excel_tables ORDER BY block_index, start_row" ).fetchall() for sheet_name, table_name, inferred_title, original_title_text, block_index, start_row in rows: if table_name not in want_names: continue title = inferred_title or original_title_text or 'untitled' mapping[table_name] = {'sheet_name': sheet_name, 'title': title} except Exception: pass # 2) duckdb_tables() try: q2 = ( "SELECT schema_name, table_name " "FROM duckdb_tables() " "WHERE table_type = 'BASE TABLE' " "AND table_name NOT LIKE '__%%' " "ORDER BY schema_name, table_name" ) rows = con.execute(q2).fetchall() names = [] for schema, name in rows: if (schema or '').lower() == 'main': names.append(name) else: names.append(f'{schema}."{name}"') if names: return names except Exception: try: rows = con.execute( "SELECT sheet_name, table_name, inferred_title, original_title_text, block_index, start_row " "FROM __excel_tables ORDER BY block_index, start_row" ).fetchall() for sheet_name, table_name, inferred_title, original_title_text, block_index, start_row in rows: if table_name not in want_names: continue title = inferred_title or original_title_text or 'untitled' mapping[table_name] = {'sheet_name': sheet_name, 'title': title} except Exception: pass # 3) Fallback to metadata table try: meta = con.execute("SELECT DISTINCT table_name FROM __file_tables").fetchall() names = [] for (t,) in meta: try: con.execute(f'SELECT 1 FROM "{t}" LIMIT 1').fetchone() names.append(t) except Exception: continue return names except Exception: # Fallback to legacy excel metadata table if unified not present try: meta = con.execute("SELECT DISTINCT table_name FROM __excel_tables").fetchall() names = [] for (t,) in meta: try: con.execute(f'SELECT 1 FROM "{t}" LIMIT 1').fetchone() names.append(t) except Exception: continue return names except Exception: return [] def get_columns(con: duckdb.DuckDBPyConnection, table: str) -> List[Tuple[str,str]]: # Normalize table name for information_schema lookup if table.lower().startswith("main."): tname = table.split('.', 1)[1].strip('"') schema_filter = 'main' elif '.' in table: schema_filter, tname_raw = table.split('.', 1) tname = tname_raw.strip('"') else: schema_filter = 'main' tname = table.strip('"') q = ( "SELECT column_name, data_type " "FROM information_schema.columns " "WHERE table_schema=? AND table_name=? " "ORDER BY ordinal_position" ) return con.execute(q, [schema_filter, tname]).fetchall() def detect_year_column(con, table: str, col: str) -> bool: try: sql = ( f'SELECT AVG(CASE WHEN TRY_CAST("{col}" AS INTEGER) BETWEEN 1900 AND 2100 ' f'THEN 1.0 ELSE 0.0 END) FROM {table}' ) v = con.execute(sql).fetchone()[0] return (v or 0) > 0.7 except Exception: return False def role_of_column(con, table: str, col: str, dtype: str) -> str: d = (dtype or '').upper() if any(tok in d for tok in ["DATE", "TIMESTAMP"]): return "date" if any(tok in d for tok in ["INT", "BIGINT", "DOUBLE", "DECIMAL", "FLOAT", "HUGEINT", "REAL"]): if detect_year_column(con, table, col): return "year" return "numeric" if any(tok in d for tok in ["CHAR", "STRING", "TEXT", "VARCHAR"]): try: sql = f'SELECT COUNT(*), COUNT(DISTINCT "{col}") FROM {table}' n, nd = con.execute(sql).fetchone() if n and nd is not None: ratio = (nd / n) if n else 0 if ratio > 0.95: return "id_like" if 0.01 <= ratio <= 0.35: return "category" if ratio < 0.01: return "binary_flag" except Exception: pass return "text" return "other" def quick_table_profile(con: duckdb.DuckDBPyConnection, table: str) -> Dict: rows = con.execute(f'SELECT COUNT(*) FROM {table}').fetchone()[0] cols = get_columns(con, table) roles = {"category": [], "numeric": [], "date": [], "year": [], "id_like": [], "text": [], "binary": [], "other": []} for c, d in cols: r = role_of_column(con, table, c, d) if r == "binary_flag": roles["binary"].append(c) else: roles.setdefault(r, []).append(c) return { "rows": int(rows or 0), "n_cols": len(cols), "n_cat": len(roles["category"]), "n_num": len(roles["numeric"]), "n_time": len(roles["year"]) + len(roles["date"]), } def table_mapping(con: duckdb.DuckDBPyConnection, user_tables: List[str]) -> Dict[str, Dict]: """ Map db_table (normalized) -> {sheet_name, title} using __excel_tables if present. """ normalize = lambda t: t.split('.', 1)[1].strip('"') if '.' in t else t.strip('"') want_names = {normalize(t) for t in user_tables} mapping: Dict[str, Dict] = {} try: rows = con.execute( "SELECT sheet_name, table_name, inferred_title, original_title_text, block_index, start_row " "FROM __file_tables ORDER BY block_index, start_row" ).fetchall() for sheet_name, table_name, inferred_title, original_title_text, block_index, start_row in rows: if table_name not in want_names: continue title = inferred_title or original_title_text or 'untitled' mapping[table_name] = {'sheet_name': sheet_name, 'title': title} except Exception: try: rows = con.execute( "SELECT sheet_name, table_name, inferred_title, original_title_text, block_index, start_row " "FROM __excel_tables ORDER BY block_index, start_row" ).fetchall() for sheet_name, table_name, inferred_title, original_title_text, block_index, start_row in rows: if table_name not in want_names: continue title = inferred_title or original_title_text or 'untitled' mapping[table_name] = {'sheet_name': sheet_name, 'title': title} except Exception: pass return mapping def excel_schema_samples(con: duckdb.DuckDBPyConnection, mapping: Dict[str, Dict], max_cols: int = 8) -> Dict[str, List[str]]: """ Return up to max_cols original column names per table_name (normalized) for LLM hints. """ samples: Dict[str, List[str]] = {} try: rows = con.execute("SELECT sheet_name, table_name, column_ordinal, original_name FROM __file_schema ORDER BY sheet_name, table_name, column_ordinal").fetchall() for sheet_name, table_name, ordn, orig in rows: if table_name not in mapping: continue lst = samples.setdefault(table_name, []) if orig and len(lst) < max_cols: lst.append(str(orig)) except Exception: try: rows = con.execute( "SELECT sheet_name, table_name, inferred_title, original_title_text, block_index, start_row " "FROM __excel_tables ORDER BY block_index, start_row" ).fetchall() for sheet_name, table_name, inferred_title, original_title_text, block_index, start_row in rows: if table_name not in want_names: continue title = inferred_title or original_title_text or 'untitled' mapping[table_name] = {'sheet_name': sheet_name, 'title': title} except Exception: pass return samples # ---------- OpenAI ---------- def ai_overview_from_context(context: Dict) -> str: api_key = os.environ.get("OPENAI_API_KEY") or st.secrets.get("OPENAI_API_KEY", None) if not api_key: raise RuntimeError("OPENAI_API_KEY is not set. Please add it to .env or Streamlit secrets.") try: from openai import OpenAI client = OpenAI(api_key=api_key) model = os.environ.get("OPENAI_MODEL", "gpt-4o-mini") except Exception as e: raise RuntimeError("OpenAI client not available. Install 'openai' >= 1.0 and try again.") from e prompt = f''' Start directly (no greeting). Write a concise, conversational overview (max two short paragraphs) of the dataset created from the uploaded Excel. Requirements: - Do NOT mention database engines, schemas, or technical column/table names. - For each segment, reference it as: Sheet "" — Table "" (use "untitled" if missing). - Use any provided original Excel header hints ONLY to infer friendlier human concepts; do not quote them verbatim. - After the overview, list 6–8 simple questions a user could ask in natural language. - Output Markdown with headings: "Overview" and "Try These Questions". Context (JSON): {json.dumps(context, ensure_ascii=False, indent=2)} ''' resp = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], temperature=0.4, ) return resp.choices[0].message.content.strip() # ---------- Orchestration ---------- def run_ingestion_pipeline(file_path: Path, db_path: Path, log_placeholder): # Combined log function log_lines: List[str] = [] def _append(line: str): log_lines.append(line) log_placeholder.markdown( f"<div class='logbox'><code>{'</code><br/><code>'.join(map(str, log_lines[-400:]))}</code></div>", unsafe_allow_html=True, ) # 1) Save (already saved by caller, but we log here for a single place) _append("[app] Saving file…") _append("[app] Saved.") if not SCRIPT_PATH.exists(): _append("[app] ERROR: ingestion component not found next to the app.") raise FileNotFoundError("Required ingestion component not found.") # 2) Ingest _append("[app] Ingesting…") env = os.environ.copy() env["PYTHONIOENCODING"] = "utf-8" cmd = [sys.executable, str(SCRIPT_PATH), "--file", str(file_path), "--duckdb", str(db_path)] try: proc = subprocess.Popen( cmd, cwd=str(PRIMARY_DIR), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, env=env ) except Exception as e: _append(f"[app] ERROR: failed to start ingestion: {e}") raise for line in iter(proc.stdout.readline, ""): _append(line.rstrip("\n")) proc.wait() if proc.returncode != 0: _append("[app] ERROR: ingestion reported a non-zero exit code.") raise RuntimeError("Ingestion failed. See logs.") _append("[app] Ingestion complete.") # 3) Open dataset _append("[app] Opening dataset…") con = duckdb.connect(str(db_path)) _append("[app] Dataset open.") return con, _append def analyze_and_summarize(con: duckdb.DuckDBPyConnection): user_tables = list_user_tables(con) preview_items = [] # list of {'table_ref': t, 'label': label} for UI if not user_tables: # Try to provide metadata if no tables are found try: meta_df = con.execute("SELECT * FROM __excel_tables").fetchdf() st.warning("No user tables were discovered. Showing ingestion metadata for reference.") st.dataframe(meta_df, use_container_width=True, hide_index=True) except Exception: st.error("No user tables were discovered and no metadata table is available.") return "", [] # Build mapping + schema hints mapping = table_mapping(con, user_tables) # normalized table_name -> {sheet_name, title} schema_hints = excel_schema_samples(con, mapping, max_cols=8) # Build compact profiling context for LLM; avoid raw db table names per_table = [] for idx, t in enumerate(user_tables, start=1): prof = quick_table_profile(con, t) norm = t.split('.', 1)[1].strip('"') if '.' in t else t.strip('"') m = mapping.get(norm, {}) sheet = m.get('sheet_name') title = m.get('title') per_table.append({ "idx": idx, "sheet_name": sheet, "title": title or "untitled", "rows": prof["rows"], "n_cols": prof["n_cols"], "category_fields": prof["n_cat"], "numeric_measures": prof["n_num"], "time_fields": prof["n_time"], "example_original_headers": schema_hints.get(norm, []) }) label = f'Sheet "{sheet}" — Table "{title or "untitled"}"' if sheet else f'Table "{title or "untitled"}"' preview_items.append({'table_ref': t, 'label': label}) context = { "segments": per_table } # Generate overview (LLM only) overview_md = ai_overview_from_context(context) return overview_md, preview_items # ---------- UI flow ---------- file = st.file_uploader("Upload an Excel or CSV file", type=["xlsx", "csv"]) if file is None and not st.session_state.last_overview_md: st.info("Upload a .xlsx or .csv file to begin.") # Only show logs AFTER there is an upload or some result to show logs_placeholder = None if file is not None or st.session_state.processing or st.session_state.last_overview_md: logs_exp = st.expander("Processing logs", expanded=False) logs_placeholder = logs_exp.empty() if file is not None: key = _file_key(file) stem = Path(file.name).stem saved_file = UPLOAD_DIR / file.name db_path = DB_DIR / f"{stem}.duckdb" # --- CLEAR state immediately on new upload --- if st.session_state.get("processed_key") != key: st.session_state["last_overview_md"] = None st.session_state["last_preview_items"] = [] st.session_state["chat_history"] = [] st.session_state["schema_text"] = None st.session_state["db_path"] = None # Optional: clear any previous logs shown in UI on rerun # (no explicit log buffer stored; the log expander will refresh) # Auto-start ingestion exactly once per unique upload if (st.session_state.processed_key != key) and (not st.session_state.processing): st.session_state.processing = True if logs_placeholder is None: logs_exp = st.expander("Ingestion logs", expanded=False) logs_placeholder = logs_exp.empty() # Save uploaded file with open(saved_file, "wb") as f: f.write(file.getbuffer()) try: con, app_log = run_ingestion_pipeline(saved_file, db_path, logs_placeholder) # Analyze + overview app_log("[app] Analyzing data…") overview_md, preview_items = analyze_and_summarize(con) app_log("[app] Overview complete.") con.close() st.session_state.last_overview_md = overview_md st.session_state.last_preview_items = preview_items st.session_state.processed_key = key st.session_state.processing = False except Exception as e: st.session_state.processing = False st.error(f"Ingestion failed. See logs for details. Error: {e}") # Display results if available (and avoid re-triggering ingestion) if st.session_state.last_overview_md: #st.subheader("Overview") st.markdown(st.session_state.last_overview_md) with st.expander("Quick preview (verification only)", expanded=False): try: # Reconnect to current dataset path (if present) if file is not None: stem = Path(file.name).stem db_path = DB_DIR / f"{stem}.duckdb" con = duckdb.connect(str(db_path)) for item in st.session_state.last_preview_items: t = item['table_ref'] label = item['label'] df = con.execute(f"SELECT * FROM {t} LIMIT 50").df() st.caption(f"Preview — {label}") st.dataframe(df, use_container_width=True, hide_index=True) con.close() except Exception as e: st.warning(f"Could not preview tables: {e}") # ===================== # Chat with your dataset # (Appends after overview & preview; leaves earlier logic untouched) # ===================== if st.session_state.get("last_overview_md"): st.divider() st.subheader("Chat with your dataset") # Lazy imports so nothing changes before preview completes def _lazy_imports(): from duckdb_react_agent import get_schema_summary, make_llm, answer_question # noqa: F401 return get_schema_summary, make_llm, answer_question # Initialize chat memory st.session_state.setdefault("chat_history", []) # [{role, content, sql?, plot_path?}] # --- 1) Take input first so the user's question appears immediately --- user_q = st.chat_input("Ask a question about the dataset…") if user_q: st.session_state.chat_history.append({"role": "user", "content": user_q}) # --- 2) Render history in strict User → Assistant order --- for msg in st.session_state.chat_history: with st.chat_message("user" if msg["role"] == "user" else "assistant"): # Always: text first st.markdown(msg["content"]) # Then: optional plot (below the answer) plot_path_hist = msg.get("plot_path") if plot_path_hist: if not os.path.isabs(plot_path_hist): plot_path_hist = str((PRIMARY_DIR / plot_path_hist).resolve()) if os.path.exists(plot_path_hist): st.image(plot_path_hist, caption="Chart", width=520) # Finally: optional SQL expander if msg.get("sql"): with st.expander("View generated SQL", expanded=False): st.markdown(f"<div class='sqlbox'>{msg['sql']}</div>", unsafe_allow_html=True) # --- 3) If a new question arrived, stream the assistant answer now --- if user_q: with st.chat_message("assistant"): # Placeholders in sequence: text → plot → SQL stream_placeholder = st.empty() plot_placeholder = st.empty() sql_placeholder = st.empty() # Show pending immediately stream_placeholder.markdown("_Answer pending…_") partial_chunks = [] def on_token(t: str): partial_chunks.append(t) stream_placeholder.markdown("".join(partial_chunks)) # Resolve DB path if 'db_path' in locals(): _db_path = db_path # from the preview scope if defined else: if 'file' in locals() and file is not None: _stem = Path(file.name).stem _db_path = DB_DIR / f"{_stem}.duckdb" else: _candidates = sorted(DB_DIR.glob("*.duckdb"), key=lambda p: p.stat().st_mtime, reverse=True) _db_path = _candidates[0] if _candidates else None if not _db_path or not Path(_db_path).exists(): stream_placeholder.error("No dataset found. Please re-upload the file in this session.") else: # Call agent lazily get_schema_summary, make_llm, answer_question = _lazy_imports() try: try: con2 = duckdb.connect(str(_db_path), read_only=True) except Exception: con2 = duckdb.connect(str(_db_path)) schema_text = get_schema_summary(con2, allowed_schemas=["main"]) llm = make_llm(model=os.environ.get("OPENAI_MODEL", "gpt-4o-mini"), temperature=0.0) import inspect as _inspect _sig = None try: _sig = _inspect.signature(answer_question) except Exception: _sig = None def _call_answer(): try: if _sig and "history" in _sig.parameters and "token_callback" in _sig.parameters and "stream" in _sig.parameters: return answer_question(con2, llm, schema_text, user_q, stream=True, token_callback=on_token, history=st.session_state.chat_history) elif _sig and "token_callback" in _sig.parameters and "stream" in _sig.parameters: return answer_question(con2, llm, schema_text, user_q, stream=True, token_callback=on_token) else: return answer_question(con2, llm, schema_text, user_q) except TypeError: try: return answer_question(con2, llm, schema_text, user_q, stream=True, token_callback=on_token) except TypeError: return answer_question(con2, llm, schema_text, user_q) result = _call_answer() con2.close() # Finalize text answer_text = result.get("answer") or "".join(partial_chunks) or "*No answer produced.*" stream_placeholder.markdown(answer_text) # Show plot next (slightly larger, below the text) plot_path = result.get("plot_path") if plot_path: if not os.path.isabs(plot_path): plot_path = str((PRIMARY_DIR / plot_path).resolve()) if os.path.exists(plot_path): plot_placeholder.image(plot_path, caption="Chart", width=560) # Finally show SQL gen_sql = (result.get("sql") or "").strip() if gen_sql: with st.expander("View generated SQL", expanded=False): st.markdown(f"<div class='sqlbox'>{gen_sql}</div>", unsafe_allow_html=True) # Persist assistant message st.session_state.chat_history.append({ "role": "assistant", "content": answer_text, "sql": gen_sql, "plot_path": result.get("plot_path") }) finally: try: con2.close() except Exception: pass