garrettJones03's picture
Upload app.py with huggingface_hub
791e439 verified
"""
Fraud Detection Engine - Hugging Face Spaces Version
This version is configured for deployment on Hugging Face Spaces.
Data files should be placed in the 'data/' directory within the Space.
"""
import gradio as gr
import json
import os
import re
import pandas as pd
from datetime import datetime
from typing import Optional, Dict, List, Any
# ============================================================================
# CONFIGURATION
# ============================================================================
# For HF Spaces, use relative paths
BASE_DATA_PATH = os.environ.get("DATA_PATH", "data")
FINANCIAL_DATA_CSV = os.path.join(BASE_DATA_PATH, "Financial Data.csv")
BENEISH_MSCORE_CSV = os.path.join(BASE_DATA_PATH, "Beneish M-score - Sheet1.csv")
ZSCORE_DATA_CSV = os.path.join(BASE_DATA_PATH, "Z-score data.csv")
COMPANY_TICKERS_JSON = os.path.join(BASE_DATA_PATH, "company_tickers.json")
FILINGS_DIR = os.path.join(BASE_DATA_PATH, "filings")
MSCORE_MANIPULATION_THRESHOLD = -1.78
ZSCORE_SAFE_THRESHOLD = 2.99
ZSCORE_GREY_THRESHOLD = 1.81
# ============================================================================
# SIMPLIFIED DATA LOADER
# ============================================================================
class DataLoader:
"""Simplified data loader for HF Spaces deployment."""
_financial_df: Optional[pd.DataFrame] = None
_mscore_df: Optional[pd.DataFrame] = None
_zscore_df: Optional[pd.DataFrame] = None
_company_tickers: Optional[Dict] = None
@classmethod
def _load_financial_csv(cls) -> pd.DataFrame:
if cls._financial_df is None:
if os.path.exists(FINANCIAL_DATA_CSV):
cls._financial_df = pd.read_csv(FINANCIAL_DATA_CSV)
cls._financial_df['cik_normalized'] = pd.to_numeric(
cls._financial_df['cik'], errors='coerce'
).astype('Int64')
else:
cls._financial_df = pd.DataFrame()
return cls._financial_df
@classmethod
def _load_mscore_csv(cls) -> pd.DataFrame:
if cls._mscore_df is None:
if os.path.exists(BENEISH_MSCORE_CSV):
cls._mscore_df = pd.read_csv(BENEISH_MSCORE_CSV)
if 'CIK Numbers' in cls._mscore_df.columns:
cls._mscore_df['cik_normalized'] = pd.to_numeric(
cls._mscore_df['CIK Numbers'], errors='coerce'
).astype('Int64')
else:
cls._mscore_df = pd.DataFrame()
return cls._mscore_df
@classmethod
def _load_zscore_csv(cls) -> pd.DataFrame:
if cls._zscore_df is None:
if os.path.exists(ZSCORE_DATA_CSV):
cls._zscore_df = pd.read_csv(ZSCORE_DATA_CSV)
cls._zscore_df['cik_normalized'] = pd.to_numeric(
cls._zscore_df['cik'], errors='coerce'
).astype('Int64')
else:
cls._zscore_df = pd.DataFrame()
return cls._zscore_df
@classmethod
def load_company_tickers(cls) -> Dict[int, Dict[str, str]]:
if cls._company_tickers is None:
if os.path.exists(COMPANY_TICKERS_JSON):
with open(COMPANY_TICKERS_JSON, 'r') as f:
raw = json.load(f)
cls._company_tickers = {}
for idx, company in raw.items():
cik = company.get('cik_str')
if cik:
cls._company_tickers[int(cik)] = {
"ticker": company.get('ticker', ''),
"title": company.get('title', '')
}
else:
cls._company_tickers = {}
return cls._company_tickers
@classmethod
def get_company_info(cls, cik: int) -> Dict[str, str]:
tickers = cls.load_company_tickers()
return tickers.get(cik, {"ticker": "Unknown", "title": "Unknown"})
@classmethod
def get_available_companies(cls) -> List[tuple]:
"""Get list of companies with available data."""
tickers = cls.load_company_tickers()
mscore_df = cls._load_mscore_csv()
if mscore_df.empty:
# Return sample if no data
return [("DEMO - Sample Company", 0)]
available_ciks = set(mscore_df['cik_normalized'].dropna().astype(int).tolist())
choices = []
for cik in available_ciks:
info = tickers.get(cik, {})
ticker = info.get('ticker', 'UNK')
name = info.get('title', 'Unknown')
if ticker and ticker != 'Unknown':
choices.append((f"{ticker} - {name[:40]}", cik))
return sorted(choices, key=lambda x: x[0]) if choices else [("DEMO - Sample Company", 0)]
@classmethod
def get_precomputed_mscore(cls, cik: int) -> Optional[float]:
df = cls._load_mscore_csv()
if df.empty:
return None
row = df[df['cik_normalized'] == cik]
if row.empty:
return None
m_score = row.iloc[0].get('m_score')
return float(m_score) if pd.notna(m_score) else None
@classmethod
def get_zscore_inputs(cls, cik: int) -> Optional[Dict[str, float]]:
df = cls._load_zscore_csv()
if df.empty:
return None
row = df[df['cik_normalized'] == cik]
if row.empty:
return None
row = row.iloc[-1]
try:
at = float(row.get('at', 0) or 0)
if at == 0:
return None
act = float(row.get('act', 0) or 0)
lct = float(row.get('lct', 0) or 0)
re_val = float(row.get('re', 0) or 0)
ebit = float(row.get('ebit', 0) or 0)
revt = float(row.get('revt', 0) or 0)
csho = float(row.get('csho', 0) or 0)
prcc_f = float(row.get('prcc_f', 0) or 0)
mve = csho * prcc_f
tl = lct if lct > 0 else at * 0.5
return {
"x1": (act - lct) / at,
"x2": re_val / at,
"x3": ebit / at,
"x4": mve / tl if tl > 0 else 0,
"x5": revt / at
}
except (ValueError, TypeError):
return None
# ============================================================================
# AGENTS
# ============================================================================
class FinancialAgent:
def __init__(self):
self.name = "Financial Analysis Agent"
def calculate_beneish_m_score(self, data: dict) -> dict:
try:
m_score = (
-4.84
+ 0.92 * data.get('dsri', 0)
+ 0.52 * data.get('gmi', 0)
+ 0.71 * data.get('aqi', 0)
+ 0.20 * data.get('sgi', 0)
+ 0.11 * data.get('depi', 0)
- 0.17 * data.get('sgai', 0)
+ 4.67 * data.get('tata', 0)
- 0.32 * data.get('lvgi', 0)
)
risk_flag = m_score > MSCORE_MANIPULATION_THRESHOLD
return {
"m_score": round(m_score, 4),
"risk_flag": risk_flag,
"details": "High probability of manipulation" if risk_flag else "Low probability of manipulation"
}
except Exception as e:
return {"error": str(e)}
def calculate_altman_z_score(self, data: dict) -> dict:
try:
z_score = (
1.2 * data.get('x1', 0)
+ 1.4 * data.get('x2', 0)
+ 3.3 * data.get('x3', 0)
+ 0.6 * data.get('x4', 0)
+ 1.0 * data.get('x5', 0)
)
if z_score > ZSCORE_SAFE_THRESHOLD:
status = "Safe Zone"
elif z_score > ZSCORE_GREY_THRESHOLD:
status = "Grey Zone"
else:
status = "Distress Zone"
return {"z_score": round(z_score, 4), "status": status}
except Exception as e:
return {"error": str(e)}
class RiskAgent:
def __init__(self):
self.name = "Risk Assessment Agent"
def calculate_final_risk(self, financial_results: dict, text_results: dict) -> dict:
risk_score = 0
reasons = []
if financial_results.get("risk_flag"):
risk_score += 40
reasons.append("Beneish M-Score indicates manipulation risk")
z_status = financial_results.get("altman_z", {}).get("status")
if z_status == "Distress Zone":
risk_score += 30
reasons.append("Altman Z-Score indicates financial distress")
elif z_status == "Grey Zone":
risk_score += 15
reasons.append("Altman Z-Score in Grey Zone")
obfuscation = text_results.get("obfuscation_score", 0)
if obfuscation > 0.7:
risk_score += 30
reasons.append(f"High managerial obfuscation (Score: {obfuscation:.2f})")
elif obfuscation > 0.4:
risk_score += 10
if risk_score > 70:
risk_level = "CRITICAL"
elif risk_score > 40:
risk_level = "HIGH"
elif risk_score > 20:
risk_level = "MODERATE"
else:
risk_level = "LOW"
return {
"total_risk_score": risk_score,
"risk_level": risk_level,
"key_factors": reasons
}
# ============================================================================
# ANALYSIS FUNCTION
# ============================================================================
fin_agent = FinancialAgent()
risk_agent = RiskAgent()
def analyze_company(cik_selection):
"""Run fraud detection analysis on selected company."""
if not cik_selection:
return "Please select a company", "", "", ""
cik = int(cik_selection)
# Demo mode
if cik == 0:
return demo_analysis()
company_info = DataLoader.get_company_info(cik)
# Financial Analysis
m_score_val = DataLoader.get_precomputed_mscore(cik)
if m_score_val is not None:
m_score_result = {
"m_score": round(m_score_val, 4),
"risk_flag": m_score_val > MSCORE_MANIPULATION_THRESHOLD,
"details": "High probability of manipulation" if m_score_val > MSCORE_MANIPULATION_THRESHOLD else "Low probability of manipulation"
}
else:
m_score_result = {"m_score": None, "risk_flag": False, "details": "Data not available"}
zscore_inputs = DataLoader.get_zscore_inputs(cik)
if zscore_inputs:
z_score_result = fin_agent.calculate_altman_z_score(zscore_inputs)
else:
z_score_result = {"z_score": None, "status": "Unknown"}
financial_results = {
"beneish_m": m_score_result,
"altman_z": z_score_result,
"risk_flag": m_score_result.get("risk_flag", False) or (z_score_result.get("status") == "Distress Zone")
}
# Simplified text results (no 10-K file analysis in HF version)
text_results = {
"obfuscation_score": 0.3, # Placeholder
"note": "Text analysis requires 10-K filing upload"
}
# Risk Assessment
final_report = risk_agent.calculate_final_risk(financial_results, text_results)
# Format outputs
company_header = f"""## {company_info['ticker']} - {company_info['title']}
**CIK:** {cik}
"""
m_val = m_score_result.get('m_score', 'N/A')
m_flag = "HIGH RISK" if m_score_result.get('risk_flag') else "Normal"
z_val = z_score_result.get('z_score', 'N/A')
z_status = z_score_result.get('status', 'Unknown')
financial_output = f"""### Beneish M-Score
**Score:** {m_val}
**Status:** {m_flag}
**Interpretation:** {m_score_result.get('details', 'N/A')}
> M-Score > -1.78 indicates high probability of earnings manipulation
### Altman Z-Score
**Score:** {z_val}
**Status:** {z_status}
> Safe Zone (>2.99) | Grey Zone (1.81-2.99) | Distress Zone (<1.81)
"""
text_output = """### MD&A Analysis
**Status:** Not available in demo
> Upload 10-K filings for full text analysis
"""
risk_level = final_report['risk_level']
risk_score = final_report['total_risk_score']
risk_output = f"""## FRAUD RISK ASSESSMENT
### Risk Level: {risk_level}
### Total Score: {risk_score}/100
**Key Risk Factors:**
"""
factors = final_report.get('key_factors', [])
if factors:
risk_output += "\n".join([f"- {f}" for f in factors])
else:
risk_output += "- No significant risk factors identified"
return company_header, financial_output, text_output, risk_output
def demo_analysis():
"""Return demo analysis when no real data is available."""
company_header = """## DEMO - Sample Analysis
**Note:** This is a demonstration with sample data.
Upload your data files to analyze real companies.
"""
financial_output = """### Beneish M-Score (Demo)
**Score:** -2.45
**Status:** Normal
**Interpretation:** Low probability of manipulation
### Altman Z-Score (Demo)
**Score:** 3.25
**Status:** Safe Zone
"""
text_output = """### MD&A Analysis (Demo)
**Obfuscation Score:** 0.35
**Flagged Phrases:** 3 found
- "challenging market conditions"
- "strategic realignment"
- "factors beyond our control"
"""
risk_output = """## FRAUD RISK ASSESSMENT (Demo)
### Risk Level: LOW
### Total Score: 15/100
**Key Risk Factors:**
- No significant risk factors in this demo
"""
return company_header, financial_output, text_output, risk_output
# ============================================================================
# GRADIO INTERFACE
# ============================================================================
with gr.Blocks(title="Fraud Detection Engine") as demo:
gr.Markdown("""
# Fraud Detection Engine
Analyzes SEC 10-K filings to identify risks of financial statement manipulation using:
- **Beneish M-Score** - Earnings manipulation detection
- **Altman Z-Score** - Financial distress assessment
- **MD&A Text Analysis** - Managerial obfuscation detection
""")
with gr.Row():
company_dropdown = gr.Dropdown(
choices=DataLoader.get_available_companies(),
label="Select Company",
info="Companies with available financial data"
)
analyze_btn = gr.Button("Analyze", variant="primary")
company_info = gr.Markdown()
with gr.Row():
with gr.Column():
financial_output = gr.Markdown(label="Financial Analysis")
with gr.Column():
text_output = gr.Markdown(label="Text Analysis")
risk_output = gr.Markdown()
analyze_btn.click(
fn=analyze_company,
inputs=[company_dropdown],
outputs=[company_info, financial_output, text_output, risk_output]
)
gr.Markdown("""
---
**Methodology:** Based on Beneish (1999) M-Score and Altman (1968) Z-Score models.
**Data Requirements:** Place CSV files in `data/` directory:
- `Financial Data.csv`
- `Beneish M-score - Sheet1.csv`
- `Z-score data.csv`
- `company_tickers.json`
""")
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)