Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import json | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForCausalLM, | |
| AutoModelForSequenceClassification, | |
| TrainingArguments, | |
| Trainer | |
| ) | |
| import torch | |
| import numpy as np | |
| from torch.utils.data import Dataset, DataLoader | |
| import re | |
| class FinancialDataset(Dataset): | |
| def __init__(self, texts, labels, tokenizer, max_length=512): | |
| self.texts = texts | |
| self.labels = labels | |
| self.tokenizer = tokenizer | |
| self.max_length = max_length | |
| def __len__(self): | |
| return len(self.texts) | |
| def __getitem__(self, idx): | |
| text = str(self.texts[idx]) | |
| inputs = self.tokenizer( | |
| text, | |
| truncation=True, | |
| padding='max_length', | |
| max_length=self.max_length, | |
| return_tensors='pt' | |
| ) | |
| return { | |
| 'input_ids': inputs['input_ids'].squeeze(), | |
| 'attention_mask': inputs['attention_mask'].squeeze(), | |
| 'labels': torch.tensor(self.labels[idx], dtype=torch.long) | |
| } | |
| class FinancialAnalyzer: | |
| def __init__(self): | |
| print("Initializing Analyzer...") | |
| self.last_metrics = {} | |
| self.initialize_models() | |
| print("Initialization complete!") | |
| def initialize_models(self): | |
| """Initialize both TinyLlama and FinBERT models""" | |
| try: | |
| # Initialize TinyLlama | |
| self.llama_tokenizer = AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0") | |
| self.llama_model = AutoModelForCausalLM.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0") | |
| self.llama_model.eval() | |
| # Initialize FinBERT | |
| self.finbert_tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert") | |
| self.finbert_model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert") | |
| self.finbert_model.eval() | |
| print("Models loaded successfully!") | |
| except Exception as e: | |
| print(f"Error initializing models: {str(e)}") | |
| raise | |
| def clean_number(self, value): | |
| """Clean and convert numerical values""" | |
| try: | |
| if isinstance(value, str): | |
| value = value.replace('$', '').replace(',', '').strip() | |
| if '(' in value and ')' in value: | |
| value = '-' + value.replace('(', '').replace(')', '') | |
| return float(value or 0) | |
| except: | |
| return 0.0 | |
| def is_valid_markdown(self, file_path): | |
| """Check if a file is a valid Markdown file""" | |
| try: | |
| with open(file_path, 'r') as f: | |
| content = f.read() | |
| return any(line.startswith('#') or '|' in line for line in content.split('\n')) | |
| except: | |
| return False | |
| def parse_financial_data(self, content): | |
| """Parse markdown content into structured data""" | |
| try: | |
| data = {} | |
| current_section = "" | |
| current_table = [] | |
| headers = None | |
| for line in content.split('\n'): | |
| if line.startswith('#'): | |
| if current_table and headers: | |
| data[current_section] = self.process_table(headers, current_table) | |
| current_section = line.strip('# ') | |
| current_table = [] | |
| headers = None | |
| elif '|' in line: | |
| if '-|-' not in line: | |
| row = [cell.strip() for cell in line.split('|')[1:-1]] | |
| if not headers: | |
| headers = row | |
| else: | |
| current_table.append(row) | |
| if current_table and headers: | |
| data[current_section] = self.process_table(headers, current_table) | |
| return data | |
| except Exception as e: | |
| print(f"Error parsing financial data: {str(e)}") | |
| return {} | |
| def process_table(self, headers, rows): | |
| """Process table data into structured format""" | |
| try: | |
| processed_data = {} | |
| for row in rows: | |
| if len(row) == len(headers): | |
| item_name = row[0].strip('*').strip() | |
| processed_data[item_name] = {} | |
| for i, value in enumerate(row[1:], 1): | |
| processed_data[item_name][headers[i]] = self.clean_number(value) | |
| return processed_data | |
| except Exception as e: | |
| print(f"Error processing table: {str(e)}") | |
| return {} | |
| def get_nested_value(self, data, section, key, year): | |
| """Safely get nested dictionary value""" | |
| try: | |
| return data.get(section, {}).get(key, {}).get(str(year), 0) | |
| except: | |
| return 0 | |
| def extract_metrics(self, income_data, balance_data): | |
| """Extract and calculate key financial metrics""" | |
| try: | |
| metrics = { | |
| "Revenue": { | |
| "2025": self.get_nested_value(income_data, "Revenue", "Total Net Revenue", "2025"), | |
| "2024": self.get_nested_value(income_data, "Revenue", "Total Net Revenue", "2024"), | |
| "2021": self.get_nested_value(income_data, "Revenue", "Total Net Revenue", "2021") | |
| }, | |
| "Profitability": { | |
| "Gross_Profit_2025": self.get_nested_value(income_data, "Cost and Gross Profit", "Gross Profit", "2025"), | |
| "EBIT_2025": self.get_nested_value(income_data, "Profit Summary", "EBIT", "2025"), | |
| "Net_Earnings_2025": self.get_nested_value(income_data, "Profit Summary", "Net Earnings", "2025"), | |
| "Operating_Expenses_2025": self.get_nested_value(income_data, "Operating Expenses", "Total Operating Expenses", "2025") | |
| }, | |
| "Balance_Sheet": { | |
| "Total_Assets_2025": self.get_nested_value(balance_data, "Key Totals", "Total_Assets", "2025"), | |
| "Current_Assets_2025": self.get_nested_value(balance_data, "Key Totals", "Total_Current_Assets", "2025"), | |
| "Total_Liabilities_2025": self.get_nested_value(balance_data, "Key Totals", "Total_Liabilities", "2025"), | |
| "Current_Liabilities_2025": self.get_nested_value(balance_data, "Key Totals", "Total_Current_Liabilities", "2025"), | |
| "Equity_2025": self.get_nested_value(balance_data, "Key Totals", "Total_Shareholders_Equity", "2025"), | |
| "Inventory_2025": self.get_nested_value(balance_data, "Balance Sheet Data 2021-2025", "Inventory", "2025"), | |
| "Accounts_Receivable_2025": self.get_nested_value(balance_data, "Balance Sheet Data 2021-2025", "Accounts_Receivable", "2025"), | |
| "Long_Term_Debt_2025": self.get_nested_value(balance_data, "Balance Sheet Data 2021-2025", "Long_Term_Debt", "2025") | |
| }, | |
| "Cash_Flow": { | |
| "Depreciation_2025": self.get_nested_value(income_data, "Operating Expenses", "Depreciation & Amortization", "2025"), | |
| "Interest_Expense_2025": self.get_nested_value(income_data, "Profit Summary", "Interest Expense", "2025") | |
| } | |
| } | |
| revenue_2025 = metrics["Revenue"]["2025"] | |
| if revenue_2025 != 0: | |
| metrics["Ratios"] = { | |
| "Gross_Margin": (metrics["Profitability"]["Gross_Profit_2025"] / revenue_2025) * 100, | |
| "Operating_Margin": (metrics["Profitability"]["EBIT_2025"] / revenue_2025) * 100, | |
| "Net_Margin": (metrics["Profitability"]["Net_Earnings_2025"] / revenue_2025) * 100, | |
| "Current_Ratio": metrics["Balance_Sheet"]["Current_Assets_2025"] / metrics["Balance_Sheet"]["Current_Liabilities_2025"] if metrics["Balance_Sheet"]["Current_Liabilities_2025"] != 0 else 0, | |
| "Quick_Ratio": (metrics["Balance_Sheet"]["Current_Assets_2025"] - metrics["Balance_Sheet"]["Inventory_2025"]) / metrics["Balance_Sheet"]["Current_Liabilities_2025"] if metrics["Balance_Sheet"]["Current_Liabilities_2025"] != 0 else 0, | |
| "Asset_Turnover": revenue_2025 / metrics["Balance_Sheet"]["Total_Assets_2025"] if metrics["Balance_Sheet"]["Total_Assets_2025"] != 0 else 0, | |
| "Receivables_Turnover": revenue_2025 / metrics["Balance_Sheet"]["Accounts_Receivable_2025"] if metrics["Balance_Sheet"]["Accounts_Receivable_2025"] != 0 else 0, | |
| "Debt_to_Equity": metrics["Balance_Sheet"]["Total_Liabilities_2025"] / metrics["Balance_Sheet"]["Equity_2025"] if metrics["Balance_Sheet"]["Equity_2025"] != 0 else 0, | |
| "Interest_Coverage": metrics["Profitability"]["EBIT_2025"] / metrics["Cash_Flow"]["Interest_Expense_2025"] if metrics["Cash_Flow"]["Interest_Expense_2025"] != 0 else 0, | |
| "Revenue_Growth": ((metrics["Revenue"]["2025"] / metrics["Revenue"]["2024"]) - 1) * 100 if metrics["Revenue"]["2024"] != 0 else 0, | |
| "5Year_Revenue_CAGR": ((metrics["Revenue"]["2025"] / metrics["Revenue"]["2021"]) ** (1/4) - 1) * 100 if metrics["Revenue"]["2021"] != 0 else 0 | |
| } | |
| return metrics | |
| except Exception as e: | |
| print(f"Error extracting metrics: {str(e)}") | |
| return {} | |
| def convert_to_serializable(obj): | |
| """Convert numpy values to Python native types""" | |
| if isinstance(obj, np.float32): | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| elif isinstance(obj, dict): | |
| return {key: convert_to_serializable(value) for key, value in obj.items()} | |
| elif isinstance(obj, list): | |
| return [convert_to_serializable(item) for item in obj] | |
| return obj | |
| def get_sentiment_analysis(self, metrics): | |
| """Get financial sentiment analysis using FinBERT""" | |
| try: | |
| financial_text = f""" | |
| Revenue growth: {metrics['Ratios'].get('Revenue_Growth', 0):.2f}% | |
| Profit margin: {metrics['Ratios'].get('Net_Margin', 0):.2f}% | |
| Debt to equity: {metrics['Ratios'].get('Debt_to_Equity', 0):.2f} | |
| Interest coverage: {metrics['Ratios'].get('Interest_Coverage', 0):.2f} | |
| Current ratio: {metrics['Ratios'].get('Current_Ratio', 0):.2f} | |
| """ | |
| inputs = self.finbert_tokenizer(financial_text, return_tensors="pt", padding=True, truncation=True) | |
| outputs = self.finbert_model(**inputs) | |
| probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) | |
| sentiment_scores = probabilities.detach().numpy()[0] | |
| sentiments = ['negative', 'neutral', 'positive'] | |
| sentiment_dict = dict(zip(sentiments, [float(score) for score in sentiment_scores])) | |
| return sentiment_dict | |
| except Exception as e: | |
| print(f"Error in sentiment analysis: {str(e)}") | |
| return {} | |
| def analyze_financials(self, balance_sheet_file, income_stmt_file): | |
| """Main analysis function""" | |
| try: | |
| # Validate input files | |
| if not (self.is_valid_markdown(balance_sheet_file) and self.is_valid_markdown(income_stmt_file)): | |
| return "Error: One or both files are invalid or not in Markdown format." | |
| # Read files | |
| with open(balance_sheet_file, 'r') as f: | |
| balance_sheet = f.read() | |
| with open(income_stmt_file, 'r') as f: | |
| income_stmt = f.read() | |
| # Process financial data | |
| income_data = self.parse_financial_data(income_stmt) | |
| balance_data = self.parse_financial_data(balance_sheet) | |
| metrics = self.extract_metrics(income_data, balance_data) | |
| # Get sentiment analysis | |
| sentiment_dict = self.get_sentiment_analysis(metrics) | |
| # Generate and get analysis | |
| prompt = self.generate_prompt(metrics, sentiment_dict) | |
| analysis = self.generate_analysis(prompt) | |
| # Convert all numpy values to Python native types | |
| metrics = convert_to_serializable(metrics) | |
| sentiment_dict = convert_to_serializable(sentiment_dict) | |
| # Prepare final results | |
| results = { | |
| "Financial Analysis": { | |
| "Key Metrics": metrics, | |
| "Market Sentiment": sentiment_dict, | |
| "AI Insights": analysis, | |
| "Analysis Period": "2021-2025", | |
| "Note": "All monetary values in millions ($M)" | |
| } | |
| } | |
| return json.dumps(results, indent=2) | |
| except Exception as e: | |
| return f"Error in analysis: {str(e)}\n\nDetails: {type(e).__name__}" | |
| def generate_prompt(self, metrics, sentiment_dict): | |
| """Create enhanced analysis prompt with sentiment""" | |
| try: | |
| return f"""[INST] As a financial analyst, provide a comprehensive analysis of this company's performance. | |
| Financial Metrics (2025): | |
| ------------------------ | |
| 1. Revenue & Growth: | |
| - Revenue: ${metrics['Revenue']['2025']:,.1f}M | |
| - Growth Rate: {metrics['Ratios'].get('Revenue_Growth', 0):,.1f}% | |
| - 5-Year CAGR: {metrics['Ratios'].get('5Year_Revenue_CAGR', 0):,.1f}% | |
| 2. Profitability: | |
| - Gross Profit: ${metrics['Profitability']['Gross_Profit_2025']:,.1f}M | |
| - EBIT: ${metrics['Profitability']['EBIT_2025']:,.1f}M | |
| - Net Earnings: ${metrics['Profitability']['Net_Earnings_2025']:,.1f}M | |
| - Margins: | |
| * Gross: {metrics['Ratios'].get('Gross_Margin', 0):,.1f}% | |
| * Operating: {metrics['Ratios'].get('Operating_Margin', 0):,.1f}% | |
| * Net: {metrics['Ratios'].get('Net_Margin', 0):,.1f}% | |
| 3. Financial Position: | |
| - Assets: ${metrics['Balance_Sheet']['Total_Assets_2025']:,.1f}M | |
| - Liabilities: ${metrics['Balance_Sheet']['Total_Liabilities_2025']:,.1f}M | |
| - Equity: ${metrics['Balance_Sheet']['Equity_2025']:,.1f}M | |
| 4. Key Ratios: | |
| - Liquidity: Current Ratio {metrics['Ratios'].get('Current_Ratio', 0):,.2f}x | |
| - Efficiency: Asset Turnover {metrics['Ratios'].get('Asset_Turnover', 0):,.2f}x | |
| - Solvency: Debt/Equity {metrics['Ratios'].get('Debt_to_Equity', 0):,.2f}x | |
| - Coverage: Interest Coverage {metrics['Ratios'].get('Interest_Coverage', 0):,.2f}x | |
| Market Sentiment Indicators: | |
| --------------------------- | |
| - Positive: {sentiment_dict.get('positive', 0):,.2f} | |
| - Neutral: {sentiment_dict.get('neutral', 0):,.2f} | |
| - Negative: {sentiment_dict.get('negative', 0):,.2f} | |
| Provide: | |
| 1. Overall financial health assessment | |
| 2. Key strengths and concerns | |
| 3. Operational efficiency analysis | |
| 4. Recommendations for improvement | |
| [/INST]""" | |
| except Exception as e: | |
| print(f"Error generating prompt: {str(e)}") | |
| return "" | |
| def generate_analysis(self, prompt): | |
| """Generate analysis using TinyLlama""" | |
| try: | |
| # Format the prompt in TinyLlama's expected format | |
| formatted_prompt = f"<human>: {prompt}\n<assistant>: Let me analyze these financial metrics in detail." | |
| inputs = self.llama_tokenizer( | |
| formatted_prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=2048, | |
| padding=True | |
| ) | |
| # Generate with adjusted parameters | |
| outputs = self.llama_model.generate( | |
| inputs["input_ids"], | |
| max_new_tokens=1024, | |
| min_new_tokens=200, # Ensure minimum length | |
| temperature=0.8, # Slightly increased creativity | |
| top_p=0.92, # Slightly increased diversity | |
| do_sample=True, | |
| repetition_penalty=1.2, | |
| length_penalty=1.5, # Encourage longer generations | |
| num_return_sequences=1, | |
| pad_token_id=self.llama_tokenizer.eos_token_id, | |
| eos_token_id=self.llama_tokenizer.eos_token_id, | |
| early_stopping=True | |
| ) | |
| # Decode and clean up the response | |
| analysis = self.llama_tokenizer.decode(outputs[0], skip_special_tokens=False) | |
| # Extract only the assistant's response | |
| if "<assistant>:" in analysis: | |
| analysis = analysis.split("<assistant>:")[-1].strip() | |
| # Clean up any remaining tags | |
| analysis = analysis.replace("<human>:", "").replace("<assistant>:", "").strip() | |
| # Validate output length and content | |
| if len(analysis.split()) < 100: | |
| # Fallback analysis if model generation is too short | |
| analysis = self.generate_fallback_analysis(self.last_metrics) | |
| return analysis | |
| except Exception as e: | |
| print(f"Detailed error in generate_analysis: {str(e)}") | |
| return self.generate_fallback_analysis(self.last_metrics) | |
| def generate_fallback_analysis(self, metrics): | |
| """Generate a basic analysis when the model fails""" | |
| try: | |
| revenue_growth = metrics['Ratios'].get('Revenue_Growth', 0) | |
| net_margin = metrics['Ratios'].get('Net_Margin', 0) | |
| current_ratio = metrics['Ratios'].get('Current_Ratio', 0) | |
| debt_to_equity = metrics['Ratios'].get('Debt_to_Equity', 0) | |
| analysis = f""" | |
| Financial Analysis Summary: | |
| 1. Revenue and Growth: | |
| The company shows a revenue growth of {revenue_growth:.1f}%, indicating { | |
| 'strong' if revenue_growth > 5 else 'moderate' if revenue_growth > 0 else 'weak'} growth performance. | |
| 2. Profitability: | |
| With a net margin of {net_margin:.1f}%, the company demonstrates { | |
| 'strong' if net_margin > 10 else 'moderate' if net_margin > 5 else 'concerning'} profitability levels. | |
| 3. Liquidity Position: | |
| The current ratio of {current_ratio:.2f}x suggests { | |
| 'very strong' if current_ratio > 2 else 'adequate' if current_ratio > 1 else 'concerning'} liquidity position. | |
| 4. Financial Leverage: | |
| With a debt-to-equity ratio of {debt_to_equity:.2f}, the company maintains { | |
| 'conservative' if debt_to_equity < 0.5 else 'moderate' if debt_to_equity < 1 else 'aggressive'} leverage. | |
| Key Recommendations: | |
| 1. {'Consider debt reduction' if debt_to_equity > 0.5 else 'Maintain current debt levels'} | |
| 2. {'Focus on improving profit margins' if net_margin < 5 else 'Maintain profit efficiency'} | |
| 3. {'Implement growth strategies' if revenue_growth < 2 else 'Sustain growth momentum'} | |
| This analysis is based on key financial metrics and standard industry benchmarks. | |
| """ | |
| return analysis | |
| except Exception as e: | |
| return f"Error generating fallback analysis: {str(e)}" | |
| def fine_tune_models(self, train_texts, train_labels, epochs=3): | |
| """Fine-tune the models with custom data""" | |
| try: | |
| # Prepare dataset | |
| train_dataset = FinancialDataset(train_texts, train_labels, self.llama_tokenizer) | |
| # Training arguments | |
| training_args = TrainingArguments( | |
| output_dir="./financial_model_tuned", | |
| num_train_epochs=epochs, | |
| per_device_train_batch_size=4, | |
| logging_dir="./logs", | |
| logging_steps=10, | |
| save_steps=50, | |
| eval_steps=50, | |
| evaluation_strategy="steps", | |
| learning_rate=2e-5, | |
| weight_decay=0.01, | |
| warmup_steps=500, | |
| ) | |
| # Initialize trainer | |
| trainer = Trainer( | |
| model=self.llama_model, | |
| args=training_args, | |
| train_dataset=train_dataset, | |
| ) | |
| # Fine-tune the model | |
| trainer.train() | |
| # Save the fine-tuned model | |
| self.llama_model.save_pretrained("./financial_model_tuned") | |
| self.llama_tokenizer.save_pretrained("./financial_model_tuned") | |
| print("Fine-tuning completed successfully!") | |
| except Exception as e: | |
| print(f"Error in fine-tuning: {str(e)}") | |
| def analyze_financials(self, balance_sheet_file, income_stmt_file): | |
| """Main analysis function""" | |
| try: | |
| # Validate input files | |
| if not (self.is_valid_markdown(balance_sheet_file) and self.is_valid_markdown(income_stmt_file)): | |
| return "Error: One or both files are invalid or not in Markdown format." | |
| # Read files | |
| with open(balance_sheet_file, 'r') as f: | |
| balance_sheet = f.read() | |
| with open(income_stmt_file, 'r') as f: | |
| income_stmt = f.read() | |
| # Process financial data | |
| income_data = self.parse_financial_data(income_stmt) | |
| balance_data = self.parse_financial_data(balance_sheet) | |
| metrics = self.extract_metrics(income_data, balance_data) | |
| self.last_metrics = metrics | |
| # Get sentiment analysis | |
| sentiment_dict = self.get_sentiment_analysis(metrics) | |
| # Generate and get analysis | |
| prompt = self.generate_prompt(metrics, sentiment_dict) | |
| analysis = self.generate_analysis(prompt) | |
| # Prepare final results | |
| results = { | |
| "Financial Analysis": { | |
| "Key Metrics": metrics, | |
| "Market Sentiment": sentiment_dict, | |
| "AI Insights": analysis, | |
| "Analysis Period": "2021-2025", | |
| "Note": "All monetary values in millions ($M)" | |
| } | |
| } | |
| return json.dumps(results, indent=2) | |
| except Exception as e: | |
| return f"Error in analysis: {str(e)}\n\nDetails: {type(e).__name__}" | |
| def create_interface(): | |
| analyzer = FinancialAnalyzer() | |
| iface = gr.Interface( | |
| fn=analyzer.analyze_financials, | |
| inputs=[ | |
| gr.File(label="Balance Sheet (Markdown)", type="filepath"), | |
| gr.File(label="Income Statement (Markdown)", type="filepath") | |
| ], | |
| outputs=gr.Textbox(label="Analysis Results", lines=25), | |
| title="AI Financial Statement Analyzer", | |
| description="""Upload financial statements in Markdown format for AI-powered analysis. | |
| The analysis combines LLM-based insights with sentiment analysis.""" | |
| ) | |
| return iface | |
| if __name__ == "__main__": | |
| iface = create_interface() | |
| iface.launch() |