Spaces:
Runtime error
Runtime error
| """Financial Data Analysis Module""" | |
| from edgar_client import EdgarDataClient | |
| from datetime import datetime | |
| from functools import lru_cache | |
| import json | |
| class FinancialAnalyzer: | |
| def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"): | |
| """ | |
| Initialize financial analyzer | |
| Args: | |
| user_agent (str): User agent string for identifying request source | |
| """ | |
| self.edgar_client = EdgarDataClient(user_agent) | |
| # 新增:实例级缓存,进一步提升性能 | |
| self._search_cache = {} | |
| self._extract_metrics_cache = {} # 缓存 extract_financial_metrics 结果 | |
| def search_company(self, company_input): | |
| """ | |
| Search company information (by name or CIK) - Optimized version | |
| Args: | |
| company_input (str): Company name or CIK | |
| Returns: | |
| dict: Company information | |
| """ | |
| # 实例级缓存检查 | |
| if company_input in self._search_cache: | |
| return self._search_cache[company_input] | |
| # If input is numeric, assume it's a CIK | |
| if company_input.isdigit() and len(company_input) >= 8: | |
| # Get company information from cache (will use @lru_cache) | |
| company_info = self.edgar_client.get_company_info(company_input) | |
| if company_info: | |
| self._search_cache[company_input] = company_info | |
| return company_info | |
| else: | |
| return {"error": "Company not found for specified CIK"} | |
| else: | |
| # Search company by name/ticker (uses cached company_tickers.json) | |
| company = self.edgar_client.search_company_by_name(company_input) | |
| if company: | |
| # ✅ OPTIMIZATION: Return basic info directly without calling get_company_info | |
| # search_company_by_name already returns: cik, name, ticker | |
| # Only call get_company_info if we need SIC code or description | |
| # For basic searches, the ticker data is sufficient | |
| # This eliminates the 3-5 second delay from get_company_info | |
| result = { | |
| "cik": company['cik'], | |
| "name": company['name'], | |
| "tickers": [company['ticker']] if company.get('ticker') else [], | |
| "_source": "company_tickers_cache" # Debug info | |
| } | |
| self._search_cache[company_input] = result | |
| return result | |
| else: | |
| return {"error": "No matching company found"} | |
| def get_company_filings_list(self, cik, form_types=['10-K', '10-Q']): | |
| """ | |
| Get company filings list | |
| Args: | |
| cik (str): Company CIK | |
| form_types (list): List of form types | |
| Returns: | |
| list: Filings list | |
| """ | |
| filings = self.edgar_client.get_company_filings(cik, form_types) | |
| return filings | |
| def extract_financial_metrics(self, cik, years=3): | |
| """ | |
| Extract financial metrics for specified number of years (optimized) | |
| Args: | |
| cik (str): Company CIK | |
| years (int): Number of years to extract, default is 3 years | |
| Returns: | |
| list: List of financial data | |
| """ | |
| # 实例级缓存检查(避免重复计算) | |
| cache_key = f"{cik}_{years}" | |
| if cache_key in self._extract_metrics_cache: | |
| return self._extract_metrics_cache[cache_key] | |
| financial_data = [] | |
| # Step 1: Get company facts ONCE (will be cached) | |
| facts = self.edgar_client.get_company_facts(cik) | |
| if not facts: | |
| return [] | |
| # Step 2: Get company filings ONCE to determine available years | |
| # Use tuple for caching compatibility | |
| filings_10k = self.edgar_client.get_company_filings(cik, ('10-K',)) | |
| filings_20f = self.edgar_client.get_company_filings(cik, ('20-F',)) | |
| all_annual_filings = filings_10k + filings_20f | |
| if not all_annual_filings: | |
| return [] | |
| # Step 3: Extract filing years from annual reports | |
| filing_year_map = {} # Map: filing_year -> list of filings | |
| for filing in all_annual_filings: | |
| filing_date = filing.get('filing_date', '') | |
| if filing_date and len(filing_date) >= 4: | |
| try: | |
| file_year = int(filing_date[:4]) | |
| if file_year not in filing_year_map: | |
| filing_year_map[file_year] = [] | |
| filing_year_map[file_year].append(filing) | |
| except ValueError: | |
| continue | |
| if not filing_year_map: | |
| return [] | |
| # Step 4: Sort years in descending order and take the most recent N years | |
| sorted_years = sorted(filing_year_map.keys(), reverse=True) | |
| target_years = sorted_years[:years] | |
| # Step 5: Map filing years to fiscal years using facts (already fetched) | |
| filing_to_fiscal_year = {} # Map: filing_year -> fiscal_year | |
| # Try to map filing years to fiscal years using Company Facts | |
| for data_source in ["us-gaap", "ifrs-full"]: | |
| if data_source in facts.get("facts", {}): | |
| source_data = facts["facts"][data_source] | |
| # Look for Revenue tag to get fiscal year mapping | |
| revenue_tags = ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", | |
| "Revenue", "RevenueFromContractWithCustomer"] | |
| for tag in revenue_tags: | |
| if tag in source_data: | |
| units = source_data[tag].get("units", {}) | |
| if "USD" in units: | |
| for entry in units["USD"]: | |
| form = entry.get("form", "") | |
| fy = entry.get("fy", 0) | |
| filed = entry.get("filed", "") # Filing date | |
| fp = entry.get("fp", "") | |
| # Map filing year to fiscal year | |
| if form in ["10-K", "20-F"] and fy > 0 and filed and (fp == "FY" or not fp): | |
| if len(filed) >= 10: # Format: YYYY-MM-DD | |
| try: | |
| file_year = int(filed[:4]) | |
| # Store the mapping: filing_year -> fiscal_year | |
| if file_year not in filing_to_fiscal_year: | |
| filing_to_fiscal_year[file_year] = fy | |
| except ValueError: | |
| continue | |
| break # Found revenue tag, no need to check more | |
| # Step 6: Generate period list for target years | |
| # For each year: FY -> Q4 -> Q3 -> Q2 -> Q1 (descending order) | |
| periods = [] | |
| for file_year in target_years: | |
| # Try to get fiscal year from mapping, otherwise use filing year | |
| fiscal_year = filing_to_fiscal_year.get(file_year, file_year) | |
| # First add annual data for this fiscal year | |
| periods.append({ | |
| 'period': str(fiscal_year), | |
| 'type': 'annual', | |
| 'fiscal_year': fiscal_year, | |
| 'filing_year': file_year | |
| }) | |
| # Then add quarterly data in descending order: Q4, Q3, Q2, Q1 | |
| for quarter in range(4, 0, -1): | |
| periods.append({ | |
| 'period': f"{fiscal_year}Q{quarter}", | |
| 'type': 'quarterly', | |
| 'fiscal_year': fiscal_year, | |
| 'filing_year': file_year | |
| }) | |
| # Step 7: Get financial data for each period | |
| for idx, period_info in enumerate(periods): | |
| period = period_info['period'] | |
| fiscal_year = period_info['fiscal_year'] | |
| data = self.edgar_client.get_financial_data_for_period(cik, period) | |
| if data and "period" in data: | |
| # Add fiscal year prefix for annual data | |
| if period_info['type'] == 'annual': | |
| data["period"] = f"FY{fiscal_year}" | |
| # Add sequence number to maintain order | |
| data["_sequence"] = idx | |
| financial_data.append(data) | |
| # 缓存结果 | |
| if financial_data: | |
| self._extract_metrics_cache[cache_key] = financial_data | |
| return financial_data | |
| def get_latest_financial_data(self, cik): | |
| """ | |
| Get latest financial data (optimized) | |
| Args: | |
| cik (str): Company CIK | |
| Returns: | |
| dict: Latest financial data | |
| """ | |
| # Get latest filing year (supports 10-K and 20-F) | |
| # Use tuple for caching | |
| filings_10k = self.edgar_client.get_company_filings(cik, ('10-K',)) | |
| filings_20f = self.edgar_client.get_company_filings(cik, ('20-F',)) | |
| filings = filings_10k + filings_20f | |
| if not filings: | |
| return {} | |
| # Get latest filing year | |
| latest_filing_year = None | |
| for filing in filings: | |
| if 'filing_date' in filing and filing['filing_date']: | |
| try: | |
| filing_year = int(filing['filing_date'][:4]) | |
| if latest_filing_year is None or filing_year > latest_filing_year: | |
| latest_filing_year = filing_year | |
| except ValueError: | |
| continue | |
| if latest_filing_year is None: | |
| return {} | |
| # Get financial data for latest year | |
| return self.edgar_client.get_financial_data_for_period(cik, str(latest_filing_year)) | |
| def format_financial_data(self, financial_data): | |
| """ | |
| Format financial data for display | |
| Args: | |
| financial_data (dict or list): Financial data | |
| Returns: | |
| dict or list: Formatted financial data | |
| """ | |
| if isinstance(financial_data, list): | |
| # Sort by _sequence to maintain correct order (FY -> Q4 -> Q3 -> Q2 -> Q1) | |
| sorted_data = sorted(financial_data, key=lambda x: x.get("_sequence", 999)) | |
| formatted_data = [] | |
| for data in sorted_data: | |
| formatted_data.append(self._format_single_financial_data(data)) | |
| return formatted_data | |
| else: | |
| return self._format_single_financial_data(financial_data) | |
| def _format_single_financial_data(self, data): | |
| """ | |
| Format single financial data entry | |
| Args: | |
| data (dict): Financial data | |
| Returns: | |
| dict: Formatted financial data | |
| """ | |
| formatted = data.copy() | |
| # Ensure all key fields exist, even if None | |
| key_fields = ['total_revenue', 'net_income', 'earnings_per_share', 'operating_expenses', 'operating_cash_flow', 'source_url', 'source_form'] | |
| for key in key_fields: | |
| if key not in formatted: | |
| formatted[key] = None | |
| # No longer perform unit conversion, keep original values | |
| # Format EPS, keep two decimal places | |
| if 'earnings_per_share' in formatted and isinstance(formatted['earnings_per_share'], (int, float)): | |
| formatted['earnings_per_share'] = round(formatted['earnings_per_share'], 2) | |
| return formatted | |