Spaces:
Sleeping
Sleeping
| import os | |
| os.environ["XDG_CONFIG_HOME"] = "/tmp" | |
| os.environ["STREAMLIT_RUNTIME_CONFIG_DIR"] = "/tmp" | |
| os.environ["STREAMLIT_HOME"] = "/tmp" | |
| import streamlit as st | |
| import pandas as pd | |
| import json | |
| from openai import OpenAI | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.preprocessing import LabelEncoder | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| PROMPT_INSTRUCTIONS_TEXT = """ | |
| You are a forensic auditor AI with deep domain expertise and a sharp eye for irregularities. Your job is to identify **anomalies** in a single column of financial data. | |
| Analyze the values provided and return only values that are: | |
| - **Numerical outliers**: extremely high/low or oddly rounded numbers | |
| - **Format inconsistencies**: strange symbols, irregular formatting, or data corruption | |
| - **Rare or suspicious values**: strings or categories that do not appear to fit the overall pattern | |
| ONLY analyze the values from the provided column, without relying on any external context. | |
| Return ONLY the following JSON object and nothing else: | |
| { | |
| "anomalies": [ | |
| { | |
| "index": 0, | |
| "value": "VALUE_HERE", | |
| "anomaly_type": "Type of anomaly here", | |
| "explanation": "Why this value is considered anomalous", | |
| "confidence": 0.9 | |
| } | |
| ] | |
| } | |
| """ | |
| def query_openai(prompt: str) -> dict: | |
| try: | |
| response = client.chat.completions.create( | |
| model="gpt-4-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You analyze a list of values and return JSON anomalies only."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.2, | |
| max_tokens=2048 | |
| ) | |
| raw_output = response.choices[0].message.content | |
| print("🔵 RAW OUTPUT:\n", raw_output) | |
| json_start = raw_output.find("{") | |
| json_end = raw_output.rfind("}") | |
| if json_start != -1 and json_end != -1: | |
| json_str = raw_output[json_start:json_end + 1] | |
| return json.loads(json_str) | |
| return {"error": "Could not locate JSON structure in LLM response."} | |
| except json.JSONDecodeError as e: | |
| return {"error": f"Failed to parse JSON: {str(e)}"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def apply_isolation_forest(df): | |
| df_encoded = df.copy() | |
| for col in df_encoded.select_dtypes(include=["object", "category"]).columns: | |
| df_encoded[col] = LabelEncoder().fit_transform(df_encoded[col].astype(str)) | |
| try: | |
| model = IsolationForest(contamination=0.05, random_state=42) | |
| df_encoded = df_encoded.dropna() | |
| preds = model.fit_predict(df_encoded) | |
| scores = model.decision_function(df_encoded) | |
| result_df = df.loc[df_encoded.index].copy() | |
| result_df["IForest_Score"] = scores | |
| result_df["Anomaly"] = ["Yes" if p == -1 else "No" for p in preds] | |
| return result_df | |
| except Exception as e: | |
| st.error(f"Isolation Forest failed: {e}") | |
| return None | |
| # ---------------- Streamlit UI ---------------- | |
| st.set_page_config(page_title="LLM-Assisted Anomaly Detector", layout="wide") | |
| st.title("LLM-Assisted Anomaly Detector") | |
| st.markdown(""" | |
| This tool combines machine learning and large language models to detect anomalies in datasets. We first apply isolation forest to the full dataset to flag data-level outliers. Then, you can select one column to perform a second pass of analysis using OpenAI's GPT-4, which focuses on semantic and contextual anomalies within that column only (e.g. Payment_Method column). | |
| """) | |
| # Initialize session state for df | |
| if "df" not in st.session_state: | |
| st.session_state.df = None | |
| # Load sample data | |
| if st.button("Load sample dataset"): | |
| try: | |
| st.session_state.df = pd.read_csv("src/df_crypto.csv") | |
| st.success("Sample dataset loaded from `src/df_crypto.csv`.") | |
| except Exception as e: | |
| st.error(f"Could not load sample dataset: {e}") | |
| # File upload | |
| if st.session_state.df is None: | |
| uploaded_file = st.file_uploader("Or upload your own CSV file", type=["csv"]) | |
| if uploaded_file: | |
| try: | |
| st.session_state.df = pd.read_csv(uploaded_file) | |
| st.success("Custom dataset uploaded.") | |
| except Exception as e: | |
| st.error(f"Could not read uploaded CSV. Error: {e}") | |
| # Use persisted df | |
| df = st.session_state.df | |
| if df is not None: | |
| st.subheader("Full Dataset") | |
| st.dataframe(df, use_container_width=True) | |
| # ---------------- Isolation Forest ---------------- | |
| st.markdown("### Anomaly Detection with Isolation Forest (whole dataset)") | |
| iforest_df = apply_isolation_forest(df) | |
| if iforest_df is not None: | |
| st.success("Isolation Forest analysis completed.") | |
| st.dataframe(iforest_df[iforest_df["Anomaly"] == "Yes"], use_container_width=True) | |
| # ---------------- LLM Section ---------------- | |
| st.markdown("### LLM-Based Anomaly Detection (specific column)") | |
| selected_column = st.selectbox("Select a column to analyze with LLM:", df.columns) | |
| if st.button("Run LLM Anomaly Detection on selected column"): | |
| with st.spinner("Analyzing column with LLM..."): | |
| values = df[selected_column].dropna().tolist() | |
| values = values[:500] # Token safety | |
| value_list_with_index = [ | |
| {"index": idx, "value": str(val)} for idx, val in enumerate(values) | |
| ] | |
| prompt = PROMPT_INSTRUCTIONS_TEXT + "\n\nVALUES:\n" + json.dumps(value_list_with_index, indent=2) | |
| result = query_openai(prompt) | |
| if "anomalies" in result: | |
| st.success(f"LLM found {len(result['anomalies'])} anomalies in `{selected_column}`.") | |
| st.dataframe(pd.json_normalize(result["anomalies"]), use_container_width=True) | |
| else: | |
| st.warning("No anomalies found or invalid response from LLM.") | |
| st.subheader("Raw Model Output") | |
| st.json(result) | |
| else: | |
| st.info("Please upload a CSV or click the sample button to begin.") |