mod-osint / gui /streamlit_app.py
moddux's picture
deploy: HF sanitized GUI snapshot
b75c637
"""
MOD-OSINT Streamlit GUI Wizard
Wired to engine.pipeline_orchestrator.run_pipeline()
Stages:
A — Upload / Input selection
B — Settings
C — Run pipeline
D — Browse / Export results
Import safety:
This module avoids importing Streamlit at module load time so CI/tests can
import it without ScriptRunContext warnings.
"""
from __future__ import annotations
import sqlite3
import tempfile
from pathlib import Path
import pandas as pd
_DEMO_DIR = Path("samples/demo_ingest")
def _load_yaml_defaults(path: Path) -> dict:
try:
import yaml # optional; provided by requirements-hf.txt
return yaml.safe_load(path.read_text()) or {}
except Exception:
return {}
def _write_uploads(uploads) -> Path:
"""Save uploaded files into a temp dir and return the dir path."""
tmp = Path(tempfile.mkdtemp(prefix="modosint_"))
updir = tmp / "uploads"
updir.mkdir(parents=True, exist_ok=True)
for file_obj in uploads:
(updir / file_obj.name).write_bytes(file_obj.getbuffer())
return updir
def _resolve_input(session_state) -> Path | None:
"""Determine input from session state (uploads > local path > demo)."""
uploads = session_state.get("_uploads")
if uploads:
return _write_uploads(uploads)
local_path = session_state.get("_local_path", "").strip()
if local_path:
path_obj = Path(local_path).expanduser()
if path_obj.exists():
return path_obj
if session_state.get("_use_demo") and _DEMO_DIR.exists():
return _DEMO_DIR
return None
def main() -> None:
"""Entrypoint for `streamlit run gui/streamlit_app.py`."""
import streamlit as st
import streamlit.components.v1 as components
from engine.pipeline_orchestrator import run_pipeline
from gui.terminal_panel import render_terminal
st.set_page_config(
page_title="MOD-OSINT",
page_icon="🧠",
layout="wide",
initial_sidebar_state="expanded",
)
st.title("🧠 MOD-OSINT")
st.caption("GUI wizard -> `engine.pipeline_orchestrator.run_pipeline()`")
if "effective_config" not in st.session_state:
st.session_state["effective_config"] = {}
if "last_run_id" not in st.session_state:
st.session_state["last_run_id"] = None
if "last_run_dir" not in st.session_state:
st.session_state["last_run_dir"] = None
with st.sidebar:
render_terminal({"effective_config": st.session_state["effective_config"]})
tab_upload, tab_settings, tab_run, tab_browse = st.tabs(
["📂 Upload", "⚙️ Settings", "▶️ Run", "📊 Browse"]
)
with tab_upload:
st.subheader("A) Upload or select input")
uploads = st.file_uploader(
"Upload files (CSV, JSON, TXT, HTML, LOG)",
accept_multiple_files=True,
key="_uploads",
)
if uploads:
st.success(f"Queued {len(uploads)} file(s): {[u.name for u in uploads]}")
st.divider()
local_path = st.text_input(
"Or enter a local directory / file path",
value="",
key="_local_path",
placeholder="/path/to/data/",
)
st.divider()
st.checkbox(
f"Use built-in demo dataset (`{_DEMO_DIR}`)",
value=not bool(uploads) and not bool(local_path),
key="_use_demo",
disabled=not _DEMO_DIR.exists(),
help="Runs the pipeline against samples/demo_ingest/ for quick smoke testing.",
)
if _DEMO_DIR.exists():
demo_files = sorted(_DEMO_DIR.iterdir())
st.caption(f"Demo files: {[f.name for f in demo_files if f.is_file()]}")
else:
st.caption("`samples/demo_ingest/` not found in working directory.")
with tab_settings:
st.subheader("B) Pipeline settings")
cfg_path = Path("pipeline_config.yaml")
defaults = _load_yaml_defaults(cfg_path) if cfg_path.exists() else {}
col_left, col_right = st.columns(2)
with col_left:
offline_mode = st.toggle(
"offline_mode",
value=True,
help="Disable all outbound network calls.",
)
enable_ml = st.toggle(
"enable_ml_analysis",
value=False,
help="Enable ML/NLP stage (requires torch; off by default).",
)
with col_right:
correlation_mode = st.selectbox(
"correlation_mode",
["basic", "in-memory"],
index=0,
help="basic = simple entity matching; in-memory = graph in RAM.",
)
effective_config: dict = defaults.copy()
effective_config.setdefault("runtime", {})
effective_config["runtime"].update(
{
"offline_mode": offline_mode,
"enable_ml_analysis": enable_ml,
"correlation_mode": correlation_mode,
}
)
st.session_state["effective_config"] = effective_config
st.markdown("**Effective config (passed to engine):**")
st.json(effective_config)
with tab_run:
st.subheader("C) Run pipeline")
st.caption("Outputs are written to `runs/<run_id>/`.")
input_path = _resolve_input(st.session_state)
if input_path:
st.info(f"Input resolved -> `{input_path}`")
else:
st.warning("No input selected. Go to Upload tab or enable demo dataset.")
run_btn = st.button("🚀 Run pipeline now", type="primary", disabled=input_path is None)
if run_btn and input_path:
progress = st.progress(0, text="Starting...")
log_area = st.empty()
log_lines: list[str] = []
def _log(message: str) -> None:
log_lines.append(message)
log_area.code("\n".join(log_lines[-40:]), language="bash")
_log(f"Input: {input_path}")
_log("Calling engine.pipeline_orchestrator.run_pipeline()...")
progress.progress(10, text="Normalizing files...")
try:
ctx = run_pipeline(
input_path=input_path,
config=st.session_state["effective_config"],
)
st.session_state["last_run_id"] = ctx.run_id
st.session_state["last_run_dir"] = str(ctx.run_dir)
progress.progress(90, text="Generating report...")
_log(f"Run ID: {ctx.run_id}")
_log(f"Run dir: {ctx.run_dir}")
if ctx.stage_results:
for stage_name, stage_out in ctx.stage_results.items():
_log(f" [{stage_out.status.value.upper():8s}] {stage_name}")
progress.progress(100, text="Done")
st.success(f"Pipeline complete - run `{ctx.run_id}`")
st.code(str(ctx.run_dir))
st.info("Switch to Browse tab to explore outputs.")
except Exception as exc:
progress.empty()
st.error(f"Pipeline failed: {exc}")
_log(f"ERROR: {exc}")
with tab_browse:
st.subheader("D) Browse results")
run_dir_str = st.session_state.get("last_run_dir")
if not run_dir_str:
st.info("Run the pipeline first (Stage C).")
return
run_dir = Path(run_dir_str)
report_html = run_dir / "report" / "index.html"
db_path = run_dir / "db.sqlite"
exports_dir = run_dir / "exports"
manifest_path = run_dir / "manifest.json"
col1, col2, col3, col4 = st.columns(4)
col1.metric("Run ID", st.session_state.get("last_run_id", "-"))
col2.metric("Report", "yes" if report_html.exists() else "no")
col3.metric("DB", "yes" if db_path.exists() else "no")
col4.metric("Exports", str(len(list(exports_dir.rglob("*"))) if exports_dir.exists() else 0))
if manifest_path.exists():
with st.expander("Run manifest"):
import json
st.json(json.loads(manifest_path.read_text()))
st.divider()
st.markdown("### HTML Report")
if report_html.exists():
st.markdown(f"`{report_html}`")
try:
components.html(report_html.read_text(errors="replace"), height=700, scrolling=True)
except Exception as exc:
st.warning(f"Inline render failed ({exc}). Open the path above in a browser.")
with open(report_html, "rb") as file_handle:
st.download_button(
"Download report/index.html",
data=file_handle,
file_name="index.html",
mime="text/html",
)
else:
st.info("No report/index.html yet.")
st.divider()
st.markdown("### Exports")
if exports_dir.exists():
export_files = sorted([path for path in exports_dir.rglob("*") if path.is_file()])
if export_files:
for export_file in export_files:
rel = export_file.relative_to(run_dir).as_posix()
col_path, col_download = st.columns([3, 1])
col_path.write(f"`{rel}`")
with open(export_file, "rb") as file_handle:
col_download.download_button(
"Download",
data=file_handle,
file_name=export_file.name,
key=f"dl_{rel}",
)
else:
st.info("Exports directory is empty.")
else:
st.info("No exports/ directory found.")
jsonl_path = run_dir / "normalized.jsonl"
if jsonl_path.exists():
with open(jsonl_path, "rb") as file_handle:
st.download_button(
"Download normalized.jsonl",
data=file_handle,
file_name="normalized.jsonl",
mime="application/x-ndjson",
)
st.divider()
st.markdown("### SQLite DB Preview")
if not db_path.exists():
st.info("No db.sqlite found.")
return
with open(db_path, "rb") as file_handle:
st.download_button(
"Download db.sqlite",
data=file_handle,
file_name="db.sqlite",
mime="application/x-sqlite3",
)
try:
conn = sqlite3.connect(db_path)
tables = pd.read_sql(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;",
conn,
)["name"].tolist()
if tables:
st.write("Tables:", tables)
selected_table = st.selectbox("Preview table", tables, key="db_table_sel")
dataframe = pd.read_sql(f"SELECT * FROM [{selected_table}] LIMIT 200;", conn)
st.dataframe(dataframe, use_container_width=True)
else:
st.info("DB exists but contains no tables yet.")
conn.close()
except Exception as exc:
st.warning(f"DB preview failed: {exc}")
if __name__ == "__main__":
main()