| | """ |
| | Fraud Detection Engine - Hugging Face Spaces Version |
| | |
| | This version is configured for deployment on Hugging Face Spaces. |
| | Data files should be placed in the 'data/' directory within the Space. |
| | """ |
| | import gradio as gr |
| | import json |
| | import os |
| | import re |
| | import pandas as pd |
| | from datetime import datetime |
| | from typing import Optional, Dict, List, Any |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | BASE_DATA_PATH = os.environ.get("DATA_PATH", "data") |
| | FINANCIAL_DATA_CSV = os.path.join(BASE_DATA_PATH, "Financial Data.csv") |
| | BENEISH_MSCORE_CSV = os.path.join(BASE_DATA_PATH, "Beneish M-score - Sheet1.csv") |
| | ZSCORE_DATA_CSV = os.path.join(BASE_DATA_PATH, "Z-score data.csv") |
| | COMPANY_TICKERS_JSON = os.path.join(BASE_DATA_PATH, "company_tickers.json") |
| | FILINGS_DIR = os.path.join(BASE_DATA_PATH, "filings") |
| |
|
| | MSCORE_MANIPULATION_THRESHOLD = -1.78 |
| | ZSCORE_SAFE_THRESHOLD = 2.99 |
| | ZSCORE_GREY_THRESHOLD = 1.81 |
| |
|
| | |
| | |
| | |
| |
|
| | class DataLoader: |
| | """Simplified data loader for HF Spaces deployment.""" |
| |
|
| | _financial_df: Optional[pd.DataFrame] = None |
| | _mscore_df: Optional[pd.DataFrame] = None |
| | _zscore_df: Optional[pd.DataFrame] = None |
| | _company_tickers: Optional[Dict] = None |
| |
|
| | @classmethod |
| | def _load_financial_csv(cls) -> pd.DataFrame: |
| | if cls._financial_df is None: |
| | if os.path.exists(FINANCIAL_DATA_CSV): |
| | cls._financial_df = pd.read_csv(FINANCIAL_DATA_CSV) |
| | cls._financial_df['cik_normalized'] = pd.to_numeric( |
| | cls._financial_df['cik'], errors='coerce' |
| | ).astype('Int64') |
| | else: |
| | cls._financial_df = pd.DataFrame() |
| | return cls._financial_df |
| |
|
| | @classmethod |
| | def _load_mscore_csv(cls) -> pd.DataFrame: |
| | if cls._mscore_df is None: |
| | if os.path.exists(BENEISH_MSCORE_CSV): |
| | cls._mscore_df = pd.read_csv(BENEISH_MSCORE_CSV) |
| | if 'CIK Numbers' in cls._mscore_df.columns: |
| | cls._mscore_df['cik_normalized'] = pd.to_numeric( |
| | cls._mscore_df['CIK Numbers'], errors='coerce' |
| | ).astype('Int64') |
| | else: |
| | cls._mscore_df = pd.DataFrame() |
| | return cls._mscore_df |
| |
|
| | @classmethod |
| | def _load_zscore_csv(cls) -> pd.DataFrame: |
| | if cls._zscore_df is None: |
| | if os.path.exists(ZSCORE_DATA_CSV): |
| | cls._zscore_df = pd.read_csv(ZSCORE_DATA_CSV) |
| | cls._zscore_df['cik_normalized'] = pd.to_numeric( |
| | cls._zscore_df['cik'], errors='coerce' |
| | ).astype('Int64') |
| | else: |
| | cls._zscore_df = pd.DataFrame() |
| | return cls._zscore_df |
| |
|
| | @classmethod |
| | def load_company_tickers(cls) -> Dict[int, Dict[str, str]]: |
| | if cls._company_tickers is None: |
| | if os.path.exists(COMPANY_TICKERS_JSON): |
| | with open(COMPANY_TICKERS_JSON, 'r') as f: |
| | raw = json.load(f) |
| | cls._company_tickers = {} |
| | for idx, company in raw.items(): |
| | cik = company.get('cik_str') |
| | if cik: |
| | cls._company_tickers[int(cik)] = { |
| | "ticker": company.get('ticker', ''), |
| | "title": company.get('title', '') |
| | } |
| | else: |
| | cls._company_tickers = {} |
| | return cls._company_tickers |
| |
|
| | @classmethod |
| | def get_company_info(cls, cik: int) -> Dict[str, str]: |
| | tickers = cls.load_company_tickers() |
| | return tickers.get(cik, {"ticker": "Unknown", "title": "Unknown"}) |
| |
|
| | @classmethod |
| | def get_available_companies(cls) -> List[tuple]: |
| | """Get list of companies with available data.""" |
| | tickers = cls.load_company_tickers() |
| | mscore_df = cls._load_mscore_csv() |
| |
|
| | if mscore_df.empty: |
| | |
| | return [("DEMO - Sample Company", 0)] |
| |
|
| | available_ciks = set(mscore_df['cik_normalized'].dropna().astype(int).tolist()) |
| |
|
| | choices = [] |
| | for cik in available_ciks: |
| | info = tickers.get(cik, {}) |
| | ticker = info.get('ticker', 'UNK') |
| | name = info.get('title', 'Unknown') |
| | if ticker and ticker != 'Unknown': |
| | choices.append((f"{ticker} - {name[:40]}", cik)) |
| |
|
| | return sorted(choices, key=lambda x: x[0]) if choices else [("DEMO - Sample Company", 0)] |
| |
|
| | @classmethod |
| | def get_precomputed_mscore(cls, cik: int) -> Optional[float]: |
| | df = cls._load_mscore_csv() |
| | if df.empty: |
| | return None |
| | row = df[df['cik_normalized'] == cik] |
| | if row.empty: |
| | return None |
| | m_score = row.iloc[0].get('m_score') |
| | return float(m_score) if pd.notna(m_score) else None |
| |
|
| | @classmethod |
| | def get_zscore_inputs(cls, cik: int) -> Optional[Dict[str, float]]: |
| | df = cls._load_zscore_csv() |
| | if df.empty: |
| | return None |
| | row = df[df['cik_normalized'] == cik] |
| | if row.empty: |
| | return None |
| | row = row.iloc[-1] |
| |
|
| | try: |
| | at = float(row.get('at', 0) or 0) |
| | if at == 0: |
| | return None |
| |
|
| | act = float(row.get('act', 0) or 0) |
| | lct = float(row.get('lct', 0) or 0) |
| | re_val = float(row.get('re', 0) or 0) |
| | ebit = float(row.get('ebit', 0) or 0) |
| | revt = float(row.get('revt', 0) or 0) |
| | csho = float(row.get('csho', 0) or 0) |
| | prcc_f = float(row.get('prcc_f', 0) or 0) |
| |
|
| | mve = csho * prcc_f |
| | tl = lct if lct > 0 else at * 0.5 |
| |
|
| | return { |
| | "x1": (act - lct) / at, |
| | "x2": re_val / at, |
| | "x3": ebit / at, |
| | "x4": mve / tl if tl > 0 else 0, |
| | "x5": revt / at |
| | } |
| | except (ValueError, TypeError): |
| | return None |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class FinancialAgent: |
| | def __init__(self): |
| | self.name = "Financial Analysis Agent" |
| |
|
| | def calculate_beneish_m_score(self, data: dict) -> dict: |
| | try: |
| | m_score = ( |
| | -4.84 |
| | + 0.92 * data.get('dsri', 0) |
| | + 0.52 * data.get('gmi', 0) |
| | + 0.71 * data.get('aqi', 0) |
| | + 0.20 * data.get('sgi', 0) |
| | + 0.11 * data.get('depi', 0) |
| | - 0.17 * data.get('sgai', 0) |
| | + 4.67 * data.get('tata', 0) |
| | - 0.32 * data.get('lvgi', 0) |
| | ) |
| | risk_flag = m_score > MSCORE_MANIPULATION_THRESHOLD |
| | return { |
| | "m_score": round(m_score, 4), |
| | "risk_flag": risk_flag, |
| | "details": "High probability of manipulation" if risk_flag else "Low probability of manipulation" |
| | } |
| | except Exception as e: |
| | return {"error": str(e)} |
| |
|
| | def calculate_altman_z_score(self, data: dict) -> dict: |
| | try: |
| | z_score = ( |
| | 1.2 * data.get('x1', 0) |
| | + 1.4 * data.get('x2', 0) |
| | + 3.3 * data.get('x3', 0) |
| | + 0.6 * data.get('x4', 0) |
| | + 1.0 * data.get('x5', 0) |
| | ) |
| | if z_score > ZSCORE_SAFE_THRESHOLD: |
| | status = "Safe Zone" |
| | elif z_score > ZSCORE_GREY_THRESHOLD: |
| | status = "Grey Zone" |
| | else: |
| | status = "Distress Zone" |
| | return {"z_score": round(z_score, 4), "status": status} |
| | except Exception as e: |
| | return {"error": str(e)} |
| |
|
| |
|
| | class RiskAgent: |
| | def __init__(self): |
| | self.name = "Risk Assessment Agent" |
| |
|
| | def calculate_final_risk(self, financial_results: dict, text_results: dict) -> dict: |
| | risk_score = 0 |
| | reasons = [] |
| |
|
| | if financial_results.get("risk_flag"): |
| | risk_score += 40 |
| | reasons.append("Beneish M-Score indicates manipulation risk") |
| |
|
| | z_status = financial_results.get("altman_z", {}).get("status") |
| | if z_status == "Distress Zone": |
| | risk_score += 30 |
| | reasons.append("Altman Z-Score indicates financial distress") |
| | elif z_status == "Grey Zone": |
| | risk_score += 15 |
| | reasons.append("Altman Z-Score in Grey Zone") |
| |
|
| | obfuscation = text_results.get("obfuscation_score", 0) |
| | if obfuscation > 0.7: |
| | risk_score += 30 |
| | reasons.append(f"High managerial obfuscation (Score: {obfuscation:.2f})") |
| | elif obfuscation > 0.4: |
| | risk_score += 10 |
| |
|
| | if risk_score > 70: |
| | risk_level = "CRITICAL" |
| | elif risk_score > 40: |
| | risk_level = "HIGH" |
| | elif risk_score > 20: |
| | risk_level = "MODERATE" |
| | else: |
| | risk_level = "LOW" |
| |
|
| | return { |
| | "total_risk_score": risk_score, |
| | "risk_level": risk_level, |
| | "key_factors": reasons |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | fin_agent = FinancialAgent() |
| | risk_agent = RiskAgent() |
| |
|
| |
|
| | def analyze_company(cik_selection): |
| | """Run fraud detection analysis on selected company.""" |
| | if not cik_selection: |
| | return "Please select a company", "", "", "" |
| |
|
| | cik = int(cik_selection) |
| |
|
| | |
| | if cik == 0: |
| | return demo_analysis() |
| |
|
| | company_info = DataLoader.get_company_info(cik) |
| |
|
| | |
| | m_score_val = DataLoader.get_precomputed_mscore(cik) |
| |
|
| | if m_score_val is not None: |
| | m_score_result = { |
| | "m_score": round(m_score_val, 4), |
| | "risk_flag": m_score_val > MSCORE_MANIPULATION_THRESHOLD, |
| | "details": "High probability of manipulation" if m_score_val > MSCORE_MANIPULATION_THRESHOLD else "Low probability of manipulation" |
| | } |
| | else: |
| | m_score_result = {"m_score": None, "risk_flag": False, "details": "Data not available"} |
| |
|
| | zscore_inputs = DataLoader.get_zscore_inputs(cik) |
| | if zscore_inputs: |
| | z_score_result = fin_agent.calculate_altman_z_score(zscore_inputs) |
| | else: |
| | z_score_result = {"z_score": None, "status": "Unknown"} |
| |
|
| | financial_results = { |
| | "beneish_m": m_score_result, |
| | "altman_z": z_score_result, |
| | "risk_flag": m_score_result.get("risk_flag", False) or (z_score_result.get("status") == "Distress Zone") |
| | } |
| |
|
| | |
| | text_results = { |
| | "obfuscation_score": 0.3, |
| | "note": "Text analysis requires 10-K filing upload" |
| | } |
| |
|
| | |
| | final_report = risk_agent.calculate_final_risk(financial_results, text_results) |
| |
|
| | |
| | company_header = f"""## {company_info['ticker']} - {company_info['title']} |
| | **CIK:** {cik} |
| | """ |
| |
|
| | m_val = m_score_result.get('m_score', 'N/A') |
| | m_flag = "HIGH RISK" if m_score_result.get('risk_flag') else "Normal" |
| | z_val = z_score_result.get('z_score', 'N/A') |
| | z_status = z_score_result.get('status', 'Unknown') |
| |
|
| | financial_output = f"""### Beneish M-Score |
| | **Score:** {m_val} |
| | **Status:** {m_flag} |
| | **Interpretation:** {m_score_result.get('details', 'N/A')} |
| | |
| | > M-Score > -1.78 indicates high probability of earnings manipulation |
| | |
| | ### Altman Z-Score |
| | **Score:** {z_val} |
| | **Status:** {z_status} |
| | |
| | > Safe Zone (>2.99) | Grey Zone (1.81-2.99) | Distress Zone (<1.81) |
| | """ |
| |
|
| | text_output = """### MD&A Analysis |
| | **Status:** Not available in demo |
| | |
| | > Upload 10-K filings for full text analysis |
| | """ |
| |
|
| | risk_level = final_report['risk_level'] |
| | risk_score = final_report['total_risk_score'] |
| |
|
| | risk_output = f"""## FRAUD RISK ASSESSMENT |
| | |
| | ### Risk Level: {risk_level} |
| | ### Total Score: {risk_score}/100 |
| | |
| | **Key Risk Factors:** |
| | """ |
| | factors = final_report.get('key_factors', []) |
| | if factors: |
| | risk_output += "\n".join([f"- {f}" for f in factors]) |
| | else: |
| | risk_output += "- No significant risk factors identified" |
| |
|
| | return company_header, financial_output, text_output, risk_output |
| |
|
| |
|
| | def demo_analysis(): |
| | """Return demo analysis when no real data is available.""" |
| | company_header = """## DEMO - Sample Analysis |
| | **Note:** This is a demonstration with sample data. |
| | Upload your data files to analyze real companies. |
| | """ |
| |
|
| | financial_output = """### Beneish M-Score (Demo) |
| | **Score:** -2.45 |
| | **Status:** Normal |
| | **Interpretation:** Low probability of manipulation |
| | |
| | ### Altman Z-Score (Demo) |
| | **Score:** 3.25 |
| | **Status:** Safe Zone |
| | """ |
| |
|
| | text_output = """### MD&A Analysis (Demo) |
| | **Obfuscation Score:** 0.35 |
| | **Flagged Phrases:** 3 found |
| | - "challenging market conditions" |
| | - "strategic realignment" |
| | - "factors beyond our control" |
| | """ |
| |
|
| | risk_output = """## FRAUD RISK ASSESSMENT (Demo) |
| | |
| | ### Risk Level: LOW |
| | ### Total Score: 15/100 |
| | |
| | **Key Risk Factors:** |
| | - No significant risk factors in this demo |
| | """ |
| |
|
| | return company_header, financial_output, text_output, risk_output |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | with gr.Blocks(title="Fraud Detection Engine") as demo: |
| | gr.Markdown(""" |
| | # Fraud Detection Engine |
| | |
| | Analyzes SEC 10-K filings to identify risks of financial statement manipulation using: |
| | - **Beneish M-Score** - Earnings manipulation detection |
| | - **Altman Z-Score** - Financial distress assessment |
| | - **MD&A Text Analysis** - Managerial obfuscation detection |
| | """) |
| |
|
| | with gr.Row(): |
| | company_dropdown = gr.Dropdown( |
| | choices=DataLoader.get_available_companies(), |
| | label="Select Company", |
| | info="Companies with available financial data" |
| | ) |
| | analyze_btn = gr.Button("Analyze", variant="primary") |
| |
|
| | company_info = gr.Markdown() |
| |
|
| | with gr.Row(): |
| | with gr.Column(): |
| | financial_output = gr.Markdown(label="Financial Analysis") |
| | with gr.Column(): |
| | text_output = gr.Markdown(label="Text Analysis") |
| |
|
| | risk_output = gr.Markdown() |
| |
|
| | analyze_btn.click( |
| | fn=analyze_company, |
| | inputs=[company_dropdown], |
| | outputs=[company_info, financial_output, text_output, risk_output] |
| | ) |
| |
|
| | gr.Markdown(""" |
| | --- |
| | **Methodology:** Based on Beneish (1999) M-Score and Altman (1968) Z-Score models. |
| | |
| | **Data Requirements:** Place CSV files in `data/` directory: |
| | - `Financial Data.csv` |
| | - `Beneish M-score - Sheet1.csv` |
| | - `Z-score data.csv` |
| | - `company_tickers.json` |
| | """) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | demo.launch(server_name="0.0.0.0", server_port=7860) |
| |
|