|
|
| import requests |
| import pandas as pd |
| import json |
| import os |
| import logging |
| from datetime import datetime |
| from typing import Dict, Any, List |
| import time |
| import PyPDF2 |
| import pdfplumber |
| from io import BytesIO |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| class NebiusFinanceProcessor: |
| """Finance processor using Nebius AI Studio API with DeepSeek-V3 model for transaction analysis.""" |
| |
| def __init__(self, api_key: str = None): |
| self.api_key = api_key or os.environ.get("NEBIUS_API_KEY") |
| if not self.api_key: |
| raise ValueError("NEBIUS_API_KEY environment variable must be set") |
| |
| self.base_url = "https://api.studio.nebius.ai/v1/chat/completions" |
| self.model_name = "deepseek-ai/DeepSeek-V3-0324" |
| self.headers = { |
| "Authorization": f"Bearer {self.api_key}", |
| "Content-Type": "application/json" |
| } |
| logger.info("Nebius AI Studio client initialized successfully with DeepSeek-V3") |
| |
| def _make_api_request(self, messages: List[Dict], max_tokens: int = 2000, temperature: float = 0.1) -> str: |
| """Make API request to Nebius AI Studio.""" |
| try: |
| payload = { |
| "model": self.model_name, |
| "messages": messages, |
| "max_tokens": max_tokens, |
| "temperature": temperature, |
| "stream": False |
| } |
| |
| response = requests.post( |
| self.base_url, |
| headers=self.headers, |
| json=payload, |
| timeout=30 |
| ) |
| |
| if response.status_code == 200: |
| result = response.json() |
| return result["choices"][0]["message"]["content"] |
| else: |
| logger.error(f"API request failed with status {response.status_code}: {response.text}") |
| raise Exception(f"API request failed: {response.status_code}") |
| |
| except Exception as e: |
| logger.error(f"Error making API request: {str(e)}") |
| raise |
| |
| def categorize_transaction_batch(self, transactions: List[Dict]) -> List[Dict]: |
| """Categorize multiple transactions using DeepSeek-V3 API.""" |
| try: |
| |
| transactions_text = "\n".join([ |
| f"{i+1}. Date: {tx['date']}, Description: {tx['description']}, Amount: {tx['amount']}" |
| for i, tx in enumerate(transactions) |
| ]) |
| |
| prompt = f"""You are a financial AI assistant specialized in categorizing financial transactions. |
| Please categorize each of the following transactions into appropriate categories. |
| |
| Available categories: |
| - Salary Income |
| - Investment Income |
| - Refunds & Returns |
| - Other Income |
| - Groceries & Food |
| - Transportation |
| - Utilities |
| - Housing |
| - Healthcare & Insurance |
| - Dining & Restaurants |
| - Entertainment |
| - Shopping |
| - Fitness & Sports |
| - Savings & Investments |
| - Bank Fees |
| - Taxes |
| - Other Expenses |
| - Miscellaneous |
| - Uncategorized |
| |
| Transactions to categorize: |
| {transactions_text} |
| |
| Please respond with a JSON array where each object contains: |
| - "index": the transaction number (1-based) |
| - "category": the assigned category |
| - "reasoning": brief explanation for the categorization |
| |
| Format your response as valid JSON only, no additional text.""" |
|
|
| messages = [{"role": "user", "content": prompt}] |
| response_text = self._make_api_request(messages, max_tokens=4000, temperature=0.1) |
| |
| |
| try: |
| |
| if "```json" in response_text: |
| json_start = response_text.find("```json") + 7 |
| json_end = response_text.find("```", json_start) |
| json_content = response_text[json_start:json_end].strip() |
| else: |
| json_content = response_text.strip() |
| |
| categorizations = json.loads(json_content) |
| except json.JSONDecodeError as e: |
| logger.error(f"Failed to parse DeepSeek response as JSON: {e}") |
| |
| return self._fallback_categorization(transactions) |
| |
| |
| for cat in categorizations: |
| if isinstance(cat, dict) and "index" in cat and "category" in cat: |
| idx = cat["index"] - 1 |
| if 0 <= idx < len(transactions): |
| transactions[idx]["category"] = cat["category"] |
| transactions[idx]["ai_reasoning"] = cat.get("reasoning", "") |
| |
| |
| for tx in transactions: |
| if "category" not in tx: |
| tx["category"] = self._fallback_single_categorization(tx) |
| |
| logger.info(f"Successfully categorized {len(transactions)} transactions with DeepSeek-V3") |
| return transactions |
| |
| except Exception as e: |
| logger.error(f"Error in DeepSeek categorization: {str(e)}") |
| return self._fallback_categorization(transactions) |
| |
| def _fallback_categorization(self, transactions: List[Dict]) -> List[Dict]: |
| """Fallback rule-based categorization.""" |
| for tx in transactions: |
| tx["category"] = self._fallback_single_categorization(tx) |
| return transactions |
| |
| def _fallback_single_categorization(self, transaction: Dict) -> str: |
| """Rule-based categorization for a single transaction.""" |
| description = transaction.get("description", "").lower() |
| amount = float(transaction.get("amount", 0)) |
| |
| if amount > 0: |
| |
| if any(word in description for word in ['salary', 'wage', 'payroll', 'income']): |
| return 'Salary Income' |
| elif any(word in description for word in ['refund', 'return', 'cashback']): |
| return 'Refunds & Returns' |
| elif any(word in description for word in ['interest', 'dividend', 'investment']): |
| return 'Investment Income' |
| else: |
| return 'Other Income' |
| else: |
| |
| if any(word in description for word in ['grocery', 'supermarket', 'food store', 'market']): |
| return 'Groceries & Food' |
| elif any(word in description for word in ['gas', 'fuel', 'petrol', 'station', 'uber', 'taxi', 'bus']): |
| return 'Transportation' |
| elif any(word in description for word in ['utility', 'electric', 'water', 'internet', 'phone']): |
| return 'Utilities' |
| elif any(word in description for word in ['rent', 'mortgage', 'housing']): |
| return 'Housing' |
| elif any(word in description for word in ['insurance', 'medical', 'health', 'doctor', 'pharmacy']): |
| return 'Healthcare & Insurance' |
| elif any(word in description for word in ['restaurant', 'dining', 'cafe', 'fast food']): |
| return 'Dining & Restaurants' |
| elif any(word in description for word in ['entertainment', 'movie', 'streaming', 'netflix', 'spotify']): |
| return 'Entertainment' |
| elif any(word in description for word in ['shopping', 'store', 'retail', 'amazon']): |
| return 'Shopping' |
| elif any(word in description for word in ['gym', 'fitness', 'sport']): |
| return 'Fitness & Sports' |
| elif any(word in description for word in ['savings', 'investment', 'transfer to']): |
| return 'Savings & Investments' |
| elif any(word in description for word in ['fee', 'charge', 'bank fee', 'atm']): |
| return 'Bank Fees' |
| elif any(word in description for word in ['tax', 'irs', 'government']): |
| return 'Taxes' |
| else: |
| return 'Other Expenses' |
|
|
| def process_transactions(self, df: pd.DataFrame) -> pd.DataFrame: |
| """Process transactions DataFrame with DeepSeek-V3 categorization.""" |
| try: |
| logger.info(f"Processing {len(df)} transactions with Nebius AI Studio (DeepSeek-V3)") |
| |
| |
| required_columns = ['date', 'description', 'amount'] |
| missing_columns = [col for col in required_columns if col not in df.columns] |
| if missing_columns: |
| raise ValueError(f"Missing required columns: {missing_columns}") |
| |
| |
| transactions = df.to_dict('records') |
| |
| |
| batch_size = 15 |
| processed_transactions = [] |
| |
| for i in range(0, len(transactions), batch_size): |
| batch = transactions[i:i + batch_size] |
| processed_batch = self.categorize_transaction_batch(batch) |
| processed_transactions.extend(processed_batch) |
| |
| |
| if i + batch_size < len(transactions): |
| time.sleep(0.5) |
| |
| |
| result_df = pd.DataFrame(processed_transactions) |
| |
| |
| result_df['transaction_type'] = result_df['amount'].apply(lambda x: 'Credit' if x > 0 else 'Debit') |
| result_df['amount_abs'] = result_df['amount'].abs() |
| |
| |
| try: |
| result_df['date'] = pd.to_datetime(result_df['date'], errors='coerce') |
| result_df['month'] = result_df['date'].dt.month |
| result_df['year'] = result_df['date'].dt.year |
| result_df['day_of_week'] = result_df['date'].dt.day_name() |
| except Exception as e: |
| logger.warning(f"Could not parse dates: {str(e)}") |
| |
| |
| try: |
| result_df = result_df.sort_values('date', ascending=False) |
| except: |
| pass |
| |
| logger.info("Successfully processed and categorized transactions with DeepSeek-V3") |
| return result_df |
| |
| except Exception as e: |
| logger.error(f"Error processing transactions: {str(e)}") |
| return pd.DataFrame({"error": [str(e)]}) |
|
|
| def generate_financial_insights(self, df: pd.DataFrame) -> Dict[str, Any]: |
| """Generate financial insights using DeepSeek-V3 analysis.""" |
| try: |
| |
| basic_metrics = self._calculate_basic_metrics(df) |
| |
| |
| summary_text = self._prepare_summary_for_analysis(df, basic_metrics) |
| |
| prompt = f"""You are a financial advisor AI. Analyze the following financial data and provide insights. |
| |
| Financial Summary: |
| {summary_text} |
| |
| Please provide a comprehensive analysis including: |
| 1. Financial health assessment (score 1-10) |
| 2. Spending pattern insights |
| 3. Recommendations for improvement |
| 4. Key observations about financial behavior |
| 5. Potential areas of concern |
| 6. Positive financial patterns |
| |
| Format your response as a JSON object with the following structure: |
| {{ |
| "financial_health_score": <score from 1-10>, |
| "health_assessment": "<brief assessment>", |
| "spending_insights": ["<insight1>", "<insight2>", ...], |
| "recommendations": ["<recommendation1>", "<recommendation2>", ...], |
| "key_observations": ["<observation1>", "<observation2>", ...], |
| "areas_of_concern": ["<concern1>", "<concern2>", ...], |
| "positive_patterns": ["<pattern1>", "<pattern2>", ...] |
| }} |
| |
| Provide only the JSON response, no additional text.""" |
| |
| messages = [{"role": "user", "content": prompt}] |
| response_text = self._make_api_request(messages, max_tokens=2000, temperature=0.3) |
| |
| |
| try: |
| |
| if "```json" in response_text: |
| json_start = response_text.find("```json") + 7 |
| json_end = response_text.find("```", json_start) |
| json_content = response_text[json_start:json_end].strip() |
| else: |
| json_content = response_text.strip() |
| |
| ai_insights = json.loads(json_content) |
| except json.JSONDecodeError: |
| logger.warning("Could not parse DeepSeek insights as JSON, using fallback") |
| ai_insights = { |
| "financial_health_score": 7, |
| "health_assessment": "Analysis completed with basic metrics", |
| "spending_insights": ["Detailed analysis requires manual review"], |
| "recommendations": ["Continue monitoring your financial patterns"], |
| "key_observations": ["Financial data processed successfully"], |
| "areas_of_concern": [], |
| "positive_patterns": [] |
| } |
| |
| |
| complete_report = {**basic_metrics, "ai_insights": ai_insights} |
| |
| logger.info("Generated comprehensive financial report with DeepSeek-V3 insights") |
| return complete_report |
| |
| except Exception as e: |
| logger.error(f"Error generating financial insights: {str(e)}") |
| |
| return self._calculate_basic_metrics(df) |
|
|
| def _calculate_basic_metrics(self, df: pd.DataFrame) -> Dict[str, Any]: |
| """Calculate basic financial metrics.""" |
| try: |
| |
| try: |
| df['date'] = pd.to_datetime(df['date'], errors='coerce') |
| except: |
| logger.warning("Could not parse dates, some time-based analysis will be limited") |
| |
| |
| total_income = df[df['amount'] > 0]['amount'].sum() |
| total_expenses = abs(df[df['amount'] < 0]['amount'].sum()) |
| net_balance = total_income - total_expenses |
| |
| |
| income_transactions = len(df[df['amount'] > 0]) |
| expense_transactions = len(df[df['amount'] < 0]) |
| total_transactions = len(df) |
| |
| |
| avg_income = df[df['amount'] > 0]['amount'].mean() if income_transactions > 0 else 0 |
| avg_expense = abs(df[df['amount'] < 0]['amount'].mean()) if expense_transactions > 0 else 0 |
| avg_transaction = df['amount'].mean() |
| |
| |
| largest_income = df[df['amount'] > 0]['amount'].max() if income_transactions > 0 else 0 |
| largest_expense = abs(df[df['amount'] < 0]['amount'].min()) if expense_transactions > 0 else 0 |
| |
| |
| category_summary = {} |
| category_counts = {} |
| if 'category' in df.columns: |
| category_summary = df.groupby('category')['amount'].sum().to_dict() |
| category_counts = df.groupby('category').size().to_dict() |
| |
| |
| monthly_summary = {} |
| monthly_net = {} |
| if 'date' in df.columns: |
| try: |
| df['month_year'] = df['date'].dt.strftime('%Y-%m') |
| monthly_summary = df.groupby('month_year')['amount'].sum().to_dict() |
| |
| |
| monthly_income = df[df['amount'] > 0].groupby('month_year')['amount'].sum() |
| monthly_expenses = df[df['amount'] < 0].groupby('month_year')['amount'].sum() |
| monthly_net = (monthly_income + monthly_expenses).to_dict() |
| |
| except Exception as e: |
| logger.warning(f"Could not generate monthly analysis: {str(e)}") |
| |
| |
| savings_rate = (net_balance / total_income * 100) if total_income > 0 else 0 |
| expense_to_income_ratio = (total_expenses / total_income) if total_income > 0 else 0 |
| |
| return { |
| |
| "total_income": round(total_income, 2), |
| "total_expenses": round(total_expenses, 2), |
| "net_balance": round(net_balance, 2), |
| "expense_to_income_ratio": round(expense_to_income_ratio, 2), |
| |
| |
| "transaction_count": total_transactions, |
| "income_transactions": income_transactions, |
| "expense_transactions": expense_transactions, |
| "average_transaction": round(avg_transaction, 2), |
| "average_income": round(avg_income, 2), |
| "average_expense": round(avg_expense, 2), |
| |
| |
| "largest_income": round(largest_income, 2), |
| "largest_expense": round(largest_expense, 2), |
| |
| |
| "savings_rate_percent": round(savings_rate, 2), |
| |
| |
| "category_breakdown": {k: round(v, 2) for k, v in category_summary.items()}, |
| "category_transaction_counts": category_counts, |
| |
| |
| "monthly_trends": {k: round(v, 2) for k, v in monthly_summary.items()}, |
| "monthly_net_balance": {k: round(v, 2) for k, v in monthly_net.items()}, |
| |
| |
| "analysis_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "data_period": { |
| "start_date": df['date'].min().strftime("%Y-%m-%d") if 'date' in df.columns and not df['date'].isna().all() else "Unknown", |
| "end_date": df['date'].max().strftime("%Y-%m-%d") if 'date' in df.columns and not df['date'].isna().all() else "Unknown" |
| } |
| } |
| |
| except Exception as e: |
| logger.error(f"Error calculating basic metrics: {str(e)}") |
| return {"error": str(e)} |
|
|
| def _prepare_summary_for_analysis(self, df: pd.DataFrame, basic_metrics: Dict) -> str: |
| """Prepare a text summary for DeepSeek analysis.""" |
| try: |
| summary_parts = [ |
| f"Total Income: ${basic_metrics.get('total_income', 0):,.2f}", |
| f"Total Expenses: ${basic_metrics.get('total_expenses', 0):,.2f}", |
| f"Net Balance: ${basic_metrics.get('net_balance', 0):,.2f}", |
| f"Savings Rate: {basic_metrics.get('savings_rate_percent', 0):.1f}%", |
| f"Total Transactions: {basic_metrics.get('transaction_count', 0)}", |
| f"Average Income per Transaction: ${basic_metrics.get('average_income', 0):,.2f}", |
| f"Average Expense per Transaction: ${basic_metrics.get('average_expense', 0):,.2f}", |
| ] |
| |
| |
| category_breakdown = basic_metrics.get('category_breakdown', {}) |
| if category_breakdown: |
| summary_parts.append("\nCategory Breakdown:") |
| for category, amount in sorted(category_breakdown.items(), key=lambda x: abs(x[1]), reverse=True)[:10]: |
| summary_parts.append(f" {category}: ${amount:,.2f}") |
| |
| |
| monthly_trends = basic_metrics.get('monthly_trends', {}) |
| if monthly_trends: |
| summary_parts.append(f"\nMonthly Trends: {len(monthly_trends)} months of data") |
| |
| return "\n".join(summary_parts) |
| |
| except Exception as e: |
| logger.error(f"Error preparing summary: {str(e)}") |
| return "Financial data summary unavailable" |
|
|
| |
| nebius_processor = None |
|
|
| def get_nebius_processor(): |
| """Get or create the global Nebius processor instance.""" |
| global nebius_processor |
| if nebius_processor is None: |
| nebius_processor = NebiusFinanceProcessor() |
| return nebius_processor |
|
|
|
|
|
|
| def process_transactions_nebius(input_data) -> pd.DataFrame: |
| """Process transactions using Nebius AI Studio API with DeepSeek-V3. |
| Accepts either a file path (str) or list of transactions.""" |
| try: |
| if isinstance(input_data, str): |
| |
| df = pd.read_csv(input_data) |
| elif isinstance(input_data, list): |
| |
| df = pd.DataFrame(input_data) |
| else: |
| raise ValueError("Input must be either file path (str) or list of transactions") |
| |
| logger.info(f"Processing data with {len(df)} rows and columns: {list(df.columns)}") |
| |
| |
| proc = get_nebius_processor() |
| result_df = proc.process_transactions(df) |
| |
| return result_df |
| |
| except Exception as e: |
| logger.error(f"Error processing transactions: {str(e)}") |
| return pd.DataFrame({"error": [str(e)]}) |
|
|
| def generate_financial_report_nebius(input_data) -> Dict[str, Any]: |
| """Generate financial report using Nebius AI Studio API with DeepSeek-V3. |
| Accepts either a file path (str) or list of transactions.""" |
| try: |
| if isinstance(input_data, str): |
| |
| df = pd.read_csv(input_data) |
| elif isinstance(input_data, list): |
| |
| df = pd.DataFrame(input_data) |
| else: |
| raise ValueError("Input must be either file path (str) or list of transactions") |
| |
| logger.info(f"Generating report for {len(df)} rows") |
| |
| |
| proc = get_nebius_processor() |
| report = proc.generate_financial_insights(df) |
| |
| return report |
| |
| except Exception as e: |
| logger.error(f"Error generating financial report: {str(e)}") |
| return {"error": str(e)} |
| |
| def process_transactions_nebius(transactions_input): |
| """Process transactions using Nebius AI Studio API with DeepSeek-V3. |
| Accepts either a file path (str), file object, or list of transactions.""" |
| try: |
| |
| if isinstance(transactions_input, str) and transactions_input.endswith('.pdf'): |
| |
| with open(transactions_input, 'rb') as f: |
| return _process_pdf_transactions(f) |
| elif hasattr(transactions_input, 'read') and hasattr(transactions_input, 'name') and transactions_input.name.endswith('.pdf'): |
| |
| return _process_pdf_transactions(transactions_input) |
| elif isinstance(transactions_input, str): |
| |
| df = pd.read_csv(transactions_input) |
| elif isinstance(transactions_input, list): |
| |
| df = pd.DataFrame(transactions_input) |
| else: |
| raise ValueError("Input must be file path, file object, or list of transactions") |
| |
| |
| proc = get_nebius_processor() |
| return proc.process_transactions(df) |
| |
| except Exception as e: |
| logger.error(f"Error processing transactions: {str(e)}") |
| raise |
|
|
| def _process_pdf_transactions(file_obj): |
| """Helper function to extract transactions from PDF.""" |
| try: |
| |
| with pdfplumber.open(file_obj) as pdf: |
| text = "\n".join([page.extract_text() for page in pdf.pages if page.extract_text()]) |
| |
| |
| transactions = [] |
| for line in text.split('\n'): |
| try: |
| |
| parts = line.strip().split() |
| if len(parts) >= 3: |
| date = parts[0] |
| description = ' '.join(parts[1:-1]) |
| amount = parts[-1].replace('$', '').replace(',', '') |
| |
| transactions.append({ |
| 'date': date, |
| 'description': description[:200], |
| 'amount': float(amount) |
| }) |
| except: |
| continue |
| |
| return transactions |
| |
| except Exception as e: |
| logger.error(f"Error processing PDF: {str(e)}") |
| raise |
|
|
| def batch_process_transactions_nebius(file_paths: List[str]) -> Dict[str, Any]: |
| """Process multiple transaction files in batch using Nebius AI Studio.""" |
| logger.info(f"Batch processing {len(file_paths)} files with Nebius AI Studio") |
| results = {} |
| |
| proc = get_nebius_processor() |
| |
| for file_path in file_paths: |
| try: |
| |
| df = pd.read_csv(file_path) |
| processed_df = proc.process_transactions(df) |
| |
| |
| report = proc.generate_financial_insights(processed_df) |
| |
| results[file_path] = { |
| "transactions": processed_df.to_dict('records') if not processed_df.empty else [], |
| "report": report, |
| "status": "success" |
| } |
| |
| except Exception as e: |
| logger.error(f"Error processing {file_path}: {str(e)}") |
| results[file_path] = { |
| "error": str(e), |
| "status": "failed" |
| } |
| |
| logger.info(f"Batch processing completed. {len(results)} files processed") |
| return results |
|
|
| if __name__ == "__main__": |
| |
| logger.info("Nebius AI Studio backend ready for use") |
| print("Make sure to set NEBIUS_API_KEY environment variable") |