File size: 4,759 Bytes
9c37488
 
 
 
 
 
 
 
b077e44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import sys, os

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
UTILS_DIR = os.path.join(BASE_DIR, "utils")

if UTILS_DIR not in sys.path:
    sys.path.insert(0, UTILS_DIR)

import streamlit as st
import sys, os, subprocess, hashlib

# ─── Ensure utils/ is importable ──────────────────────────────
UTILS_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if UTILS_PATH not in sys.path:
    sys.path.append(UTILS_PATH)

from utils.docgen import generate_doc
from utils.summarizer import summarize_text

st.title("🛠️ Local Script Orchestration & Execution")
st.write("Scan, queue, and execute Omniscient scripts with results ingested back into the system.")

# Init state
if "local_scripts" not in st.session_state:
    st.session_state.local_scripts = []
if "script_runs" not in st.session_state:
    st.session_state.script_runs = []

SCAN_DIRS = [
    "/opt/omniscient/bin",
    "/opt/omniscient/scripts",
    "/opt/omniscient/ai"
]

def scan_scripts():
    scripts = []
    for d in SCAN_DIRS:
        if os.path.exists(d):
            for root, _, files in os.walk(d):
                for f in files:
                    if f.endswith((".sh", ".py")):
                        path = os.path.join(root, f)
                        try:
                            with open(path, "r", errors="ignore") as fh:
                                content = fh.read()
                            sha1 = hashlib.sha1(content.encode()).hexdigest()
                            doc = generate_doc(f, path, content)
                            summary = summarize_text(content)

                            scripts.append({
                                "name": f,
                                "path": path,
                                "sha1": sha1,
                                "doc": doc,
                                "summary": summary
                            })
                        except Exception as e:
                            st.error(f"⚠️ Error reading {f}: {e}")
    return scripts

def run_script(script_path: str):
    """Execute a script safely and capture output"""
    try:
        if script_path.endswith(".sh"):
            result = subprocess.run(["bash", script_path], capture_output=True, text=True, timeout=60)
        elif script_path.endswith(".py"):
            result = subprocess.run(["python3", script_path], capture_output=True, text=True, timeout=60)
        else:
            return {"error": "Unsupported script type"}

        return {
            "script": script_path,
            "stdout": result.stdout,
            "stderr": result.stderr,
            "returncode": result.returncode
        }
    except Exception as e:
        return {"script": script_path, "error": str(e)}

# ─── Scan Scripts ─────────────────────────────────────────────
if st.button("🔍 Scan Local Scripts"):
    scripts = scan_scripts()
    st.session_state.local_scripts = scripts
    st.success(f"✅ Found {len(scripts)} scripts.")

# ─── Script Selection ─────────────────────────────────────────
if st.session_state.local_scripts:
    script_names = [s["name"] for s in st.session_state.local_scripts]
    selected = st.multiselect("Select scripts to execute", script_names)

    if st.button("▶️ Run Selected Scripts"):
        for s in st.session_state.local_scripts:
            if s["name"] in selected:
                run_result = run_script(s["path"])
                st.session_state.script_runs.append(run_result)
                if "error" in run_result:
                    st.error(f"❌ {s['name']} failed: {run_result['error']}")
                elif run_result["returncode"] != 0:
                    st.error(f"❌ {s['name']} exited with {run_result['returncode']}")
                    st.code(run_result["stderr"])
                else:
                    st.success(f"✅ {s['name']} ran successfully")
                    st.code(run_result["stdout"][:500])  # preview output

# ─── Display Past Runs ────────────────────────────────────────
if st.session_state.script_runs:
    st.subheader("📜 Script Run History")
    for r in st.session_state.script_runs[-5:]:  # show last 5
        st.markdown(f"**Script:** {r.get('script')}  \n**Return Code:** {r.get('returncode')}")
        if r.get("stdout"):
            st.code(r["stdout"][:300])
        if r.get("stderr"):
            st.code(r["stderr"][:300])