Finapp / nebius_backend.py
smainye's picture
Update nebius_backend.py
9d0dbdd verified
import requests
import pandas as pd
import json
import os
import logging
from datetime import datetime
from typing import Dict, Any, List
import time
import PyPDF2
import pdfplumber
from io import BytesIO
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class NebiusFinanceProcessor:
"""Finance processor using Nebius AI Studio API with DeepSeek-V3 model for transaction analysis."""
def __init__(self, api_key: str = None):
self.api_key = api_key or os.environ.get("NEBIUS_API_KEY")
if not self.api_key:
raise ValueError("NEBIUS_API_KEY environment variable must be set")
self.base_url = "https://api.studio.nebius.ai/v1/chat/completions"
self.model_name = "deepseek-ai/DeepSeek-V3-0324"
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
logger.info("Nebius AI Studio client initialized successfully with DeepSeek-V3")
def _make_api_request(self, messages: List[Dict], max_tokens: int = 2000, temperature: float = 0.1) -> str:
"""Make API request to Nebius AI Studio."""
try:
payload = {
"model": self.model_name,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": False
}
response = requests.post(
self.base_url,
headers=self.headers,
json=payload,
timeout=30
)
if response.status_code == 200:
result = response.json()
return result["choices"][0]["message"]["content"]
else:
logger.error(f"API request failed with status {response.status_code}: {response.text}")
raise Exception(f"API request failed: {response.status_code}")
except Exception as e:
logger.error(f"Error making API request: {str(e)}")
raise
def categorize_transaction_batch(self, transactions: List[Dict]) -> List[Dict]:
"""Categorize multiple transactions using DeepSeek-V3 API."""
try:
# Prepare the prompt for batch categorization
transactions_text = "\n".join([
f"{i+1}. Date: {tx['date']}, Description: {tx['description']}, Amount: {tx['amount']}"
for i, tx in enumerate(transactions)
])
prompt = f"""You are a financial AI assistant specialized in categorizing financial transactions.
Please categorize each of the following transactions into appropriate categories.
Available categories:
- Salary Income
- Investment Income
- Refunds & Returns
- Other Income
- Groceries & Food
- Transportation
- Utilities
- Housing
- Healthcare & Insurance
- Dining & Restaurants
- Entertainment
- Shopping
- Fitness & Sports
- Savings & Investments
- Bank Fees
- Taxes
- Other Expenses
- Miscellaneous
- Uncategorized
Transactions to categorize:
{transactions_text}
Please respond with a JSON array where each object contains:
- "index": the transaction number (1-based)
- "category": the assigned category
- "reasoning": brief explanation for the categorization
Format your response as valid JSON only, no additional text."""
messages = [{"role": "user", "content": prompt}]
response_text = self._make_api_request(messages, max_tokens=4000, temperature=0.1)
# Parse the response
try:
# Look for JSON content between ```json and ``` or just parse directly
if "```json" in response_text:
json_start = response_text.find("```json") + 7
json_end = response_text.find("```", json_start)
json_content = response_text[json_start:json_end].strip()
else:
json_content = response_text.strip()
categorizations = json.loads(json_content)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse DeepSeek response as JSON: {e}")
# Fallback to rule-based categorization
return self._fallback_categorization(transactions)
# Apply categorizations to transactions
for cat in categorizations:
if isinstance(cat, dict) and "index" in cat and "category" in cat:
idx = cat["index"] - 1 # Convert to 0-based index
if 0 <= idx < len(transactions):
transactions[idx]["category"] = cat["category"]
transactions[idx]["ai_reasoning"] = cat.get("reasoning", "")
# Fill in any missing categories with fallback
for tx in transactions:
if "category" not in tx:
tx["category"] = self._fallback_single_categorization(tx)
logger.info(f"Successfully categorized {len(transactions)} transactions with DeepSeek-V3")
return transactions
except Exception as e:
logger.error(f"Error in DeepSeek categorization: {str(e)}")
return self._fallback_categorization(transactions)
def _fallback_categorization(self, transactions: List[Dict]) -> List[Dict]:
"""Fallback rule-based categorization."""
for tx in transactions:
tx["category"] = self._fallback_single_categorization(tx)
return transactions
def _fallback_single_categorization(self, transaction: Dict) -> str:
"""Rule-based categorization for a single transaction."""
description = transaction.get("description", "").lower()
amount = float(transaction.get("amount", 0))
if amount > 0:
# Income categorization
if any(word in description for word in ['salary', 'wage', 'payroll', 'income']):
return 'Salary Income'
elif any(word in description for word in ['refund', 'return', 'cashback']):
return 'Refunds & Returns'
elif any(word in description for word in ['interest', 'dividend', 'investment']):
return 'Investment Income'
else:
return 'Other Income'
else:
# Expense categorization
if any(word in description for word in ['grocery', 'supermarket', 'food store', 'market']):
return 'Groceries & Food'
elif any(word in description for word in ['gas', 'fuel', 'petrol', 'station', 'uber', 'taxi', 'bus']):
return 'Transportation'
elif any(word in description for word in ['utility', 'electric', 'water', 'internet', 'phone']):
return 'Utilities'
elif any(word in description for word in ['rent', 'mortgage', 'housing']):
return 'Housing'
elif any(word in description for word in ['insurance', 'medical', 'health', 'doctor', 'pharmacy']):
return 'Healthcare & Insurance'
elif any(word in description for word in ['restaurant', 'dining', 'cafe', 'fast food']):
return 'Dining & Restaurants'
elif any(word in description for word in ['entertainment', 'movie', 'streaming', 'netflix', 'spotify']):
return 'Entertainment'
elif any(word in description for word in ['shopping', 'store', 'retail', 'amazon']):
return 'Shopping'
elif any(word in description for word in ['gym', 'fitness', 'sport']):
return 'Fitness & Sports'
elif any(word in description for word in ['savings', 'investment', 'transfer to']):
return 'Savings & Investments'
elif any(word in description for word in ['fee', 'charge', 'bank fee', 'atm']):
return 'Bank Fees'
elif any(word in description for word in ['tax', 'irs', 'government']):
return 'Taxes'
else:
return 'Other Expenses'
def process_transactions(self, df: pd.DataFrame) -> pd.DataFrame:
"""Process transactions DataFrame with DeepSeek-V3 categorization."""
try:
logger.info(f"Processing {len(df)} transactions with Nebius AI Studio (DeepSeek-V3)")
# Validate required columns
required_columns = ['date', 'description', 'amount']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
# Convert DataFrame to list of dictionaries for processing
transactions = df.to_dict('records')
# Process in batches to avoid token limits
batch_size = 15 # Process 15 transactions at a time for DeepSeek
processed_transactions = []
for i in range(0, len(transactions), batch_size):
batch = transactions[i:i + batch_size]
processed_batch = self.categorize_transaction_batch(batch)
processed_transactions.extend(processed_batch)
# Add small delay between batches to avoid rate limiting
if i + batch_size < len(transactions):
time.sleep(0.5)
# Convert back to DataFrame
result_df = pd.DataFrame(processed_transactions)
# Add additional insights
result_df['transaction_type'] = result_df['amount'].apply(lambda x: 'Credit' if x > 0 else 'Debit')
result_df['amount_abs'] = result_df['amount'].abs()
# Convert date column to datetime for better analysis
try:
result_df['date'] = pd.to_datetime(result_df['date'], errors='coerce')
result_df['month'] = result_df['date'].dt.month
result_df['year'] = result_df['date'].dt.year
result_df['day_of_week'] = result_df['date'].dt.day_name()
except Exception as e:
logger.warning(f"Could not parse dates: {str(e)}")
# Sort by date (newest first)
try:
result_df = result_df.sort_values('date', ascending=False)
except:
pass
logger.info("Successfully processed and categorized transactions with DeepSeek-V3")
return result_df
except Exception as e:
logger.error(f"Error processing transactions: {str(e)}")
return pd.DataFrame({"error": [str(e)]})
def generate_financial_insights(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Generate financial insights using DeepSeek-V3 analysis."""
try:
# First, generate basic metrics
basic_metrics = self._calculate_basic_metrics(df)
# Prepare summary for DeepSeek analysis
summary_text = self._prepare_summary_for_analysis(df, basic_metrics)
prompt = f"""You are a financial advisor AI. Analyze the following financial data and provide insights.
Financial Summary:
{summary_text}
Please provide a comprehensive analysis including:
1. Financial health assessment (score 1-10)
2. Spending pattern insights
3. Recommendations for improvement
4. Key observations about financial behavior
5. Potential areas of concern
6. Positive financial patterns
Format your response as a JSON object with the following structure:
{{
"financial_health_score": <score from 1-10>,
"health_assessment": "<brief assessment>",
"spending_insights": ["<insight1>", "<insight2>", ...],
"recommendations": ["<recommendation1>", "<recommendation2>", ...],
"key_observations": ["<observation1>", "<observation2>", ...],
"areas_of_concern": ["<concern1>", "<concern2>", ...],
"positive_patterns": ["<pattern1>", "<pattern2>", ...]
}}
Provide only the JSON response, no additional text."""
messages = [{"role": "user", "content": prompt}]
response_text = self._make_api_request(messages, max_tokens=2000, temperature=0.3)
# Parse DeepSeek's insights
try:
# Extract JSON from response
if "```json" in response_text:
json_start = response_text.find("```json") + 7
json_end = response_text.find("```", json_start)
json_content = response_text[json_start:json_end].strip()
else:
json_content = response_text.strip()
ai_insights = json.loads(json_content)
except json.JSONDecodeError:
logger.warning("Could not parse DeepSeek insights as JSON, using fallback")
ai_insights = {
"financial_health_score": 7,
"health_assessment": "Analysis completed with basic metrics",
"spending_insights": ["Detailed analysis requires manual review"],
"recommendations": ["Continue monitoring your financial patterns"],
"key_observations": ["Financial data processed successfully"],
"areas_of_concern": [],
"positive_patterns": []
}
# Combine basic metrics with AI insights
complete_report = {**basic_metrics, "ai_insights": ai_insights}
logger.info("Generated comprehensive financial report with DeepSeek-V3 insights")
return complete_report
except Exception as e:
logger.error(f"Error generating financial insights: {str(e)}")
# Return basic metrics without AI insights
return self._calculate_basic_metrics(df)
def _calculate_basic_metrics(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Calculate basic financial metrics."""
try:
# Convert date column for time-based analysis
try:
df['date'] = pd.to_datetime(df['date'], errors='coerce')
except:
logger.warning("Could not parse dates, some time-based analysis will be limited")
# Basic financial metrics
total_income = df[df['amount'] > 0]['amount'].sum()
total_expenses = abs(df[df['amount'] < 0]['amount'].sum())
net_balance = total_income - total_expenses
# Transaction counts
income_transactions = len(df[df['amount'] > 0])
expense_transactions = len(df[df['amount'] < 0])
total_transactions = len(df)
# Average transaction amounts
avg_income = df[df['amount'] > 0]['amount'].mean() if income_transactions > 0 else 0
avg_expense = abs(df[df['amount'] < 0]['amount'].mean()) if expense_transactions > 0 else 0
avg_transaction = df['amount'].mean()
# Largest transactions
largest_income = df[df['amount'] > 0]['amount'].max() if income_transactions > 0 else 0
largest_expense = abs(df[df['amount'] < 0]['amount'].min()) if expense_transactions > 0 else 0
# Category analysis
category_summary = {}
category_counts = {}
if 'category' in df.columns:
category_summary = df.groupby('category')['amount'].sum().to_dict()
category_counts = df.groupby('category').size().to_dict()
# Monthly trends
monthly_summary = {}
monthly_net = {}
if 'date' in df.columns:
try:
df['month_year'] = df['date'].dt.strftime('%Y-%m')
monthly_summary = df.groupby('month_year')['amount'].sum().to_dict()
# Calculate monthly net balance
monthly_income = df[df['amount'] > 0].groupby('month_year')['amount'].sum()
monthly_expenses = df[df['amount'] < 0].groupby('month_year')['amount'].sum()
monthly_net = (monthly_income + monthly_expenses).to_dict() # expenses are negative
except Exception as e:
logger.warning(f"Could not generate monthly analysis: {str(e)}")
# Financial health indicators
savings_rate = (net_balance / total_income * 100) if total_income > 0 else 0
expense_to_income_ratio = (total_expenses / total_income) if total_income > 0 else 0
return {
# Basic metrics
"total_income": round(total_income, 2),
"total_expenses": round(total_expenses, 2),
"net_balance": round(net_balance, 2),
"expense_to_income_ratio": round(expense_to_income_ratio, 2),
# Transaction analysis
"transaction_count": total_transactions,
"income_transactions": income_transactions,
"expense_transactions": expense_transactions,
"average_transaction": round(avg_transaction, 2),
"average_income": round(avg_income, 2),
"average_expense": round(avg_expense, 2),
# Extremes
"largest_income": round(largest_income, 2),
"largest_expense": round(largest_expense, 2),
# Financial health
"savings_rate_percent": round(savings_rate, 2),
# Category breakdown
"category_breakdown": {k: round(v, 2) for k, v in category_summary.items()},
"category_transaction_counts": category_counts,
# Time-based analysis
"monthly_trends": {k: round(v, 2) for k, v in monthly_summary.items()},
"monthly_net_balance": {k: round(v, 2) for k, v in monthly_net.items()},
# Metadata
"analysis_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"data_period": {
"start_date": df['date'].min().strftime("%Y-%m-%d") if 'date' in df.columns and not df['date'].isna().all() else "Unknown",
"end_date": df['date'].max().strftime("%Y-%m-%d") if 'date' in df.columns and not df['date'].isna().all() else "Unknown"
}
}
except Exception as e:
logger.error(f"Error calculating basic metrics: {str(e)}")
return {"error": str(e)}
def _prepare_summary_for_analysis(self, df: pd.DataFrame, basic_metrics: Dict) -> str:
"""Prepare a text summary for DeepSeek analysis."""
try:
summary_parts = [
f"Total Income: ${basic_metrics.get('total_income', 0):,.2f}",
f"Total Expenses: ${basic_metrics.get('total_expenses', 0):,.2f}",
f"Net Balance: ${basic_metrics.get('net_balance', 0):,.2f}",
f"Savings Rate: {basic_metrics.get('savings_rate_percent', 0):.1f}%",
f"Total Transactions: {basic_metrics.get('transaction_count', 0)}",
f"Average Income per Transaction: ${basic_metrics.get('average_income', 0):,.2f}",
f"Average Expense per Transaction: ${basic_metrics.get('average_expense', 0):,.2f}",
]
# Add category breakdown
category_breakdown = basic_metrics.get('category_breakdown', {})
if category_breakdown:
summary_parts.append("\nCategory Breakdown:")
for category, amount in sorted(category_breakdown.items(), key=lambda x: abs(x[1]), reverse=True)[:10]:
summary_parts.append(f" {category}: ${amount:,.2f}")
# Add monthly trends if available
monthly_trends = basic_metrics.get('monthly_trends', {})
if monthly_trends:
summary_parts.append(f"\nMonthly Trends: {len(monthly_trends)} months of data")
return "\n".join(summary_parts)
except Exception as e:
logger.error(f"Error preparing summary: {str(e)}")
return "Financial data summary unavailable"
# Global processor instance
nebius_processor = None
def get_nebius_processor():
"""Get or create the global Nebius processor instance."""
global nebius_processor
if nebius_processor is None:
nebius_processor = NebiusFinanceProcessor()
return nebius_processor
def process_transactions_nebius(input_data) -> pd.DataFrame:
"""Process transactions using Nebius AI Studio API with DeepSeek-V3.
Accepts either a file path (str) or list of transactions."""
try:
if isinstance(input_data, str):
# Read CSV file
df = pd.read_csv(input_data)
elif isinstance(input_data, list):
# Convert list of transactions to DataFrame
df = pd.DataFrame(input_data)
else:
raise ValueError("Input must be either file path (str) or list of transactions")
logger.info(f"Processing data with {len(df)} rows and columns: {list(df.columns)}")
# Get processor and process transactions
proc = get_nebius_processor()
result_df = proc.process_transactions(df)
return result_df
except Exception as e:
logger.error(f"Error processing transactions: {str(e)}")
return pd.DataFrame({"error": [str(e)]})
def generate_financial_report_nebius(input_data) -> Dict[str, Any]:
"""Generate financial report using Nebius AI Studio API with DeepSeek-V3.
Accepts either a file path (str) or list of transactions."""
try:
if isinstance(input_data, str):
# Read CSV file
df = pd.read_csv(input_data)
elif isinstance(input_data, list):
# Convert list of transactions to DataFrame
df = pd.DataFrame(input_data)
else:
raise ValueError("Input must be either file path (str) or list of transactions")
logger.info(f"Generating report for {len(df)} rows")
# Get processor and generate insights
proc = get_nebius_processor()
report = proc.generate_financial_insights(df)
return report
except Exception as e:
logger.error(f"Error generating financial report: {str(e)}")
return {"error": str(e)}
def process_transactions_nebius(transactions_input):
"""Process transactions using Nebius AI Studio API with DeepSeek-V3.
Accepts either a file path (str), file object, or list of transactions."""
try:
# Handle different input types
if isinstance(transactions_input, str) and transactions_input.endswith('.pdf'):
# PDF file path
with open(transactions_input, 'rb') as f:
return _process_pdf_transactions(f)
elif hasattr(transactions_input, 'read') and hasattr(transactions_input, 'name') and transactions_input.name.endswith('.pdf'):
# PDF file object
return _process_pdf_transactions(transactions_input)
elif isinstance(transactions_input, str):
# CSV file path
df = pd.read_csv(transactions_input)
elif isinstance(transactions_input, list):
# List of transactions
df = pd.DataFrame(transactions_input)
else:
raise ValueError("Input must be file path, file object, or list of transactions")
# Rest of the processing logic remains the same
proc = get_nebius_processor()
return proc.process_transactions(df)
except Exception as e:
logger.error(f"Error processing transactions: {str(e)}")
raise
def _process_pdf_transactions(file_obj):
"""Helper function to extract transactions from PDF."""
try:
# Read PDF and extract text
with pdfplumber.open(file_obj) as pdf:
text = "\n".join([page.extract_text() for page in pdf.pages if page.extract_text()])
# Convert text to transactions (simple implementation - customize as needed)
transactions = []
for line in text.split('\n'):
try:
# Simple pattern matching - adjust based on your PDF format
parts = line.strip().split()
if len(parts) >= 3:
date = parts[0]
description = ' '.join(parts[1:-1])
amount = parts[-1].replace('$', '').replace(',', '')
transactions.append({
'date': date,
'description': description[:200],
'amount': float(amount)
})
except:
continue
return transactions
except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
raise
def batch_process_transactions_nebius(file_paths: List[str]) -> Dict[str, Any]:
"""Process multiple transaction files in batch using Nebius AI Studio."""
logger.info(f"Batch processing {len(file_paths)} files with Nebius AI Studio")
results = {}
proc = get_nebius_processor()
for file_path in file_paths:
try:
# Read and process transactions
df = pd.read_csv(file_path)
processed_df = proc.process_transactions(df)
# Generate report
report = proc.generate_financial_insights(processed_df)
results[file_path] = {
"transactions": processed_df.to_dict('records') if not processed_df.empty else [],
"report": report,
"status": "success"
}
except Exception as e:
logger.error(f"Error processing {file_path}: {str(e)}")
results[file_path] = {
"error": str(e),
"status": "failed"
}
logger.info(f"Batch processing completed. {len(results)} files processed")
return results
if __name__ == "__main__":
# For local testing
logger.info("Nebius AI Studio backend ready for use")
print("Make sure to set NEBIUS_API_KEY environment variable")