""" Fraud Detection Engine - Hugging Face Spaces Version This version is configured for deployment on Hugging Face Spaces. Data files should be placed in the 'data/' directory within the Space. """ import gradio as gr import json import os import re import pandas as pd from datetime import datetime from typing import Optional, Dict, List, Any # ============================================================================ # CONFIGURATION # ============================================================================ # For HF Spaces, use relative paths BASE_DATA_PATH = os.environ.get("DATA_PATH", "data") FINANCIAL_DATA_CSV = os.path.join(BASE_DATA_PATH, "Financial Data.csv") BENEISH_MSCORE_CSV = os.path.join(BASE_DATA_PATH, "Beneish M-score - Sheet1.csv") ZSCORE_DATA_CSV = os.path.join(BASE_DATA_PATH, "Z-score data.csv") COMPANY_TICKERS_JSON = os.path.join(BASE_DATA_PATH, "company_tickers.json") FILINGS_DIR = os.path.join(BASE_DATA_PATH, "filings") MSCORE_MANIPULATION_THRESHOLD = -1.78 ZSCORE_SAFE_THRESHOLD = 2.99 ZSCORE_GREY_THRESHOLD = 1.81 # ============================================================================ # SIMPLIFIED DATA LOADER # ============================================================================ class DataLoader: """Simplified data loader for HF Spaces deployment.""" _financial_df: Optional[pd.DataFrame] = None _mscore_df: Optional[pd.DataFrame] = None _zscore_df: Optional[pd.DataFrame] = None _company_tickers: Optional[Dict] = None @classmethod def _load_financial_csv(cls) -> pd.DataFrame: if cls._financial_df is None: if os.path.exists(FINANCIAL_DATA_CSV): cls._financial_df = pd.read_csv(FINANCIAL_DATA_CSV) cls._financial_df['cik_normalized'] = pd.to_numeric( cls._financial_df['cik'], errors='coerce' ).astype('Int64') else: cls._financial_df = pd.DataFrame() return cls._financial_df @classmethod def _load_mscore_csv(cls) -> pd.DataFrame: if cls._mscore_df is None: if os.path.exists(BENEISH_MSCORE_CSV): cls._mscore_df = pd.read_csv(BENEISH_MSCORE_CSV) if 'CIK Numbers' in cls._mscore_df.columns: cls._mscore_df['cik_normalized'] = pd.to_numeric( cls._mscore_df['CIK Numbers'], errors='coerce' ).astype('Int64') else: cls._mscore_df = pd.DataFrame() return cls._mscore_df @classmethod def _load_zscore_csv(cls) -> pd.DataFrame: if cls._zscore_df is None: if os.path.exists(ZSCORE_DATA_CSV): cls._zscore_df = pd.read_csv(ZSCORE_DATA_CSV) cls._zscore_df['cik_normalized'] = pd.to_numeric( cls._zscore_df['cik'], errors='coerce' ).astype('Int64') else: cls._zscore_df = pd.DataFrame() return cls._zscore_df @classmethod def load_company_tickers(cls) -> Dict[int, Dict[str, str]]: if cls._company_tickers is None: if os.path.exists(COMPANY_TICKERS_JSON): with open(COMPANY_TICKERS_JSON, 'r') as f: raw = json.load(f) cls._company_tickers = {} for idx, company in raw.items(): cik = company.get('cik_str') if cik: cls._company_tickers[int(cik)] = { "ticker": company.get('ticker', ''), "title": company.get('title', '') } else: cls._company_tickers = {} return cls._company_tickers @classmethod def get_company_info(cls, cik: int) -> Dict[str, str]: tickers = cls.load_company_tickers() return tickers.get(cik, {"ticker": "Unknown", "title": "Unknown"}) @classmethod def get_available_companies(cls) -> List[tuple]: """Get list of companies with available data.""" tickers = cls.load_company_tickers() mscore_df = cls._load_mscore_csv() if mscore_df.empty: # Return sample if no data return [("DEMO - Sample Company", 0)] available_ciks = set(mscore_df['cik_normalized'].dropna().astype(int).tolist()) choices = [] for cik in available_ciks: info = tickers.get(cik, {}) ticker = info.get('ticker', 'UNK') name = info.get('title', 'Unknown') if ticker and ticker != 'Unknown': choices.append((f"{ticker} - {name[:40]}", cik)) return sorted(choices, key=lambda x: x[0]) if choices else [("DEMO - Sample Company", 0)] @classmethod def get_precomputed_mscore(cls, cik: int) -> Optional[float]: df = cls._load_mscore_csv() if df.empty: return None row = df[df['cik_normalized'] == cik] if row.empty: return None m_score = row.iloc[0].get('m_score') return float(m_score) if pd.notna(m_score) else None @classmethod def get_zscore_inputs(cls, cik: int) -> Optional[Dict[str, float]]: df = cls._load_zscore_csv() if df.empty: return None row = df[df['cik_normalized'] == cik] if row.empty: return None row = row.iloc[-1] try: at = float(row.get('at', 0) or 0) if at == 0: return None act = float(row.get('act', 0) or 0) lct = float(row.get('lct', 0) or 0) re_val = float(row.get('re', 0) or 0) ebit = float(row.get('ebit', 0) or 0) revt = float(row.get('revt', 0) or 0) csho = float(row.get('csho', 0) or 0) prcc_f = float(row.get('prcc_f', 0) or 0) mve = csho * prcc_f tl = lct if lct > 0 else at * 0.5 return { "x1": (act - lct) / at, "x2": re_val / at, "x3": ebit / at, "x4": mve / tl if tl > 0 else 0, "x5": revt / at } except (ValueError, TypeError): return None # ============================================================================ # AGENTS # ============================================================================ class FinancialAgent: def __init__(self): self.name = "Financial Analysis Agent" def calculate_beneish_m_score(self, data: dict) -> dict: try: m_score = ( -4.84 + 0.92 * data.get('dsri', 0) + 0.52 * data.get('gmi', 0) + 0.71 * data.get('aqi', 0) + 0.20 * data.get('sgi', 0) + 0.11 * data.get('depi', 0) - 0.17 * data.get('sgai', 0) + 4.67 * data.get('tata', 0) - 0.32 * data.get('lvgi', 0) ) risk_flag = m_score > MSCORE_MANIPULATION_THRESHOLD return { "m_score": round(m_score, 4), "risk_flag": risk_flag, "details": "High probability of manipulation" if risk_flag else "Low probability of manipulation" } except Exception as e: return {"error": str(e)} def calculate_altman_z_score(self, data: dict) -> dict: try: z_score = ( 1.2 * data.get('x1', 0) + 1.4 * data.get('x2', 0) + 3.3 * data.get('x3', 0) + 0.6 * data.get('x4', 0) + 1.0 * data.get('x5', 0) ) if z_score > ZSCORE_SAFE_THRESHOLD: status = "Safe Zone" elif z_score > ZSCORE_GREY_THRESHOLD: status = "Grey Zone" else: status = "Distress Zone" return {"z_score": round(z_score, 4), "status": status} except Exception as e: return {"error": str(e)} class RiskAgent: def __init__(self): self.name = "Risk Assessment Agent" def calculate_final_risk(self, financial_results: dict, text_results: dict) -> dict: risk_score = 0 reasons = [] if financial_results.get("risk_flag"): risk_score += 40 reasons.append("Beneish M-Score indicates manipulation risk") z_status = financial_results.get("altman_z", {}).get("status") if z_status == "Distress Zone": risk_score += 30 reasons.append("Altman Z-Score indicates financial distress") elif z_status == "Grey Zone": risk_score += 15 reasons.append("Altman Z-Score in Grey Zone") obfuscation = text_results.get("obfuscation_score", 0) if obfuscation > 0.7: risk_score += 30 reasons.append(f"High managerial obfuscation (Score: {obfuscation:.2f})") elif obfuscation > 0.4: risk_score += 10 if risk_score > 70: risk_level = "CRITICAL" elif risk_score > 40: risk_level = "HIGH" elif risk_score > 20: risk_level = "MODERATE" else: risk_level = "LOW" return { "total_risk_score": risk_score, "risk_level": risk_level, "key_factors": reasons } # ============================================================================ # ANALYSIS FUNCTION # ============================================================================ fin_agent = FinancialAgent() risk_agent = RiskAgent() def analyze_company(cik_selection): """Run fraud detection analysis on selected company.""" if not cik_selection: return "Please select a company", "", "", "" cik = int(cik_selection) # Demo mode if cik == 0: return demo_analysis() company_info = DataLoader.get_company_info(cik) # Financial Analysis m_score_val = DataLoader.get_precomputed_mscore(cik) if m_score_val is not None: m_score_result = { "m_score": round(m_score_val, 4), "risk_flag": m_score_val > MSCORE_MANIPULATION_THRESHOLD, "details": "High probability of manipulation" if m_score_val > MSCORE_MANIPULATION_THRESHOLD else "Low probability of manipulation" } else: m_score_result = {"m_score": None, "risk_flag": False, "details": "Data not available"} zscore_inputs = DataLoader.get_zscore_inputs(cik) if zscore_inputs: z_score_result = fin_agent.calculate_altman_z_score(zscore_inputs) else: z_score_result = {"z_score": None, "status": "Unknown"} financial_results = { "beneish_m": m_score_result, "altman_z": z_score_result, "risk_flag": m_score_result.get("risk_flag", False) or (z_score_result.get("status") == "Distress Zone") } # Simplified text results (no 10-K file analysis in HF version) text_results = { "obfuscation_score": 0.3, # Placeholder "note": "Text analysis requires 10-K filing upload" } # Risk Assessment final_report = risk_agent.calculate_final_risk(financial_results, text_results) # Format outputs company_header = f"""## {company_info['ticker']} - {company_info['title']} **CIK:** {cik} """ m_val = m_score_result.get('m_score', 'N/A') m_flag = "HIGH RISK" if m_score_result.get('risk_flag') else "Normal" z_val = z_score_result.get('z_score', 'N/A') z_status = z_score_result.get('status', 'Unknown') financial_output = f"""### Beneish M-Score **Score:** {m_val} **Status:** {m_flag} **Interpretation:** {m_score_result.get('details', 'N/A')} > M-Score > -1.78 indicates high probability of earnings manipulation ### Altman Z-Score **Score:** {z_val} **Status:** {z_status} > Safe Zone (>2.99) | Grey Zone (1.81-2.99) | Distress Zone (<1.81) """ text_output = """### MD&A Analysis **Status:** Not available in demo > Upload 10-K filings for full text analysis """ risk_level = final_report['risk_level'] risk_score = final_report['total_risk_score'] risk_output = f"""## FRAUD RISK ASSESSMENT ### Risk Level: {risk_level} ### Total Score: {risk_score}/100 **Key Risk Factors:** """ factors = final_report.get('key_factors', []) if factors: risk_output += "\n".join([f"- {f}" for f in factors]) else: risk_output += "- No significant risk factors identified" return company_header, financial_output, text_output, risk_output def demo_analysis(): """Return demo analysis when no real data is available.""" company_header = """## DEMO - Sample Analysis **Note:** This is a demonstration with sample data. Upload your data files to analyze real companies. """ financial_output = """### Beneish M-Score (Demo) **Score:** -2.45 **Status:** Normal **Interpretation:** Low probability of manipulation ### Altman Z-Score (Demo) **Score:** 3.25 **Status:** Safe Zone """ text_output = """### MD&A Analysis (Demo) **Obfuscation Score:** 0.35 **Flagged Phrases:** 3 found - "challenging market conditions" - "strategic realignment" - "factors beyond our control" """ risk_output = """## FRAUD RISK ASSESSMENT (Demo) ### Risk Level: LOW ### Total Score: 15/100 **Key Risk Factors:** - No significant risk factors in this demo """ return company_header, financial_output, text_output, risk_output # ============================================================================ # GRADIO INTERFACE # ============================================================================ with gr.Blocks(title="Fraud Detection Engine") as demo: gr.Markdown(""" # Fraud Detection Engine Analyzes SEC 10-K filings to identify risks of financial statement manipulation using: - **Beneish M-Score** - Earnings manipulation detection - **Altman Z-Score** - Financial distress assessment - **MD&A Text Analysis** - Managerial obfuscation detection """) with gr.Row(): company_dropdown = gr.Dropdown( choices=DataLoader.get_available_companies(), label="Select Company", info="Companies with available financial data" ) analyze_btn = gr.Button("Analyze", variant="primary") company_info = gr.Markdown() with gr.Row(): with gr.Column(): financial_output = gr.Markdown(label="Financial Analysis") with gr.Column(): text_output = gr.Markdown(label="Text Analysis") risk_output = gr.Markdown() analyze_btn.click( fn=analyze_company, inputs=[company_dropdown], outputs=[company_info, financial_output, text_output, risk_output] ) gr.Markdown(""" --- **Methodology:** Based on Beneish (1999) M-Score and Altman (1968) Z-Score models. **Data Requirements:** Place CSV files in `data/` directory: - `Financial Data.csv` - `Beneish M-score - Sheet1.csv` - `Z-score data.csv` - `company_tickers.json` """) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)