import os os.environ["XDG_CONFIG_HOME"] = "/tmp" os.environ["STREAMLIT_RUNTIME_CONFIG_DIR"] = "/tmp" os.environ["STREAMLIT_HOME"] = "/tmp" import streamlit as st import pandas as pd import json from openai import OpenAI from sklearn.ensemble import IsolationForest from sklearn.preprocessing import LabelEncoder client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) PROMPT_INSTRUCTIONS_TEXT = """ You are a forensic auditor AI with deep domain expertise and a sharp eye for irregularities. Your job is to identify **anomalies** in a single column of financial data. Analyze the values provided and return only values that are: - **Numerical outliers**: extremely high/low or oddly rounded numbers - **Format inconsistencies**: strange symbols, irregular formatting, or data corruption - **Rare or suspicious values**: strings or categories that do not appear to fit the overall pattern ONLY analyze the values from the provided column, without relying on any external context. Return ONLY the following JSON object and nothing else: { "anomalies": [ { "index": 0, "value": "VALUE_HERE", "anomaly_type": "Type of anomaly here", "explanation": "Why this value is considered anomalous", "confidence": 0.9 } ] } """ def query_openai(prompt: str) -> dict: try: response = client.chat.completions.create( model="gpt-4-turbo", messages=[ {"role": "system", "content": "You analyze a list of values and return JSON anomalies only."}, {"role": "user", "content": prompt} ], temperature=0.2, max_tokens=2048 ) raw_output = response.choices[0].message.content print("🔵 RAW OUTPUT:\n", raw_output) json_start = raw_output.find("{") json_end = raw_output.rfind("}") if json_start != -1 and json_end != -1: json_str = raw_output[json_start:json_end + 1] return json.loads(json_str) return {"error": "Could not locate JSON structure in LLM response."} except json.JSONDecodeError as e: return {"error": f"Failed to parse JSON: {str(e)}"} except Exception as e: return {"error": str(e)} def apply_isolation_forest(df): df_encoded = df.copy() for col in df_encoded.select_dtypes(include=["object", "category"]).columns: df_encoded[col] = LabelEncoder().fit_transform(df_encoded[col].astype(str)) try: model = IsolationForest(contamination=0.05, random_state=42) df_encoded = df_encoded.dropna() preds = model.fit_predict(df_encoded) scores = model.decision_function(df_encoded) result_df = df.loc[df_encoded.index].copy() result_df["IForest_Score"] = scores result_df["Anomaly"] = ["Yes" if p == -1 else "No" for p in preds] return result_df except Exception as e: st.error(f"Isolation Forest failed: {e}") return None # ---------------- Streamlit UI ---------------- st.set_page_config(page_title="LLM-Assisted Anomaly Detector", layout="wide") st.title("LLM-Assisted Anomaly Detector") st.markdown(""" This tool combines machine learning and large language models to detect anomalies in datasets. We first apply isolation forest to the full dataset to flag data-level outliers. Then, you can select one column to perform a second pass of analysis using OpenAI's GPT-4, which focuses on semantic and contextual anomalies within that column only (e.g. Payment_Method column). """) # Initialize session state for df if "df" not in st.session_state: st.session_state.df = None # Load sample data if st.button("Load sample dataset"): try: st.session_state.df = pd.read_csv("src/df_crypto.csv") st.success("Sample dataset loaded from `src/df_crypto.csv`.") except Exception as e: st.error(f"Could not load sample dataset: {e}") # File upload if st.session_state.df is None: uploaded_file = st.file_uploader("Or upload your own CSV file", type=["csv"]) if uploaded_file: try: st.session_state.df = pd.read_csv(uploaded_file) st.success("Custom dataset uploaded.") except Exception as e: st.error(f"Could not read uploaded CSV. Error: {e}") # Use persisted df df = st.session_state.df if df is not None: st.subheader("Full Dataset") st.dataframe(df, use_container_width=True) # ---------------- Isolation Forest ---------------- st.markdown("### Anomaly Detection with Isolation Forest (whole dataset)") iforest_df = apply_isolation_forest(df) if iforest_df is not None: st.success("Isolation Forest analysis completed.") st.dataframe(iforest_df[iforest_df["Anomaly"] == "Yes"], use_container_width=True) # ---------------- LLM Section ---------------- st.markdown("### LLM-Based Anomaly Detection (specific column)") selected_column = st.selectbox("Select a column to analyze with LLM:", df.columns) if st.button("Run LLM Anomaly Detection on selected column"): with st.spinner("Analyzing column with LLM..."): values = df[selected_column].dropna().tolist() values = values[:500] # Token safety value_list_with_index = [ {"index": idx, "value": str(val)} for idx, val in enumerate(values) ] prompt = PROMPT_INSTRUCTIONS_TEXT + "\n\nVALUES:\n" + json.dumps(value_list_with_index, indent=2) result = query_openai(prompt) if "anomalies" in result: st.success(f"LLM found {len(result['anomalies'])} anomalies in `{selected_column}`.") st.dataframe(pd.json_normalize(result["anomalies"]), use_container_width=True) else: st.warning("No anomalies found or invalid response from LLM.") st.subheader("Raw Model Output") st.json(result) else: st.info("Please upload a CSV or click the sample button to begin.")