prernajeet01's picture
Update app.py
f7297cd verified
import os
import pandas as pd
import numpy as np
import json
import datetime
import gradio as gr
from openai import OpenAI
import uuid
import random
import time
import plotly.express as px
import plotly.graph_objects as go
from typing import List, Dict, Any, Tuple
import threading
import queue
import re
from collections import defaultdict
# Initialize OpenAI client
def get_openai_client():
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise ValueError("OpenAI API key not found. Please set the OPENAI_API_KEY environment variable.")
return OpenAI(api_key=api_key)
# Advanced data generation with more realistic patterns and anomalies
def generate_sample_data(num_entries=100, include_patterns=True):
# Expanded data for more realistic scenarios
departments = ["Finance", "Sales", "Marketing", "Operations", "IT", "HR", "Legal", "R&D", "Executive", "Facilities"]
accounts = [
{"code": "1000", "name": "Cash", "type": "Asset"},
{"code": "1100", "name": "Accounts Receivable", "type": "Asset"},
{"code": "1200", "name": "Inventory", "type": "Asset"},
{"code": "1300", "name": "Prepaid Expenses", "type": "Asset"},
{"code": "1500", "name": "Fixed Assets", "type": "Asset"},
{"code": "1600", "name": "Accumulated Depreciation", "type": "Asset"},
{"code": "2000", "name": "Accounts Payable", "type": "Liability"},
{"code": "2100", "name": "Accrued Liabilities", "type": "Liability"},
{"code": "2200", "name": "Short-term Loans", "type": "Liability"},
{"code": "2800", "name": "Long-term Debt", "type": "Liability"},
{"code": "3000", "name": "Revenue", "type": "Revenue"},
{"code": "3100", "name": "Service Revenue", "type": "Revenue"},
{"code": "3200", "name": "Interest Income", "type": "Revenue"},
{"code": "4000", "name": "Cost of Goods Sold", "type": "Expense"},
{"code": "5000", "name": "Salaries", "type": "Expense"},
{"code": "5100", "name": "Employee Benefits", "type": "Expense"},
{"code": "5200", "name": "Commissions", "type": "Expense"},
{"code": "6000", "name": "Utilities", "type": "Expense"},
{"code": "6100", "name": "Rent", "type": "Expense"},
{"code": "6200", "name": "Office Supplies", "type": "Expense"},
{"code": "7000", "name": "Advertising", "type": "Expense"},
{"code": "7100", "name": "Travel Expenses", "type": "Expense"},
{"code": "8000", "name": "Depreciation Expense", "type": "Expense"},
{"code": "9000", "name": "Miscellaneous", "type": "Expense"}
]
# More comprehensive user list
users = {
"regular": ["john.doe", "jane.smith", "mike.johnson", "sara.williams", "tom.brown", "lisa.jones", "david.miller"],
"power": ["alex.controller", "maria.accountant", "robert.finance", "susan.treasurer"],
"admins": ["admin.finance", "system.auto"]
}
approvers = ["alex.manager", "lisa.supervisor", "robert.director", "emma.controller", "james.cfo"] + [None] * 3
# Create transaction IDs with better formatting
transaction_ids = [f"JE-{datetime.datetime.now().strftime('%Y%m')}-{str(i+1).zfill(4)}" for i in range(num_entries)]
# Generate dates within the last 90 days with realistic distributions
today = datetime.datetime.now()
# More entries at month-end and quarter-end
month_ends = [
today.replace(day=1) - datetime.timedelta(days=1), # Last day of previous month
(today.replace(day=1) - datetime.timedelta(days=1)).replace(day=1) - datetime.timedelta(days=1) # Last day of month before previous
]
quarter_ends = []
for quarter in [3, 6, 9, 12]:
quarter_end = today.replace(month=quarter, day=1) - datetime.timedelta(days=1)
if quarter_end < today:
quarter_ends.append(quarter_end)
# Weight dates - more entries at month/quarter ends
dates = []
for _ in range(num_entries):
if random.random() < 0.25: # 25% of entries at month end
base_date = random.choice(month_ends)
date_offset = datetime.timedelta(days=random.randint(-1, 1)) # Around month end
elif random.random() < 0.3: # 30% of remaining at quarter end
if quarter_ends:
base_date = random.choice(quarter_ends)
date_offset = datetime.timedelta(days=random.randint(-2, 2)) # Around quarter end
else:
base_date = today - datetime.timedelta(days=random.randint(0, 90))
date_offset = datetime.timedelta(days=0)
else:
base_date = today - datetime.timedelta(days=random.randint(0, 90))
date_offset = datetime.timedelta(days=0)
dates.append((base_date + date_offset).strftime('%Y-%m-%d'))
# Create entries
entries = []
# Track users who create suspicious entries for pattern analysis
suspicious_users = set(random.sample(users["regular"] + users["power"], 2))
suspicious_users.add(random.choice(users["admins"]))
# Introduce some common anomaly patterns if requested
recurring_suspicious_account = random.choice(accounts)
round_amount_threshold = random.choice([1000, 5000, 10000])
for i in range(num_entries):
# Determine if this is a potentially suspicious entry
is_suspicious = random.random() < 0.15 # 15% chance of being suspicious
# Add more suspicious patterns
if include_patterns:
# Users who repeatedly make suspicious entries
if transaction_ids[i][-3:] in ["050", "100", "150"] or random.random() < 0.05:
created_by = random.choice(list(suspicious_users))
is_suspicious = True
else:
user_type = "admins" if random.random() < 0.1 else ("power" if random.random() < 0.3 else "regular")
created_by = random.choice(users[user_type])
else:
user_type = "admins" if random.random() < 0.1 else ("power" if random.random() < 0.3 else "regular")
created_by = random.choice(users[user_type])
# Add some patterns that might trigger alerts
manual_override = is_suspicious or random.random() < 0.15
missing_approval = is_suspicious or (manual_override and random.random() < 0.3)
# Create a journal entry
num_lines = random.randint(2, 6)
# Set the total amount - introduce suspicious round amounts
if is_suspicious and include_patterns and random.random() < 0.4:
total_amount = float(round_amount_threshold * random.randint(1, 5))
else:
total_amount = round(random.uniform(1000, 50000), 2)
# Create balanced or unbalanced entries
if is_suspicious and random.random() < 0.3:
# Unbalanced entry
imbalance = round(random.uniform(0.01, 100), 2)
if random.random() < 0.5:
debits = total_amount + imbalance
credits = total_amount
else:
debits = total_amount
credits = total_amount + imbalance
else:
# Balanced entry
debits = total_amount
credits = total_amount
# Split the amounts among the lines
debit_amounts = []
credit_amounts = []
# Distribute amount across lines
for j in range(num_lines - 1):
if j < num_lines // 2:
debit_amounts.append(round(debits * random.uniform(0.1, 0.5), 2))
else:
credit_amounts.append(round(credits * random.uniform(0.1, 0.5), 2))
# Add the final balancing amounts
debit_amounts.append(round(debits - sum(debit_amounts), 2))
credit_amounts.append(round(credits - sum(credit_amounts), 2))
# Generate timestamps with realistic distributions
date = dates[i]
# Determine if entry is after hours
after_hours = False
is_weekend = datetime.datetime.strptime(date, '%Y-%m-%d').weekday() >= 5
if is_suspicious and include_patterns and random.random() < 0.6:
# Suspicious timing patterns
if random.random() < 0.5:
# Late night entry
entry_time = random.randint(19, 23)
after_hours = True
else:
# Early morning
entry_time = random.randint(0, 6)
after_hours = True
elif is_weekend:
# Weekend entries are distributed throughout the day but fewer
entry_time = random.randint(8, 19)
after_hours = entry_time < 9 or entry_time > 17
else:
# Normal business hours with some natural distribution
business_hours_weight = 0.9 # 90% during business hours
if random.random() < business_hours_weight:
entry_time = random.randint(9, 17)
else:
entry_time = random.randint(7, 8) if random.random() < 0.5 else random.randint(18, 20)
after_hours = True
timestamp = f"{date} {entry_time:02d}:{random.randint(0, 59):02d}:{random.randint(0, 59):02d}"
# Create the journal entry lines
entry_lines = []
# Suspicious pattern - same account used frequently
if is_suspicious and include_patterns and random.random() < 0.4:
suspicious_account = recurring_suspicious_account
else:
suspicious_account = None
# Set approver - more realistic approval patterns
if missing_approval:
approver = None
elif manual_override and random.random() < 0.7:
# Higher level approvals for manual overrides
approver = random.choice(approvers[:2]) # Higher-level approvers
else:
approver = random.choice(approvers)
# Description logic - more varied and realistic
if is_suspicious and include_patterns:
if random.random() < 0.4:
description_base = random.choice([
"Adjustment entry",
"Correction",
"Reclassification",
"Period-end adjustment",
"Balance correction"
])
else:
description_base = random.choice([
"Payment processing",
"Invoice payment",
"Expense recording",
"Revenue recognition",
"Asset acquisition",
"Standard journal entry",
"Monthly accrual",
"Prepaid amortization"
])
else:
description_base = random.choice([
"Payment processing",
"Invoice payment",
"Expense recording",
"Revenue recognition",
"Asset acquisition",
"Standard journal entry",
"Monthly accrual",
"Prepaid amortization"
])
# Add more context to description
description = f"{description_base} - {random.choice(departments)}"
if manual_override:
if random.random() < 0.6:
description += " (manual override)"
# Special suspicious pattern - splitting to stay under approval thresholds
threshold_splitting = is_suspicious and include_patterns and random.random() < 0.2
for j in range(num_lines):
if j < len(debit_amounts):
amount = debit_amounts[j]
entry_type = "Debit"
else:
amount = credit_amounts[j - len(debit_amounts)]
entry_type = "Credit"
# Account selection logic
if suspicious_account and random.random() < 0.7:
account = suspicious_account
elif threshold_splitting and entry_type == "Debit":
# Target expense accounts for threshold splitting
filtered_accounts = [acc for acc in accounts if acc["type"] == "Expense"]
account = random.choice(filtered_accounts)
else:
account = random.choice(accounts)
# Notes field
if manual_override:
notes_options = [
f"Adjustment required due to reconciliation",
f"Manual correction approved by {random.choice(approvers[:2]) if approver else 'pending'}",
f"System error correction",
f"Special handling required",
""
]
notes = random.choice(notes_options)
else:
notes = ""
# Add document references for more realism
doc_ref = ""
if random.random() < 0.7:
ref_type = random.choice(["INV", "PO", "CONT", "REQ", "BILL"])
ref_num = random.randint(10000, 99999)
doc_ref = f"{ref_type}-{ref_num}"
# Additional suspicious patterns
same_user_approval = False
if approver and approver == created_by and is_suspicious:
same_user_approval = True
entry_line = {
"transaction_id": transaction_ids[i],
"line_id": j + 1,
"timestamp": timestamp,
"account_code": account["code"],
"account_name": account["name"],
"account_type": account["type"],
"department": random.choice(departments),
"amount": amount,
"type": entry_type,
"description": description,
"created_by": created_by,
"approved_by": approver,
"manual_override": manual_override,
"notes": notes,
"document_reference": doc_ref,
"after_hours": after_hours,
"weekend": is_weekend,
"same_user_approval": same_user_approval
}
entries.append(entry_line)
# Convert to DataFrame
df = pd.DataFrame(entries)
return df
# Dependency injection pattern for better testability and flexibility
class JournalAnalyzer:
def __init__(self, openai_client=None, model="gpt-4o", temperature=0.2):
self.client = openai_client
self.model = model
self.temperature = temperature
def set_client(self, client):
self.client = client
def analyze_journal_entry(self, entry_data, metadata=None):
if not self.client:
raise ValueError("OpenAI client not initialized. Call set_client() first.")
# Prepare transaction metadata
if metadata is None:
metadata = {}
# Create prompt with more detailed analysis guidelines
prompt = f"""
Analyze the following journal entry for potential issues, anomalies, or fraud indicators:
Transaction ID: {entry_data['transaction_id']}
Timestamp: {entry_data['timestamp']}
Created by: {entry_data['created_by']}
Approved by: {entry_data['approved_by'] if entry_data['approved_by'] else 'NOT APPROVED'}
Manual Override: {'Yes' if entry_data['manual_override'] else 'No'}
Journal Entry Lines:
"""
# Add each line to the prompt
for i, line in enumerate(entry_data['lines']):
prompt += f"""
Line {i+1}:
- Account: {line['account_code']} - {line['account_name']} ({line['account_type']})
- Department: {line['department']}
- Amount: ${line['amount']:.2f} ({line['type']})
- Description: {line['description']}
- Document Reference: {line.get('document_reference', 'N/A')}
"""
# Add notes if present
notes = [line.get('notes', '') for line in entry_data['lines'] if line.get('notes')]
if notes:
prompt += "\nNotes:\n" + "\n".join([f"- {note}" for note in notes if note])
# Add metadata about the entry
prompt += f"""
Additional Metadata:
- Entry Created: {'After hours' if any(line.get('after_hours', False) for line in entry_data['lines']) else 'During business hours'}
- Entry Day: {'Weekend' if any(line.get('weekend', False) for line in entry_data['lines']) else 'Weekday'}
- Debit Total: ${metadata.get('debit_total', 0):.2f}
- Credit Total: ${metadata.get('credit_total', 0):.2f}
- Balance: ${metadata.get('balance', 0):.2f}
- Same user approval: {'Yes' if any(line.get('same_user_approval', False) for line in entry_data['lines']) else 'No'}
"""
# Add analysis guidelines
prompt += """
Please identify any of the following issues:
1. Missing approvals (entries that should have approvals based on amount or account type)
2. Manual overrides that seem suspicious or lack proper documentation
3. Unusual timing (after hours, weekends, month/quarter end)
4. Unusual amounts or patterns (round amounts, threshold-avoiding amounts)
5. Unbalanced entries or suspicious balancing
6. Same person creating and approving entries
7. Suspicious account usage patterns
8. Potential fraud indicators based on accounting best practices
For each identified issue, provide:
- Issue type
- Detailed description of why it's problematic
- Risk level (Low, Medium, High)
- Risk score from 1-10 where 10 is highest risk
- Recommendation for investigation or mitigation
Provide your response in the following JSON format:
{{"issues": [
{{"type": "issue_type", "description": "detailed description", "risk_level": "Low/Medium/High", "risk_score": score, "recommendation": "recommended action"}}
],
"overall_risk_score": overall_score,
"overall_risk_level": "Low/Medium/High",
"primary_concerns": "brief summary of main issues",
"explanation": "brief explanation of the overall risk assessment"}}
"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are an expert forensic accountant and auditor specializing in detecting journal entry fraud and anomalies. You have deep knowledge of accounting controls, patterns of fraud, and financial statement manipulation."},
{"role": "user", "content": prompt}
],
temperature=self.temperature,
response_format={"type": "json_object"}
)
# Parse the JSON response
analysis_result = json.loads(response.choices[0].message.content)
analysis_result["transaction_id"] = entry_data["transaction_id"]
# Add metadata
analysis_result["metadata"] = {
"debit_total": metadata.get('debit_total', 0),
"credit_total": metadata.get('credit_total', 0),
"balance": metadata.get('balance', 0),
"line_count": len(entry_data['lines']),
"timestamp": entry_data['lines'][0].get('timestamp', ''),
"created_by": entry_data['created_by'],
"approved_by": entry_data['approved_by']
}
return analysis_result
except Exception as e:
return {
"transaction_id": entry_data["transaction_id"],
"issues": [{"type": "error", "description": f"Failed to analyze entry: {str(e)}", "risk_level": "Medium", "risk_score": 5, "recommendation": "Review manually due to analysis error"}],
"overall_risk_score": 5,
"overall_risk_level": "Medium",
"primary_concerns": "Analysis error",
"explanation": f"An error occurred during analysis: {str(e)}",
"metadata": {
"debit_total": metadata.get('debit_total', 0),
"credit_total": metadata.get('credit_total', 0),
"balance": metadata.get('balance', 0),
"error": str(e)
}
}
# Parallel processing for improved performance
class ParallelAnalyzer:
def __init__(self, max_workers=4):
self.max_workers = max_workers
self.results = []
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.workers = []
self.stop_event = threading.Event()
def worker(self, analyzer):
while not self.stop_event.is_set():
try:
task = self.task_queue.get(timeout=1)
if task is None:
break
transaction_id, entry_data, metadata = task
result = analyzer.analyze_journal_entry(entry_data, metadata)
self.result_queue.put(result)
self.task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
self.result_queue.put({
"transaction_id": transaction_id if 'transaction_id' in locals() else "unknown",
"error": str(e)
})
self.task_queue.task_done()
def analyze_batch(self, data, analyzer, progress_callback=None):
# Reset state
self.stop_event.clear()
self.results = []
# Group by transaction_id to get full journal entries
transactions = {}
for _, row in data.iterrows():
transaction_id = row["transaction_id"]
if transaction_id not in transactions:
transactions[transaction_id] = {
"lines": [],
"created_by": row["created_by"],
"approved_by": row["approved_by"],
"manual_override": row["manual_override"],
"transaction_id": transaction_id
}
transactions[transaction_id]["lines"].append(row.to_dict())
# Calculate transaction metadata
for transaction_id, entry in transactions.items():
lines = entry["lines"]
debits = sum(line["amount"] for line in lines if line["type"] == "Debit")
credits = sum(line["amount"] for line in lines if line["type"] == "Credit")
balance = round(debits - credits, 2)
# Add metadata
transactions[transaction_id]["metadata"] = {
"debit_total": debits,
"credit_total": credits,
"balance": balance
}
# Start workers
for _ in range(min(self.max_workers, len(transactions))):
worker_thread = threading.Thread(target=self.worker, args=(analyzer,))
worker_thread.daemon = True
worker_thread.start()
self.workers.append(worker_thread)
# Submit tasks
total = len(transactions)
for i, (transaction_id, entry) in enumerate(transactions.items()):
metadata = entry.pop("metadata", {})
self.task_queue.put((transaction_id, entry, metadata))
# Update progress if provided
if progress_callback is not None:
progress_callback((i + 1) / total)
# Wait for tasks to complete
self.task_queue.join()
# Stop workers
self.stop_event.set()
for _ in range(len(self.workers)):
self.task_queue.put(None)
for worker in self.workers:
worker.join(timeout=1)
self.workers = []
# Collect results
while not self.result_queue.empty():
self.results.append(self.result_queue.get())
return self.results
# Enhanced summary report builder with data visualization
def build_enhanced_summary_report(analysis_results):
if not analysis_results or (len(analysis_results) == 1 and "error" in analysis_results[0]):
return "Analysis failed. Please check the logs for more information.", None, None
total_entries = len(analysis_results)
total_issues = sum(len(result.get("issues", [])) for result in analysis_results)
# Risk categorization with more granularity
risk_categories = {
"Critical (9-10)": 0,
"High (7-8)": 0,
"Medium (4-6)": 0,
"Low (1-3)": 0,
"None (0)": 0
}
for result in analysis_results:
score = result.get("overall_risk_score", 0)
if score >= 9:
risk_categories["Critical (9-10)"] += 1
elif score >= 7:
risk_categories["High (7-8)"] += 1
elif score >= 4:
risk_categories["Medium (4-6)"] += 1
elif score >= 1:
risk_categories["Low (1-3)"] += 1
else:
risk_categories["None (0)"] += 1
# Categorize issues with more detail
issue_categories = defaultdict(int)
issue_severity = defaultdict(list)
for result in analysis_results:
for issue in result.get("issues", []):
issue_type = issue.get("type", "other")
issue_categories[issue_type] += 1
issue_severity[issue_type].append(issue.get("risk_score", 0))
# Calculate average severity by issue type
avg_severity = {
issue_type: sum(scores)/len(scores) if scores else 0
for issue_type, scores in issue_severity.items()
}
# Identify patterns in timing
time_patterns = defaultdict(int)
for result in analysis_results:
if "metadata" in result and "timestamp" in result["metadata"]:
try:
timestamp = result["metadata"]["timestamp"]
dt = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
# Categorize by time of day
hour = dt.hour
if 0 <= hour < 6:
time_patterns["Early Morning (12am-6am)"] += 1
elif 6 <= hour < 9:
time_patterns["Morning (6am-9am)"] += 1
elif 9 <= hour < 12:
time_patterns["Mid-Morning (9am-12pm)"] += 1
elif 12 <= hour < 15:
time_patterns["Afternoon (12pm-3pm)"] += 1
elif 15 <= hour < 18:
time_patterns["Late Afternoon (3pm-6pm)"] += 1
elif 18 <= hour < 21:
time_patterns["Evening (6pm-9pm)"] += 1
else:
time_patterns["Night (9pm-12am)"] += 1
# Categorize by day of week
day_of_week = dt.strftime('%A')
time_patterns[day_of_week] += 1
# Month end patterns
last_day_of_month = (dt.replace(day=1) + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1)
if dt.day == last_day_of_month.day or dt.day == last_day_of_month.day - 1:
time_patterns["Month End"] += 1
except Exception:
pass
# User patterns - identify users with high-risk entries
user_risk = defaultdict(list)
for result in analysis_results:
if "metadata" in result and "created_by" in result["metadata"]:
user = result["metadata"]["created_by"]
score = result.get("overall_risk_score", 0)
user_risk[user].append(score)
avg_user_risk = {
user: sum(scores)/len(scores)
for user, scores in user_risk.items()
}
high_risk_users = {
user: avg_score
for user, avg_score in avg_user_risk.items()
if avg_score >= 5 and len(user_risk[user]) >= 2
}
# Build report with richer insights
report = f"""
# Journal Entry Audit Summary
## Overview
- **Total Entries Analyzed:** {total_entries}
- **Total Issues Detected:** {total_issues}
- **Average Issues Per Entry:** {total_issues/total_entries:.2f}
## Risk Distribution
"""
for category, count in risk_categories.items():
if count > 0:
report += f"- **{category}:** {count} entries ({count/total_entries*100:.1f}%)\n"
# Add issue categories section
report += "\n## Issue Categories\n"
for issue_type, count in sorted(issue_categories.items(), key=lambda x: x[1], reverse=True):
avg_risk = avg_severity.get(issue_type, 0)
report += f"- **{issue_type.replace('_', ' ').title()}:** {count} instances (Avg. Risk: {avg_risk:.1f})\n"
# Add timing patterns
report += "\n## Timing Patterns\n"
# Only show interesting patterns
after_hours = sum([
time_patterns.get("Early Morning (12am-6am)", 0),
time_patterns.get("Evening (6pm-9pm)", 0),
time_patterns.get("Night (9pm-12am)", 0)
])
weekend = sum([
time_patterns.get("Saturday", 0),
time_patterns.get("Sunday", 0)
])
report += f"- **After Hours Entries:** {after_hours} entries ({after_hours/total_entries*100:.1f}%)\n"
report += f"- **Weekend Entries:** {weekend} entries ({weekend/total_entries*100:.1f}%)\n"
report += f"- **Month End Entries:** {time_patterns.get('Month End', 0)} entries ({time_patterns.get('Month End', 0)/total_entries*100:.1f}%)\n"
# Add high-risk user section
report += "\n## User Risk Patterns\n"
if high_risk_users:
for user, risk_score in sorted(high_risk_users.items(), key=lambda x: x[1], reverse=True):
num_entries = len(user_risk[user])
report += f"- **{user}:** Average Risk Score {risk_score:.1f} ({num_entries} entries)\n"
else:
report += "- No users with consistently high-risk patterns detected\n"
# Add top 5 highest risk entries
report += "\n## Highest Risk Entries\n"
high_risk_entries = sorted(analysis_results, key=lambda x: x.get("overall_risk_score", 0), reverse=True)[:5]
for i, entry in enumerate(high_risk_entries):
txn_id = entry.get("transaction_id", "Unknown")
score = entry.get("overall_risk_score", 0)
concerns = entry.get("primary_concerns", "No details available")
report += f"### {i+1}. Transaction {txn_id} (Risk Score: {score})\n"
report += f"- **Primary Concerns:** {concerns}\n"
report += f"- **Key Issues:**\n"
for issue in entry.get("issues", [])[:3]: # Show top 3 issues per entry
report += f" - {issue.get('type', 'Unknown issue')} ({issue.get('risk_level', 'Unknown')}): {issue.get('description', 'No details')}\n"
# Create visualizations
risk_dist_fig = go.Figure(
data=[
go.Bar(
x=list(risk_categories.keys()),
y=list(risk_categories.values()),
marker_color=['darkred', 'red', 'orange', 'yellow', 'green']
)
]
)
risk_dist_fig.update_layout(
title="Risk Distribution of Journal Entries",
xaxis_title="Risk Category",
yaxis_title="Number of Entries",
template="plotly_white"
)
# Issue type visualization
if issue_categories:
issue_types = list(issue_categories.keys())
issue_counts = list(issue_categories.values())
issue_severity_vals = [avg_severity.get(t, 0) for t in issue_types]
issue_fig = go.Figure(
data=[
go.Bar(
x=issue_types,
y=issue_counts,
marker=dict(
color=issue_severity_vals,
colorscale='Reds',
colorbar=dict(title="Avg. Severity")
)
)
]
)
issue_fig.update_layout(
title="Issue Types by Frequency and Severity",
xaxis_title="Issue Type",
yaxis_title="Count",
template="plotly_white"
)
else:
issue_fig = None
return report, risk_dist_fig, issue_fig
# Analyze a single journal entry and provide detailed explanation
def analyze_entry_detailed(entry_data, analyzer):
"""Analyze a single journal entry with detailed explanation"""
# Extract transaction lines
transaction_id = entry_data["transaction_id"].iloc[0]
entry_lines = entry_data.to_dict('records')
# Calculate totals
debits = sum(line["amount"] for line in entry_lines if line["type"] == "Debit")
credits = sum(line["amount"] for line in entry_lines if line["type"] == "Credit")
balance = round(debits - credits, 2)
# Prepare entry in the format expected by analyzer
entry = {
"transaction_id": transaction_id,
"created_by": entry_data["created_by"].iloc[0],
"approved_by": entry_data["approved_by"].iloc[0],
"manual_override": entry_data["manual_override"].iloc[0],
"lines": entry_lines
}
# Metadata for analysis
metadata = {
"debit_total": debits,
"credit_total": credits,
"balance": balance
}
# Run analysis
analysis = analyzer.analyze_journal_entry(entry, metadata)
# Format for display
display_data = {
"transaction_id": transaction_id,
"timestamp": entry_data["timestamp"].iloc[0],
"created_by": entry_data["created_by"].iloc[0],
"approved_by": entry_data["approved_by"].iloc[0] if not pd.isna(entry_data["approved_by"].iloc[0]) else "Not Approved",
"manual_override": "Yes" if entry_data["manual_override"].iloc[0] else "No",
"line_count": len(entry_lines),
"debit_total": f"${debits:.2f}",
"credit_total": f"${credits:.2f}",
"balance": f"${balance:.2f}",
"risk_level": analysis.get("overall_risk_level", "Unknown"),
"risk_score": analysis.get("overall_risk_score", 0),
"issues": analysis.get("issues", []),
"explanation": analysis.get("explanation", "No explanation provided"),
"primary_concerns": analysis.get("primary_concerns", "No concerns identified")
}
return display_data, analysis
# Create a more detailed visualization for a single entry
def create_entry_visualization(entry_data):
"""Create a detailed visualization for a single journal entry"""
# Line chart for amounts
df = entry_data.copy()
# Convert debits and credits for visualization
df['amount_signed'] = df.apply(lambda x: x['amount'] if x['type'] == 'Credit' else -x['amount'], axis=1)
df['line_label'] = df.apply(lambda x: f"{x['account_name']} ({x['type']})", axis=1)
# Create a flow diagram
fig = go.Figure()
# Add bars for debits and credits
debits = df[df['type'] == 'Debit']
credits = df[df['type'] == 'Credit']
fig.add_trace(go.Bar(
x=debits['line_label'],
y=debits['amount'],
name='Debits',
marker_color='rgba(219, 64, 82, 0.7)'
))
fig.add_trace(go.Bar(
x=credits['line_label'],
y=credits['amount'],
name='Credits',
marker_color='rgba(50, 171, 96, 0.7)'
))
# Add line for running balance
running_balance = []
balance = 0
for _, row in df.iterrows():
if row['type'] == 'Debit':
balance -= row['amount']
else:
balance += row['amount']
running_balance.append(balance)
# Update layout
fig.update_layout(
title=f"Journal Entry {df['transaction_id'].iloc[0]} - Debits and Credits",
xaxis_title="Account",
yaxis_title="Amount ($)",
barmode='group',
template="plotly_white"
)
# Second visualization - flow of funds
flow_fig = go.Figure(data=[go.Sankey(
node = dict(
pad = 15,
thickness = 20,
line = dict(color = "black", width = 0.5),
label = list(df['account_name']) + ["Balance"],
color = ["rgba(219, 64, 82, 0.8)" if t == "Debit" else "rgba(50, 171, 96, 0.8)"
for t in df['type']] + ["rgba(131, 90, 241, 0.8)"]
),
link = dict(
source = [i for i in range(len(df))],
target = [len(df) for _ in range(len(df))],
value = df['amount'],
color = ["rgba(219, 64, 82, 0.5)" if t == "Debit" else "rgba(50, 171, 96, 0.5)"
for t in df['type']]
)
)])
flow_fig.update_layout(
title_text=f"Transaction {df['transaction_id'].iloc[0]} - Flow of Funds",
font_size=12
)
return fig, flow_fig
# Gradio interface with improved layout
def create_interface():
# OpenAI client setup
try:
client = get_openai_client()
analyzer = JournalAnalyzer(client)
parallel_analyzer = ParallelAnalyzer(max_workers=4)
except Exception as e:
print(f"Error initializing OpenAI client: {e}")
# Fallback to demo mode
analyzer = JournalAnalyzer()
parallel_analyzer = ParallelAnalyzer(max_workers=1)
# Define interface
with gr.Blocks(title="Enhanced Journal Entry Auditor", theme=gr.themes.Default()) as app:
gr.Markdown("# Enhanced Journal Entry Auditor")
gr.Markdown("### Analyze journal entries for potential fraud, anomalies, and control weaknesses")
with gr.Tab("Data Upload & Batch Analysis"):
with gr.Row():
with gr.Column(scale=2):
file_input = gr.File(label="Upload CSV (Optional)")
num_entries = gr.Slider(10, 500, 100, step=10, label="Number of sample entries (if not uploading a file)")
include_patterns = gr.Checkbox(True, label="Include suspicious patterns in generated data")
run_batch = gr.Button("Run Batch Analysis", variant="primary")
with gr.Column(scale=1):
with gr.Accordion("Analysis Settings", open=True):
model_choice = gr.Dropdown(
choices=["gpt-4o", "gpt-4-turbo", "gpt-3.5-turbo"],
value="gpt-4o",
label="Model"
)
temperature = gr.Slider(0, 1, 0.2, step=0.1, label="Temperature")
max_workers = gr.Slider(1, 8, 4, step=1, label="Parallel Workers")
progress_bar = gr.Progress()
with gr.Row():
analysis_table = gr.DataFrame(label="Analysis Results")
with gr.Row():
with gr.Column(scale=1):
summary_markdown = gr.Markdown(label="Summary Report")
with gr.Column(scale=1):
with gr.Tab("Risk Distribution"):
risk_plot = gr.Plot(label="Risk Distribution")
with gr.Tab("Issue Types"):
issue_plot = gr.Plot(label="Issue Types")
with gr.Tab("Entry Explorer"):
with gr.Row():
entry_selector = gr.Dropdown([], label="Select Transaction")
refresh_entries = gr.Button("Refresh Entries")
with gr.Row():
with gr.Column(scale=1):
entry_details = gr.JSON(label="Entry Details")
with gr.Column(scale=1):
with gr.Tab("Entry Visualization"):
entry_plot = gr.Plot(label="Entry Visualization")
with gr.Tab("Flow of Funds"):
flow_plot = gr.Plot(label="Flow of Funds")
with gr.Row():
entry_analysis = gr.JSON(label="Analysis Results")
# Define functions to handle interactions
def load_data(file, num_entries, include_patterns, progress=gr.Progress()):
if file is not None:
try:
df = pd.read_csv(file.name)
return df, f"Loaded {len(df)} entries from file"
except Exception as e:
return None, f"Error loading file: {str(e)}"
else:
progress(0, desc="Generating sample data...")
df = generate_sample_data(num_entries, include_patterns)
return df, f"Generated {len(df)} sample entries"
def run_analysis(file, num_entries, include_patterns, model, temp, workers, progress=gr.Progress()):
progress(0, desc="Loading data...")
df, msg = load_data(file, num_entries, include_patterns, progress)
if df is None:
return None, [], "Error: Failed to load data", None, None
# Set analyzer parameters
analyzer.model = model
analyzer.temperature = temp
parallel_analyzer.max_workers = workers
# Run analysis
progress(0.1, desc="Analyzing journal entries...")
results = parallel_analyzer.analyze_batch(df, analyzer,
lambda p: progress(0.1 + p * 0.8, desc="Analyzing journal entries..."))
# Create summary report
progress(0.9, desc="Generating report...")
report, risk_fig, issue_fig = build_enhanced_summary_report(results)
# Convert results to DataFrame for display
result_df = []
for r in results:
result_df.append({
"Transaction": r.get("transaction_id", "Unknown"),
"Risk Level": r.get("overall_risk_level", "Unknown"),
"Risk Score": r.get("overall_risk_score", 0),
"Issues": len(r.get("issues", [])),
"Primary Concerns": r.get("primary_concerns", "None")
})
return df, pd.DataFrame(result_df), report, risk_fig, issue_fig
def update_entry_selector(df):
if df is None:
return [], {}
# Get unique transaction IDs
transactions = df["transaction_id"].unique().tolist()
return transactions, {}
def load_entry(transaction_id, df):
if df is None or transaction_id is None:
return {}, None, None
# Filter data for selected transaction
entry_data = df[df["transaction_id"] == transaction_id]
if len(entry_data) == 0:
return {}, None, None
# Create visualizations
entry_vis, flow_vis = create_entry_visualization(entry_data)
# Format entry details
details = {
"transaction_id": transaction_id,
"timestamp": entry_data["timestamp"].iloc[0],
"created_by": entry_data["created_by"].iloc[0],
"approved_by": entry_data["approved_by"].iloc[0] if not pd.isna(entry_data["approved_by"].iloc[0]) else "Not Approved",
"manual_override": "Yes" if entry_data["manual_override"].iloc[0] else "No",
"lines": []
}
for _, row in entry_data.iterrows():
details["lines"].append({
"line_id": row["line_id"],
"account": f"{row['account_code']} - {row['account_name']}",
"department": row["department"],
"amount": f"${row['amount']:.2f}",
"type": row["type"],
"description": row["description"],
"document_reference": row.get("document_reference", "")
})
return details, entry_vis, flow_vis
def analyze_selected_entry(transaction_id, df, model, temp):
if df is None or transaction_id is None:
return {}
# Filter data for selected transaction
entry_data = df[df["transaction_id"] == transaction_id]
if len(entry_data) == 0:
return {}
# Update analyzer settings
analyzer.model = model
analyzer.temperature = temp
# Analyze entry
_, analysis = analyze_entry_detailed(entry_data, analyzer)
return analysis
# Event handlers
run_batch.click(
run_analysis,
inputs=[file_input, num_entries, include_patterns, model_choice, temperature, max_workers],
outputs=[gr.State(), analysis_table, summary_markdown, risk_plot, issue_plot]
).then(
update_entry_selector,
inputs=[gr.State()],
outputs=[entry_selector, entry_details]
)
refresh_entries.click(
update_entry_selector,
inputs=[gr.State()],
outputs=[entry_selector, entry_details]
)
entry_selector.change(
load_entry,
inputs=[entry_selector, gr.State()],
outputs=[entry_details, entry_plot, flow_plot]
).then(
analyze_selected_entry,
inputs=[entry_selector, gr.State(), model_choice, temperature],
outputs=[entry_analysis]
)
return app
# Main function
if __name__ == "__main__":
app = create_interface()
app.launch()