Spaces:
Runtime error
Runtime error
| import sys, os | |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| UTILS_DIR = os.path.join(BASE_DIR, "utils") | |
| if UTILS_DIR not in sys.path: | |
| sys.path.insert(0, UTILS_DIR) | |
| import streamlit as st | |
| import re, hashlib, time, os | |
| from utils.file_utils import normalize_log_line, keyword_search | |
| from utils.summarizer import summarize_text | |
| st.title("📜 Omnilog") | |
| st.write("Log analyzer with normalization, search, AI summaries, and real-time tailing") | |
| # Init counters | |
| if "uploaded_files" not in st.session_state: | |
| st.session_state.uploaded_files = [] | |
| if "errors" not in st.session_state: | |
| st.session_state.errors = [] | |
| mode = st.radio("Mode", ["Upload Log File", "Live Syslog Tail"]) | |
| # ─── Upload Mode ────────────────────────────────────────────── | |
| if mode == "Upload Log File": | |
| uploaded = st.file_uploader("Upload a log file", type=["log", "txt"]) | |
| if uploaded: | |
| try: | |
| content = uploaded.read().decode("utf-8", errors="ignore") | |
| # Track upload | |
| st.session_state.uploaded_files.append(uploaded.name) | |
| # Normalize | |
| normalized = [normalize_log_line(line) for line in content.splitlines()] | |
| # Display normalized preview | |
| st.subheader("🧹 Normalized Logs") | |
| st.code("\n".join(normalized[:50])) | |
| # Stats | |
| st.subheader("📊 Stats") | |
| st.metric("Total Lines", len(normalized)) | |
| st.metric("Unique Entries", len(set(normalized))) | |
| st.write("SHA1 Digest:", hashlib.sha1(content.encode()).hexdigest()) | |
| # Keyword search | |
| query = st.text_input("🔍 Search logs") | |
| matches = [] | |
| if query: | |
| matches = keyword_search("\n".join(normalized), query) | |
| st.success(f"Found {len(matches)} matches") | |
| st.code("\n".join(matches[:50])) | |
| # AI summarization | |
| st.subheader("🧠 AI Summary") | |
| summary = summarize_text("\n".join(normalized)) | |
| st.write(summary) | |
| # Save for Chatbot | |
| st.session_state.omnilog_output = { | |
| "normalized_preview": "\n".join(normalized[:50]), | |
| "matches": matches[:50], | |
| "summary": summary, | |
| } | |
| except Exception as e: | |
| st.error(f"⚠️ Error processing log: {e}") | |
| st.session_state.errors.append(str(e)) | |
| # ─── Live Tailing Mode ──────────────────────────────────────── | |
| elif mode == "Live Syslog Tail": | |
| logfile = "/var/log/syslog" | |
| if not os.path.exists(logfile): | |
| st.error("⚠️ Syslog not found. This mode only works locally.") | |
| else: | |
| st.info(f"Streaming last 50 lines from {logfile} (auto-refreshing)") | |
| # Read last 50 lines | |
| def tail_file(path, n=50): | |
| with open(path, "r", errors="ignore") as f: | |
| lines = f.readlines() | |
| return lines[-n:] | |
| # Placeholder container | |
| placeholder = st.empty() | |
| # Live loop | |
| for _ in range(200): # ~200 refreshes (stop after) | |
| lines = tail_file(logfile, 50) | |
| normalized = [normalize_log_line(l) for l in lines] | |
| # Highlight critical warnings | |
| highlighted = [] | |
| for line in normalized: | |
| if re.search(r"(CRITICAL|FAILED|WARNING)", line, re.IGNORECASE): | |
| highlighted.append("⚠️ " + line) | |
| st.session_state.errors.append(line) | |
| else: | |
| highlighted.append(line) | |
| # Update live view | |
| with placeholder.container(): | |
| st.code("\n".join(highlighted)) | |
| time.sleep(2) # refresh every 2s | |