EasyReportsMCPServer / financial_analyzer.py
JC321's picture
Upload 2 files
f6bd766 verified
"""Financial Data Analysis Module"""
from edgar_client import EdgarDataClient
from datetime import datetime
import json
class FinancialAnalyzer:
def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"):
"""
Initialize financial analyzer
Args:
user_agent (str): User agent string for identifying request source
"""
self.edgar_client = EdgarDataClient(user_agent)
# Layer 2: Method-level cache (avoid duplicate API calls)
self._method_cache = {} # method_key -> result
self._method_cache_timestamps = {} # method_key -> timestamp
self._method_cache_ttl = 600 # 10 minutes cache
self._method_cache_max_size = 500 # Limit cache size
def _get_method_cache(self, cache_key):
"""Get cached method result if valid"""
if cache_key not in self._method_cache_timestamps:
return None
import time
age = time.time() - self._method_cache_timestamps[cache_key]
if age < self._method_cache_ttl:
return self._method_cache.get(cache_key)
else:
# Expired, remove from cache
self._method_cache.pop(cache_key, None)
self._method_cache_timestamps.pop(cache_key, None)
return None
def _set_method_cache(self, cache_key, result):
"""Cache method result with size limit"""
# LRU-like eviction if cache is full
if len(self._method_cache) >= self._method_cache_max_size:
# Remove oldest half
keys_to_remove = list(self._method_cache.keys())[:self._method_cache_max_size // 2]
for key in keys_to_remove:
self._method_cache.pop(key, None)
self._method_cache_timestamps.pop(key, None)
import time
self._method_cache[cache_key] = result
self._method_cache_timestamps[cache_key] = time.time()
def search_company(self, company_input):
"""
Search company information (by name, ticker, or CIK) - Optimized version
Args:
company_input (str): Company name, ticker, or CIK
Returns:
dict: Company information
"""
# Strip whitespace
company_input = company_input.strip()
# Strategy 1: If input is numeric and looks like CIK (8-10 digits), use fast CIK lookup
if company_input.isdigit() and len(company_input) >= 8:
# Normalize CIK to 10 digits
cik = company_input.zfill(10)
# Try fast lookup first (from cached tickers)
basic_info = self.edgar_client.get_company_by_cik(cik)
if basic_info:
# Fast path succeeded, now get detailed info
company_info = self.edgar_client.get_company_info(cik)
if company_info:
# Ensure 'ticker' exists alongside 'tickers' for compatibility
if "ticker" not in company_info:
tks = company_info.get("tickers") or []
company_info["ticker"] = tks[0] if tks else None
return company_info
else:
# Fallback to basic info if detailed fetch fails
return {
"cik": basic_info['cik'],
"name": basic_info['name'],
"tickers": [basic_info['ticker']] if basic_info.get('ticker') else [],
"ticker": basic_info.get('ticker'),
"_source": "basic_cik_lookup"
}
else:
# CIK not found in cache, try full API call
company_info = self.edgar_client.get_company_info(cik)
if company_info:
return company_info
else:
return {"error": "Company not found for specified CIK"}
# Strategy 2: Check if it looks like a ticker (short uppercase)
input_length = len(company_input)
is_likely_ticker = input_length <= 5 and company_input.isupper()
if is_likely_ticker:
# Try fast ticker lookup first
basic_info = self.edgar_client.get_company_by_ticker(company_input)
if basic_info:
# Fast ticker lookup succeeded - return enriched basic info
return {
"cik": basic_info['cik'],
"name": basic_info['name'],
"tickers": [basic_info['ticker']] if basic_info.get('ticker') else [],
"ticker": basic_info.get('ticker'),
"ein": None, # Not available in basic search
"fiscal_year_end": None, # Not available in basic search
"sic_description": None, # Not available in basic search
"_source": "quick_ticker_search",
"_note": "Basic info from ticker search. Use get_company_info for full details."
}
# Strategy 3: General search by name/ticker
# This returns basic info: {cik, name, ticker}
basic_info = self.edgar_client.search_company_by_name(company_input)
if not basic_info:
return {"error": "No matching company found"}
# Strategy 4: Decide whether to fetch detailed info
# For ticker-like searches, return basic info quickly
if is_likely_ticker:
# Quick response with basic info
return {
"cik": basic_info['cik'],
"name": basic_info['name'],
"tickers": [basic_info['ticker']] if basic_info.get('ticker') else [],
"ticker": basic_info.get('ticker'),
"ein": None,
"fiscal_year_end": None,
"sic_description": None,
"_source": "quick_search",
"_note": "Basic info from ticker search. Use get_company_info for full details."
}
# For name searches, fetch detailed info (worth the extra API call)
company_info = self.edgar_client.get_company_info(basic_info['cik'])
if company_info:
# Ensure 'ticker' exists alongside 'tickers' for compatibility
if "ticker" not in company_info:
tks = company_info.get("tickers") or []
company_info["ticker"] = tks[0] if tks else None
return company_info
else:
# Fallback to basic info if detailed fetch fails
return {
"cik": basic_info['cik'],
"name": basic_info['name'],
"tickers": [basic_info['ticker']] if basic_info.get('ticker') else [],
"ticker": basic_info.get('ticker'),
"_source": "basic_search_fallback"
}
def get_company_filings_list(self, cik, form_types=None):
"""
Get company filings list
Args:
cik (str): Company CIK
form_types (list): List of form types (default: ['10-K', '10-Q'])
Returns:
list: Filings list
"""
if form_types is None:
form_types = ['10-K', '10-Q']
filings = self.edgar_client.get_company_filings(cik, form_types)
return filings
def extract_financial_metrics(self, cik, years=3):
"""
Extract financial metrics for specified number of years
Args:
cik (str): Company CIK
years (int): Number of years to extract, default is 3 years
Returns:
list: List of financial data
"""
# Check method cache first (Layer 2)
cache_key = f"extract_metrics_{cik}_{years}"
cached = self._get_method_cache(cache_key)
if cached is not None:
print(f"[Cache Hit] extract_financial_metrics({cik}, {years})")
return cached
financial_data = []
# Step 1: Get company filings to determine what was actually filed
filings_10k = self.edgar_client.get_company_filings(cik, ['10-K'])
filings_20f = self.edgar_client.get_company_filings(cik, ['20-F'])
all_annual_filings = filings_10k + filings_20f
if not all_annual_filings:
return []
# Detect if company is a 20-F filer (foreign company)
is_20f_filer = len(filings_20f) > 0 and len(filings_10k) == 0
has_quarterly = False # 20-F filers typically don't have quarterly reports
# Step 2: Extract filing years from annual reports
# Use filing_date to determine the years we should query
filing_year_map = {} # Map: filing_year -> list of filings
for filing in all_annual_filings:
filing_date = filing.get('filing_date', '')
if filing_date and len(filing_date) >= 4:
try:
file_year = int(filing_date[:4])
if file_year not in filing_year_map:
filing_year_map[file_year] = []
filing_year_map[file_year].append(filing)
except ValueError:
continue
if not filing_year_map:
return []
# Step 3: Sort years in descending order and take the most recent N years
sorted_years = sorted(filing_year_map.keys(), reverse=True)
target_years = sorted_years[:years]
# Step 4: For each target year, we need to find the fiscal year from Company Facts
# Get company facts to map filing years to fiscal years
facts = self.edgar_client.get_company_facts(cik)
filing_to_fiscal_year = {} # Map: filing_year -> fiscal_year
if facts:
# Try to map filing years to fiscal years using Company Facts
for data_source in ["us-gaap", "ifrs-full"]:
if data_source in facts.get("facts", {}):
source_data = facts["facts"][data_source]
# Look for Revenue tag to get fiscal year mapping
revenue_tags = ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax",
"Revenue", "RevenueFromContractWithCustomer"]
for tag in revenue_tags:
if tag in source_data:
units = source_data[tag].get("units", {})
if "USD" in units:
for entry in units["USD"]:
form = entry.get("form", "")
fy = entry.get("fy", 0)
filed = entry.get("filed", "") # Filing date
fp = entry.get("fp", "")
# Map filing year to fiscal year
if form in ["10-K", "20-F"] and fy > 0 and filed and (fp == "FY" or not fp):
if len(filed) >= 10: # Format: YYYY-MM-DD
try:
file_year = int(filed[:4])
# Store the mapping: filing_year -> fiscal_year
if file_year not in filing_to_fiscal_year:
filing_to_fiscal_year[file_year] = fy
except ValueError:
continue
break # Found revenue tag, no need to check more
# Step 5: Generate period list for target years
# For each year: FY -> Q4 -> Q3 -> Q2 -> Q1 (descending order)
# For 20-F filers: only FY (no quarterly data)
periods = []
for file_year in target_years:
# Try to get fiscal year from mapping, otherwise use filing year
fiscal_year = filing_to_fiscal_year.get(file_year, file_year)
# First add annual data for this fiscal year
periods.append({
'period': str(fiscal_year),
'type': 'annual',
'fiscal_year': fiscal_year,
'filing_year': file_year
})
# Only add quarterly data for 10-K filers (not for 20-F filers)
if not is_20f_filer:
# Then add quarterly data in descending order: Q4, Q3, Q2, Q1
for quarter in range(4, 0, -1):
periods.append({
'period': f"{fiscal_year}Q{quarter}",
'type': 'quarterly',
'fiscal_year': fiscal_year,
'filing_year': file_year
})
# Step 6: Get financial data for each period
for idx, period_info in enumerate(periods):
period = period_info['period']
fiscal_year = period_info['fiscal_year']
data = self.edgar_client.get_financial_data_for_period(cik, period)
if data and "period" in data:
# Add fiscal year prefix for annual data
if period_info['type'] == 'annual':
data["period"] = f"FY{fiscal_year}"
# Add sequence number to maintain order
data["_sequence"] = idx
financial_data.append(data)
# Cache the result (Layer 2)
self._set_method_cache(cache_key, financial_data)
return financial_data
def get_latest_financial_data(self, cik):
"""
Get latest financial data
Args:
cik (str): Company CIK
Returns:
dict: Latest financial data
"""
# Check method cache first (Layer 2)
cache_key = f"latest_data_{cik}"
cached = self._get_method_cache(cache_key)
if cached is not None:
print(f"[Cache Hit] get_latest_financial_data({cik})")
return cached
# Get latest filing year (supports 10-K and 20-F)
filings_10k = self.edgar_client.get_company_filings(cik, ['10-K'])
filings_20f = self.edgar_client.get_company_filings(cik, ['20-F'])
filings = filings_10k + filings_20f
if not filings:
return {}
# Get latest filing year
latest_filing_year = None
for filing in filings:
if 'filing_date' in filing and filing['filing_date']:
try:
filing_year = int(filing['filing_date'][:4])
if latest_filing_year is None or filing_year > latest_filing_year:
latest_filing_year = filing_year
except ValueError:
continue
if latest_filing_year is None:
return {}
# Get financial data for latest year
result = self.edgar_client.get_financial_data_for_period(cik, str(latest_filing_year))
# Cache the result (Layer 2)
self._set_method_cache(cache_key, result)
return result
def format_financial_data(self, financial_data):
"""
Format financial data for display
Args:
financial_data (dict or list): Financial data
Returns:
dict or list: Formatted financial data
"""
if isinstance(financial_data, list):
# Sort by _sequence to maintain correct order (FY -> Q4 -> Q3 -> Q2 -> Q1)
sorted_data = sorted(financial_data, key=lambda x: x.get("_sequence", 999))
formatted_data = []
for data in sorted_data:
formatted_data.append(self._format_single_financial_data(data))
return formatted_data
else:
return self._format_single_financial_data(financial_data)
def _format_single_financial_data(self, data):
"""
Format single financial data entry
Args:
data (dict): Financial data
Returns:
dict: Formatted financial data
"""
formatted = data.copy()
# Ensure all key fields exist, even if None
key_fields = ['total_revenue', 'net_income', 'earnings_per_share', 'operating_expenses', 'operating_cash_flow', 'source_url', 'source_form']
for key in key_fields:
if key not in formatted:
formatted[key] = None
# No longer perform unit conversion, keep original values
# Format EPS, keep two decimal places
if 'earnings_per_share' in formatted and isinstance(formatted['earnings_per_share'], (int, float)):
formatted['earnings_per_share'] = round(formatted['earnings_per_share'], 2)
return formatted