Spaces:
Sleeping
Sleeping
| """Financial Data Analysis Module""" | |
| from edgar_client import EdgarDataClient | |
| from datetime import datetime | |
| import json | |
| class FinancialAnalyzer: | |
| def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"): | |
| """ | |
| Initialize financial analyzer | |
| Args: | |
| user_agent (str): User agent string for identifying request source | |
| """ | |
| self.edgar_client = EdgarDataClient(user_agent) | |
| # Layer 2: Method-level cache (avoid duplicate API calls) | |
| self._method_cache = {} # method_key -> result | |
| self._method_cache_timestamps = {} # method_key -> timestamp | |
| self._method_cache_ttl = 600 # 10 minutes cache | |
| self._method_cache_max_size = 500 # Limit cache size | |
| def _get_method_cache(self, cache_key): | |
| """Get cached method result if valid""" | |
| if cache_key not in self._method_cache_timestamps: | |
| return None | |
| import time | |
| age = time.time() - self._method_cache_timestamps[cache_key] | |
| if age < self._method_cache_ttl: | |
| return self._method_cache.get(cache_key) | |
| else: | |
| # Expired, remove from cache | |
| self._method_cache.pop(cache_key, None) | |
| self._method_cache_timestamps.pop(cache_key, None) | |
| return None | |
| def _set_method_cache(self, cache_key, result): | |
| """Cache method result with size limit""" | |
| # LRU-like eviction if cache is full | |
| if len(self._method_cache) >= self._method_cache_max_size: | |
| # Remove oldest half | |
| keys_to_remove = list(self._method_cache.keys())[:self._method_cache_max_size // 2] | |
| for key in keys_to_remove: | |
| self._method_cache.pop(key, None) | |
| self._method_cache_timestamps.pop(key, None) | |
| import time | |
| self._method_cache[cache_key] = result | |
| self._method_cache_timestamps[cache_key] = time.time() | |
| def search_company(self, company_input): | |
| """ | |
| Search company information (by name, ticker, or CIK) - Optimized version | |
| Args: | |
| company_input (str): Company name, ticker, or CIK | |
| Returns: | |
| dict: Company information | |
| """ | |
| # Strip whitespace | |
| company_input = company_input.strip() | |
| # Strategy 1: If input is numeric and looks like CIK (8-10 digits), use fast CIK lookup | |
| if company_input.isdigit() and len(company_input) >= 8: | |
| # Normalize CIK to 10 digits | |
| cik = company_input.zfill(10) | |
| # Try fast lookup first (from cached tickers) | |
| basic_info = self.edgar_client.get_company_by_cik(cik) | |
| if basic_info: | |
| # Fast path succeeded, now get detailed info | |
| company_info = self.edgar_client.get_company_info(cik) | |
| if company_info: | |
| # Ensure 'ticker' exists alongside 'tickers' for compatibility | |
| if "ticker" not in company_info: | |
| tks = company_info.get("tickers") or [] | |
| company_info["ticker"] = tks[0] if tks else None | |
| return company_info | |
| else: | |
| # Fallback to basic info if detailed fetch fails | |
| return { | |
| "cik": basic_info['cik'], | |
| "name": basic_info['name'], | |
| "tickers": [basic_info['ticker']] if basic_info.get('ticker') else [], | |
| "ticker": basic_info.get('ticker'), | |
| "_source": "basic_cik_lookup" | |
| } | |
| else: | |
| # CIK not found in cache, try full API call | |
| company_info = self.edgar_client.get_company_info(cik) | |
| if company_info: | |
| return company_info | |
| else: | |
| return {"error": "Company not found for specified CIK"} | |
| # Strategy 2: Check if it looks like a ticker (short uppercase) | |
| input_length = len(company_input) | |
| is_likely_ticker = input_length <= 5 and company_input.isupper() | |
| if is_likely_ticker: | |
| # Try fast ticker lookup first | |
| basic_info = self.edgar_client.get_company_by_ticker(company_input) | |
| if basic_info: | |
| # Fast ticker lookup succeeded - return enriched basic info | |
| return { | |
| "cik": basic_info['cik'], | |
| "name": basic_info['name'], | |
| "tickers": [basic_info['ticker']] if basic_info.get('ticker') else [], | |
| "ticker": basic_info.get('ticker'), | |
| "ein": None, # Not available in basic search | |
| "fiscal_year_end": None, # Not available in basic search | |
| "sic_description": None, # Not available in basic search | |
| "_source": "quick_ticker_search", | |
| "_note": "Basic info from ticker search. Use get_company_info for full details." | |
| } | |
| # Strategy 3: General search by name/ticker | |
| # This returns basic info: {cik, name, ticker} | |
| basic_info = self.edgar_client.search_company_by_name(company_input) | |
| if not basic_info: | |
| return {"error": "No matching company found"} | |
| # Strategy 4: Decide whether to fetch detailed info | |
| # For ticker-like searches, return basic info quickly | |
| if is_likely_ticker: | |
| # Quick response with basic info | |
| return { | |
| "cik": basic_info['cik'], | |
| "name": basic_info['name'], | |
| "tickers": [basic_info['ticker']] if basic_info.get('ticker') else [], | |
| "ticker": basic_info.get('ticker'), | |
| "ein": None, | |
| "fiscal_year_end": None, | |
| "sic_description": None, | |
| "_source": "quick_search", | |
| "_note": "Basic info from ticker search. Use get_company_info for full details." | |
| } | |
| # For name searches, fetch detailed info (worth the extra API call) | |
| company_info = self.edgar_client.get_company_info(basic_info['cik']) | |
| if company_info: | |
| # Ensure 'ticker' exists alongside 'tickers' for compatibility | |
| if "ticker" not in company_info: | |
| tks = company_info.get("tickers") or [] | |
| company_info["ticker"] = tks[0] if tks else None | |
| return company_info | |
| else: | |
| # Fallback to basic info if detailed fetch fails | |
| return { | |
| "cik": basic_info['cik'], | |
| "name": basic_info['name'], | |
| "tickers": [basic_info['ticker']] if basic_info.get('ticker') else [], | |
| "ticker": basic_info.get('ticker'), | |
| "_source": "basic_search_fallback" | |
| } | |
| def get_company_filings_list(self, cik, form_types=None): | |
| """ | |
| Get company filings list | |
| Args: | |
| cik (str): Company CIK | |
| form_types (list): List of form types (default: ['10-K', '10-Q']) | |
| Returns: | |
| list: Filings list | |
| """ | |
| if form_types is None: | |
| form_types = ['10-K', '10-Q'] | |
| filings = self.edgar_client.get_company_filings(cik, form_types) | |
| return filings | |
| def extract_financial_metrics(self, cik, years=3): | |
| """ | |
| Extract financial metrics for specified number of years | |
| Args: | |
| cik (str): Company CIK | |
| years (int): Number of years to extract, default is 3 years | |
| Returns: | |
| list: List of financial data | |
| """ | |
| # Check method cache first (Layer 2) | |
| cache_key = f"extract_metrics_{cik}_{years}" | |
| cached = self._get_method_cache(cache_key) | |
| if cached is not None: | |
| print(f"[Cache Hit] extract_financial_metrics({cik}, {years})") | |
| return cached | |
| financial_data = [] | |
| # Step 1: Get company filings to determine what was actually filed | |
| filings_10k = self.edgar_client.get_company_filings(cik, ['10-K']) | |
| filings_20f = self.edgar_client.get_company_filings(cik, ['20-F']) | |
| all_annual_filings = filings_10k + filings_20f | |
| if not all_annual_filings: | |
| return [] | |
| # Detect if company is a 20-F filer (foreign company) | |
| is_20f_filer = len(filings_20f) > 0 and len(filings_10k) == 0 | |
| has_quarterly = False # 20-F filers typically don't have quarterly reports | |
| # Step 2: Extract filing years from annual reports | |
| # Use filing_date to determine the years we should query | |
| filing_year_map = {} # Map: filing_year -> list of filings | |
| for filing in all_annual_filings: | |
| filing_date = filing.get('filing_date', '') | |
| if filing_date and len(filing_date) >= 4: | |
| try: | |
| file_year = int(filing_date[:4]) | |
| if file_year not in filing_year_map: | |
| filing_year_map[file_year] = [] | |
| filing_year_map[file_year].append(filing) | |
| except ValueError: | |
| continue | |
| if not filing_year_map: | |
| return [] | |
| # Step 3: Sort years in descending order and take the most recent N years | |
| sorted_years = sorted(filing_year_map.keys(), reverse=True) | |
| target_years = sorted_years[:years] | |
| # Step 4: For each target year, we need to find the fiscal year from Company Facts | |
| # Get company facts to map filing years to fiscal years | |
| facts = self.edgar_client.get_company_facts(cik) | |
| filing_to_fiscal_year = {} # Map: filing_year -> fiscal_year | |
| if facts: | |
| # Try to map filing years to fiscal years using Company Facts | |
| for data_source in ["us-gaap", "ifrs-full"]: | |
| if data_source in facts.get("facts", {}): | |
| source_data = facts["facts"][data_source] | |
| # Look for Revenue tag to get fiscal year mapping | |
| revenue_tags = ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", | |
| "Revenue", "RevenueFromContractWithCustomer"] | |
| for tag in revenue_tags: | |
| if tag in source_data: | |
| units = source_data[tag].get("units", {}) | |
| if "USD" in units: | |
| for entry in units["USD"]: | |
| form = entry.get("form", "") | |
| fy = entry.get("fy", 0) | |
| filed = entry.get("filed", "") # Filing date | |
| fp = entry.get("fp", "") | |
| # Map filing year to fiscal year | |
| if form in ["10-K", "20-F"] and fy > 0 and filed and (fp == "FY" or not fp): | |
| if len(filed) >= 10: # Format: YYYY-MM-DD | |
| try: | |
| file_year = int(filed[:4]) | |
| # Store the mapping: filing_year -> fiscal_year | |
| if file_year not in filing_to_fiscal_year: | |
| filing_to_fiscal_year[file_year] = fy | |
| except ValueError: | |
| continue | |
| break # Found revenue tag, no need to check more | |
| # Step 5: Generate period list for target years | |
| # For each year: FY -> Q4 -> Q3 -> Q2 -> Q1 (descending order) | |
| # For 20-F filers: only FY (no quarterly data) | |
| periods = [] | |
| for file_year in target_years: | |
| # Try to get fiscal year from mapping, otherwise use filing year | |
| fiscal_year = filing_to_fiscal_year.get(file_year, file_year) | |
| # First add annual data for this fiscal year | |
| periods.append({ | |
| 'period': str(fiscal_year), | |
| 'type': 'annual', | |
| 'fiscal_year': fiscal_year, | |
| 'filing_year': file_year | |
| }) | |
| # Only add quarterly data for 10-K filers (not for 20-F filers) | |
| if not is_20f_filer: | |
| # Then add quarterly data in descending order: Q4, Q3, Q2, Q1 | |
| for quarter in range(4, 0, -1): | |
| periods.append({ | |
| 'period': f"{fiscal_year}Q{quarter}", | |
| 'type': 'quarterly', | |
| 'fiscal_year': fiscal_year, | |
| 'filing_year': file_year | |
| }) | |
| # Step 6: Get financial data for each period | |
| for idx, period_info in enumerate(periods): | |
| period = period_info['period'] | |
| fiscal_year = period_info['fiscal_year'] | |
| data = self.edgar_client.get_financial_data_for_period(cik, period) | |
| if data and "period" in data: | |
| # Add fiscal year prefix for annual data | |
| if period_info['type'] == 'annual': | |
| data["period"] = f"FY{fiscal_year}" | |
| # Add sequence number to maintain order | |
| data["_sequence"] = idx | |
| financial_data.append(data) | |
| # Cache the result (Layer 2) | |
| self._set_method_cache(cache_key, financial_data) | |
| return financial_data | |
| def get_latest_financial_data(self, cik): | |
| """ | |
| Get latest financial data | |
| Args: | |
| cik (str): Company CIK | |
| Returns: | |
| dict: Latest financial data | |
| """ | |
| # Check method cache first (Layer 2) | |
| cache_key = f"latest_data_{cik}" | |
| cached = self._get_method_cache(cache_key) | |
| if cached is not None: | |
| print(f"[Cache Hit] get_latest_financial_data({cik})") | |
| return cached | |
| # Get latest filing year (supports 10-K and 20-F) | |
| filings_10k = self.edgar_client.get_company_filings(cik, ['10-K']) | |
| filings_20f = self.edgar_client.get_company_filings(cik, ['20-F']) | |
| filings = filings_10k + filings_20f | |
| if not filings: | |
| return {} | |
| # Get latest filing year | |
| latest_filing_year = None | |
| for filing in filings: | |
| if 'filing_date' in filing and filing['filing_date']: | |
| try: | |
| filing_year = int(filing['filing_date'][:4]) | |
| if latest_filing_year is None or filing_year > latest_filing_year: | |
| latest_filing_year = filing_year | |
| except ValueError: | |
| continue | |
| if latest_filing_year is None: | |
| return {} | |
| # Get financial data for latest year | |
| result = self.edgar_client.get_financial_data_for_period(cik, str(latest_filing_year)) | |
| # Cache the result (Layer 2) | |
| self._set_method_cache(cache_key, result) | |
| return result | |
| def format_financial_data(self, financial_data): | |
| """ | |
| Format financial data for display | |
| Args: | |
| financial_data (dict or list): Financial data | |
| Returns: | |
| dict or list: Formatted financial data | |
| """ | |
| if isinstance(financial_data, list): | |
| # Sort by _sequence to maintain correct order (FY -> Q4 -> Q3 -> Q2 -> Q1) | |
| sorted_data = sorted(financial_data, key=lambda x: x.get("_sequence", 999)) | |
| formatted_data = [] | |
| for data in sorted_data: | |
| formatted_data.append(self._format_single_financial_data(data)) | |
| return formatted_data | |
| else: | |
| return self._format_single_financial_data(financial_data) | |
| def _format_single_financial_data(self, data): | |
| """ | |
| Format single financial data entry | |
| Args: | |
| data (dict): Financial data | |
| Returns: | |
| dict: Formatted financial data | |
| """ | |
| formatted = data.copy() | |
| # Ensure all key fields exist, even if None | |
| key_fields = ['total_revenue', 'net_income', 'earnings_per_share', 'operating_expenses', 'operating_cash_flow', 'source_url', 'source_form'] | |
| for key in key_fields: | |
| if key not in formatted: | |
| formatted[key] = None | |
| # No longer perform unit conversion, keep original values | |
| # Format EPS, keep two decimal places | |
| if 'earnings_per_share' in formatted and isinstance(formatted['earnings_per_share'], (int, float)): | |
| formatted['earnings_per_share'] = round(formatted['earnings_per_share'], 2) | |
| return formatted | |