Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import pandas as pd | |
| import numpy as np | |
| import gradio as gr | |
| from datetime import datetime | |
| import time | |
| # We'll use a custom wrapper to initialize the OpenAI client safely | |
| def get_openai_client(): | |
| try: | |
| # Import the necessary modules | |
| from openai import OpenAI | |
| import httpx | |
| import types | |
| # Get the API key | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| return None, "OpenAI API key not found in environment variables" | |
| # Create a custom version of httpx.Client that ignores the 'proxies' parameter | |
| original_client_init = httpx.Client.__init__ | |
| def patched_init(self, *args, **kwargs): | |
| # Remove 'proxies' if it exists | |
| if 'proxies' in kwargs: | |
| del kwargs['proxies'] | |
| # Call the original init | |
| return original_client_init(self, *args, **kwargs) | |
| # Apply the patch temporarily | |
| httpx.Client.__init__ = patched_init | |
| # Create the OpenAI client | |
| client = OpenAI(api_key=api_key) | |
| # Restore the original init method | |
| httpx.Client.__init__ = original_client_init | |
| return client, None | |
| except Exception as e: | |
| return None, f"Error initializing OpenAI client: {str(e)}" | |
| # Initialize the OpenAI client | |
| client, client_error = get_openai_client() | |
| def analyze_journal_entry(entry_data): | |
| """Analyze a single journal entry using OpenAI""" | |
| if client is None: | |
| return {"error": client_error or "OpenAI client not initialized", "risk_score": 0, "issues_detected": [], "explanation": "Error analyzing entry", "recommendations": []} | |
| # Convert entry data to a formatted string for analysis | |
| entry_str = json.dumps(entry_data, indent=2) | |
| prompt = f""" | |
| As an accounting auditor, analyze this journal entry for potential issues: | |
| {entry_str} | |
| Look specifically for: | |
| 1. Manual overrides of automated controls | |
| 2. Missing approvals or authorizations | |
| 3. Unusual timing or amounts that may indicate fraud | |
| 4. Mismatched debit and credit totals | |
| 5. Transactions with unusual accounts or descriptions | |
| 6. Entries made outside normal business hours | |
| Format your response as JSON with these fields: | |
| - risk_score (0-100) | |
| - issues_detected (array of strings) | |
| - explanation (detailed explanation of findings) | |
| - recommendations (array of strings) | |
| """ | |
| try: | |
| response = client.chat.completions.create( | |
| model="gpt-3.5-turbo", # Using GPT-4 for better analysis quality | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.1, | |
| response_format={"type": "json_object"} | |
| ) | |
| analysis = json.loads(response.choices[0].message.content) | |
| return analysis | |
| except Exception as e: | |
| return {"error": str(e), "risk_score": 0, "issues_detected": [], "explanation": "Error analyzing entry", "recommendations": []} | |
| def validate_journal_entries(file): | |
| """Validate the format of uploaded journal entries file""" | |
| try: | |
| if file.name.endswith('.csv'): | |
| df = pd.read_csv(file.name) | |
| elif file.name.endswith(('.xls', '.xlsx')): | |
| df = pd.read_excel(file.name) | |
| else: | |
| return None, "Unsupported file format. Please upload CSV or Excel file." | |
| # Check for required columns | |
| required_columns = ['entry_id', 'date', 'account', 'description', 'debit', 'credit', 'approver'] | |
| missing_columns = [col for col in required_columns if col not in df.columns] | |
| if missing_columns: | |
| return None, f"Missing required columns: {', '.join(missing_columns)}" | |
| # Convert date column to datetime | |
| df['date'] = pd.to_datetime(df['date'], errors='coerce') | |
| # Fill NaN values | |
| df = df.fillna({'approver': 'None', 'description': 'No description'}) | |
| # Ensure numeric columns are numeric | |
| df['debit'] = pd.to_numeric(df['debit'], errors='coerce').fillna(0) | |
| df['credit'] = pd.to_numeric(df['credit'], errors='coerce').fillna(0) | |
| return df, "File validated successfully" | |
| except Exception as e: | |
| return None, f"Error validating file: {str(e)}" | |
| def analyze_file(file, max_entries=None): | |
| """Analyze journal entries from uploaded file""" | |
| # Check if OpenAI client is initialized | |
| if client is None: | |
| return f"Error: {client_error or 'OpenAI client not initialized. Please check your API key.'}", None, None | |
| # Validate and load file | |
| df, validation_message = validate_journal_entries(file) | |
| if df is None: | |
| return validation_message, None, None | |
| # Limit entries if specified | |
| if max_entries and max_entries > 0: | |
| df = df.head(max_entries) | |
| # Prepare results | |
| results = [] | |
| high_risk_entries = [] | |
| summary_stats = { | |
| "total_entries": len(df), | |
| "high_risk_count": 0, | |
| "medium_risk_count": 0, | |
| "low_risk_count": 0, | |
| "issues_by_type": {}, | |
| "processing_time": 0 | |
| } | |
| start_time = time.time() | |
| # Process each journal entry | |
| for _, row in df.iterrows(): | |
| entry_data = { | |
| "entry_id": str(row['entry_id']), | |
| "date": row['date'].strftime('%Y-%m-%d %H:%M:%S') if isinstance(row['date'], datetime) else str(row['date']), | |
| "account": str(row['account']), | |
| "description": str(row['description']), | |
| "debit": float(row['debit']), | |
| "credit": float(row['credit']), | |
| "approver": str(row['approver']), | |
| # Include any additional columns that exist | |
| **{col: str(row[col]) for col in df.columns if col not in ['entry_id', 'date', 'account', 'description', 'debit', 'credit', 'approver']} | |
| } | |
| # Analyze the entry | |
| analysis = analyze_journal_entry(entry_data) | |
| # Add entry data to analysis result | |
| result = {**entry_data, **analysis} | |
| results.append(result) | |
| # Update summary statistics | |
| risk_score = result.get('risk_score', 0) | |
| if risk_score >= 70: | |
| summary_stats["high_risk_count"] += 1 | |
| high_risk_entries.append(result) | |
| elif risk_score >= 30: | |
| summary_stats["medium_risk_count"] += 1 | |
| else: | |
| summary_stats["low_risk_count"] += 1 | |
| # Count issues by type | |
| for issue in result.get('issues_detected', []): | |
| if issue in summary_stats["issues_by_type"]: | |
| summary_stats["issues_by_type"][issue] += 1 | |
| else: | |
| summary_stats["issues_by_type"][issue] = 1 | |
| summary_stats["processing_time"] = round(time.time() - start_time, 2) | |
| # Create a formatted report | |
| report_markdown = generate_report(results, summary_stats) | |
| # Create a dataframe of high-risk entries for display | |
| high_risk_df = None | |
| if high_risk_entries: | |
| high_risk_df = pd.DataFrame([{ | |
| "Entry ID": entry["entry_id"], | |
| "Date": entry["date"], | |
| "Account": entry["account"], | |
| "Amount": max(entry["debit"], entry["credit"]), | |
| "Risk Score": entry["risk_score"], | |
| "Issues": ", ".join(entry["issues_detected"]) | |
| } for entry in high_risk_entries]) | |
| # Convert summary stats to a text summary | |
| summary_text = create_summary_text(summary_stats) | |
| return summary_text, high_risk_df, report_markdown | |
| def create_summary_text(stats): | |
| """Convert summary statistics to readable text""" | |
| summary = f"Analyzed {stats['total_entries']} journal entries in {stats['processing_time']} seconds. " | |
| # Risk level breakdown | |
| summary += f"Found {stats['high_risk_count']} high-risk entries, {stats['medium_risk_count']} medium-risk entries, " | |
| summary += f"and {stats['low_risk_count']} low-risk entries. " | |
| # Issues breakdown | |
| if stats['issues_by_type']: | |
| summary += "The most common issues detected were: " | |
| sorted_issues = sorted(stats['issues_by_type'].items(), key=lambda x: x[1], reverse=True) | |
| issue_texts = [] | |
| for issue, count in sorted_issues: | |
| percentage = round((count / stats['total_entries']) * 100) | |
| issue_texts.append(f"{issue} ({count} entries, {percentage}%)") | |
| if len(issue_texts) > 1: | |
| summary += ", ".join(issue_texts[:-1]) + f", and {issue_texts[-1]}." | |
| else: | |
| summary += f"{issue_texts[0]}." | |
| else: | |
| summary += "No specific issues were detected." | |
| return summary | |
| def generate_report(results, summary_stats): | |
| """Generate a detailed report from analysis results""" | |
| # Sort entries by risk score (highest first) | |
| sorted_entries = sorted(results, key=lambda x: x.get('risk_score', 0), reverse=True) | |
| report = f"""# Journal Entry Audit Report | |
| ## Summary | |
| - Total Entries Analyzed: {summary_stats['total_entries']} | |
| - High Risk Entries: {summary_stats['high_risk_count']} ({round(summary_stats['high_risk_count']/summary_stats['total_entries']*100, 1) if summary_stats['total_entries'] > 0 else 0}%) | |
| - Medium Risk Entries: {summary_stats['medium_risk_count']} ({round(summary_stats['medium_risk_count']/summary_stats['total_entries']*100, 1) if summary_stats['total_entries'] > 0 else 0}%) | |
| - Low Risk Entries: {summary_stats['low_risk_count']} ({round(summary_stats['low_risk_count']/summary_stats['total_entries']*100, 1) if summary_stats['total_entries'] > 0 else 0}%) | |
| - Processing Time: {summary_stats['processing_time']} seconds | |
| ## Issues By Type | |
| """ | |
| # Add issues by type to report | |
| sorted_issues = sorted(summary_stats["issues_by_type"].items(), key=lambda x: x[1], reverse=True) | |
| for issue, count in sorted_issues: | |
| report += f"- {issue}: {count} entries\n" | |
| # Add details of high-risk entries | |
| report += "\n## High Risk Entries (Details)\n\n" | |
| for entry in sorted_entries: | |
| if entry.get('risk_score', 0) >= 70: | |
| report += f"""### Entry ID: {entry['entry_id']} (Risk Score: {entry['risk_score']}) | |
| - **Date**: {entry['date']} | |
| - **Account**: {entry['account']} | |
| - **Description**: {entry['description']} | |
| - **Amount**: Debit: {entry['debit']}, Credit: {entry['credit']} | |
| - **Approver**: {entry['approver']} | |
| **Issues Detected**: | |
| """ | |
| for issue in entry.get('issues_detected', []): | |
| report += f"- {issue}\n" | |
| report += f"\n**Explanation**: {entry.get('explanation', 'No explanation provided')}\n\n" | |
| report += "**Recommendations**:\n" | |
| for rec in entry.get('recommendations', []): | |
| report += f"- {rec}\n" | |
| report += "\n---\n\n" | |
| return report | |
| def interface(): | |
| """Create the Gradio interface""" | |
| with gr.Blocks(title="AI-Powered Journal Entry Auditor") as app: | |
| gr.Markdown("# AI-Powered Journal Entry Auditor") | |
| gr.Markdown("Upload your journal entries file (CSV or Excel) to detect potential issues using AI analysis.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File(label="Upload Journal Entries File (CSV or Excel)") | |
| max_entries = gr.Slider(label="Max Entries to Analyze (0 for all)", minimum=0, maximum=1000, value=100, step=10) | |
| analyze_button = gr.Button("Analyze Journal Entries") | |
| with gr.Column(): | |
| status = gr.Textbox(label="Analysis Summary", lines=4) | |
| with gr.Tabs(): | |
| with gr.TabItem("High Risk Entries"): | |
| high_risk_table = gr.Dataframe(label="High Risk Entries") | |
| with gr.TabItem("Detailed Report"): | |
| report = gr.Markdown(label="Detailed Report") | |
| analyze_button.click( | |
| analyze_file, | |
| inputs=[file_input, max_entries], | |
| outputs=[status, high_risk_table, report] | |
| ) | |
| gr.Markdown(""" | |
| ## How to Use | |
| 1. Upload a CSV or Excel file containing journal entries | |
| 2. Optionally limit the number of entries to analyze | |
| 3. Click "Analyze Journal Entries" | |
| 4. View the analysis summary, high-risk entries, and detailed report | |
| ## Required File Format | |
| Your file must include these columns: | |
| - entry_id: Unique identifier for each journal entry | |
| - date: Date and time of the entry | |
| - account: Account name or number | |
| - description: Description of the transaction | |
| - debit: Debit amount | |
| - credit: Credit amount | |
| - approver: Person who approved the entry (if any) | |
| Additional columns will be included in the analysis. | |
| """) | |
| return app | |
| if __name__ == "__main__": | |
| app = interface() | |
| app.launch(share=True) |