""" MOD-OSINT Streamlit GUI Wizard Wired to engine.pipeline_orchestrator.run_pipeline() Stages: A — Upload / Input selection B — Settings C — Run pipeline D — Browse / Export results Import safety: This module avoids importing Streamlit at module load time so CI/tests can import it without ScriptRunContext warnings. """ from __future__ import annotations import sqlite3 import tempfile from pathlib import Path import pandas as pd _DEMO_DIR = Path("samples/demo_ingest") def _load_yaml_defaults(path: Path) -> dict: try: import yaml # optional; provided by requirements-hf.txt return yaml.safe_load(path.read_text()) or {} except Exception: return {} def _write_uploads(uploads) -> Path: """Save uploaded files into a temp dir and return the dir path.""" tmp = Path(tempfile.mkdtemp(prefix="modosint_")) updir = tmp / "uploads" updir.mkdir(parents=True, exist_ok=True) for file_obj in uploads: (updir / file_obj.name).write_bytes(file_obj.getbuffer()) return updir def _resolve_input(session_state) -> Path | None: """Determine input from session state (uploads > local path > demo).""" uploads = session_state.get("_uploads") if uploads: return _write_uploads(uploads) local_path = session_state.get("_local_path", "").strip() if local_path: path_obj = Path(local_path).expanduser() if path_obj.exists(): return path_obj if session_state.get("_use_demo") and _DEMO_DIR.exists(): return _DEMO_DIR return None def main() -> None: """Entrypoint for `streamlit run gui/streamlit_app.py`.""" import streamlit as st import streamlit.components.v1 as components from engine.pipeline_orchestrator import run_pipeline from gui.terminal_panel import render_terminal st.set_page_config( page_title="MOD-OSINT", page_icon="🧠", layout="wide", initial_sidebar_state="expanded", ) st.title("🧠 MOD-OSINT") st.caption("GUI wizard -> `engine.pipeline_orchestrator.run_pipeline()`") if "effective_config" not in st.session_state: st.session_state["effective_config"] = {} if "last_run_id" not in st.session_state: st.session_state["last_run_id"] = None if "last_run_dir" not in st.session_state: st.session_state["last_run_dir"] = None with st.sidebar: render_terminal({"effective_config": st.session_state["effective_config"]}) tab_upload, tab_settings, tab_run, tab_browse = st.tabs( ["📂 Upload", "⚙️ Settings", "▶️ Run", "📊 Browse"] ) with tab_upload: st.subheader("A) Upload or select input") uploads = st.file_uploader( "Upload files (CSV, JSON, TXT, HTML, LOG)", accept_multiple_files=True, key="_uploads", ) if uploads: st.success(f"Queued {len(uploads)} file(s): {[u.name for u in uploads]}") st.divider() local_path = st.text_input( "Or enter a local directory / file path", value="", key="_local_path", placeholder="/path/to/data/", ) st.divider() st.checkbox( f"Use built-in demo dataset (`{_DEMO_DIR}`)", value=not bool(uploads) and not bool(local_path), key="_use_demo", disabled=not _DEMO_DIR.exists(), help="Runs the pipeline against samples/demo_ingest/ for quick smoke testing.", ) if _DEMO_DIR.exists(): demo_files = sorted(_DEMO_DIR.iterdir()) st.caption(f"Demo files: {[f.name for f in demo_files if f.is_file()]}") else: st.caption("`samples/demo_ingest/` not found in working directory.") with tab_settings: st.subheader("B) Pipeline settings") cfg_path = Path("pipeline_config.yaml") defaults = _load_yaml_defaults(cfg_path) if cfg_path.exists() else {} col_left, col_right = st.columns(2) with col_left: offline_mode = st.toggle( "offline_mode", value=True, help="Disable all outbound network calls.", ) enable_ml = st.toggle( "enable_ml_analysis", value=False, help="Enable ML/NLP stage (requires torch; off by default).", ) with col_right: correlation_mode = st.selectbox( "correlation_mode", ["basic", "in-memory"], index=0, help="basic = simple entity matching; in-memory = graph in RAM.", ) effective_config: dict = defaults.copy() effective_config.setdefault("runtime", {}) effective_config["runtime"].update( { "offline_mode": offline_mode, "enable_ml_analysis": enable_ml, "correlation_mode": correlation_mode, } ) st.session_state["effective_config"] = effective_config st.markdown("**Effective config (passed to engine):**") st.json(effective_config) with tab_run: st.subheader("C) Run pipeline") st.caption("Outputs are written to `runs//`.") input_path = _resolve_input(st.session_state) if input_path: st.info(f"Input resolved -> `{input_path}`") else: st.warning("No input selected. Go to Upload tab or enable demo dataset.") run_btn = st.button("🚀 Run pipeline now", type="primary", disabled=input_path is None) if run_btn and input_path: progress = st.progress(0, text="Starting...") log_area = st.empty() log_lines: list[str] = [] def _log(message: str) -> None: log_lines.append(message) log_area.code("\n".join(log_lines[-40:]), language="bash") _log(f"Input: {input_path}") _log("Calling engine.pipeline_orchestrator.run_pipeline()...") progress.progress(10, text="Normalizing files...") try: ctx = run_pipeline( input_path=input_path, config=st.session_state["effective_config"], ) st.session_state["last_run_id"] = ctx.run_id st.session_state["last_run_dir"] = str(ctx.run_dir) progress.progress(90, text="Generating report...") _log(f"Run ID: {ctx.run_id}") _log(f"Run dir: {ctx.run_dir}") if ctx.stage_results: for stage_name, stage_out in ctx.stage_results.items(): _log(f" [{stage_out.status.value.upper():8s}] {stage_name}") progress.progress(100, text="Done") st.success(f"Pipeline complete - run `{ctx.run_id}`") st.code(str(ctx.run_dir)) st.info("Switch to Browse tab to explore outputs.") except Exception as exc: progress.empty() st.error(f"Pipeline failed: {exc}") _log(f"ERROR: {exc}") with tab_browse: st.subheader("D) Browse results") run_dir_str = st.session_state.get("last_run_dir") if not run_dir_str: st.info("Run the pipeline first (Stage C).") return run_dir = Path(run_dir_str) report_html = run_dir / "report" / "index.html" db_path = run_dir / "db.sqlite" exports_dir = run_dir / "exports" manifest_path = run_dir / "manifest.json" col1, col2, col3, col4 = st.columns(4) col1.metric("Run ID", st.session_state.get("last_run_id", "-")) col2.metric("Report", "yes" if report_html.exists() else "no") col3.metric("DB", "yes" if db_path.exists() else "no") col4.metric("Exports", str(len(list(exports_dir.rglob("*"))) if exports_dir.exists() else 0)) if manifest_path.exists(): with st.expander("Run manifest"): import json st.json(json.loads(manifest_path.read_text())) st.divider() st.markdown("### HTML Report") if report_html.exists(): st.markdown(f"`{report_html}`") try: components.html(report_html.read_text(errors="replace"), height=700, scrolling=True) except Exception as exc: st.warning(f"Inline render failed ({exc}). Open the path above in a browser.") with open(report_html, "rb") as file_handle: st.download_button( "Download report/index.html", data=file_handle, file_name="index.html", mime="text/html", ) else: st.info("No report/index.html yet.") st.divider() st.markdown("### Exports") if exports_dir.exists(): export_files = sorted([path for path in exports_dir.rglob("*") if path.is_file()]) if export_files: for export_file in export_files: rel = export_file.relative_to(run_dir).as_posix() col_path, col_download = st.columns([3, 1]) col_path.write(f"`{rel}`") with open(export_file, "rb") as file_handle: col_download.download_button( "Download", data=file_handle, file_name=export_file.name, key=f"dl_{rel}", ) else: st.info("Exports directory is empty.") else: st.info("No exports/ directory found.") jsonl_path = run_dir / "normalized.jsonl" if jsonl_path.exists(): with open(jsonl_path, "rb") as file_handle: st.download_button( "Download normalized.jsonl", data=file_handle, file_name="normalized.jsonl", mime="application/x-ndjson", ) st.divider() st.markdown("### SQLite DB Preview") if not db_path.exists(): st.info("No db.sqlite found.") return with open(db_path, "rb") as file_handle: st.download_button( "Download db.sqlite", data=file_handle, file_name="db.sqlite", mime="application/x-sqlite3", ) try: conn = sqlite3.connect(db_path) tables = pd.read_sql( "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;", conn, )["name"].tolist() if tables: st.write("Tables:", tables) selected_table = st.selectbox("Preview table", tables, key="db_table_sel") dataframe = pd.read_sql(f"SELECT * FROM [{selected_table}] LIMIT 200;", conn) st.dataframe(dataframe, use_container_width=True) else: st.info("DB exists but contains no tables yet.") conn.close() except Exception as exc: st.warning(f"DB preview failed: {exc}") if __name__ == "__main__": main()