Spaces:
Sleeping
Sleeping
File size: 6,016 Bytes
c089c34 d4810cc c089c34 447505f 9dc8fd7 83202f8 9dd1f1b b0330bc d35cf83 1cd2b0c 26bc92b 83202f8 d4810cc 83202f8 d4810cc 83202f8 eca4002 83202f8 26bc92b d4810cc 26bc92b d4810cc b0330bc d4810cc b0330bc d4810cc d35cf83 26bc92b eca4002 d35cf83 83202f8 d35cf83 b0330bc d35cf83 ebbcc75 b0330bc 031cc89 7a69235 031cc89 d35cf83 f907e1a ebbcc75 d35cf83 f907e1a d35cf83 031cc89 f907e1a 031cc89 d35cf83 f907e1a d35cf83 c4559e8 f907e1a c4559e8 d35cf83 c4559e8 d35cf83 ebbcc75 d35cf83 9dd1f1b d35cf83 ebbcc75 d35cf83 b0330bc d35cf83 031cc89 d35cf83 c4559e8 d35cf83 c4559e8 d35cf83 c4559e8 83202f8 031cc89 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
import os
os.environ["XDG_CONFIG_HOME"] = "/tmp"
os.environ["STREAMLIT_RUNTIME_CONFIG_DIR"] = "/tmp"
os.environ["STREAMLIT_HOME"] = "/tmp"
import streamlit as st
import pandas as pd
import json
from openai import OpenAI
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import LabelEncoder
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
PROMPT_INSTRUCTIONS_TEXT = """
You are a forensic auditor AI with deep domain expertise and a sharp eye for irregularities. Your job is to identify **anomalies** in a single column of financial data.
Analyze the values provided and return only values that are:
- **Numerical outliers**: extremely high/low or oddly rounded numbers
- **Format inconsistencies**: strange symbols, irregular formatting, or data corruption
- **Rare or suspicious values**: strings or categories that do not appear to fit the overall pattern
ONLY analyze the values from the provided column, without relying on any external context.
Return ONLY the following JSON object and nothing else:
{
"anomalies": [
{
"index": 0,
"value": "VALUE_HERE",
"anomaly_type": "Type of anomaly here",
"explanation": "Why this value is considered anomalous",
"confidence": 0.9
}
]
}
"""
def query_openai(prompt: str) -> dict:
try:
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{"role": "system", "content": "You analyze a list of values and return JSON anomalies only."},
{"role": "user", "content": prompt}
],
temperature=0.2,
max_tokens=2048
)
raw_output = response.choices[0].message.content
print("🔵 RAW OUTPUT:\n", raw_output)
json_start = raw_output.find("{")
json_end = raw_output.rfind("}")
if json_start != -1 and json_end != -1:
json_str = raw_output[json_start:json_end + 1]
return json.loads(json_str)
return {"error": "Could not locate JSON structure in LLM response."}
except json.JSONDecodeError as e:
return {"error": f"Failed to parse JSON: {str(e)}"}
except Exception as e:
return {"error": str(e)}
def apply_isolation_forest(df):
df_encoded = df.copy()
for col in df_encoded.select_dtypes(include=["object", "category"]).columns:
df_encoded[col] = LabelEncoder().fit_transform(df_encoded[col].astype(str))
try:
model = IsolationForest(contamination=0.05, random_state=42)
df_encoded = df_encoded.dropna()
preds = model.fit_predict(df_encoded)
scores = model.decision_function(df_encoded)
result_df = df.loc[df_encoded.index].copy()
result_df["IForest_Score"] = scores
result_df["Anomaly"] = ["Yes" if p == -1 else "No" for p in preds]
return result_df
except Exception as e:
st.error(f"Isolation Forest failed: {e}")
return None
# ---------------- Streamlit UI ----------------
st.set_page_config(page_title="LLM-Assisted Anomaly Detector", layout="wide")
st.title("LLM-Assisted Anomaly Detector")
st.markdown("""
This tool combines machine learning and large language models to detect anomalies in datasets. We first apply isolation forest to the full dataset to flag data-level outliers. Then, you can select one column to perform a second pass of analysis using OpenAI's GPT-4, which focuses on semantic and contextual anomalies within that column only (e.g. Payment_Method column).
""")
# Initialize session state for df
if "df" not in st.session_state:
st.session_state.df = None
# Load sample data
if st.button("Load sample dataset"):
try:
st.session_state.df = pd.read_csv("src/df_crypto.csv")
st.success("Sample dataset loaded from `src/df_crypto.csv`.")
except Exception as e:
st.error(f"Could not load sample dataset: {e}")
# File upload
if st.session_state.df is None:
uploaded_file = st.file_uploader("Or upload your own CSV file", type=["csv"])
if uploaded_file:
try:
st.session_state.df = pd.read_csv(uploaded_file)
st.success("Custom dataset uploaded.")
except Exception as e:
st.error(f"Could not read uploaded CSV. Error: {e}")
# Use persisted df
df = st.session_state.df
if df is not None:
st.subheader("Full Dataset")
st.dataframe(df, use_container_width=True)
# ---------------- Isolation Forest ----------------
st.markdown("### Anomaly Detection with Isolation Forest (whole dataset)")
iforest_df = apply_isolation_forest(df)
if iforest_df is not None:
st.success("Isolation Forest analysis completed.")
st.dataframe(iforest_df[iforest_df["Anomaly"] == "Yes"], use_container_width=True)
# ---------------- LLM Section ----------------
st.markdown("### LLM-Based Anomaly Detection (specific column)")
selected_column = st.selectbox("Select a column to analyze with LLM:", df.columns)
if st.button("Run LLM Anomaly Detection on selected column"):
with st.spinner("Analyzing column with LLM..."):
values = df[selected_column].dropna().tolist()
values = values[:500] # Token safety
value_list_with_index = [
{"index": idx, "value": str(val)} for idx, val in enumerate(values)
]
prompt = PROMPT_INSTRUCTIONS_TEXT + "\n\nVALUES:\n" + json.dumps(value_list_with_index, indent=2)
result = query_openai(prompt)
if "anomalies" in result:
st.success(f"LLM found {len(result['anomalies'])} anomalies in `{selected_column}`.")
st.dataframe(pd.json_normalize(result["anomalies"]), use_container_width=True)
else:
st.warning("No anomalies found or invalid response from LLM.")
st.subheader("Raw Model Output")
st.json(result)
else:
st.info("Please upload a CSV or click the sample button to begin.") |