Spaces:
Sleeping
Sleeping
| """EDGAR API Client Module""" | |
| import requests | |
| from requests.adapters import HTTPAdapter | |
| from requests.packages.urllib3.util.retry import Retry | |
| try: | |
| from sec_edgar_api.EdgarClient import EdgarClient | |
| except ImportError: | |
| EdgarClient = None | |
| import json | |
| import time | |
| from functools import wraps | |
| import threading | |
| class EdgarDataClient: | |
| def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"): | |
| """Initialize EDGAR client""" | |
| self.user_agent = user_agent | |
| self.last_request_time = 0 | |
| self.min_request_interval = 0.11 # SEC allows 10 requests/second, use 0.11s to be safe | |
| self.request_timeout = 45 # Increased from 30 to 45 seconds for better reliability | |
| self.max_retries = 3 # Maximum retry attempts | |
| self._lock = threading.Lock() # Thread-safe rate limiting | |
| # Configure requests session with connection pooling and retry logic | |
| self.session = requests.Session() | |
| retry_strategy = Retry( | |
| total=3, | |
| backoff_factor=1, | |
| status_forcelist=[429, 500, 502, 503, 504], | |
| allowed_methods=["HEAD", "GET", "OPTIONS"] | |
| ) | |
| adapter = HTTPAdapter( | |
| max_retries=retry_strategy, | |
| pool_connections=10, | |
| pool_maxsize=20, | |
| pool_block=False | |
| ) | |
| self.session.mount("http://", adapter) | |
| self.session.mount("https://", adapter) | |
| self.session.headers.update({"User-Agent": user_agent}) | |
| # Cache for frequently accessed data | |
| self._company_cache = {} # Cache company info to avoid repeated calls | |
| self._cache_ttl = 600 # Increased from 300 to 600 seconds (10 minutes) for better performance | |
| self._tickers_cache_ttl = 7200 # Increased from 3600 to 7200 seconds (2 hours) | |
| self._cache_timestamps = {} | |
| # Fast lookup indexes for company tickers | |
| self._ticker_index = {} # ticker -> company data | |
| self._cik_index = {} # cik -> company data | |
| self._name_lower_index = {} # lowercase name -> company data | |
| self._name_prefix_index = {} # name prefix (3 chars) -> list of company data | |
| self._ticker_prefix_index = {} # ticker prefix (2 chars) -> list of company data | |
| self._alias_index = {} # common aliases -> company data | |
| self._index_loaded = False | |
| # Search result cache (for performance) | |
| self._search_cache = {} # search_key -> result | |
| self._search_cache_max_size = 1000 # Limit cache size | |
| # Layer 3: Period data cache (avoid re-parsing XBRL for same period) | |
| self._period_cache = {} # period_key -> financial data | |
| self._period_cache_timestamps = {} # period_key -> timestamp | |
| self._period_cache_ttl = 1800 # 30 minutes cache (financial data changes rarely) | |
| self._period_cache_max_size = 1000 # Limit cache size | |
| # Common company aliases for intelligent search | |
| self._company_aliases = { | |
| 'google': ['GOOGL', 'GOOG'], | |
| 'facebook': ['META'], | |
| 'meta': ['META'], | |
| 'apple': ['AAPL'], | |
| 'microsoft': ['MSFT'], | |
| 'amazon': ['AMZN'], | |
| 'tesla': ['TSLA'], | |
| 'nvidia': ['NVDA'], | |
| 'netflix': ['NFLX'], | |
| 'alphabet': ['GOOGL', 'GOOG'], | |
| 'twitter': ['TWTR'], # Historical | |
| 'intel': ['INTC'], | |
| 'amd': ['AMD'], | |
| 'oracle': ['ORCL'], | |
| 'salesforce': ['CRM'], | |
| 'adobe': ['ADBE'], | |
| 'cisco': ['CSCO'], | |
| 'ibm': ['IBM'], | |
| 'walmart': ['WMT'], | |
| 'disney': ['DIS'], | |
| 'nike': ['NKE'], | |
| 'mcdonalds': ['MCD'], | |
| 'coca cola': ['KO'], | |
| 'pepsi': ['PEP'], | |
| 'starbucks': ['SBUX'], | |
| 'boeing': ['BA'], | |
| 'ge': ['GE'], | |
| 'general electric': ['GE'], | |
| 'ford': ['F'], | |
| 'gm': ['GM'], | |
| 'general motors': ['GM'], | |
| 'jpmorgan': ['JPM'], | |
| 'goldman': ['GS'], | |
| 'goldman sachs': ['GS'], | |
| 'morgan stanley': ['MS'], | |
| 'bank of america': ['BAC'], | |
| 'wells fargo': ['WFC'], | |
| 'visa': ['V'], | |
| 'mastercard': ['MA'], | |
| 'berkshire': ['BRK.B', 'BRK.A'], | |
| 'exxon': ['XOM'], | |
| 'chevron': ['CVX'], | |
| 'pfizer': ['PFE'], | |
| 'johnson': ['JNJ'], | |
| 'merck': ['MRK'], | |
| } | |
| if EdgarClient: | |
| self.edgar = EdgarClient(user_agent=user_agent) | |
| else: | |
| self.edgar = None | |
| def _rate_limit(self): | |
| """Thread-safe rate limiting to comply with SEC API limits (10 requests/second)""" | |
| with self._lock: | |
| current_time = time.time() | |
| time_since_last_request = current_time - self.last_request_time | |
| if time_since_last_request < self.min_request_interval: | |
| sleep_time = self.min_request_interval - time_since_last_request | |
| time.sleep(sleep_time) | |
| self.last_request_time = time.time() | |
| def _is_cache_valid(self, cache_key): | |
| """Check if cache entry is still valid""" | |
| if cache_key not in self._cache_timestamps: | |
| return False | |
| age = time.time() - self._cache_timestamps[cache_key] | |
| # Use longer TTL for company tickers list | |
| ttl = self._tickers_cache_ttl if cache_key == "company_tickers_json" else self._cache_ttl | |
| return age < ttl | |
| def _get_cached(self, cache_key): | |
| """Get cached data if valid""" | |
| if self._is_cache_valid(cache_key): | |
| return self._company_cache.get(cache_key) | |
| return None | |
| def _set_cache(self, cache_key, data): | |
| """Set cache data with timestamp""" | |
| self._company_cache[cache_key] = data | |
| self._cache_timestamps[cache_key] = time.time() | |
| def _make_request_with_retry(self, url, headers=None, use_session=True): | |
| """Make HTTP request with retry logic and timeout""" | |
| if headers is None: | |
| headers = {"User-Agent": self.user_agent} | |
| for attempt in range(self.max_retries): | |
| try: | |
| self._rate_limit() | |
| if use_session: | |
| response = self.session.get(url, headers=headers, timeout=self.request_timeout) | |
| else: | |
| response = requests.get(url, headers=headers, timeout=self.request_timeout) | |
| response.raise_for_status() | |
| return response | |
| except requests.exceptions.Timeout: | |
| print(f"Request timeout (attempt {attempt + 1}/{self.max_retries}): {url}") | |
| if attempt == self.max_retries - 1: | |
| raise | |
| time.sleep(2 ** attempt) # Exponential backoff | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 429: # Too Many Requests | |
| wait_time = 2 ** attempt | |
| print(f"Rate limited, waiting {wait_time}s (attempt {attempt + 1}/{self.max_retries})") | |
| time.sleep(wait_time) | |
| if attempt == self.max_retries - 1: | |
| raise | |
| else: | |
| raise | |
| except Exception as e: | |
| print(f"Request error (attempt {attempt + 1}/{self.max_retries}): {e}") | |
| if attempt == self.max_retries - 1: | |
| raise | |
| time.sleep(2 ** attempt) | |
| return None | |
| def _load_company_tickers(self, force_refresh=False): | |
| """Load and index company tickers data""" | |
| cache_key = "company_tickers_json" | |
| # Check if already loaded and cache is valid | |
| if self._index_loaded and not force_refresh and self._is_cache_valid(cache_key): | |
| return self._get_cached(cache_key) | |
| # Check cache first | |
| companies = self._get_cached(cache_key) if not force_refresh else None | |
| if not companies: | |
| try: | |
| # Download company tickers | |
| url = "https://www.sec.gov/files/company_tickers.json" | |
| print(f"Downloading company tickers from SEC...") | |
| response = self._make_request_with_retry(url) | |
| if not response: | |
| print("Failed to download company tickers") | |
| return None | |
| companies = response.json() | |
| # Cache for 1 hour | |
| self._set_cache(cache_key, companies) | |
| print(f"Loaded {len(companies)} companies") | |
| except Exception as e: | |
| print(f"Error loading company tickers: {e}") | |
| return None | |
| else: | |
| print(f"Using cached company tickers ({len(companies)} companies)") | |
| # Build fast lookup indexes | |
| self._ticker_index = {} | |
| self._cik_index = {} | |
| self._name_lower_index = {} | |
| self._name_prefix_index = {} | |
| self._ticker_prefix_index = {} | |
| self._alias_index = {} | |
| for _, company in companies.items(): | |
| cik = str(company["cik_str"]).zfill(10) | |
| ticker = company["ticker"] | |
| name = company["title"] | |
| company_data = { | |
| "cik": cik, | |
| "name": name, | |
| "ticker": ticker | |
| } | |
| # Index by ticker (lowercase for case-insensitive) | |
| ticker_lower = ticker.lower() | |
| self._ticker_index[ticker_lower] = company_data | |
| # Index by CIK | |
| self._cik_index[cik] = company_data | |
| # Index by exact name (lowercase) | |
| name_lower = name.lower() | |
| self._name_lower_index[name_lower] = company_data | |
| # Build prefix indexes for faster partial matching | |
| # Name prefix index (use 3-character prefixes) | |
| if len(name_lower) >= 3: | |
| for i in range(len(name_lower) - 2): | |
| prefix = name_lower[i:i+3] | |
| if prefix not in self._name_prefix_index: | |
| self._name_prefix_index[prefix] = [] | |
| self._name_prefix_index[prefix].append(company_data) | |
| # Ticker prefix index (use 2-character prefixes for tickers) | |
| if len(ticker_lower) >= 2: | |
| prefix = ticker_lower[:2] | |
| if prefix not in self._ticker_prefix_index: | |
| self._ticker_prefix_index[prefix] = [] | |
| self._ticker_prefix_index[prefix].append(company_data) | |
| # Build alias index for intelligent search | |
| for alias, tickers in self._company_aliases.items(): | |
| for ticker in tickers: | |
| ticker_lower = ticker.lower() | |
| if ticker_lower in self._ticker_index: | |
| self._alias_index[alias.lower()] = self._ticker_index[ticker_lower] | |
| break # Use first matching ticker | |
| self._index_loaded = True | |
| print(f"Built indexes: {len(self._ticker_index)} tickers, {len(self._cik_index)} CIKs") | |
| print(f"Built prefix indexes: {len(self._name_prefix_index)} name prefixes, {len(self._ticker_prefix_index)} ticker prefixes") | |
| print(f"Built alias index: {len(self._alias_index)} common aliases") | |
| return companies | |
| def get_company_by_cik(self, cik): | |
| """Fast lookup of company info by CIK (from cached tickers)""" | |
| # Ensure data is loaded | |
| self._load_company_tickers() | |
| # Normalize CIK | |
| cik_normalized = str(cik).zfill(10) | |
| # Fast index lookup | |
| return self._cik_index.get(cik_normalized) | |
| def get_company_by_ticker(self, ticker): | |
| """Fast lookup of company info by ticker""" | |
| # Ensure data is loaded | |
| self._load_company_tickers() | |
| # Fast index lookup (case-insensitive) | |
| return self._ticker_index.get(ticker.lower()) | |
| def search_company_by_name(self, company_name): | |
| """Search company CIK by company name with caching and optimized search""" | |
| try: | |
| # Load company tickers and build indexes | |
| companies = self._load_company_tickers() | |
| if not companies: | |
| return None | |
| # Prepare search input | |
| search_name = company_name.lower().strip() | |
| # Check search cache first | |
| cache_key = f"search_{search_name}" | |
| if cache_key in self._search_cache: | |
| return self._search_cache[cache_key].copy() if self._search_cache[cache_key] else None | |
| result = None | |
| # Optimize: Use fast index lookups first | |
| # Priority 1: Exact ticker match (fastest - O(1) hash lookup) | |
| if search_name in self._ticker_index: | |
| result = self._ticker_index[search_name].copy() | |
| # Priority 2: Common alias match (intelligent search - O(1)) | |
| elif search_name in self._alias_index: | |
| result = self._alias_index[search_name].copy() | |
| print(f"Alias match: '{company_name}' → {result.get('ticker')} ({result.get('name')})") | |
| # Priority 3: Exact name match (fast - O(1) hash lookup) | |
| elif search_name in self._name_lower_index: | |
| result = self._name_lower_index[search_name].copy() | |
| # Priority 4: Exact CIK match (fast - O(1) hash lookup) | |
| # Handle CIK input (8-10 digits) | |
| elif search_name.isdigit() and len(search_name) >= 8: | |
| cik_normalized = search_name.zfill(10) | |
| if cik_normalized in self._cik_index: | |
| result = self._cik_index[cik_normalized].copy() | |
| # Priority 5: Prefix-based partial matches (optimized with prefix indexes) | |
| if not result: | |
| result = self._search_with_prefix_index(search_name) | |
| # Cache the result (even if None) | |
| self._cache_search_result(cache_key, result) | |
| return result.copy() if result else None | |
| except Exception as e: | |
| print(f"Error searching company: {e}") | |
| return None | |
| def _search_with_prefix_index(self, search_name): | |
| """Optimized partial match search using prefix indexes""" | |
| candidates = set() | |
| # Strategy 1: Try ticker prefix match if search term looks like ticker | |
| if len(search_name) <= 5: | |
| # Use ticker prefix index | |
| if len(search_name) >= 2: | |
| prefix = search_name[:2] | |
| if prefix in self._ticker_prefix_index: | |
| for company_data in self._ticker_prefix_index[prefix]: | |
| ticker_lower = company_data["ticker"].lower() | |
| if search_name in ticker_lower: | |
| # Exact prefix match in ticker - highest priority | |
| if ticker_lower.startswith(search_name): | |
| return company_data | |
| candidates.add((company_data["cik"], company_data["name"], company_data["ticker"])) | |
| # Strategy 2: Use name prefix index for name-based search | |
| if len(search_name) >= 3: | |
| # Try first 3 characters as prefix | |
| prefix = search_name[:3] | |
| if prefix in self._name_prefix_index: | |
| for company_data in self._name_prefix_index[prefix]: | |
| name_lower = company_data["name"].lower() | |
| # Check if search term is in the name | |
| if search_name in name_lower: | |
| # Exact prefix match - highest priority | |
| if name_lower.startswith(search_name): | |
| return company_data | |
| candidates.add((company_data["cik"], company_data["name"], company_data["ticker"])) | |
| # Strategy 3: If prefix index didn't help (search term in middle of name), | |
| # do limited iteration on a subset of companies | |
| if not candidates and len(search_name) >= 3: | |
| # Only scan companies whose names contain the first 3 chars anywhere | |
| scan_limit = 0 | |
| for prefix_key, company_list in self._name_prefix_index.items(): | |
| if search_name[:3] in prefix_key: | |
| for company_data in company_list: | |
| name_lower = company_data["name"].lower() | |
| ticker_lower = company_data["ticker"].lower() | |
| if search_name in name_lower or search_name in ticker_lower: | |
| candidates.add((company_data["cik"], company_data["name"], company_data["ticker"])) | |
| scan_limit += 1 | |
| if scan_limit > 1000: # Limit scan to avoid performance issues | |
| break | |
| if scan_limit > 1000: | |
| break | |
| # Return first candidate if found | |
| if candidates: | |
| cik, name, ticker = next(iter(candidates)) | |
| return {"cik": cik, "name": name, "ticker": ticker} | |
| return None | |
| def _cache_search_result(self, cache_key, result): | |
| """Cache search result with size limit""" | |
| # Implement LRU-like behavior: if cache is full, clear oldest half | |
| if len(self._search_cache) >= self._search_cache_max_size: | |
| # Simple strategy: clear half of the cache | |
| keys_to_remove = list(self._search_cache.keys())[:self._search_cache_max_size // 2] | |
| for key in keys_to_remove: | |
| del self._search_cache[key] | |
| self._search_cache[cache_key] = result | |
| def _get_period_cache(self, cache_key): | |
| """Get cached period data if valid (Layer 3)""" | |
| if cache_key not in self._period_cache_timestamps: | |
| return None | |
| age = time.time() - self._period_cache_timestamps[cache_key] | |
| if age < self._period_cache_ttl: | |
| return self._period_cache.get(cache_key) | |
| else: | |
| # Expired, remove from cache | |
| self._period_cache.pop(cache_key, None) | |
| self._period_cache_timestamps.pop(cache_key, None) | |
| return None | |
| def _set_period_cache(self, cache_key, result): | |
| """Cache period data with size limit (Layer 3)""" | |
| # LRU-like eviction if cache is full | |
| if len(self._period_cache) >= self._period_cache_max_size: | |
| # Remove oldest half | |
| keys_to_remove = list(self._period_cache.keys())[:self._period_cache_max_size // 2] | |
| for key in keys_to_remove: | |
| self._period_cache.pop(key, None) | |
| self._period_cache_timestamps.pop(key, None) | |
| self._period_cache[cache_key] = result | |
| self._period_cache_timestamps[cache_key] = time.time() | |
| def get_company_info(self, cik): | |
| """ | |
| Get basic company information with caching | |
| Args: | |
| cik (str): Company CIK code | |
| Returns: | |
| dict: Dictionary containing company information | |
| """ | |
| if not self.edgar: | |
| print("sec_edgar_api library not installed") | |
| return None | |
| # Check cache first | |
| cache_key = f"info_{cik}" | |
| cached = self._get_cached(cache_key) | |
| if cached: | |
| return cached | |
| try: | |
| # Add timeout wrapper for sec-edgar-api calls | |
| import signal | |
| def timeout_handler(signum, frame): | |
| raise TimeoutError("SEC API call timeout") | |
| # Set alarm for 45 seconds (only works on Unix-like systems) | |
| try: | |
| signal.signal(signal.SIGALRM, timeout_handler) | |
| signal.alarm(45) # Increased timeout | |
| submissions = self.edgar.get_submissions(cik=cik) | |
| signal.alarm(0) # Cancel alarm | |
| except AttributeError: | |
| # Windows doesn't support SIGALRM, use direct call | |
| submissions = self.edgar.get_submissions(cik=cik) | |
| result = { | |
| "cik": cik, | |
| "name": submissions.get("name", ""), | |
| "tickers": submissions.get("tickers", []), | |
| "sic": submissions.get("sic", ""), | |
| "sic_description": submissions.get("sicDescription", "") | |
| } | |
| # Cache the result | |
| self._set_cache(cache_key, result) | |
| return result | |
| except TimeoutError: | |
| print(f"Timeout getting company info for CIK: {cik}") | |
| return None | |
| except Exception as e: | |
| print(f"Error getting company info: {e}") | |
| return None | |
| def get_company_filings(self, cik, form_types=None): | |
| """ | |
| Get all company filing documents with caching | |
| Args: | |
| cik (str): Company CIK code | |
| form_types (list): List of form types, e.g., ['10-K', '10-Q'], None for all types | |
| Returns: | |
| list: List of filing documents | |
| """ | |
| if not self.edgar: | |
| print("sec_edgar_api library not installed") | |
| return [] | |
| # Check cache first (cache all filings, filter later) | |
| cache_key = f"filings_{cik}" | |
| cached = self._get_cached(cache_key) | |
| if not cached: | |
| try: | |
| # Add timeout wrapper | |
| import signal | |
| def timeout_handler(signum, frame): | |
| raise TimeoutError("SEC API call timeout") | |
| try: | |
| signal.signal(signal.SIGALRM, timeout_handler) | |
| signal.alarm(45) # Increased timeout | |
| submissions = self.edgar.get_submissions(cik=cik) | |
| signal.alarm(0) | |
| except AttributeError: | |
| # Windows fallback | |
| submissions = self.edgar.get_submissions(cik=cik) | |
| # Extract filing information | |
| filings = [] | |
| recent = submissions.get("filings", {}).get("recent", {}) | |
| # Get data from each field | |
| form_types_list = recent.get("form", []) | |
| filing_dates = recent.get("filingDate", []) | |
| accession_numbers = recent.get("accessionNumber", []) | |
| primary_documents = recent.get("primaryDocument", []) | |
| # Iterate through all filings | |
| for i in range(len(form_types_list)): | |
| filing_date = filing_dates[i] if i < len(filing_dates) else "" | |
| accession_number = accession_numbers[i] if i < len(accession_numbers) else "" | |
| primary_document = primary_documents[i] if i < len(primary_documents) else "" | |
| filing = { | |
| "form_type": form_types_list[i], | |
| "filing_date": filing_date, | |
| "accession_number": accession_number, | |
| "primary_document": primary_document | |
| } | |
| filings.append(filing) | |
| # Cache all filings | |
| self._set_cache(cache_key, filings) | |
| cached = filings | |
| except TimeoutError: | |
| print(f"Timeout getting company filings for CIK: {cik}") | |
| return [] | |
| except Exception as e: | |
| print(f"Error getting company filings: {e}") | |
| return [] | |
| # Filter by form type if specified | |
| if form_types: | |
| return [f for f in cached if f.get("form_type") in form_types] | |
| return cached | |
| def get_company_facts(self, cik): | |
| """ | |
| Get all company financial facts data with caching and timeout | |
| Args: | |
| cik (str): Company CIK code | |
| Returns: | |
| dict: Company financial facts data | |
| """ | |
| if not self.edgar: | |
| print("sec_edgar_api library not installed") | |
| return {} | |
| # Check cache first | |
| cache_key = f"facts_{cik}" | |
| cached = self._get_cached(cache_key) | |
| if cached: | |
| return cached | |
| try: | |
| # Add timeout wrapper | |
| import signal | |
| def timeout_handler(signum, frame): | |
| raise TimeoutError("SEC API call timeout") | |
| try: | |
| signal.signal(signal.SIGALRM, timeout_handler) | |
| signal.alarm(60) # 60 seconds for facts (larger dataset) | |
| facts = self.edgar.get_company_facts(cik=cik) | |
| signal.alarm(0) | |
| except AttributeError: | |
| # Windows fallback | |
| facts = self.edgar.get_company_facts(cik=cik) | |
| # Cache the result | |
| self._set_cache(cache_key, facts) | |
| return facts | |
| except TimeoutError: | |
| print(f"Timeout getting company facts for CIK: {cik}") | |
| return {} | |
| except Exception as e: | |
| print(f"Error getting company facts: {e}") | |
| return {} | |
| def get_financial_data_for_period(self, cik, period): | |
| """ | |
| Get financial data for a specific period (supports annual and quarterly) | |
| Args: | |
| cik (str): Company CIK code | |
| period (str): Period in format 'YYYY' or 'YYYYQX' (e.g., '2025' or '2025Q3') | |
| Returns: | |
| dict: Financial data dictionary | |
| """ | |
| if not self.edgar: | |
| print("sec_edgar_api library not installed") | |
| return {} | |
| # Check period cache first (Layer 3) | |
| cache_key = f"period_{cik}_{period}" | |
| cached = self._get_period_cache(cache_key) | |
| if cached is not None: | |
| print(f"[Cache Hit] get_financial_data_for_period({cik}, {period})") | |
| return cached.copy() # Return copy to avoid mutation | |
| try: | |
| # Get company financial facts | |
| facts = self.get_company_facts(cik) | |
| if not facts: | |
| return {} | |
| # Extract us-gaap and ifrs-full financial data (20-F may use IFRS) | |
| us_gaap = facts.get("facts", {}).get("us-gaap", {}) | |
| ifrs_full = facts.get("facts", {}).get("ifrs-full", {}) | |
| # Define financial metrics and their XBRL tags | |
| # Include multiple possible tags to improve match rate (including US-GAAP and IFRS tags) | |
| financial_metrics = { | |
| "total_revenue": ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "RevenueFromContractWithCustomerIncludingAssessedTax", "SalesRevenueNet", "RevenueFromContractWithCustomer", "Revenue"], | |
| "net_income": ["NetIncomeLoss", "ProfitLoss", "NetIncome", "ProfitLossAttributableToOwnersOfParent"], | |
| "earnings_per_share": ["EarningsPerShareBasic", "EarningsPerShare", "BasicEarningsPerShare", "BasicEarningsLossPerShare"], | |
| "operating_expenses": ["OperatingExpenses", "OperatingCostsAndExpenses", "OperatingExpensesExcludingDepreciationAndAmortization", "CostsAndExpenses", "GeneralAndAdministrativeExpense", "CostOfRevenue", "ResearchAndDevelopmentExpense", "SellingAndMarketingExpense"], | |
| "operating_cash_flow": ["NetCashProvidedByUsedInOperatingActivities", "NetCashProvidedUsedInOperatingActivities", "NetCashFlowsFromUsedInOperatingActivities", "CashFlowsFromUsedInOperatingActivities"], | |
| } | |
| # Determine target form types to search | |
| if 'Q' in period: | |
| # Quarterly data, mainly search 10-Q (20-F usually doesn't have quarterly reports) | |
| target_forms = ["10-Q"] | |
| target_forms_annual = ["10-K", "20-F"] # for fallback | |
| year = int(period.split('Q')[0]) | |
| quarter = period.split('Q')[1] | |
| else: | |
| # Annual data, search 10-K and 20-F annual forms | |
| target_forms = ["10-K", "20-F"] | |
| target_forms_annual = target_forms | |
| year = int(period) | |
| quarter = None | |
| # Store result with consolidated meta and sources (added for de-duplication) | |
| result = { | |
| "period": period, | |
| "meta": { | |
| "year": year, | |
| "quarter": quarter, | |
| "is_20f_filer": False, # will set below | |
| "primary_source": {} # Common source info for all metrics in this period | |
| }, | |
| "sources": {} # Per-metric source info (only if differs from primary) | |
| } | |
| # Detect if company uses 20-F (foreign filer) | |
| is_20f_filer = False | |
| all_filings = self.get_company_filings(cik) | |
| if all_filings: | |
| form_types_used = set(f.get('form_type', '') for f in all_filings[:20]) | |
| if '20-F' in form_types_used and '10-K' not in form_types_used: | |
| is_20f_filer = True | |
| # Reflect in meta | |
| result["meta"]["is_20f_filer"] = is_20f_filer | |
| # Get company filings to find accession number and primary document | |
| filings = self.get_company_filings(cik, form_types=target_forms) | |
| filings_map = {} # Map: form_year -> {accession_number, primary_document, filing_date, form_type} | |
| # Build filing map for quick lookup | |
| for filing in filings: | |
| form_type = filing.get("form_type", "") | |
| filing_date = filing.get("filing_date", "") | |
| accession_number = filing.get("accession_number", "") | |
| primary_document = filing.get("primary_document", "") | |
| if filing_date and accession_number: | |
| # Extract year from filing_date (format: YYYY-MM-DD) | |
| file_year = int(filing_date[:4]) if len(filing_date) >= 4 else 0 | |
| # Store filing if it matches the period year | |
| # For 20-F, also check year-1 (fiscal year may differ from filing year) | |
| if file_year == year or (is_20f_filer and form_type == '20-F' and file_year in [year - 1, year + 1]): | |
| key = f"{form_type}_{file_year}" | |
| if key not in filings_map: | |
| filings_map[key] = { | |
| "accession_number": accession_number, | |
| "primary_document": primary_document, | |
| "form_type": form_type, | |
| "filing_date": filing_date, | |
| "file_year": file_year | |
| } | |
| # Iterate through each financial metric | |
| for metric_key, metric_tags in financial_metrics.items(): | |
| # Support multiple possible tags | |
| for metric_tag in metric_tags: | |
| # Search both US-GAAP and IFRS tags | |
| # For 20-F filers, prioritize IFRS | |
| metric_data = None | |
| data_source = None | |
| if is_20f_filer: | |
| # Check IFRS first for 20-F filers | |
| if metric_tag in ifrs_full: | |
| metric_data = ifrs_full[metric_tag] | |
| data_source = "ifrs-full" | |
| elif metric_tag in us_gaap: | |
| metric_data = us_gaap[metric_tag] | |
| data_source = "us-gaap" | |
| else: | |
| # Check US-GAAP first for 10-K filers | |
| if metric_tag in us_gaap: | |
| metric_data = us_gaap[metric_tag] | |
| data_source = "us-gaap" | |
| elif metric_tag in ifrs_full: | |
| metric_data = ifrs_full[metric_tag] | |
| data_source = "ifrs-full" | |
| if metric_data: | |
| units = metric_data.get("units", {}) | |
| # Find USD unit data (supports USD and USD/shares) | |
| usd_data = None | |
| if "USD" in units: | |
| usd_data = units["USD"] | |
| elif "USD/shares" in units and metric_key == "earnings_per_share": | |
| # EPS uses USD/shares unit | |
| usd_data = units["USD/shares"] | |
| if usd_data: | |
| # Try exact match first, then loose match | |
| matched_entry = None | |
| # Search for data in the specified period | |
| for entry in usd_data: | |
| form = entry.get("form", "") | |
| fy = entry.get("fy", 0) | |
| fp = entry.get("fp", "") | |
| end_date = entry.get("end", "") | |
| if not end_date or len(end_date) < 4: | |
| continue | |
| entry_year = int(end_date[:4]) | |
| # Check if form type matches | |
| if form in target_forms: | |
| if quarter: | |
| # Quarterly data match | |
| if entry_year == year and fp == f"Q{quarter}": | |
| # If already matched, compare end date, choose the latest | |
| if matched_entry: | |
| if entry.get("end", "") > matched_entry.get("end", ""): | |
| matched_entry = entry | |
| else: | |
| matched_entry = entry | |
| else: | |
| # Annual data match - prioritize fiscal year (fy) field | |
| # Strategy 1: Exact match by fiscal year | |
| if fy == year and (fp == "FY" or fp == "" or not fp): | |
| # If already matched, compare end date, choose the latest | |
| if matched_entry: | |
| if entry.get("end", "") > matched_entry.get("end", ""): | |
| matched_entry = entry | |
| else: | |
| matched_entry = entry | |
| # Strategy 2: Match by end date year (when fy not available or doesn't match) | |
| elif not matched_entry and entry_year == year and (fp == "FY" or fp == "" or not fp): | |
| matched_entry = entry | |
| # Strategy 3: Allow fy to differ by 1 year (fiscal year vs calendar year mismatch) | |
| elif not matched_entry and fy > 0 and abs(fy - year) <= 1 and (fp == "FY" or fp == "" or not fp): | |
| matched_entry = entry | |
| # Strategy 4: Enhanced matching for 20-F - check frame field and end date | |
| elif not matched_entry and form == "20-F": | |
| frame = entry.get("frame", "") | |
| # Match if CY{year} in frame OR end date contains year OR fiscal year within range | |
| if (f"CY{year}" in frame or | |
| (str(year) in end_date and len(end_date) >= 4 and end_date[:4] == str(year)) or | |
| (fy > 0 and abs(fy - year) <= 1)): | |
| # Additional check: prefer entries with FY period | |
| if fp == "FY" or fp == "" or not fp: | |
| matched_entry = entry | |
| # If quarterly data not found, try finding from annual report (fallback strategy) | |
| if not matched_entry and quarter and target_forms_annual: | |
| for entry in usd_data: | |
| form = entry.get("form", "") | |
| end_date = entry.get("end", "") | |
| fp = entry.get("fp", "") | |
| if form in target_forms_annual and end_date: | |
| # Check if end date is within this quarter range | |
| if str(year) in end_date and f"Q{quarter}" in fp: | |
| matched_entry = entry | |
| break | |
| # Apply matched data | |
| if matched_entry: | |
| result[metric_key] = matched_entry.get("val", 0) | |
| # Get form and accession info | |
| form_type = matched_entry.get("form", "") | |
| accn_from_facts = matched_entry.get('accn', '').replace('-', '') | |
| filed_date = matched_entry.get('filed', '') | |
| filed_year = int(filed_date[:4]) if filed_date and len(filed_date) >= 4 else year | |
| # Try to get accession_number and primary_document from filings | |
| # For 20-F, try multiple year keys since filing year may differ | |
| filing_info = None | |
| possible_keys = [f"{form_type}_{year}"] | |
| if form_type == "20-F": | |
| possible_keys.extend([f"20-F_{filed_year}", f"20-F_{year-1}", f"20-F_{year+1}"]) | |
| for filing_key in possible_keys: | |
| if filing_key in filings_map: | |
| filing_info = filings_map[filing_key] | |
| break | |
| if filing_info: | |
| # Use filing info from get_company_filings | |
| accession_number = filing_info["accession_number"].replace('-', '') | |
| primary_document = filing_info["primary_document"] | |
| # Generate complete source URL | |
| if primary_document: | |
| url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_number}/{primary_document}" | |
| else: | |
| url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100" | |
| else: | |
| # Fallback to company browse page if filing not found | |
| url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100" | |
| # Backward compatible: only set once to avoid later overwrites | |
| if "source_url" not in result: | |
| result["source_url"] = url | |
| result["source_form"] = form_type | |
| result["data_source"] = data_source | |
| # Set primary source info (common for all metrics in this period) | |
| result["meta"]["primary_source"] = { | |
| "url": url, | |
| "form": form_type, | |
| "data_source": data_source, | |
| "filed": matched_entry.get("filed", ""), | |
| "accn": matched_entry.get("accn", ""), | |
| "fy": matched_entry.get("fy", 0), | |
| "fp": matched_entry.get("fp", ""), | |
| "frame": matched_entry.get("frame", ""), | |
| "start": matched_entry.get("start", ""), | |
| "end": matched_entry.get("end", "") | |
| } | |
| else: | |
| # Only add per-metric source if it differs from primary | |
| primary_src = result["meta"]["primary_source"] | |
| if (url != primary_src.get("url") or | |
| form_type != primary_src.get("form") or | |
| data_source != primary_src.get("data_source")): | |
| result["sources"][metric_key] = { | |
| "url": url, | |
| "form": form_type, | |
| "data_source": data_source, | |
| "filed": matched_entry.get("filed", "") | |
| } | |
| # Simplified details: only metric-specific info (tag and val) | |
| # All common fields (form, fy, fp, accn, filed, frame, data_source, start, end) | |
| # are now in meta.primary_source | |
| result[f"{metric_key}_details"] = { | |
| "tag": metric_tag, | |
| "val": matched_entry.get("val", 0) | |
| } | |
| # If data is found, break out of tag loop | |
| if metric_key in result: | |
| break | |
| # Cache the result (Layer 3) | |
| self._set_period_cache(cache_key, result) | |
| return result | |
| except Exception as e: | |
| print(f"Error getting financial data for period {period}: {e}") | |
| return {} | |