Spaces:
Runtime error
Runtime error
| """EDGAR API Client Module with Performance Optimization""" | |
| import requests | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| import urllib3 | |
| try: | |
| from sec_edgar_api.EdgarClient import EdgarClient | |
| except ImportError: | |
| EdgarClient = None | |
| import json | |
| import time | |
| import threading | |
| from functools import lru_cache | |
| from datetime import datetime, timedelta | |
| import re | |
| import difflib | |
| import ssl | |
| class EdgarDataClient: | |
| # Class-level cache for company_tickers.json (shared across instances) | |
| _company_tickers_cache = None | |
| _company_tickers_cache_time = None | |
| _company_tickers_cache_ttl = 3600 # 1 hour TTL | |
| _cache_lock = threading.Lock() | |
| # Class-level rate limiter (SEC requires max 10 requests per second) | |
| _last_request_time = 0 | |
| _rate_limit_lock = threading.Lock() | |
| _min_request_interval = 0.11 # 110ms between requests (9 req/sec, safe margin) | |
| # 新增:公司索引(加速搜索,避免每次遍历全量数据) | |
| _by_ticker = None # ticker -> company info | |
| _by_title = None # title (lowercase) -> company info | |
| _by_title_norm = None # normalized title -> company info | |
| _all_keys = None # 用于模糊匹配的所有key列表 | |
| _index_built_time = None | |
| _index_ttl = 3600 # 1 hour | |
| # 新增:常见别名映射(提升搜索智能性) | |
| _alias_map = { | |
| "google": "alphabet inc", | |
| "alphabet": "alphabet inc", | |
| "facebook": "meta platforms, inc.", | |
| "meta": "meta platforms, inc.", | |
| "amazon": "amazon.com, inc.", | |
| "apple": "apple inc.", | |
| "microsoft": "microsoft corporation", | |
| "netflix": "netflix, inc.", | |
| "nvidia": "nvidia corporation", | |
| "tesla": "tesla, inc.", | |
| "adobe": "adobe inc.", | |
| "oracle": "oracle corporation", | |
| "ibm": "international business machines corporation", | |
| "paypal": "paypal holdings, inc.", | |
| "shopify": "shopify inc.", | |
| } | |
| def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"): | |
| """Initialize EDGAR client with connection pooling and timeout""" | |
| self.user_agent = user_agent | |
| # 新增:实例级搜索缓存(进一步减少重复搜索开销) | |
| self._search_cache = {} | |
| # Configure requests session with connection pooling | |
| self.session = requests.Session() | |
| # Disable SSL warnings for compatibility | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| # Configure retry strategy with more aggressive retry for SSL errors | |
| retry_strategy = Retry( | |
| total=5, # Increased from 3 to 5 | |
| backoff_factor=1, | |
| status_forcelist=[429, 500, 502, 503, 504], | |
| allowed_methods=["HEAD", "GET", "OPTIONS"], | |
| raise_on_status=False # Don't raise on retry-able status codes | |
| ) | |
| adapter = HTTPAdapter( | |
| pool_connections=10, | |
| pool_maxsize=20, | |
| max_retries=retry_strategy, | |
| pool_block=False | |
| ) | |
| self.session.mount("http://", adapter) | |
| self.session.mount("https://", adapter) | |
| # Set default timeout (connect timeout, read timeout) | |
| self.timeout = (10, 30) # 10s connect, 30s read | |
| self.thread_timeout = 40 # Total timeout for thread-based operations | |
| # Initialize sec_edgar_api client with timeout wrapper | |
| if EdgarClient: | |
| self.edgar = EdgarClient(user_agent=user_agent) | |
| # Monkey patch to add timeout | |
| self._patch_edgar_client_timeout() | |
| else: | |
| self.edgar = None | |
| def _patch_edgar_client_timeout(self): | |
| """Monkey patch sec_edgar_api to add timeout support""" | |
| if not self.edgar: | |
| return | |
| # Wrap get_submissions and get_company_facts with timeout (thread-based, Gradio compatible) | |
| original_get_submissions = self.edgar.get_submissions | |
| original_get_company_facts = self.edgar.get_company_facts | |
| def get_submissions_with_timeout(cik): | |
| """Thread-based timeout wrapper for get_submissions (Gradio compatible)""" | |
| result = [None] | |
| exception = [None] | |
| def wrapper(): | |
| try: | |
| result[0] = original_get_submissions(cik) | |
| except Exception as e: | |
| exception[0] = e | |
| thread = threading.Thread(target=wrapper, daemon=True) | |
| thread.start() | |
| thread.join(timeout=self.thread_timeout) | |
| if thread.is_alive(): | |
| raise TimeoutError(f"SEC API request timeout ({self.thread_timeout}s)") | |
| if exception[0]: | |
| raise exception[0] | |
| return result[0] | |
| def get_company_facts_with_timeout(cik): | |
| """Thread-based timeout wrapper for get_company_facts (Gradio compatible)""" | |
| result = [None] | |
| exception = [None] | |
| def wrapper(): | |
| try: | |
| result[0] = original_get_company_facts(cik) | |
| except Exception as e: | |
| exception[0] = e | |
| thread = threading.Thread(target=wrapper, daemon=True) | |
| thread.start() | |
| thread.join(timeout=self.thread_timeout) | |
| if thread.is_alive(): | |
| raise TimeoutError(f"SEC API request timeout ({self.thread_timeout}s)") | |
| if exception[0]: | |
| raise exception[0] | |
| return result[0] | |
| self.edgar.get_submissions = get_submissions_with_timeout | |
| self.edgar.get_company_facts = get_company_facts_with_timeout | |
| def _rate_limit(self): | |
| """Thread-safe rate limiting to comply with SEC requirements""" | |
| with self._rate_limit_lock: | |
| current_time = time.time() | |
| time_since_last = current_time - EdgarDataClient._last_request_time | |
| if time_since_last < self._min_request_interval: | |
| sleep_time = self._min_request_interval - time_since_last | |
| time.sleep(sleep_time) | |
| EdgarDataClient._last_request_time = time.time() | |
| def _normalize_text(self, s: str) -> str: | |
| """规范化文本:用于提升匹配准确度""" | |
| if not s: | |
| return "" | |
| s = s.lower().strip() | |
| s = s.replace("&", " and ") | |
| s = re.sub(r"[.,()\-_/]", " ", s) | |
| s = re.sub(r"\s+", " ", s) | |
| # 移除常见后缀词 | |
| stopwords = {"inc", "inc.", "incorporated", "corp", "corporation", "co", "company", "plc", "ltd", "llc", "the"} | |
| tokens = [t for t in s.split() if t not in stopwords] | |
| return " ".join(tokens).strip() | |
| def _ensure_company_index(self): | |
| """确保公司索引已构建(按需构建或过期重建)""" | |
| with self._cache_lock: | |
| current_time = time.time() | |
| # 若 company_tickers 缓存不存在或已过期,先刷新 | |
| if (EdgarDataClient._company_tickers_cache is None or | |
| EdgarDataClient._company_tickers_cache_time is None or | |
| current_time - EdgarDataClient._company_tickers_cache_time >= self._company_tickers_cache_ttl): | |
| # 拉取并更新 company_tickers 缓存 | |
| self._rate_limit() | |
| url = "https://www.sec.gov/files/company_tickers.json" | |
| headers = {"User-Agent": self.user_agent} | |
| response = self.session.get(url, headers=headers, timeout=self.timeout) | |
| response.raise_for_status() | |
| companies = response.json() | |
| EdgarDataClient._company_tickers_cache = companies | |
| EdgarDataClient._company_tickers_cache_time = current_time | |
| else: | |
| companies = EdgarDataClient._company_tickers_cache | |
| # 若索引不存在或已过期,则重建索引 | |
| if (EdgarDataClient._by_ticker is None or | |
| EdgarDataClient._by_title is None or | |
| EdgarDataClient._by_title_norm is None or | |
| EdgarDataClient._all_keys is None or | |
| EdgarDataClient._index_built_time is None or | |
| current_time - EdgarDataClient._index_built_time >= EdgarDataClient._index_ttl): | |
| by_ticker = {} | |
| by_title = {} | |
| by_title_norm = {} | |
| all_keys = [] | |
| for _, company in companies.items(): | |
| title = company.get("title", "") | |
| ticker = company.get("ticker", "") | |
| cik_str = str(company.get("cik_str", "")).zfill(10) | |
| title_lower = title.lower() | |
| ticker_lower = ticker.lower() | |
| title_norm = self._normalize_text(title) | |
| # 构建索引:ticker、title、normalized title | |
| if ticker_lower: | |
| by_ticker[ticker_lower] = {"cik": cik_str, "name": title, "ticker": ticker} | |
| all_keys.append(ticker_lower) | |
| if title_lower: | |
| by_title[title_lower] = {"cik": cik_str, "name": title, "ticker": ticker} | |
| if title_norm: | |
| by_title_norm[title_norm] = {"cik": cik_str, "name": title, "ticker": ticker} | |
| all_keys.append(title_norm) | |
| EdgarDataClient._by_ticker = by_ticker | |
| EdgarDataClient._by_title = by_title | |
| EdgarDataClient._by_title_norm = by_title_norm | |
| EdgarDataClient._all_keys = all_keys | |
| EdgarDataClient._index_built_time = current_time | |
| def search_company_by_name(self, company_name): | |
| """Search company CIK by company name with caching and optimized ticker matching""" | |
| try: | |
| # 实例级缓存命中检查(按规范化后的query) | |
| norm_query = self._normalize_text(company_name) | |
| cache_hit = self._search_cache.get(norm_query) | |
| if cache_hit: | |
| return cache_hit | |
| # 确保索引已构建(首次或过期后会重建) | |
| self._ensure_company_index() | |
| # 获取索引引用(已在锁内构建完成) | |
| by_ticker = EdgarDataClient._by_ticker | |
| by_title = EdgarDataClient._by_title | |
| by_title_norm = EdgarDataClient._by_title_norm | |
| all_keys = EdgarDataClient._all_keys | |
| # ✅ OPTIMIZATION 1: Ticker 优先匹配(遵循项目规范) | |
| raw = company_name.strip().lower() | |
| raw_compact = re.sub(r"[^a-z0-9]", "", raw) | |
| is_ticker_like = len(raw_compact) <= 5 and len(raw_compact) >= 1 | |
| if is_ticker_like and raw_compact in by_ticker: | |
| result = by_ticker[raw_compact] | |
| self._search_cache[norm_query] = result | |
| return result | |
| # ✅ OPTIMIZATION 2: 别名映射(如 'google' -> 'alphabet inc') | |
| alias_target = EdgarDataClient._alias_map.get(norm_query) | |
| if alias_target: | |
| alias_norm = self._normalize_text(alias_target) | |
| # 先尝试规范化标题 | |
| if alias_norm in by_title_norm: | |
| result = by_title_norm[alias_norm] | |
| self._search_cache[norm_query] = result | |
| return result | |
| # 再尝试原始标题 | |
| alias_lower = alias_target.lower() | |
| if alias_lower in by_title: | |
| result = by_title[alias_lower] | |
| self._search_cache[norm_query] = result | |
| return result | |
| # 最后尝试 ticker(有些别名可能实际上是ticker) | |
| alias_ticker = re.sub(r"[^a-z0-9]", "", alias_lower) | |
| if alias_ticker in by_ticker: | |
| result = by_ticker[alias_ticker] | |
| self._search_cache[norm_query] = result | |
| return result | |
| # ✅ OPTIMIZATION 3: 精确匹配(原始标题) | |
| title_lower = company_name.lower().strip() | |
| if title_lower in by_title: | |
| result = by_title[title_lower] | |
| self._search_cache[norm_query] = result | |
| return result | |
| # ✅ OPTIMIZATION 4: 精确匹配(规范化标题) | |
| if norm_query in by_title_norm: | |
| result = by_title_norm[norm_query] | |
| self._search_cache[norm_query] = result | |
| return result | |
| # ✅ OPTIMIZATION 5: 精确匹配(ticker,再次尝试原始输入) | |
| if raw_compact in by_ticker: | |
| result = by_ticker[raw_compact] | |
| self._search_cache[norm_query] = result | |
| return result | |
| # ✅ OPTIMIZATION 6: 部分包含匹配 | |
| partial_matches = [] | |
| for key in by_title_norm.keys(): | |
| if norm_query in key: | |
| partial_matches.append(key) | |
| if not partial_matches: | |
| for t in by_ticker.keys(): | |
| if norm_query in t: | |
| partial_matches.append(t) | |
| if partial_matches: | |
| best_key = max( | |
| partial_matches, | |
| key=lambda k: difflib.SequenceMatcher(None, norm_query, k).ratio() | |
| ) | |
| result = by_title_norm.get(best_key) or by_ticker.get(best_key) | |
| if result: | |
| self._search_cache[norm_query] = result | |
| return result | |
| # ✅ OPTIMIZATION 7: 模糊匹配(difflib,用于拼写近似的情况) | |
| close = difflib.get_close_matches(norm_query, all_keys, n=1, cutoff=0.78) | |
| if close: | |
| best = close[0] | |
| result = by_title_norm.get(best) or by_ticker.get(best) | |
| if result: | |
| self._search_cache[norm_query] = result | |
| return result | |
| # 未找到 | |
| return None | |
| except TimeoutError as e: | |
| print(f"Timeout searching company: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"Error searching company: {e}") | |
| return None | |
| def get_company_info(self, cik): | |
| """ | |
| Get basic company information (cached) | |
| Args: | |
| cik (str): Company CIK code | |
| Returns: | |
| dict: Dictionary containing company information | |
| """ | |
| if not self.edgar: | |
| print("sec_edgar_api library not installed") | |
| return None | |
| try: | |
| self._rate_limit() | |
| # Get company submissions (now has timeout protection) | |
| submissions = self.edgar.get_submissions(cik=cik) | |
| return { | |
| "cik": cik, | |
| "name": submissions.get("name", ""), | |
| "tickers": submissions.get("tickers", []), | |
| "sic": submissions.get("sic", ""), | |
| "sic_description": submissions.get("sicDescription", "") | |
| } | |
| except TimeoutError as e: | |
| print(f"Timeout getting company info for CIK {cik}: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"Error getting company info: {e}") | |
| return None | |
| def get_company_filings(self, cik, form_types=None): | |
| """ | |
| Get all company filing documents (cached) | |
| Args: | |
| cik (str): Company CIK code | |
| form_types (tuple): Tuple of form types, e.g., ('10-K', '10-Q'), None for all types | |
| Returns: | |
| list: List of filing documents | |
| """ | |
| if not self.edgar: | |
| print("sec_edgar_api library not installed") | |
| return [] | |
| # Convert list to tuple for caching (lists are not hashable) | |
| if form_types and isinstance(form_types, list): | |
| form_types = tuple(form_types) | |
| try: | |
| self._rate_limit() | |
| # Get company submissions (now has timeout protection) | |
| submissions = self.edgar.get_submissions(cik=cik) | |
| # Extract filing information | |
| filings = [] | |
| recent = submissions.get("filings", {}).get("recent", {}) | |
| # Get data from each field | |
| form_types_list = recent.get("form", []) | |
| filing_dates = recent.get("filingDate", []) | |
| accession_numbers = recent.get("accessionNumber", []) | |
| primary_documents = recent.get("primaryDocument", []) | |
| # Iterate through all filings | |
| for i in range(len(form_types_list)): | |
| form_type = form_types_list[i] | |
| # Normalize form type (e.g., "10-K/A" -> "10-K", "20-F/A" -> "20-F") | |
| normalized_form_type = form_type.split('/')[0] | |
| # Filter by form type if specified (using normalized type) | |
| if form_types and normalized_form_type not in form_types: | |
| continue | |
| filing_date = filing_dates[i] if i < len(filing_dates) else "" | |
| accession_number = accession_numbers[i] if i < len(accession_numbers) else "" | |
| primary_document = primary_documents[i] if i < len(primary_documents) else "" | |
| filing = { | |
| "form_type": form_type, | |
| "filing_date": filing_date, | |
| "accession_number": accession_number, | |
| "primary_document": primary_document | |
| } | |
| filings.append(filing) | |
| return filings | |
| except TimeoutError as e: | |
| print(f"Timeout getting company filings for CIK {cik}: {e}") | |
| return [] | |
| except Exception as e: | |
| print(f"Error getting company filings: {e}") | |
| return [] | |
| def get_company_facts(self, cik): | |
| """ | |
| Get all company financial facts data (cached) | |
| Args: | |
| cik (str): Company CIK code | |
| Returns: | |
| dict: Company financial facts data | |
| """ | |
| if not self.edgar: | |
| print("sec_edgar_api library not installed") | |
| return {} | |
| try: | |
| self._rate_limit() | |
| # Now has timeout protection via monkey patch | |
| facts = self.edgar.get_company_facts(cik=cik) | |
| return facts | |
| except TimeoutError as e: | |
| print(f"Timeout getting company facts for CIK {cik}: {e}") | |
| return {} | |
| except Exception as e: | |
| print(f"Error getting company facts: {e}") | |
| return {} | |
| def get_financial_data_for_period(self, cik, period): | |
| """ | |
| Get financial data for a specific period (supports annual and quarterly) - Cached | |
| Args: | |
| cik (str): Company CIK code | |
| period (str): Period in format 'YYYY' or 'YYYYQX' (e.g., '2025' or '2025Q3') | |
| Returns: | |
| dict: Financial data dictionary | |
| """ | |
| if not self.edgar: | |
| print("sec_edgar_api library not installed") | |
| return {} | |
| # 实例级缓存(避免重复计算) | |
| cache_key = f"period_{cik}_{period}" | |
| if hasattr(self, '_period_cache') and cache_key in self._period_cache: | |
| return self._period_cache[cache_key] | |
| if not hasattr(self, '_period_cache'): | |
| self._period_cache = {} | |
| try: | |
| # Get company financial facts | |
| facts = self.get_company_facts(cik) | |
| if not facts: | |
| return {} | |
| # Extract us-gaap and ifrs-full financial data (20-F may use IFRS) | |
| us_gaap = facts.get("facts", {}).get("us-gaap", {}) | |
| ifrs_full = facts.get("facts", {}).get("ifrs-full", {}) | |
| # Define financial metrics and their XBRL tags | |
| # Include multiple possible tags to improve match rate (including US-GAAP and IFRS tags) | |
| financial_metrics = { | |
| "total_revenue": ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "RevenueFromContractWithCustomerIncludingAssessedTax", "SalesRevenueNet", "RevenueFromContractWithCustomer", "Revenue"], | |
| "net_income": ["NetIncomeLoss", "ProfitLoss", "NetIncome", "ProfitLossAttributableToOwnersOfParent"], | |
| "earnings_per_share": ["EarningsPerShareBasic", "EarningsPerShare", "BasicEarningsPerShare", "BasicEarningsLossPerShare"], | |
| "operating_expenses": ["OperatingExpenses", "OperatingCostsAndExpenses", "OperatingExpensesExcludingDepreciationAndAmortization", "CostsAndExpenses", "GeneralAndAdministrativeExpense", "CostOfRevenue", "ResearchAndDevelopmentExpense", "SellingAndMarketingExpense"], | |
| "operating_cash_flow": ["NetCashProvidedByUsedInOperatingActivities", "NetCashProvidedUsedInOperatingActivities", "NetCashFlowsFromUsedInOperatingActivities", "CashFlowsFromUsedInOperatingActivities"], | |
| } | |
| # Store result | |
| result = {"period": period} | |
| # Determine target form types to search | |
| if 'Q' in period: | |
| # Quarterly data, mainly search 10-Q (20-F usually doesn't have quarterly reports) | |
| target_forms = ("10-Q",) # Use tuple for caching | |
| target_forms_annual = ("10-K", "20-F") # for fallback | |
| year = int(period.split('Q')[0]) | |
| quarter = period.split('Q')[1] | |
| else: | |
| # Annual data, search 10-K and 20-F annual forms | |
| target_forms = ("10-K", "20-F") # Use tuple for caching | |
| target_forms_annual = target_forms | |
| year = int(period) | |
| quarter = None | |
| # Get company filings to find accession number and primary document | |
| filings = self.get_company_filings(cik, form_types=target_forms) | |
| filings_map = {} # Map: form -> {accession_number, primary_document, filing_date} | |
| # Build filing map for quick lookup | |
| for filing in filings: | |
| form_type = filing.get("form_type", "") | |
| filing_date = filing.get("filing_date", "") | |
| accession_number = filing.get("accession_number", "") | |
| primary_document = filing.get("primary_document", "") | |
| if filing_date and accession_number: | |
| # Extract year from filing_date (format: YYYY-MM-DD) | |
| file_year = int(filing_date[:4]) if len(filing_date) >= 4 else 0 | |
| # Normalize form type (e.g., "10-K/A" -> "10-K") for consistent key matching | |
| normalized_form_type = form_type.split('/')[0] | |
| # Store filing - allow filing_date year to differ from fiscal year | |
| # (20-F/10-K for FY2024 may be filed in 2025) | |
| # We'll match by fiscal year in the facts data instead | |
| # Use normalized form type in key to ensure amended forms (10-K/A) match | |
| key = f"{normalized_form_type}_{file_year}" | |
| if key not in filings_map: | |
| filings_map[key] = { | |
| "accession_number": accession_number, | |
| "primary_document": primary_document, | |
| "form_type": form_type, # Keep original form_type for reference | |
| "filing_date": filing_date, | |
| "file_year": file_year | |
| } | |
| # Iterate through each financial metric | |
| for metric_key, metric_tags in financial_metrics.items(): | |
| # Support multiple possible tags | |
| for metric_tag in metric_tags: | |
| # Search both US-GAAP and IFRS tags | |
| metric_data = None | |
| data_source = None | |
| if metric_tag in us_gaap: | |
| metric_data = us_gaap[metric_tag] | |
| data_source = "us-gaap" | |
| elif metric_tag in ifrs_full: | |
| metric_data = ifrs_full[metric_tag] | |
| data_source = "ifrs-full" | |
| if metric_data: | |
| units = metric_data.get("units", {}) | |
| # Find USD unit data (supports USD and USD/shares) | |
| usd_data = None | |
| if "USD" in units: | |
| usd_data = units["USD"] | |
| elif "USD/shares" in units and metric_key == "earnings_per_share": | |
| # EPS uses USD/shares unit | |
| usd_data = units["USD/shares"] | |
| if usd_data: | |
| # Try exact match first, then loose match | |
| matched_entry = None | |
| # Search for data in the specified period | |
| for entry in usd_data: | |
| form = entry.get("form", "") | |
| fy = entry.get("fy", 0) | |
| fp = entry.get("fp", "") | |
| end_date = entry.get("end", "") | |
| if not end_date or len(end_date) < 4: | |
| continue | |
| entry_year = int(end_date[:4]) | |
| # Check if form type matches | |
| if form in target_forms: | |
| if quarter: | |
| # Quarterly data match | |
| if entry_year == year and fp == f"Q{quarter}": | |
| # If already matched, compare end date, choose the latest | |
| if matched_entry: | |
| if entry.get("end", "") > matched_entry.get("end", ""): | |
| matched_entry = entry | |
| else: | |
| matched_entry = entry | |
| else: | |
| # Annual data match - prioritize fiscal year (fy) field | |
| # Strategy 1: Exact match by fiscal year | |
| if fy == year and (fp == "FY" or fp == "" or not fp): | |
| # If already matched, compare end date, choose the latest | |
| if matched_entry: | |
| if entry.get("end", "") > matched_entry.get("end", ""): | |
| matched_entry = entry | |
| else: | |
| matched_entry = entry | |
| # Strategy 2: Match by end date year (when fy not available or doesn't match) | |
| elif not matched_entry and entry_year == year and (fp == "FY" or fp == "" or not fp): | |
| matched_entry = entry | |
| # Strategy 3: Allow fy to differ by 1 year (fiscal year vs calendar year mismatch) | |
| elif not matched_entry and fy > 0 and abs(fy - year) <= 1 and (fp == "FY" or fp == "" or not fp): | |
| matched_entry = entry | |
| # Strategy 4: Match by frame field for 20-F | |
| elif not matched_entry and form == "20-F" and "frame" in entry: | |
| frame = entry.get("frame", "") | |
| if f"CY{year}" in frame or str(year) in end_date: | |
| matched_entry = entry | |
| # If quarterly data not found, try finding from annual report (fallback strategy) | |
| if not matched_entry and quarter and target_forms_annual: | |
| for entry in usd_data: | |
| form = entry.get("form", "") | |
| end_date = entry.get("end", "") | |
| fp = entry.get("fp", "") | |
| if form in target_forms_annual and end_date: | |
| # Check if end date is within this quarter range | |
| if str(year) in end_date and f"Q{quarter}" in fp: | |
| matched_entry = entry | |
| break | |
| # Apply matched data | |
| if matched_entry: | |
| result[metric_key] = matched_entry.get("val", 0) | |
| # Get form and accession info | |
| form_type = matched_entry.get("form", "") | |
| filed_date = matched_entry.get('filed', '') # Filing date from facts | |
| accn_from_facts = matched_entry.get('accn', '').replace('-', '') | |
| # Try to match filing by accession number OR by form and filed year | |
| filing_info = None | |
| # Strategy 1: Try exact filing_date year match | |
| filing_key = f"{form_type}_{year}" | |
| filing_info = filings_map.get(filing_key) | |
| # Strategy 2: Try filed year from facts (20-F usually filed next year) | |
| if not filing_info and filed_date and len(filed_date) >= 4: | |
| filed_year = int(filed_date[:4]) | |
| filing_key = f"{form_type}_{filed_year}" | |
| filing_info = filings_map.get(filing_key) | |
| # Strategy 3: Try year+1 (for 20-F filed in following year) | |
| if not filing_info: | |
| filing_key = f"{form_type}_{year + 1}" | |
| filing_info = filings_map.get(filing_key) | |
| if filing_info: | |
| # Use filing info from get_company_filings | |
| accession_number = filing_info["accession_number"].replace('-', '') | |
| primary_document = filing_info["primary_document"] | |
| # Generate complete source URL | |
| if primary_document: | |
| result["source_url"] = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_number}/{primary_document}" | |
| else: | |
| result["source_url"] = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100" | |
| else: | |
| # Fallback to company browse page if filing not found | |
| result["source_url"] = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100" | |
| result["source_form"] = form_type | |
| result["data_source"] = data_source | |
| # Add detailed information | |
| result[f"{metric_key}_details"] = { | |
| "tag": metric_tag, | |
| "form": matched_entry.get("form", ""), | |
| "fy": matched_entry.get("fy", 0), | |
| "fp": matched_entry.get("fp", ""), | |
| "val": matched_entry.get("val", 0), | |
| "start": matched_entry.get("start", ""), | |
| "end": matched_entry.get("end", ""), | |
| "accn": matched_entry.get("accn", ""), | |
| "filed": matched_entry.get("filed", ""), | |
| "frame": matched_entry.get("frame", ""), | |
| "data_source": data_source | |
| } | |
| # If data is found, break out of tag loop | |
| if metric_key in result: | |
| break | |
| # 缓存结果 | |
| if result and "period" in result: | |
| self._period_cache[cache_key] = result | |
| return result | |
| except Exception as e: | |
| print(f"Error getting financial data for period {period}: {e}") | |
| return {} | |