Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| import numpy as np | |
| import json | |
| import datetime | |
| import gradio as gr | |
| from openai import OpenAI | |
| import uuid | |
| import random | |
| import time | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from typing import List, Dict, Any, Tuple | |
| import threading | |
| import queue | |
| import re | |
| from collections import defaultdict | |
| # Initialize OpenAI client | |
| def get_openai_client(): | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| raise ValueError("OpenAI API key not found. Please set the OPENAI_API_KEY environment variable.") | |
| return OpenAI(api_key=api_key) | |
| # Advanced data generation with more realistic patterns and anomalies | |
| def generate_sample_data(num_entries=100, include_patterns=True): | |
| # Expanded data for more realistic scenarios | |
| departments = ["Finance", "Sales", "Marketing", "Operations", "IT", "HR", "Legal", "R&D", "Executive", "Facilities"] | |
| accounts = [ | |
| {"code": "1000", "name": "Cash", "type": "Asset"}, | |
| {"code": "1100", "name": "Accounts Receivable", "type": "Asset"}, | |
| {"code": "1200", "name": "Inventory", "type": "Asset"}, | |
| {"code": "1300", "name": "Prepaid Expenses", "type": "Asset"}, | |
| {"code": "1500", "name": "Fixed Assets", "type": "Asset"}, | |
| {"code": "1600", "name": "Accumulated Depreciation", "type": "Asset"}, | |
| {"code": "2000", "name": "Accounts Payable", "type": "Liability"}, | |
| {"code": "2100", "name": "Accrued Liabilities", "type": "Liability"}, | |
| {"code": "2200", "name": "Short-term Loans", "type": "Liability"}, | |
| {"code": "2800", "name": "Long-term Debt", "type": "Liability"}, | |
| {"code": "3000", "name": "Revenue", "type": "Revenue"}, | |
| {"code": "3100", "name": "Service Revenue", "type": "Revenue"}, | |
| {"code": "3200", "name": "Interest Income", "type": "Revenue"}, | |
| {"code": "4000", "name": "Cost of Goods Sold", "type": "Expense"}, | |
| {"code": "5000", "name": "Salaries", "type": "Expense"}, | |
| {"code": "5100", "name": "Employee Benefits", "type": "Expense"}, | |
| {"code": "5200", "name": "Commissions", "type": "Expense"}, | |
| {"code": "6000", "name": "Utilities", "type": "Expense"}, | |
| {"code": "6100", "name": "Rent", "type": "Expense"}, | |
| {"code": "6200", "name": "Office Supplies", "type": "Expense"}, | |
| {"code": "7000", "name": "Advertising", "type": "Expense"}, | |
| {"code": "7100", "name": "Travel Expenses", "type": "Expense"}, | |
| {"code": "8000", "name": "Depreciation Expense", "type": "Expense"}, | |
| {"code": "9000", "name": "Miscellaneous", "type": "Expense"} | |
| ] | |
| # More comprehensive user list | |
| users = { | |
| "regular": ["john.doe", "jane.smith", "mike.johnson", "sara.williams", "tom.brown", "lisa.jones", "david.miller"], | |
| "power": ["alex.controller", "maria.accountant", "robert.finance", "susan.treasurer"], | |
| "admins": ["admin.finance", "system.auto"] | |
| } | |
| approvers = ["alex.manager", "lisa.supervisor", "robert.director", "emma.controller", "james.cfo"] + [None] * 3 | |
| # Create transaction IDs with better formatting | |
| transaction_ids = [f"JE-{datetime.datetime.now().strftime('%Y%m')}-{str(i+1).zfill(4)}" for i in range(num_entries)] | |
| # Generate dates within the last 90 days with realistic distributions | |
| today = datetime.datetime.now() | |
| # More entries at month-end and quarter-end | |
| month_ends = [ | |
| today.replace(day=1) - datetime.timedelta(days=1), # Last day of previous month | |
| (today.replace(day=1) - datetime.timedelta(days=1)).replace(day=1) - datetime.timedelta(days=1) # Last day of month before previous | |
| ] | |
| quarter_ends = [] | |
| for quarter in [3, 6, 9, 12]: | |
| quarter_end = today.replace(month=quarter, day=1) - datetime.timedelta(days=1) | |
| if quarter_end < today: | |
| quarter_ends.append(quarter_end) | |
| # Weight dates - more entries at month/quarter ends | |
| dates = [] | |
| for _ in range(num_entries): | |
| if random.random() < 0.25: # 25% of entries at month end | |
| base_date = random.choice(month_ends) | |
| date_offset = datetime.timedelta(days=random.randint(-1, 1)) # Around month end | |
| elif random.random() < 0.3: # 30% of remaining at quarter end | |
| if quarter_ends: | |
| base_date = random.choice(quarter_ends) | |
| date_offset = datetime.timedelta(days=random.randint(-2, 2)) # Around quarter end | |
| else: | |
| base_date = today - datetime.timedelta(days=random.randint(0, 90)) | |
| date_offset = datetime.timedelta(days=0) | |
| else: | |
| base_date = today - datetime.timedelta(days=random.randint(0, 90)) | |
| date_offset = datetime.timedelta(days=0) | |
| dates.append((base_date + date_offset).strftime('%Y-%m-%d')) | |
| # Create entries | |
| entries = [] | |
| # Track users who create suspicious entries for pattern analysis | |
| suspicious_users = set(random.sample(users["regular"] + users["power"], 2)) | |
| suspicious_users.add(random.choice(users["admins"])) | |
| # Introduce some common anomaly patterns if requested | |
| recurring_suspicious_account = random.choice(accounts) | |
| round_amount_threshold = random.choice([1000, 5000, 10000]) | |
| for i in range(num_entries): | |
| # Determine if this is a potentially suspicious entry | |
| is_suspicious = random.random() < 0.15 # 15% chance of being suspicious | |
| # Add more suspicious patterns | |
| if include_patterns: | |
| # Users who repeatedly make suspicious entries | |
| if transaction_ids[i][-3:] in ["050", "100", "150"] or random.random() < 0.05: | |
| created_by = random.choice(list(suspicious_users)) | |
| is_suspicious = True | |
| else: | |
| user_type = "admins" if random.random() < 0.1 else ("power" if random.random() < 0.3 else "regular") | |
| created_by = random.choice(users[user_type]) | |
| else: | |
| user_type = "admins" if random.random() < 0.1 else ("power" if random.random() < 0.3 else "regular") | |
| created_by = random.choice(users[user_type]) | |
| # Add some patterns that might trigger alerts | |
| manual_override = is_suspicious or random.random() < 0.15 | |
| missing_approval = is_suspicious or (manual_override and random.random() < 0.3) | |
| # Create a journal entry | |
| num_lines = random.randint(2, 6) | |
| # Set the total amount - introduce suspicious round amounts | |
| if is_suspicious and include_patterns and random.random() < 0.4: | |
| total_amount = float(round_amount_threshold * random.randint(1, 5)) | |
| else: | |
| total_amount = round(random.uniform(1000, 50000), 2) | |
| # Create balanced or unbalanced entries | |
| if is_suspicious and random.random() < 0.3: | |
| # Unbalanced entry | |
| imbalance = round(random.uniform(0.01, 100), 2) | |
| if random.random() < 0.5: | |
| debits = total_amount + imbalance | |
| credits = total_amount | |
| else: | |
| debits = total_amount | |
| credits = total_amount + imbalance | |
| else: | |
| # Balanced entry | |
| debits = total_amount | |
| credits = total_amount | |
| # Split the amounts among the lines | |
| debit_amounts = [] | |
| credit_amounts = [] | |
| # Distribute amount across lines | |
| for j in range(num_lines - 1): | |
| if j < num_lines // 2: | |
| debit_amounts.append(round(debits * random.uniform(0.1, 0.5), 2)) | |
| else: | |
| credit_amounts.append(round(credits * random.uniform(0.1, 0.5), 2)) | |
| # Add the final balancing amounts | |
| debit_amounts.append(round(debits - sum(debit_amounts), 2)) | |
| credit_amounts.append(round(credits - sum(credit_amounts), 2)) | |
| # Generate timestamps with realistic distributions | |
| date = dates[i] | |
| # Determine if entry is after hours | |
| after_hours = False | |
| is_weekend = datetime.datetime.strptime(date, '%Y-%m-%d').weekday() >= 5 | |
| if is_suspicious and include_patterns and random.random() < 0.6: | |
| # Suspicious timing patterns | |
| if random.random() < 0.5: | |
| # Late night entry | |
| entry_time = random.randint(19, 23) | |
| after_hours = True | |
| else: | |
| # Early morning | |
| entry_time = random.randint(0, 6) | |
| after_hours = True | |
| elif is_weekend: | |
| # Weekend entries are distributed throughout the day but fewer | |
| entry_time = random.randint(8, 19) | |
| after_hours = entry_time < 9 or entry_time > 17 | |
| else: | |
| # Normal business hours with some natural distribution | |
| business_hours_weight = 0.9 # 90% during business hours | |
| if random.random() < business_hours_weight: | |
| entry_time = random.randint(9, 17) | |
| else: | |
| entry_time = random.randint(7, 8) if random.random() < 0.5 else random.randint(18, 20) | |
| after_hours = True | |
| timestamp = f"{date} {entry_time:02d}:{random.randint(0, 59):02d}:{random.randint(0, 59):02d}" | |
| # Create the journal entry lines | |
| entry_lines = [] | |
| # Suspicious pattern - same account used frequently | |
| if is_suspicious and include_patterns and random.random() < 0.4: | |
| suspicious_account = recurring_suspicious_account | |
| else: | |
| suspicious_account = None | |
| # Set approver - more realistic approval patterns | |
| if missing_approval: | |
| approver = None | |
| elif manual_override and random.random() < 0.7: | |
| # Higher level approvals for manual overrides | |
| approver = random.choice(approvers[:2]) # Higher-level approvers | |
| else: | |
| approver = random.choice(approvers) | |
| # Description logic - more varied and realistic | |
| if is_suspicious and include_patterns: | |
| if random.random() < 0.4: | |
| description_base = random.choice([ | |
| "Adjustment entry", | |
| "Correction", | |
| "Reclassification", | |
| "Period-end adjustment", | |
| "Balance correction" | |
| ]) | |
| else: | |
| description_base = random.choice([ | |
| "Payment processing", | |
| "Invoice payment", | |
| "Expense recording", | |
| "Revenue recognition", | |
| "Asset acquisition", | |
| "Standard journal entry", | |
| "Monthly accrual", | |
| "Prepaid amortization" | |
| ]) | |
| else: | |
| description_base = random.choice([ | |
| "Payment processing", | |
| "Invoice payment", | |
| "Expense recording", | |
| "Revenue recognition", | |
| "Asset acquisition", | |
| "Standard journal entry", | |
| "Monthly accrual", | |
| "Prepaid amortization" | |
| ]) | |
| # Add more context to description | |
| description = f"{description_base} - {random.choice(departments)}" | |
| if manual_override: | |
| if random.random() < 0.6: | |
| description += " (manual override)" | |
| # Special suspicious pattern - splitting to stay under approval thresholds | |
| threshold_splitting = is_suspicious and include_patterns and random.random() < 0.2 | |
| for j in range(num_lines): | |
| if j < len(debit_amounts): | |
| amount = debit_amounts[j] | |
| entry_type = "Debit" | |
| else: | |
| amount = credit_amounts[j - len(debit_amounts)] | |
| entry_type = "Credit" | |
| # Account selection logic | |
| if suspicious_account and random.random() < 0.7: | |
| account = suspicious_account | |
| elif threshold_splitting and entry_type == "Debit": | |
| # Target expense accounts for threshold splitting | |
| filtered_accounts = [acc for acc in accounts if acc["type"] == "Expense"] | |
| account = random.choice(filtered_accounts) | |
| else: | |
| account = random.choice(accounts) | |
| # Notes field | |
| if manual_override: | |
| notes_options = [ | |
| f"Adjustment required due to reconciliation", | |
| f"Manual correction approved by {random.choice(approvers[:2]) if approver else 'pending'}", | |
| f"System error correction", | |
| f"Special handling required", | |
| "" | |
| ] | |
| notes = random.choice(notes_options) | |
| else: | |
| notes = "" | |
| # Add document references for more realism | |
| doc_ref = "" | |
| if random.random() < 0.7: | |
| ref_type = random.choice(["INV", "PO", "CONT", "REQ", "BILL"]) | |
| ref_num = random.randint(10000, 99999) | |
| doc_ref = f"{ref_type}-{ref_num}" | |
| # Additional suspicious patterns | |
| same_user_approval = False | |
| if approver and approver == created_by and is_suspicious: | |
| same_user_approval = True | |
| entry_line = { | |
| "transaction_id": transaction_ids[i], | |
| "line_id": j + 1, | |
| "timestamp": timestamp, | |
| "account_code": account["code"], | |
| "account_name": account["name"], | |
| "account_type": account["type"], | |
| "department": random.choice(departments), | |
| "amount": amount, | |
| "type": entry_type, | |
| "description": description, | |
| "created_by": created_by, | |
| "approved_by": approver, | |
| "manual_override": manual_override, | |
| "notes": notes, | |
| "document_reference": doc_ref, | |
| "after_hours": after_hours, | |
| "weekend": is_weekend, | |
| "same_user_approval": same_user_approval | |
| } | |
| entries.append(entry_line) | |
| # Convert to DataFrame | |
| df = pd.DataFrame(entries) | |
| return df | |
| # Dependency injection pattern for better testability and flexibility | |
| class JournalAnalyzer: | |
| def __init__(self, openai_client=None, model="gpt-4o", temperature=0.2): | |
| self.client = openai_client | |
| self.model = model | |
| self.temperature = temperature | |
| def set_client(self, client): | |
| self.client = client | |
| def analyze_journal_entry(self, entry_data, metadata=None): | |
| if not self.client: | |
| raise ValueError("OpenAI client not initialized. Call set_client() first.") | |
| # Prepare transaction metadata | |
| if metadata is None: | |
| metadata = {} | |
| # Create prompt with more detailed analysis guidelines | |
| prompt = f""" | |
| Analyze the following journal entry for potential issues, anomalies, or fraud indicators: | |
| Transaction ID: {entry_data['transaction_id']} | |
| Timestamp: {entry_data['timestamp']} | |
| Created by: {entry_data['created_by']} | |
| Approved by: {entry_data['approved_by'] if entry_data['approved_by'] else 'NOT APPROVED'} | |
| Manual Override: {'Yes' if entry_data['manual_override'] else 'No'} | |
| Journal Entry Lines: | |
| """ | |
| # Add each line to the prompt | |
| for i, line in enumerate(entry_data['lines']): | |
| prompt += f""" | |
| Line {i+1}: | |
| - Account: {line['account_code']} - {line['account_name']} ({line['account_type']}) | |
| - Department: {line['department']} | |
| - Amount: ${line['amount']:.2f} ({line['type']}) | |
| - Description: {line['description']} | |
| - Document Reference: {line.get('document_reference', 'N/A')} | |
| """ | |
| # Add notes if present | |
| notes = [line.get('notes', '') for line in entry_data['lines'] if line.get('notes')] | |
| if notes: | |
| prompt += "\nNotes:\n" + "\n".join([f"- {note}" for note in notes if note]) | |
| # Add metadata about the entry | |
| prompt += f""" | |
| Additional Metadata: | |
| - Entry Created: {'After hours' if any(line.get('after_hours', False) for line in entry_data['lines']) else 'During business hours'} | |
| - Entry Day: {'Weekend' if any(line.get('weekend', False) for line in entry_data['lines']) else 'Weekday'} | |
| - Debit Total: ${metadata.get('debit_total', 0):.2f} | |
| - Credit Total: ${metadata.get('credit_total', 0):.2f} | |
| - Balance: ${metadata.get('balance', 0):.2f} | |
| - Same user approval: {'Yes' if any(line.get('same_user_approval', False) for line in entry_data['lines']) else 'No'} | |
| """ | |
| # Add analysis guidelines | |
| prompt += """ | |
| Please identify any of the following issues: | |
| 1. Missing approvals (entries that should have approvals based on amount or account type) | |
| 2. Manual overrides that seem suspicious or lack proper documentation | |
| 3. Unusual timing (after hours, weekends, month/quarter end) | |
| 4. Unusual amounts or patterns (round amounts, threshold-avoiding amounts) | |
| 5. Unbalanced entries or suspicious balancing | |
| 6. Same person creating and approving entries | |
| 7. Suspicious account usage patterns | |
| 8. Potential fraud indicators based on accounting best practices | |
| For each identified issue, provide: | |
| - Issue type | |
| - Detailed description of why it's problematic | |
| - Risk level (Low, Medium, High) | |
| - Risk score from 1-10 where 10 is highest risk | |
| - Recommendation for investigation or mitigation | |
| Provide your response in the following JSON format: | |
| {{"issues": [ | |
| {{"type": "issue_type", "description": "detailed description", "risk_level": "Low/Medium/High", "risk_score": score, "recommendation": "recommended action"}} | |
| ], | |
| "overall_risk_score": overall_score, | |
| "overall_risk_level": "Low/Medium/High", | |
| "primary_concerns": "brief summary of main issues", | |
| "explanation": "brief explanation of the overall risk assessment"}} | |
| """ | |
| try: | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[ | |
| {"role": "system", "content": "You are an expert forensic accountant and auditor specializing in detecting journal entry fraud and anomalies. You have deep knowledge of accounting controls, patterns of fraud, and financial statement manipulation."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=self.temperature, | |
| response_format={"type": "json_object"} | |
| ) | |
| # Parse the JSON response | |
| analysis_result = json.loads(response.choices[0].message.content) | |
| analysis_result["transaction_id"] = entry_data["transaction_id"] | |
| # Add metadata | |
| analysis_result["metadata"] = { | |
| "debit_total": metadata.get('debit_total', 0), | |
| "credit_total": metadata.get('credit_total', 0), | |
| "balance": metadata.get('balance', 0), | |
| "line_count": len(entry_data['lines']), | |
| "timestamp": entry_data['lines'][0].get('timestamp', ''), | |
| "created_by": entry_data['created_by'], | |
| "approved_by": entry_data['approved_by'] | |
| } | |
| return analysis_result | |
| except Exception as e: | |
| return { | |
| "transaction_id": entry_data["transaction_id"], | |
| "issues": [{"type": "error", "description": f"Failed to analyze entry: {str(e)}", "risk_level": "Medium", "risk_score": 5, "recommendation": "Review manually due to analysis error"}], | |
| "overall_risk_score": 5, | |
| "overall_risk_level": "Medium", | |
| "primary_concerns": "Analysis error", | |
| "explanation": f"An error occurred during analysis: {str(e)}", | |
| "metadata": { | |
| "debit_total": metadata.get('debit_total', 0), | |
| "credit_total": metadata.get('credit_total', 0), | |
| "balance": metadata.get('balance', 0), | |
| "error": str(e) | |
| } | |
| } | |
| # Parallel processing for improved performance | |
| class ParallelAnalyzer: | |
| def __init__(self, max_workers=4): | |
| self.max_workers = max_workers | |
| self.results = [] | |
| self.task_queue = queue.Queue() | |
| self.result_queue = queue.Queue() | |
| self.workers = [] | |
| self.stop_event = threading.Event() | |
| def worker(self, analyzer): | |
| while not self.stop_event.is_set(): | |
| try: | |
| task = self.task_queue.get(timeout=1) | |
| if task is None: | |
| break | |
| transaction_id, entry_data, metadata = task | |
| result = analyzer.analyze_journal_entry(entry_data, metadata) | |
| self.result_queue.put(result) | |
| self.task_queue.task_done() | |
| except queue.Empty: | |
| continue | |
| except Exception as e: | |
| self.result_queue.put({ | |
| "transaction_id": transaction_id if 'transaction_id' in locals() else "unknown", | |
| "error": str(e) | |
| }) | |
| self.task_queue.task_done() | |
| def analyze_batch(self, data, analyzer, progress_callback=None): | |
| # Reset state | |
| self.stop_event.clear() | |
| self.results = [] | |
| # Group by transaction_id to get full journal entries | |
| transactions = {} | |
| for _, row in data.iterrows(): | |
| transaction_id = row["transaction_id"] | |
| if transaction_id not in transactions: | |
| transactions[transaction_id] = { | |
| "lines": [], | |
| "created_by": row["created_by"], | |
| "approved_by": row["approved_by"], | |
| "manual_override": row["manual_override"], | |
| "transaction_id": transaction_id | |
| } | |
| transactions[transaction_id]["lines"].append(row.to_dict()) | |
| # Calculate transaction metadata | |
| for transaction_id, entry in transactions.items(): | |
| lines = entry["lines"] | |
| debits = sum(line["amount"] for line in lines if line["type"] == "Debit") | |
| credits = sum(line["amount"] for line in lines if line["type"] == "Credit") | |
| balance = round(debits - credits, 2) | |
| # Add metadata | |
| transactions[transaction_id]["metadata"] = { | |
| "debit_total": debits, | |
| "credit_total": credits, | |
| "balance": balance | |
| } | |
| # Start workers | |
| for _ in range(min(self.max_workers, len(transactions))): | |
| worker_thread = threading.Thread(target=self.worker, args=(analyzer,)) | |
| worker_thread.daemon = True | |
| worker_thread.start() | |
| self.workers.append(worker_thread) | |
| # Submit tasks | |
| total = len(transactions) | |
| for i, (transaction_id, entry) in enumerate(transactions.items()): | |
| metadata = entry.pop("metadata", {}) | |
| self.task_queue.put((transaction_id, entry, metadata)) | |
| # Update progress if provided | |
| if progress_callback is not None: | |
| progress_callback((i + 1) / total) | |
| # Wait for tasks to complete | |
| self.task_queue.join() | |
| # Stop workers | |
| self.stop_event.set() | |
| for _ in range(len(self.workers)): | |
| self.task_queue.put(None) | |
| for worker in self.workers: | |
| worker.join(timeout=1) | |
| self.workers = [] | |
| # Collect results | |
| while not self.result_queue.empty(): | |
| self.results.append(self.result_queue.get()) | |
| return self.results | |
| # Enhanced summary report builder with data visualization | |
| def build_enhanced_summary_report(analysis_results): | |
| if not analysis_results or (len(analysis_results) == 1 and "error" in analysis_results[0]): | |
| return "Analysis failed. Please check the logs for more information.", None, None | |
| total_entries = len(analysis_results) | |
| total_issues = sum(len(result.get("issues", [])) for result in analysis_results) | |
| # Risk categorization with more granularity | |
| risk_categories = { | |
| "Critical (9-10)": 0, | |
| "High (7-8)": 0, | |
| "Medium (4-6)": 0, | |
| "Low (1-3)": 0, | |
| "None (0)": 0 | |
| } | |
| for result in analysis_results: | |
| score = result.get("overall_risk_score", 0) | |
| if score >= 9: | |
| risk_categories["Critical (9-10)"] += 1 | |
| elif score >= 7: | |
| risk_categories["High (7-8)"] += 1 | |
| elif score >= 4: | |
| risk_categories["Medium (4-6)"] += 1 | |
| elif score >= 1: | |
| risk_categories["Low (1-3)"] += 1 | |
| else: | |
| risk_categories["None (0)"] += 1 | |
| # Categorize issues with more detail | |
| issue_categories = defaultdict(int) | |
| issue_severity = defaultdict(list) | |
| for result in analysis_results: | |
| for issue in result.get("issues", []): | |
| issue_type = issue.get("type", "other") | |
| issue_categories[issue_type] += 1 | |
| issue_severity[issue_type].append(issue.get("risk_score", 0)) | |
| # Calculate average severity by issue type | |
| avg_severity = { | |
| issue_type: sum(scores)/len(scores) if scores else 0 | |
| for issue_type, scores in issue_severity.items() | |
| } | |
| # Identify patterns in timing | |
| time_patterns = defaultdict(int) | |
| for result in analysis_results: | |
| if "metadata" in result and "timestamp" in result["metadata"]: | |
| try: | |
| timestamp = result["metadata"]["timestamp"] | |
| dt = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S') | |
| # Categorize by time of day | |
| hour = dt.hour | |
| if 0 <= hour < 6: | |
| time_patterns["Early Morning (12am-6am)"] += 1 | |
| elif 6 <= hour < 9: | |
| time_patterns["Morning (6am-9am)"] += 1 | |
| elif 9 <= hour < 12: | |
| time_patterns["Mid-Morning (9am-12pm)"] += 1 | |
| elif 12 <= hour < 15: | |
| time_patterns["Afternoon (12pm-3pm)"] += 1 | |
| elif 15 <= hour < 18: | |
| time_patterns["Late Afternoon (3pm-6pm)"] += 1 | |
| elif 18 <= hour < 21: | |
| time_patterns["Evening (6pm-9pm)"] += 1 | |
| else: | |
| time_patterns["Night (9pm-12am)"] += 1 | |
| # Categorize by day of week | |
| day_of_week = dt.strftime('%A') | |
| time_patterns[day_of_week] += 1 | |
| # Month end patterns | |
| last_day_of_month = (dt.replace(day=1) + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1) | |
| if dt.day == last_day_of_month.day or dt.day == last_day_of_month.day - 1: | |
| time_patterns["Month End"] += 1 | |
| except Exception: | |
| pass | |
| # User patterns - identify users with high-risk entries | |
| user_risk = defaultdict(list) | |
| for result in analysis_results: | |
| if "metadata" in result and "created_by" in result["metadata"]: | |
| user = result["metadata"]["created_by"] | |
| score = result.get("overall_risk_score", 0) | |
| user_risk[user].append(score) | |
| avg_user_risk = { | |
| user: sum(scores)/len(scores) | |
| for user, scores in user_risk.items() | |
| } | |
| high_risk_users = { | |
| user: avg_score | |
| for user, avg_score in avg_user_risk.items() | |
| if avg_score >= 5 and len(user_risk[user]) >= 2 | |
| } | |
| # Build report with richer insights | |
| report = f""" | |
| # Journal Entry Audit Summary | |
| ## Overview | |
| - **Total Entries Analyzed:** {total_entries} | |
| - **Total Issues Detected:** {total_issues} | |
| - **Average Issues Per Entry:** {total_issues/total_entries:.2f} | |
| ## Risk Distribution | |
| """ | |
| for category, count in risk_categories.items(): | |
| if count > 0: | |
| report += f"- **{category}:** {count} entries ({count/total_entries*100:.1f}%)\n" | |
| # Add issue categories section | |
| report += "\n## Issue Categories\n" | |
| for issue_type, count in sorted(issue_categories.items(), key=lambda x: x[1], reverse=True): | |
| avg_risk = avg_severity.get(issue_type, 0) | |
| report += f"- **{issue_type.replace('_', ' ').title()}:** {count} instances (Avg. Risk: {avg_risk:.1f})\n" | |
| # Add timing patterns | |
| report += "\n## Timing Patterns\n" | |
| # Only show interesting patterns | |
| after_hours = sum([ | |
| time_patterns.get("Early Morning (12am-6am)", 0), | |
| time_patterns.get("Evening (6pm-9pm)", 0), | |
| time_patterns.get("Night (9pm-12am)", 0) | |
| ]) | |
| weekend = sum([ | |
| time_patterns.get("Saturday", 0), | |
| time_patterns.get("Sunday", 0) | |
| ]) | |
| report += f"- **After Hours Entries:** {after_hours} entries ({after_hours/total_entries*100:.1f}%)\n" | |
| report += f"- **Weekend Entries:** {weekend} entries ({weekend/total_entries*100:.1f}%)\n" | |
| report += f"- **Month End Entries:** {time_patterns.get('Month End', 0)} entries ({time_patterns.get('Month End', 0)/total_entries*100:.1f}%)\n" | |
| # Add high-risk user section | |
| report += "\n## User Risk Patterns\n" | |
| if high_risk_users: | |
| for user, risk_score in sorted(high_risk_users.items(), key=lambda x: x[1], reverse=True): | |
| num_entries = len(user_risk[user]) | |
| report += f"- **{user}:** Average Risk Score {risk_score:.1f} ({num_entries} entries)\n" | |
| else: | |
| report += "- No users with consistently high-risk patterns detected\n" | |
| # Add top 5 highest risk entries | |
| report += "\n## Highest Risk Entries\n" | |
| high_risk_entries = sorted(analysis_results, key=lambda x: x.get("overall_risk_score", 0), reverse=True)[:5] | |
| for i, entry in enumerate(high_risk_entries): | |
| txn_id = entry.get("transaction_id", "Unknown") | |
| score = entry.get("overall_risk_score", 0) | |
| concerns = entry.get("primary_concerns", "No details available") | |
| report += f"### {i+1}. Transaction {txn_id} (Risk Score: {score})\n" | |
| report += f"- **Primary Concerns:** {concerns}\n" | |
| report += f"- **Key Issues:**\n" | |
| for issue in entry.get("issues", [])[:3]: # Show top 3 issues per entry | |
| report += f" - {issue.get('type', 'Unknown issue')} ({issue.get('risk_level', 'Unknown')}): {issue.get('description', 'No details')}\n" | |
| # Create visualizations | |
| risk_dist_fig = go.Figure( | |
| data=[ | |
| go.Bar( | |
| x=list(risk_categories.keys()), | |
| y=list(risk_categories.values()), | |
| marker_color=['darkred', 'red', 'orange', 'yellow', 'green'] | |
| ) | |
| ] | |
| ) | |
| risk_dist_fig.update_layout( | |
| title="Risk Distribution of Journal Entries", | |
| xaxis_title="Risk Category", | |
| yaxis_title="Number of Entries", | |
| template="plotly_white" | |
| ) | |
| # Issue type visualization | |
| if issue_categories: | |
| issue_types = list(issue_categories.keys()) | |
| issue_counts = list(issue_categories.values()) | |
| issue_severity_vals = [avg_severity.get(t, 0) for t in issue_types] | |
| issue_fig = go.Figure( | |
| data=[ | |
| go.Bar( | |
| x=issue_types, | |
| y=issue_counts, | |
| marker=dict( | |
| color=issue_severity_vals, | |
| colorscale='Reds', | |
| colorbar=dict(title="Avg. Severity") | |
| ) | |
| ) | |
| ] | |
| ) | |
| issue_fig.update_layout( | |
| title="Issue Types by Frequency and Severity", | |
| xaxis_title="Issue Type", | |
| yaxis_title="Count", | |
| template="plotly_white" | |
| ) | |
| else: | |
| issue_fig = None | |
| return report, risk_dist_fig, issue_fig | |
| # Analyze a single journal entry and provide detailed explanation | |
| def analyze_entry_detailed(entry_data, analyzer): | |
| """Analyze a single journal entry with detailed explanation""" | |
| # Extract transaction lines | |
| transaction_id = entry_data["transaction_id"].iloc[0] | |
| entry_lines = entry_data.to_dict('records') | |
| # Calculate totals | |
| debits = sum(line["amount"] for line in entry_lines if line["type"] == "Debit") | |
| credits = sum(line["amount"] for line in entry_lines if line["type"] == "Credit") | |
| balance = round(debits - credits, 2) | |
| # Prepare entry in the format expected by analyzer | |
| entry = { | |
| "transaction_id": transaction_id, | |
| "created_by": entry_data["created_by"].iloc[0], | |
| "approved_by": entry_data["approved_by"].iloc[0], | |
| "manual_override": entry_data["manual_override"].iloc[0], | |
| "lines": entry_lines | |
| } | |
| # Metadata for analysis | |
| metadata = { | |
| "debit_total": debits, | |
| "credit_total": credits, | |
| "balance": balance | |
| } | |
| # Run analysis | |
| analysis = analyzer.analyze_journal_entry(entry, metadata) | |
| # Format for display | |
| display_data = { | |
| "transaction_id": transaction_id, | |
| "timestamp": entry_data["timestamp"].iloc[0], | |
| "created_by": entry_data["created_by"].iloc[0], | |
| "approved_by": entry_data["approved_by"].iloc[0] if not pd.isna(entry_data["approved_by"].iloc[0]) else "Not Approved", | |
| "manual_override": "Yes" if entry_data["manual_override"].iloc[0] else "No", | |
| "line_count": len(entry_lines), | |
| "debit_total": f"${debits:.2f}", | |
| "credit_total": f"${credits:.2f}", | |
| "balance": f"${balance:.2f}", | |
| "risk_level": analysis.get("overall_risk_level", "Unknown"), | |
| "risk_score": analysis.get("overall_risk_score", 0), | |
| "issues": analysis.get("issues", []), | |
| "explanation": analysis.get("explanation", "No explanation provided"), | |
| "primary_concerns": analysis.get("primary_concerns", "No concerns identified") | |
| } | |
| return display_data, analysis | |
| # Create a more detailed visualization for a single entry | |
| def create_entry_visualization(entry_data): | |
| """Create a detailed visualization for a single journal entry""" | |
| # Line chart for amounts | |
| df = entry_data.copy() | |
| # Convert debits and credits for visualization | |
| df['amount_signed'] = df.apply(lambda x: x['amount'] if x['type'] == 'Credit' else -x['amount'], axis=1) | |
| df['line_label'] = df.apply(lambda x: f"{x['account_name']} ({x['type']})", axis=1) | |
| # Create a flow diagram | |
| fig = go.Figure() | |
| # Add bars for debits and credits | |
| debits = df[df['type'] == 'Debit'] | |
| credits = df[df['type'] == 'Credit'] | |
| fig.add_trace(go.Bar( | |
| x=debits['line_label'], | |
| y=debits['amount'], | |
| name='Debits', | |
| marker_color='rgba(219, 64, 82, 0.7)' | |
| )) | |
| fig.add_trace(go.Bar( | |
| x=credits['line_label'], | |
| y=credits['amount'], | |
| name='Credits', | |
| marker_color='rgba(50, 171, 96, 0.7)' | |
| )) | |
| # Add line for running balance | |
| running_balance = [] | |
| balance = 0 | |
| for _, row in df.iterrows(): | |
| if row['type'] == 'Debit': | |
| balance -= row['amount'] | |
| else: | |
| balance += row['amount'] | |
| running_balance.append(balance) | |
| # Update layout | |
| fig.update_layout( | |
| title=f"Journal Entry {df['transaction_id'].iloc[0]} - Debits and Credits", | |
| xaxis_title="Account", | |
| yaxis_title="Amount ($)", | |
| barmode='group', | |
| template="plotly_white" | |
| ) | |
| # Second visualization - flow of funds | |
| flow_fig = go.Figure(data=[go.Sankey( | |
| node = dict( | |
| pad = 15, | |
| thickness = 20, | |
| line = dict(color = "black", width = 0.5), | |
| label = list(df['account_name']) + ["Balance"], | |
| color = ["rgba(219, 64, 82, 0.8)" if t == "Debit" else "rgba(50, 171, 96, 0.8)" | |
| for t in df['type']] + ["rgba(131, 90, 241, 0.8)"] | |
| ), | |
| link = dict( | |
| source = [i for i in range(len(df))], | |
| target = [len(df) for _ in range(len(df))], | |
| value = df['amount'], | |
| color = ["rgba(219, 64, 82, 0.5)" if t == "Debit" else "rgba(50, 171, 96, 0.5)" | |
| for t in df['type']] | |
| ) | |
| )]) | |
| flow_fig.update_layout( | |
| title_text=f"Transaction {df['transaction_id'].iloc[0]} - Flow of Funds", | |
| font_size=12 | |
| ) | |
| return fig, flow_fig | |
| # Gradio interface with improved layout | |
| def create_interface(): | |
| # OpenAI client setup | |
| try: | |
| client = get_openai_client() | |
| analyzer = JournalAnalyzer(client) | |
| parallel_analyzer = ParallelAnalyzer(max_workers=4) | |
| except Exception as e: | |
| print(f"Error initializing OpenAI client: {e}") | |
| # Fallback to demo mode | |
| analyzer = JournalAnalyzer() | |
| parallel_analyzer = ParallelAnalyzer(max_workers=1) | |
| # Define interface | |
| with gr.Blocks(title="Enhanced Journal Entry Auditor", theme=gr.themes.Default()) as app: | |
| gr.Markdown("# Enhanced Journal Entry Auditor") | |
| gr.Markdown("### Analyze journal entries for potential fraud, anomalies, and control weaknesses") | |
| with gr.Tab("Data Upload & Batch Analysis"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| file_input = gr.File(label="Upload CSV (Optional)") | |
| num_entries = gr.Slider(10, 500, 100, step=10, label="Number of sample entries (if not uploading a file)") | |
| include_patterns = gr.Checkbox(True, label="Include suspicious patterns in generated data") | |
| run_batch = gr.Button("Run Batch Analysis", variant="primary") | |
| with gr.Column(scale=1): | |
| with gr.Accordion("Analysis Settings", open=True): | |
| model_choice = gr.Dropdown( | |
| choices=["gpt-4o", "gpt-4-turbo", "gpt-3.5-turbo"], | |
| value="gpt-4o", | |
| label="Model" | |
| ) | |
| temperature = gr.Slider(0, 1, 0.2, step=0.1, label="Temperature") | |
| max_workers = gr.Slider(1, 8, 4, step=1, label="Parallel Workers") | |
| progress_bar = gr.Progress() | |
| with gr.Row(): | |
| analysis_table = gr.DataFrame(label="Analysis Results") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| summary_markdown = gr.Markdown(label="Summary Report") | |
| with gr.Column(scale=1): | |
| with gr.Tab("Risk Distribution"): | |
| risk_plot = gr.Plot(label="Risk Distribution") | |
| with gr.Tab("Issue Types"): | |
| issue_plot = gr.Plot(label="Issue Types") | |
| with gr.Tab("Entry Explorer"): | |
| with gr.Row(): | |
| entry_selector = gr.Dropdown([], label="Select Transaction") | |
| refresh_entries = gr.Button("Refresh Entries") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| entry_details = gr.JSON(label="Entry Details") | |
| with gr.Column(scale=1): | |
| with gr.Tab("Entry Visualization"): | |
| entry_plot = gr.Plot(label="Entry Visualization") | |
| with gr.Tab("Flow of Funds"): | |
| flow_plot = gr.Plot(label="Flow of Funds") | |
| with gr.Row(): | |
| entry_analysis = gr.JSON(label="Analysis Results") | |
| # Define functions to handle interactions | |
| def load_data(file, num_entries, include_patterns, progress=gr.Progress()): | |
| if file is not None: | |
| try: | |
| df = pd.read_csv(file.name) | |
| return df, f"Loaded {len(df)} entries from file" | |
| except Exception as e: | |
| return None, f"Error loading file: {str(e)}" | |
| else: | |
| progress(0, desc="Generating sample data...") | |
| df = generate_sample_data(num_entries, include_patterns) | |
| return df, f"Generated {len(df)} sample entries" | |
| def run_analysis(file, num_entries, include_patterns, model, temp, workers, progress=gr.Progress()): | |
| progress(0, desc="Loading data...") | |
| df, msg = load_data(file, num_entries, include_patterns, progress) | |
| if df is None: | |
| return None, [], "Error: Failed to load data", None, None | |
| # Set analyzer parameters | |
| analyzer.model = model | |
| analyzer.temperature = temp | |
| parallel_analyzer.max_workers = workers | |
| # Run analysis | |
| progress(0.1, desc="Analyzing journal entries...") | |
| results = parallel_analyzer.analyze_batch(df, analyzer, | |
| lambda p: progress(0.1 + p * 0.8, desc="Analyzing journal entries...")) | |
| # Create summary report | |
| progress(0.9, desc="Generating report...") | |
| report, risk_fig, issue_fig = build_enhanced_summary_report(results) | |
| # Convert results to DataFrame for display | |
| result_df = [] | |
| for r in results: | |
| result_df.append({ | |
| "Transaction": r.get("transaction_id", "Unknown"), | |
| "Risk Level": r.get("overall_risk_level", "Unknown"), | |
| "Risk Score": r.get("overall_risk_score", 0), | |
| "Issues": len(r.get("issues", [])), | |
| "Primary Concerns": r.get("primary_concerns", "None") | |
| }) | |
| return df, pd.DataFrame(result_df), report, risk_fig, issue_fig | |
| def update_entry_selector(df): | |
| if df is None: | |
| return [], {} | |
| # Get unique transaction IDs | |
| transactions = df["transaction_id"].unique().tolist() | |
| return transactions, {} | |
| def load_entry(transaction_id, df): | |
| if df is None or transaction_id is None: | |
| return {}, None, None | |
| # Filter data for selected transaction | |
| entry_data = df[df["transaction_id"] == transaction_id] | |
| if len(entry_data) == 0: | |
| return {}, None, None | |
| # Create visualizations | |
| entry_vis, flow_vis = create_entry_visualization(entry_data) | |
| # Format entry details | |
| details = { | |
| "transaction_id": transaction_id, | |
| "timestamp": entry_data["timestamp"].iloc[0], | |
| "created_by": entry_data["created_by"].iloc[0], | |
| "approved_by": entry_data["approved_by"].iloc[0] if not pd.isna(entry_data["approved_by"].iloc[0]) else "Not Approved", | |
| "manual_override": "Yes" if entry_data["manual_override"].iloc[0] else "No", | |
| "lines": [] | |
| } | |
| for _, row in entry_data.iterrows(): | |
| details["lines"].append({ | |
| "line_id": row["line_id"], | |
| "account": f"{row['account_code']} - {row['account_name']}", | |
| "department": row["department"], | |
| "amount": f"${row['amount']:.2f}", | |
| "type": row["type"], | |
| "description": row["description"], | |
| "document_reference": row.get("document_reference", "") | |
| }) | |
| return details, entry_vis, flow_vis | |
| def analyze_selected_entry(transaction_id, df, model, temp): | |
| if df is None or transaction_id is None: | |
| return {} | |
| # Filter data for selected transaction | |
| entry_data = df[df["transaction_id"] == transaction_id] | |
| if len(entry_data) == 0: | |
| return {} | |
| # Update analyzer settings | |
| analyzer.model = model | |
| analyzer.temperature = temp | |
| # Analyze entry | |
| _, analysis = analyze_entry_detailed(entry_data, analyzer) | |
| return analysis | |
| # Event handlers | |
| run_batch.click( | |
| run_analysis, | |
| inputs=[file_input, num_entries, include_patterns, model_choice, temperature, max_workers], | |
| outputs=[gr.State(), analysis_table, summary_markdown, risk_plot, issue_plot] | |
| ).then( | |
| update_entry_selector, | |
| inputs=[gr.State()], | |
| outputs=[entry_selector, entry_details] | |
| ) | |
| refresh_entries.click( | |
| update_entry_selector, | |
| inputs=[gr.State()], | |
| outputs=[entry_selector, entry_details] | |
| ) | |
| entry_selector.change( | |
| load_entry, | |
| inputs=[entry_selector, gr.State()], | |
| outputs=[entry_details, entry_plot, flow_plot] | |
| ).then( | |
| analyze_selected_entry, | |
| inputs=[entry_selector, gr.State(), model_choice, temperature], | |
| outputs=[entry_analysis] | |
| ) | |
| return app | |
| # Main function | |
| if __name__ == "__main__": | |
| app = create_interface() | |
| app.launch() |