"""Financial Data Analysis Module""" from edgar_client import EdgarDataClient from datetime import datetime import json class FinancialAnalyzer: def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"): """ Initialize financial analyzer Args: user_agent (str): User agent string for identifying request source """ self.edgar_client = EdgarDataClient(user_agent) # Layer 2: Method-level cache (avoid duplicate API calls) self._method_cache = {} # method_key -> result self._method_cache_timestamps = {} # method_key -> timestamp self._method_cache_ttl = 600 # 10 minutes cache self._method_cache_max_size = 500 # Limit cache size def _get_method_cache(self, cache_key): """Get cached method result if valid""" if cache_key not in self._method_cache_timestamps: return None import time age = time.time() - self._method_cache_timestamps[cache_key] if age < self._method_cache_ttl: return self._method_cache.get(cache_key) else: # Expired, remove from cache self._method_cache.pop(cache_key, None) self._method_cache_timestamps.pop(cache_key, None) return None def _set_method_cache(self, cache_key, result): """Cache method result with size limit""" # LRU-like eviction if cache is full if len(self._method_cache) >= self._method_cache_max_size: # Remove oldest half keys_to_remove = list(self._method_cache.keys())[:self._method_cache_max_size // 2] for key in keys_to_remove: self._method_cache.pop(key, None) self._method_cache_timestamps.pop(key, None) import time self._method_cache[cache_key] = result self._method_cache_timestamps[cache_key] = time.time() def search_company(self, company_input): """ Search company information (by name, ticker, or CIK) - Optimized version Args: company_input (str): Company name, ticker, or CIK Returns: dict: Company information """ # Strip whitespace company_input = company_input.strip() # Strategy 1: If input is numeric and looks like CIK (8-10 digits), use fast CIK lookup if company_input.isdigit() and len(company_input) >= 8: # Normalize CIK to 10 digits cik = company_input.zfill(10) # Try fast lookup first (from cached tickers) basic_info = self.edgar_client.get_company_by_cik(cik) if basic_info: # Fast path succeeded, now get detailed info company_info = self.edgar_client.get_company_info(cik) if company_info: # Ensure 'ticker' exists alongside 'tickers' for compatibility if "ticker" not in company_info: tks = company_info.get("tickers") or [] company_info["ticker"] = tks[0] if tks else None return company_info else: # Fallback to basic info if detailed fetch fails return { "cik": basic_info['cik'], "name": basic_info['name'], "tickers": [basic_info['ticker']] if basic_info.get('ticker') else [], "ticker": basic_info.get('ticker'), "_source": "basic_cik_lookup" } else: # CIK not found in cache, try full API call company_info = self.edgar_client.get_company_info(cik) if company_info: return company_info else: return {"error": "Company not found for specified CIK"} # Strategy 2: Check if it looks like a ticker (short uppercase) input_length = len(company_input) is_likely_ticker = input_length <= 5 and company_input.isupper() if is_likely_ticker: # Try fast ticker lookup first basic_info = self.edgar_client.get_company_by_ticker(company_input) if basic_info: # Fast ticker lookup succeeded - return enriched basic info return { "cik": basic_info['cik'], "name": basic_info['name'], "tickers": [basic_info['ticker']] if basic_info.get('ticker') else [], "ticker": basic_info.get('ticker'), "ein": None, # Not available in basic search "fiscal_year_end": None, # Not available in basic search "sic_description": None, # Not available in basic search "_source": "quick_ticker_search", "_note": "Basic info from ticker search. Use get_company_info for full details." } # Strategy 3: General search by name/ticker # This returns basic info: {cik, name, ticker} basic_info = self.edgar_client.search_company_by_name(company_input) if not basic_info: return {"error": "No matching company found"} # Strategy 4: Decide whether to fetch detailed info # For ticker-like searches, return basic info quickly if is_likely_ticker: # Quick response with basic info return { "cik": basic_info['cik'], "name": basic_info['name'], "tickers": [basic_info['ticker']] if basic_info.get('ticker') else [], "ticker": basic_info.get('ticker'), "ein": None, "fiscal_year_end": None, "sic_description": None, "_source": "quick_search", "_note": "Basic info from ticker search. Use get_company_info for full details." } # For name searches, fetch detailed info (worth the extra API call) company_info = self.edgar_client.get_company_info(basic_info['cik']) if company_info: # Ensure 'ticker' exists alongside 'tickers' for compatibility if "ticker" not in company_info: tks = company_info.get("tickers") or [] company_info["ticker"] = tks[0] if tks else None return company_info else: # Fallback to basic info if detailed fetch fails return { "cik": basic_info['cik'], "name": basic_info['name'], "tickers": [basic_info['ticker']] if basic_info.get('ticker') else [], "ticker": basic_info.get('ticker'), "_source": "basic_search_fallback" } def get_company_filings_list(self, cik, form_types=None): """ Get company filings list Args: cik (str): Company CIK form_types (list): List of form types (default: ['10-K', '10-Q']) Returns: list: Filings list """ if form_types is None: form_types = ['10-K', '10-Q'] filings = self.edgar_client.get_company_filings(cik, form_types) return filings def extract_financial_metrics(self, cik, years=3): """ Extract financial metrics for specified number of years Args: cik (str): Company CIK years (int): Number of years to extract, default is 3 years Returns: list: List of financial data """ # Check method cache first (Layer 2) cache_key = f"extract_metrics_{cik}_{years}" cached = self._get_method_cache(cache_key) if cached is not None: print(f"[Cache Hit] extract_financial_metrics({cik}, {years})") return cached financial_data = [] # Step 1: Get company filings to determine what was actually filed filings_10k = self.edgar_client.get_company_filings(cik, ['10-K']) filings_20f = self.edgar_client.get_company_filings(cik, ['20-F']) all_annual_filings = filings_10k + filings_20f if not all_annual_filings: return [] # Detect if company is a 20-F filer (foreign company) is_20f_filer = len(filings_20f) > 0 and len(filings_10k) == 0 has_quarterly = False # 20-F filers typically don't have quarterly reports # Step 2: Extract filing years from annual reports # Use filing_date to determine the years we should query filing_year_map = {} # Map: filing_year -> list of filings for filing in all_annual_filings: filing_date = filing.get('filing_date', '') if filing_date and len(filing_date) >= 4: try: file_year = int(filing_date[:4]) if file_year not in filing_year_map: filing_year_map[file_year] = [] filing_year_map[file_year].append(filing) except ValueError: continue if not filing_year_map: return [] # Step 3: Sort years in descending order and take the most recent N years sorted_years = sorted(filing_year_map.keys(), reverse=True) target_years = sorted_years[:years] # Step 4: For each target year, we need to find the fiscal year from Company Facts # Get company facts to map filing years to fiscal years facts = self.edgar_client.get_company_facts(cik) filing_to_fiscal_year = {} # Map: filing_year -> fiscal_year if facts: # Try to map filing years to fiscal years using Company Facts for data_source in ["us-gaap", "ifrs-full"]: if data_source in facts.get("facts", {}): source_data = facts["facts"][data_source] # Look for Revenue tag to get fiscal year mapping revenue_tags = ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "Revenue", "RevenueFromContractWithCustomer"] for tag in revenue_tags: if tag in source_data: units = source_data[tag].get("units", {}) if "USD" in units: for entry in units["USD"]: form = entry.get("form", "") fy = entry.get("fy", 0) filed = entry.get("filed", "") # Filing date fp = entry.get("fp", "") # Map filing year to fiscal year if form in ["10-K", "20-F"] and fy > 0 and filed and (fp == "FY" or not fp): if len(filed) >= 10: # Format: YYYY-MM-DD try: file_year = int(filed[:4]) # Store the mapping: filing_year -> fiscal_year if file_year not in filing_to_fiscal_year: filing_to_fiscal_year[file_year] = fy except ValueError: continue break # Found revenue tag, no need to check more # Step 5: Generate period list for target years # For each year: FY -> Q4 -> Q3 -> Q2 -> Q1 (descending order) # For 20-F filers: only FY (no quarterly data) periods = [] for file_year in target_years: # Try to get fiscal year from mapping, otherwise use filing year fiscal_year = filing_to_fiscal_year.get(file_year, file_year) # First add annual data for this fiscal year periods.append({ 'period': str(fiscal_year), 'type': 'annual', 'fiscal_year': fiscal_year, 'filing_year': file_year }) # Only add quarterly data for 10-K filers (not for 20-F filers) if not is_20f_filer: # Then add quarterly data in descending order: Q4, Q3, Q2, Q1 for quarter in range(4, 0, -1): periods.append({ 'period': f"{fiscal_year}Q{quarter}", 'type': 'quarterly', 'fiscal_year': fiscal_year, 'filing_year': file_year }) # Step 6: Get financial data for each period for idx, period_info in enumerate(periods): period = period_info['period'] fiscal_year = period_info['fiscal_year'] data = self.edgar_client.get_financial_data_for_period(cik, period) if data and "period" in data: # Add fiscal year prefix for annual data if period_info['type'] == 'annual': data["period"] = f"FY{fiscal_year}" # Add sequence number to maintain order data["_sequence"] = idx financial_data.append(data) # Cache the result (Layer 2) self._set_method_cache(cache_key, financial_data) return financial_data def get_latest_financial_data(self, cik): """ Get latest financial data Args: cik (str): Company CIK Returns: dict: Latest financial data """ # Check method cache first (Layer 2) cache_key = f"latest_data_{cik}" cached = self._get_method_cache(cache_key) if cached is not None: print(f"[Cache Hit] get_latest_financial_data({cik})") return cached # Get latest filing year (supports 10-K and 20-F) filings_10k = self.edgar_client.get_company_filings(cik, ['10-K']) filings_20f = self.edgar_client.get_company_filings(cik, ['20-F']) filings = filings_10k + filings_20f if not filings: return {} # Get latest filing year latest_filing_year = None for filing in filings: if 'filing_date' in filing and filing['filing_date']: try: filing_year = int(filing['filing_date'][:4]) if latest_filing_year is None or filing_year > latest_filing_year: latest_filing_year = filing_year except ValueError: continue if latest_filing_year is None: return {} # Get financial data for latest year result = self.edgar_client.get_financial_data_for_period(cik, str(latest_filing_year)) # Cache the result (Layer 2) self._set_method_cache(cache_key, result) return result def format_financial_data(self, financial_data): """ Format financial data for display Args: financial_data (dict or list): Financial data Returns: dict or list: Formatted financial data """ if isinstance(financial_data, list): # Sort by _sequence to maintain correct order (FY -> Q4 -> Q3 -> Q2 -> Q1) sorted_data = sorted(financial_data, key=lambda x: x.get("_sequence", 999)) formatted_data = [] for data in sorted_data: formatted_data.append(self._format_single_financial_data(data)) return formatted_data else: return self._format_single_financial_data(financial_data) def _format_single_financial_data(self, data): """ Format single financial data entry Args: data (dict): Financial data Returns: dict: Formatted financial data """ formatted = data.copy() # Ensure all key fields exist, even if None key_fields = ['total_revenue', 'net_income', 'earnings_per_share', 'operating_expenses', 'operating_cash_flow', 'source_url', 'source_form'] for key in key_fields: if key not in formatted: formatted[key] = None # No longer perform unit conversion, keep original values # Format EPS, keep two decimal places if 'earnings_per_share' in formatted and isinstance(formatted['earnings_per_share'], (int, float)): formatted['earnings_per_share'] = round(formatted['earnings_per_share'], 2) return formatted