|
|
""" |
|
|
Data processing utilities for the Finance Expert model |
|
|
""" |
|
|
import json |
|
|
import os |
|
|
from pathlib import Path |
|
|
import jsonlines |
|
|
from typing import Dict, List, Any, Optional, Tuple |
|
|
import hashlib |
|
|
import datetime |
|
|
import logging |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from datasets import Dataset |
|
|
from tqdm import tqdm |
|
|
import re |
|
|
from dateutil.parser import parse as date_parse |
|
|
from decimal import Decimal, ROUND_HALF_UP |
|
|
|
|
|
class FinanceDataProcessor: |
|
|
def __init__(self, output_dir: str = "processed_data"): |
|
|
self.output_dir = Path(output_dir) |
|
|
self.output_dir.mkdir(exist_ok=True) |
|
|
self.logger = self._setup_logger() |
|
|
|
|
|
def _setup_logger(self) -> logging.Logger: |
|
|
"""Setup logging specific to finance data processing""" |
|
|
logger = logging.getLogger(__name__) |
|
|
logger.setLevel(logging.INFO) |
|
|
handler = logging.StreamHandler() |
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
|
handler.setFormatter(formatter) |
|
|
logger.addHandler(handler) |
|
|
return logger |
|
|
|
|
|
def process_financial_data(self, data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Process and normalize financial data""" |
|
|
try: |
|
|
|
|
|
processed = self._normalize_data(data) |
|
|
|
|
|
|
|
|
metrics = self._extract_financial_metrics(processed) |
|
|
|
|
|
|
|
|
validation = self._validate_financial_data(processed) |
|
|
|
|
|
|
|
|
ratios = self._calculate_financial_ratios(processed) |
|
|
|
|
|
return { |
|
|
"processed_data": processed, |
|
|
"metrics": metrics, |
|
|
"validation": validation, |
|
|
"ratios": ratios |
|
|
} |
|
|
except Exception as e: |
|
|
self.logger.warning(f"Error processing financial data: {str(e)}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
def _normalize_data(self, data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Normalize financial data types and formats""" |
|
|
normalized = {} |
|
|
|
|
|
for key, value in data.items(): |
|
|
if isinstance(value, str): |
|
|
|
|
|
if any(c in value for c in ["$", "€", "£", "¥"]): |
|
|
normalized[key] = self._normalize_currency(value) |
|
|
elif value.isdigit(): |
|
|
normalized[key] = int(value) |
|
|
elif self._is_float(value): |
|
|
normalized[key] = float(value) |
|
|
else: |
|
|
normalized[key] = value.strip() |
|
|
elif isinstance(value, (int, float)): |
|
|
normalized[key] = value |
|
|
elif isinstance(value, dict): |
|
|
normalized[key] = self._normalize_data(value) |
|
|
elif isinstance(value, list): |
|
|
normalized[key] = [self._normalize_data(item) if isinstance(item, dict) else item for item in value] |
|
|
else: |
|
|
normalized[key] = value |
|
|
|
|
|
return normalized |
|
|
|
|
|
def _normalize_currency(self, value: str) -> float: |
|
|
"""Convert currency strings to standardized format""" |
|
|
try: |
|
|
|
|
|
value = re.sub(r'[\$€£¥,]', '', value) |
|
|
|
|
|
value = value.replace('(', '').replace(')', '') |
|
|
|
|
|
return float(value) |
|
|
except: |
|
|
return 0.0 |
|
|
|
|
|
def _is_float(self, value: str) -> bool: |
|
|
"""Check if string can be converted to float""" |
|
|
try: |
|
|
float(value) |
|
|
return True |
|
|
except ValueError: |
|
|
return False |
|
|
|
|
|
def _extract_financial_metrics(self, data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Extract key financial metrics""" |
|
|
metrics = { |
|
|
"revenue": self._get_metric(data, "revenue", "income", "sales"), |
|
|
"expenses": self._get_metric(data, "expenses", "costs"), |
|
|
"profit": self._get_metric(data, "profit", "net_income"), |
|
|
"assets": self._get_metric(data, "assets", "total_assets"), |
|
|
"liabilities": self._get_metric(data, "liabilities", "total_liabilities"), |
|
|
"equity": self._get_metric(data, "equity", "shareholders_equity") |
|
|
} |
|
|
return metrics |
|
|
|
|
|
def _get_metric(self, data: Dict[str, Any], *keys: str) -> float: |
|
|
"""Get metric value from various possible keys""" |
|
|
for key in keys: |
|
|
if key in data: |
|
|
return self._normalize_currency(str(data[key])) |
|
|
return 0.0 |
|
|
|
|
|
def _validate_financial_data(self, data: Dict[str, Any]) -> Dict[str, bool]: |
|
|
"""Validate financial data consistency""" |
|
|
validation = { |
|
|
"balance_sheet_consistency": self._check_balance_sheet(data), |
|
|
"income_statement_consistency": self._check_income_statement(data), |
|
|
"cash_flow_consistency": self._check_cash_flow(data) |
|
|
} |
|
|
return validation |
|
|
|
|
|
def _check_balance_sheet(self, data: Dict[str, Any]) -> bool: |
|
|
"""Check balance sheet consistency""" |
|
|
assets = self._get_metric(data, "assets", "total_assets") |
|
|
liabilities = self._get_metric(data, "liabilities", "total_liabilities") |
|
|
equity = self._get_metric(data, "equity", "shareholders_equity") |
|
|
|
|
|
return abs(assets - (liabilities + equity)) < 1e-6 |
|
|
|
|
|
def _check_income_statement(self, data: Dict[str, Any]) -> bool: |
|
|
"""Check income statement consistency""" |
|
|
revenue = self._get_metric(data, "revenue", "income", "sales") |
|
|
expenses = self._get_metric(data, "expenses", "costs") |
|
|
profit = self._get_metric(data, "profit", "net_income") |
|
|
|
|
|
return abs(profit - (revenue - expenses)) < 1e-6 |
|
|
|
|
|
def _check_cash_flow(self, data: Dict[str, Any]) -> bool: |
|
|
"""Check cash flow statement consistency""" |
|
|
operating = self._get_metric(data, "operating_cash_flow") |
|
|
investing = self._get_metric(data, "investing_cash_flow") |
|
|
financing = self._get_metric(data, "financing_cash_flow") |
|
|
net_change = self._get_metric(data, "net_change_in_cash") |
|
|
|
|
|
return abs(net_change - (operating + investing + financing)) < 1e-6 |
|
|
|
|
|
def _calculate_financial_ratios(self, data: Dict[str, Any]) -> Dict[str, float]: |
|
|
"""Calculate key financial ratios""" |
|
|
try: |
|
|
metrics = self._extract_financial_metrics(data) |
|
|
|
|
|
ratios = { |
|
|
"current_ratio": metrics["assets"] / metrics["liabilities"] if metrics["liabilities"] != 0 else float('inf'), |
|
|
"debt_to_equity": metrics["liabilities"] / metrics["equity"] if metrics["equity"] != 0 else float('inf'), |
|
|
"profit_margin": metrics["profit"] / metrics["revenue"] if metrics["revenue"] != 0 else 0.0, |
|
|
"return_on_equity": metrics["profit"] / metrics["equity"] if metrics["equity"] != 0 else 0.0, |
|
|
"return_on_assets": metrics["profit"] / metrics["assets"] if metrics["assets"] != 0 else 0.0 |
|
|
} |
|
|
|
|
|
return ratios |
|
|
except ZeroDivisionError: |
|
|
return {"error": "Division by zero in ratio calculation"} |
|
|
|
|
|
def process_dataset(self, dataset: Dataset, dataset_name: str) -> List[Dict[str, Any]]: |
|
|
"""Process a complete financial dataset""" |
|
|
processed = [] |
|
|
error_count = 0 |
|
|
|
|
|
self.logger.info(f"Processing {dataset_name} dataset with {len(dataset)} samples") |
|
|
|
|
|
for idx, example in enumerate(tqdm(dataset, desc=f"Processing {dataset_name}")): |
|
|
try: |
|
|
processed_example = self._process_example(example, dataset_name) |
|
|
processed.append(processed_example) |
|
|
except Exception as e: |
|
|
error_count += 1 |
|
|
self.logger.error(f"Error processing example {idx} in {dataset_name}: {str(e)}") |
|
|
|
|
|
self.logger.info(f"Processed {len(processed)} examples") |
|
|
self.logger.info(f"Encountered {error_count} errors") |
|
|
|
|
|
return processed |
|
|
|
|
|
def _process_example(self, example: Dict[str, Any], dataset_name: str) -> Dict[str, Any]: |
|
|
"""Process a single example based on dataset type""" |
|
|
if dataset_name == "FinQA": |
|
|
return self._process_finqa(example) |
|
|
elif dataset_name == "TAT-QA": |
|
|
return self._process_tat_qa(example) |
|
|
elif dataset_name == "DocVQA": |
|
|
return self._process_docvqa(example) |
|
|
elif dataset_name == "FinancialPhraseBank": |
|
|
return self._process_phrasebank(example) |
|
|
elif dataset_name == "SECFilings": |
|
|
return self._process_sec_filings(example) |
|
|
elif dataset_name == "FRED": |
|
|
return self._process_fred(example) |
|
|
else: |
|
|
raise ValueError(f"Unknown dataset: {dataset_name}") |
|
|
|
|
|
def _process_finqa(self, example: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Process FinQA example""" |
|
|
return { |
|
|
"question": example["question"].strip(), |
|
|
"table": example["table"], |
|
|
"answer": example["answer"], |
|
|
"program": example["program"], |
|
|
"data_analysis": self.process_financial_data(example["table"]) |
|
|
} |
|
|
|
|
|
def _process_tat_qa(self, example: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Process TAT-QA example""" |
|
|
return { |
|
|
"passage": example["passage"].strip(), |
|
|
"question": example["question"].strip(), |
|
|
"answer": example["answer"], |
|
|
"scale": example["scale"], |
|
|
"type": example["type"], |
|
|
"data_analysis": self.process_financial_data({"passage": example["passage"]}) |
|
|
} |
|
|
|
|
|
def _process_docvqa(self, example: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Process DocVQA example""" |
|
|
return { |
|
|
"question": example["question"].strip(), |
|
|
"image": example["image"], |
|
|
"answer": example["answer"], |
|
|
"type": example["type"], |
|
|
"data_analysis": self.process_financial_data({"answer": example["answer"]}) |
|
|
} |
|
|
|
|
|
def _process_phrasebank(self, example: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Process FinancialPhraseBank example""" |
|
|
return { |
|
|
"sentence": example["sentence"].strip(), |
|
|
"label": example["label"], |
|
|
"sentiment_analysis": self._analyze_sentiment(example["sentence"]) |
|
|
} |
|
|
|
|
|
def _process_sec_filings(self, example: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Process SEC filings example""" |
|
|
return { |
|
|
"company": example["company"].strip(), |
|
|
"filing_type": example["filing_type"], |
|
|
"content": example["content"], |
|
|
"date": example["date"], |
|
|
"financial_analysis": self.process_financial_data({"content": example["content"]}) |
|
|
} |
|
|
|
|
|
def _process_fred(self, example: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Process FRED example""" |
|
|
return { |
|
|
"series_id": example["series_id"], |
|
|
"date": example["date"], |
|
|
"value": example["value"], |
|
|
"economic_analysis": self._analyze_economic_data(example) |
|
|
} |
|
|
|
|
|
def save_to_jsonl(self, data: List[Dict[str, Any]], filename: str) -> Path: |
|
|
"""Save processed data to JSONL file""" |
|
|
filepath = self.output_dir / filename |
|
|
with jsonlines.open(filepath, mode='w') as writer: |
|
|
writer.write_all(data) |
|
|
self.logger.info(f"Saved data to {filepath}") |
|
|
return filepath |
|
|
|
|
|
def print_sample(self, data: List[Dict[str, Any]], count: int = 3): |
|
|
"""Print sample of processed data""" |
|
|
self.logger.info("\nSample data:") |
|
|
for i, example in enumerate(data[:count]): |
|
|
self.logger.info(f"\nSample {i+1}:") |
|
|
self.logger.info(json.dumps(example, indent=2)) |
|
|
|
|
|
def print_memory_usage(self): |
|
|
"""Print current memory usage""" |
|
|
process = psutil.Process() |
|
|
memory_info = process.memory_info() |
|
|
self.logger.info(f"Current memory usage: {memory_info.rss / 1024 / 1024:.2f} MB") |
|
|
|