-import streamlit import streamlit as st import importlib.util import os import json import tempfile from pathlib import Path import inspect import traceback # --- EXISTING PIPELINE IMPORTS (keep these as in original app) --- from modules.ingestion.ingest_data import run as ingest_run from modules.preprocessing.preprocess_data import run as preprocess_run from modules.ml_analysis.ml_analysis import run as ml_run from modules.correlation.correlate_ioc import run as correlate_run from modules.export.export_results import run as export_run st.set_page_config(page_title="Modular OSINT Pipeline", layout="wide") st.title("🚀 Modular OSINT Pipeline Dashboard") # --- PIPELINE WORKFLOW --- def write_temp(data: dict) -> str: f = tempfile.NamedTemporaryFile(delete=False, suffix=".json") f.write(json.dumps(data).encode()) f.close() return f.name uploaded = st.file_uploader("Upload initial OSINTModuleInput JSON", type=["json"]) if uploaded: init_input = json.load(uploaded) st.session_state["input"] = init_input if "input" in st.session_state: st.markdown("### 🔍 Initial Input") st.json(st.session_state["input"]) col1, col2 = st.columns(2) with col1: if st.button("Run Ingestion"): path = write_temp(st.session_state["input"]) out = ingest_run(path) st.session_state["ingest"] = json.loads(out.json()) if "ingest" in st.session_state: st.markdown("#### Ingestion Output") st.json(st.session_state["ingest"]) with col1: if st.button("Run Preprocessing"): prev = st.session_state.get("ingest", st.session_state["input"]) path = write_temp(prev) out = preprocess_run(path) st.session_state["preprocess"] = json.loads(out.json()) if "preprocess" in st.session_state: st.markdown("#### Preprocessing Output") st.json(st.session_state["preprocess"]) with col2: if st.button("Run ML Analysis"): prev = st.session_state.get("preprocess", st.session_state.get("ingest")) path = write_temp(prev) out = ml_run(path) st.session_state["ml"] = json.loads(out.json()) if "ml" in st.session_state: st.markdown("#### ML Analysis Output") st.json(st.session_state["ml"]) with col2: if st.button("Run Correlation"): prev = st.session_state.get("ml", st.session_state.get("preprocess")) path = write_temp(prev) out = correlate_run(path) st.session_state["correlate"] = json.loads(out.json()) if "correlate" in st.session_state: st.markdown("#### Correlation Output") st.json(st.session_state["correlate"]) if st.button("Run Export"): prev = st.session_state.get("correlate", st.session_state.get("ml")) path = write_temp(prev) out = export_run(path) st.session_state["export"] = json.loads(out.json()) if "export" in st.session_state: st.markdown("#### Export Output") st.json(st.session_state["export"]) st.download_button( label="Download Exported Results", data=json.dumps(st.session_state["export"], indent=2), file_name="osint_export.json", mime="application/json" ) # --- MULTI-DIRECTORY MODULE LAUNCHER SECTION --- st.sidebar.header("Standalone & Subdirectory Modules") MODULES_DIR = Path("Modules") MODULE_REGISTRY = MODULES_DIR / "module_registry.json" def discover_py_modules(directory): """Recursively list .py scripts (excluding __init__.py) with their relative paths.""" py_modules = [] for root, dirs, files in os.walk(directory): for f in files: if f.endswith(".py") and f != "__init__.py": rel_path = Path(root).relative_to(directory) / f py_modules.append(rel_path) return py_modules def normalize_registry_key(path: Path) -> str: return str(path.with_suffix('')).replace(os.sep, ".").lower() def load_module_description(module_path): """Try to get description from registry or fallback to docstring.""" module_key = normalize_registry_key(module_path) if MODULE_REGISTRY.exists(): with open(MODULE_REGISTRY) as regfile: registry = json.load(regfile) if module_key in registry: return registry[module_key].get("description", "") full_path = MODULES_DIR / module_path if full_path.exists(): with open(full_path) as f: first_line = f.readline() if first_line.startswith("\"\"\"") or first_line.startswith("'''"): delimiter = first_line[:3] docstring = "" while True: l = f.readline() if not l or l.startswith(delimiter): break docstring += l.strip() + " " return docstring.strip() return "" def get_module_params(module_path): """Load osintmodule.json file if present for the module (same name, same directory).""" param_path = (MODULES_DIR / module_path).with_suffix('.osintmodule.json') if param_path.exists(): with open(param_path, "r") as f: return json.load(f), str(param_path) return None, None def run_module(module_path, params=None): """ Dynamically import and run the main() function for any discovered .py module, passing params if the signature supports it. """ full_path = MODULES_DIR / module_path if not full_path.exists(): st.error(f"Module file not found: {module_path}") return mod_name = "mod_" + str(module_path).replace("/", "_").replace("\\", "_").replace(".py", "") try: spec = importlib.util.spec_from_file_location(mod_name, str(full_path)) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) if hasattr(mod, "main"): sig = inspect.signature(mod.main) if params and len(sig.parameters) > 0: mod.main(params) else: mod.main() else: st.warning(f"{module_path} does not have a main() function.") except Exception as e: st.error(f"Error running {module_path}: {e}") st.exception(traceback.format_exc()) def save_params_json(params, param_path): with open(param_path, "w") as f: json.dump(params, f, indent=2) py_modules = discover_py_modules(MODULES_DIR) for rel_path in py_modules: mod_label = str(rel_path) mod_name = rel_path.stem desc = load_module_description(rel_path) with st.sidebar.expander(mod_label, expanded=False): if desc: st.info(desc) params, param_path = get_module_params(rel_path) param_input = None if params is not None: st.markdown("**Edit module parameters:**") param_str = st.text_area( "Parameters (JSON)", value=json.dumps(params, indent=2), key=f"params_{mod_label}", height=200, ) try: param_input = json.loads(param_str) st.success("Valid JSON") except Exception as e: st.error(f"Invalid JSON: {e}") param_input = None if st.button(f"Run {mod_label}", key=f"run_{mod_label}"): st.write(f"## Running: {mod_label}") if desc: st.info(desc) if param_input is not None and param_path: save_params_json(param_input, param_path) params = param_input run_module(rel_path, params) results_folders = [ (MODULES_DIR / rel_path.parent / "Results"), (MODULES_DIR / "Data" / "Results"), ] for results_dir in results_folders: if results_dir.exists(): for f in results_dir.glob(f"{mod_name}*.*"): with open(f, "rb") as fo: st.download_button( label=f"Download result: {f.name}", data=fo, file_name=f.name, ) as st -import importlib.util -import os -import json -import tempfile -from pathlib import Path - -# --- EXISTING PIPELINE IMPORTS (keep these as in original app) --- -from modules.ingestion.ingest_data import run as ingest_run -from modules.preprocessing.preprocess_data import run as preprocess_run -from modules.ml_analysis.ml_analysis import run as ml_run -from modules.correlation.correlate_ioc import run as correlate_run -from modules.export.export_results import run as export_run - -st.set_page_config(page_title="Modular OSINT Pipeline", layout="wide") -st.title("Modular OSINT Pipeline Dashboard") - -# --- PIPELINE WORKFLOW, unchanged --- -def write_temp(data: dict) -> str: - f = tempfile.NamedTemporaryFile(delete=False, suffix=".json") - f.write(json.dumps(data).encode()) - f.close() - return f.name - -uploaded = st.file_uploader("Upload initial OSINTModuleInput JSON", type=["json"]) -if uploaded: - init_input = json.load(uploaded) - st.session_state["input"] = init_input - -if "input" in st.session_state: - st.markdown("### Initial Input") - st.json(st.session_state["input"]) - - col1, col2 = st.columns(2) - - with col1: - if st.button("Run Ingestion"): - path = write_temp(st.session_state["input"]) - out = ingest_run(path) - st.session_state["ingest"] = json.loads(out.json()) - if "ingest" in st.session_state: - st.markdown("#### Ingestion Output") - st.json(st.session_state["ingest"]) - - with col1: - if st.button("Run Preprocessing"): - prev = st.session_state.get("ingest", st.session_state["input"]) - path = write_temp(prev) - out = preprocess_run(path) - st.session_state["preprocess"] = json.loads(out.json()) - if "preprocess" in st.session_state: - st.markdown("#### Preprocessing Output") - st.json(st.session_state["preprocess"]) - - with col2: - if st.button(" Run ML Analysis"): - prev = st.session_state.get("preprocess", st.session_state.get("ingest")) - path = write_temp(prev) - out = ml_run(path) - st.session_state["ml"] = json.loads(out.json()) - if "ml" in st.session_state: - st.markdown("#### ML Analysis Output") - st.json(st.session_state["ml"]) - - with col2: - if st.button("Run Correlation"): - prev = st.session_state.get("ml", st.session_state.get("preprocess")) - path = write_temp(prev) - out = correlate_run(path) - st.session_state["correlate"] = json.loads(out.json()) - if "correlate" in st.session_state: - st.markdown("#### Correlation Output") - st.json(st.session_state["correlate"]) - - if st.button("Run Export"): - prev = st.session_state.get("correlate", st.session_state.get("ml")) - path = write_temp(prev) - out = export_run(path) - st.session_state["export"] = json.loads(out.json()) - if "export" in st.session_state: - st.markdown("#### Export Output") - st.json(st.session_state["export"]) - - # Export/download option for pipeline output - st.download_button( - label="Download Exported Results", - data=json.dumps(st.session_state["export"], indent=2), - file_name="osint_export.json", - mime="application/json" - ) - -# --- STANDALONE MODULE LAUNCHER SECTION --- -st.sidebar.header("Standalone Modules") - -MODULES_DIR = Path("Modules") -MODULE_REGISTRY = MODULES_DIR / "module_registry.json" - -def discover_py_modules(directory): - """List .py scripts in the given directory (non-recursive, excludes __init__.py).""" - return [ - f for f in os.listdir(directory) - if f.endswith(".py") and f != "__init__.py" - ] - -def load_module_description(module_name): - """Get description from registry or fallback to module docstring.""" - # Registry lookup - if MODULE_REGISTRY.exists(): - with open(MODULE_REGISTRY) as regfile: - registry = json.load(regfile) - if module_name in registry: - return registry[module_name].get("description", "") - # Fallback: docstring from module file - module_path = MODULES_DIR / f"{module_name}.py" - if module_path.exists(): - with open(module_path) as f: - first_line = f.readline() - if first_line.startswith("\"\"\"") or first_line.startswith("'''"): - docstring = first_line.strip().strip("\"'") + " " - while True: - l = f.readline() - if not l or l.startswith("\"\"\"") or l.startswith("'''"): - break - docstring += l.strip() + " " - return docstring.strip() - return "" - -def get_module_params(module_name): - """Load osintmodule.json file if present for the module.""" - param_path = MODULES_DIR / f"{module_name}.osintmodule.json" - if param_path.exists(): - with open(param_path, "r") as f: - return json.load(f), str(param_path) - return None, None - -def run_module(module_name, params=None): - """Dynamically import and run the main() function of a module, passing params if supported.""" - module_path = MODULES_DIR / f"{module_name}.py" - if not module_path.exists(): - st.error(f"Module {module_name} not found.") - return - spec = importlib.util.spec_from_file_location(module_name, str(module_path)) - mod = importlib.util.module_from_spec(spec) - spec.loader.exec_module(mod) - if hasattr(mod, "main"): - # Try to pass params if main() supports it - import inspect - sig = inspect.signature(mod.main) - if params and len(sig.parameters) > 0: - mod.main(params) - else: - mod.main() - else: - st.warning(f"{module_name} does not have a main() function.") - -def save_params_json(params, param_path): - # Overwrites the osintmodule.json file - with open(param_path, "w") as f: - json.dump(params, f, indent=2) - -py_modules = discover_py_modules(MODULES_DIR) -for mod in py_modules: - mod_name = mod[:-3] - desc = load_module_description(mod_name) - with st.sidebar.expander(mod_name, expanded=False): - if desc: - st.info(desc) - - # Parameter editor (if osintmodule.json exists) - params, param_path = get_module_params(mod_name) - param_input = None - if params is not None: - st.markdown("**Edit module parameters:**") - param_str = st.text_area( - "Parameters (JSON)", - value=json.dumps(params, indent=2), - key=f"params_{mod_name}", - height=200 - ) - try: - param_input = json.loads(param_str) - st.success("Valid JSON") - except Exception as e: - st.error(f"Invalid JSON: {e}") - param_input = None - - if st.button(f"Run {mod_name}", key=f"run_{mod_name}"): - st.write(f"## Running: {mod_name}") - if desc: - st.info(desc) - # Save edited params if changed - if param_input is not None and param_path: - save_params_json(param_input, param_path) - params = param_input - # Run the module (with params if possible) - run_module(mod_name, params) - # Look for output file to export (if your modules save output) - output_files = list((MODULES_DIR / "Data" / "Results").glob(f"{mod_name}*.*")) - if output_files: - for f in output_files: - with open(f, "rb") as fo: - st.download_button( - label=f"Download result: {f.name}", - data=fo, - file_name=f.name - ) +import streamlit as st +import importlib.util +import os +import json +import tempfile +from pathlib import Path +import inspect + +# --- EXISTING PIPELINE IMPORTS (keep these as in original app) --- +from modules.ingestion.ingest_data import run as ingest_run +from modules.preprocessing.preprocess_data import run as preprocess_run +from modules.ml_analysis.ml_analysis import run as ml_run +from modules.correlation.correlate_ioc import run as correlate_run +from modules.export.export_results import run as export_run + +st.set_page_config(page_title="Modular OSINT Pipeline", layout="wide") +st.title("🚀 Modular OSINT Pipeline Dashboard") + +# --- PIPELINE WORKFLOW, unchanged --- +def write_temp(data: dict) -> str: + f = tempfile.NamedTemporaryFile(delete=False, suffix=".json") + f.write(json.dumps(data).encode()) + f.close() + return f.name + +uploaded = st.file_uploader("Upload initial OSINTModuleInput JSON", type=["json"]) +if uploaded: + init_input = json.load(uploaded) + st.session_state["input"] = init_input + +if "input" in st.session_state: + st.markdown("### 🔍 Initial Input") + st.json(st.session_state["input"]) + + col1, col2 = st.columns(2) + + with col1: + if st.button("Run Ingestion"): + path = write_temp(st.session_state["input"]) + out = ingest_run(path) + st.session_state["ingest"] = json.loads(out.json()) + if "ingest" in st.session_state: + st.markdown("#### Ingestion Output") + st.json(st.session_state["ingest"]) + + with col1: + if st.button(" Run Preprocessing"): + prev = st.session_state.get("ingest", st.session_state["input"]) + path = write_temp(prev) + out = preprocess_run(path) + st.session_state["preprocess"] = json.loads(out.json()) + if "preprocess" in st.session_state: + st.markdown("#### Preprocessing Output") + st.json(st.session_state["preprocess"]) + + with col2: + if st.button("Run ML Analysis"): + prev = st.session_state.get("preprocess", st.session_state.get("ingest")) + path = write_temp(prev) + out = ml_run(path) + st.session_state["ml"] = json.loads(out.json()) + if "ml" in st.session_state: + st.markdown("#### ML Analysis Output") + st.json(st.session_state["ml"]) + + with col2: + if st.button("Run Correlation"): + prev = st.session_state.get("ml", st.session_state.get("preprocess")) + path = write_temp(prev) + out = correlate_run(path) + st.session_state["correlate"] = json.loads(out.json()) + if "correlate" in st.session_state: + st.markdown("#### Correlation Output") + st.json(st.session_state["correlate"]) + + if st.button("Run Export"): + prev = st.session_state.get("correlate", st.session_state.get("ml")) + path = write_temp(prev) + out = export_run(path) + st.session_state["export"] = json.loads(out.json()) + if "export" in st.session_state: + st.markdown("#### Export Output") + st.json(st.session_state["export"]) + st.download_button( + label="Download Exported Results", + data=json.dumps(st.session_state["export"], indent=2), + file_name="osint_export.json", + mime="application/json" + ) + +# --- MULTI-DIRECTORY MODULE LAUNCHER SECTION --- +st.sidebar.header("Standalone & Subdirectory Modules") + +MODULES_DIR = Path("Modules") +MODULE_REGISTRY = MODULES_DIR / "module_registry.json" + +def discover_py_modules(directory): + """Recursively list .py scripts (excluding __init__.py) with their relative paths.""" + py_modules = [] + for root, dirs, files in os.walk(directory): + for f in files: + if f.endswith(".py") and f != "__init__.py": + rel_path = Path(root).relative_to(directory) / f + py_modules.append(rel_path) + return py_modules + +def load_module_description(module_path): + """Try to get description from registry or fallback to docstring.""" + module_name = str(module_path.with_suffix('')).replace(os.sep, ".") + # Registry lookup (top-level modules only) + if MODULE_REGISTRY.exists(): + with open(MODULE_REGISTRY) as regfile: + registry = json.load(regfile) + if module_name in registry: + return registry[module_name].get("description", "") + # Fallback: docstring from module file + full_path = MODULES_DIR / module_path + if full_path.exists(): + with open(full_path) as f: + first_line = f.readline() + if first_line.startswith("\"\"\"") or first_line.startswith("'''"): + delimiter = first_line[:3] + docstring = "" + while True: + l = f.readline() + if not l or l.startswith(delimiter): + break + docstring += l.strip() + " " + return docstring.strip() + return "" + +def get_module_params(module_path): + """Load osintmodule.json file if present for the module (same name, same directory).""" + param_path = (MODULES_DIR / module_path).with_suffix('.osintmodule.json') + if param_path.exists(): + with open(param_path, "r") as f: + return json.load(f), str(param_path) + return None, None + +def run_module(module_path, params=None): + """ + Dynamically import and run the main() function for any discovered .py module, + passing params if the signature supports it. + """ + full_path = MODULES_DIR / module_path + if not full_path.exists(): + st.error(f"Module file not found: {module_path}") + return + mod_name = "mod_" + str(module_path).replace("/", "_").replace("\\", "_").replace(".py", "") + spec = importlib.util.spec_from_file_location(mod_name, str(full_path)) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + if hasattr(mod, "main"): + sig = inspect.signature(mod.main) + if params and len(sig.parameters) > 0: + mod.main(params) + else: + mod.main() + else: + st.warning(f"{module_path} does not have a main() function.") + +def save_params_json(params, param_path): + with open(param_path, "w") as f: + json.dump(params, f, indent=2) + +py_modules = discover_py_modules(MODULES_DIR) +for rel_path in py_modules: + mod_label = str(rel_path) + mod_name = rel_path.stem + desc = load_module_description(rel_path) + with st.sidebar.expander(mod_label, expanded=False): + if desc: + st.info(desc) + params, param_path = get_module_params(rel_path) + param_input = None + if params is not None: + st.markdown("**Edit module parameters:**") + param_str = st.text_area( + "Parameters (JSON)", + value=json.dumps(params, indent=2), + key=f"params_{mod_label}", + height=200, + ) + try: + param_input = json.loads(param_str) + st.success("Valid JSON") + except Exception as e: + st.error(f"Invalid JSON: {e}") + param_input = None + if st.button(f"Run {mod_label}", key=f"run_{mod_label}"): + st.write(f"## Running: {mod_label}") + if desc: + st.info(desc) + if param_input is not None and param_path: + save_params_json(param_input, param_path) + params = param_input + run_module(rel_path, params) + # Try to find and offer downloads for any result files in this module's directory or a shared results folder + results_folders = [ + (MODULES_DIR / rel_path.parent / "Results"), + (MODULES_DIR / "Data" / "Results"), + ] + for results_dir in results_folders: + if results_dir.exists(): + for f in results_dir.glob(f"{mod_name}*.*"): + with open(f, "rb") as fo: + st.download_button( + label=f"Download result: {f.name}", + data=fo, + file_name=f.name, + )