"""EDGAR API Client Module""" import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry try: from sec_edgar_api.EdgarClient import EdgarClient except ImportError: EdgarClient = None import json import time from functools import wraps import threading class EdgarDataClient: def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"): """Initialize EDGAR client""" self.user_agent = user_agent self.last_request_time = 0 self.min_request_interval = 0.11 # SEC allows 10 requests/second, use 0.11s to be safe self.request_timeout = 45 # Increased from 30 to 45 seconds for better reliability self.max_retries = 3 # Maximum retry attempts self._lock = threading.Lock() # Thread-safe rate limiting # Configure requests session with connection pooling and retry logic self.session = requests.Session() retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET", "OPTIONS"] ) adapter = HTTPAdapter( max_retries=retry_strategy, pool_connections=10, pool_maxsize=20, pool_block=False ) self.session.mount("http://", adapter) self.session.mount("https://", adapter) self.session.headers.update({"User-Agent": user_agent}) # Cache for frequently accessed data self._company_cache = {} # Cache company info to avoid repeated calls self._cache_ttl = 600 # Increased from 300 to 600 seconds (10 minutes) for better performance self._tickers_cache_ttl = 7200 # Increased from 3600 to 7200 seconds (2 hours) self._cache_timestamps = {} # Fast lookup indexes for company tickers self._ticker_index = {} # ticker -> company data self._cik_index = {} # cik -> company data self._name_lower_index = {} # lowercase name -> company data self._name_prefix_index = {} # name prefix (3 chars) -> list of company data self._ticker_prefix_index = {} # ticker prefix (2 chars) -> list of company data self._alias_index = {} # common aliases -> company data self._index_loaded = False # Search result cache (for performance) self._search_cache = {} # search_key -> result self._search_cache_max_size = 1000 # Limit cache size # Layer 3: Period data cache (avoid re-parsing XBRL for same period) self._period_cache = {} # period_key -> financial data self._period_cache_timestamps = {} # period_key -> timestamp self._period_cache_ttl = 1800 # 30 minutes cache (financial data changes rarely) self._period_cache_max_size = 1000 # Limit cache size # Common company aliases for intelligent search self._company_aliases = { 'google': ['GOOGL', 'GOOG'], 'facebook': ['META'], 'meta': ['META'], 'apple': ['AAPL'], 'microsoft': ['MSFT'], 'amazon': ['AMZN'], 'tesla': ['TSLA'], 'nvidia': ['NVDA'], 'netflix': ['NFLX'], 'alphabet': ['GOOGL', 'GOOG'], 'twitter': ['TWTR'], # Historical 'intel': ['INTC'], 'amd': ['AMD'], 'oracle': ['ORCL'], 'salesforce': ['CRM'], 'adobe': ['ADBE'], 'cisco': ['CSCO'], 'ibm': ['IBM'], 'walmart': ['WMT'], 'disney': ['DIS'], 'nike': ['NKE'], 'mcdonalds': ['MCD'], 'coca cola': ['KO'], 'pepsi': ['PEP'], 'starbucks': ['SBUX'], 'boeing': ['BA'], 'ge': ['GE'], 'general electric': ['GE'], 'ford': ['F'], 'gm': ['GM'], 'general motors': ['GM'], 'jpmorgan': ['JPM'], 'goldman': ['GS'], 'goldman sachs': ['GS'], 'morgan stanley': ['MS'], 'bank of america': ['BAC'], 'wells fargo': ['WFC'], 'visa': ['V'], 'mastercard': ['MA'], 'berkshire': ['BRK.B', 'BRK.A'], 'exxon': ['XOM'], 'chevron': ['CVX'], 'pfizer': ['PFE'], 'johnson': ['JNJ'], 'merck': ['MRK'], } if EdgarClient: self.edgar = EdgarClient(user_agent=user_agent) else: self.edgar = None def _rate_limit(self): """Thread-safe rate limiting to comply with SEC API limits (10 requests/second)""" with self._lock: current_time = time.time() time_since_last_request = current_time - self.last_request_time if time_since_last_request < self.min_request_interval: sleep_time = self.min_request_interval - time_since_last_request time.sleep(sleep_time) self.last_request_time = time.time() def _is_cache_valid(self, cache_key): """Check if cache entry is still valid""" if cache_key not in self._cache_timestamps: return False age = time.time() - self._cache_timestamps[cache_key] # Use longer TTL for company tickers list ttl = self._tickers_cache_ttl if cache_key == "company_tickers_json" else self._cache_ttl return age < ttl def _get_cached(self, cache_key): """Get cached data if valid""" if self._is_cache_valid(cache_key): return self._company_cache.get(cache_key) return None def _set_cache(self, cache_key, data): """Set cache data with timestamp""" self._company_cache[cache_key] = data self._cache_timestamps[cache_key] = time.time() def _make_request_with_retry(self, url, headers=None, use_session=True): """Make HTTP request with retry logic and timeout""" if headers is None: headers = {"User-Agent": self.user_agent} for attempt in range(self.max_retries): try: self._rate_limit() if use_session: response = self.session.get(url, headers=headers, timeout=self.request_timeout) else: response = requests.get(url, headers=headers, timeout=self.request_timeout) response.raise_for_status() return response except requests.exceptions.Timeout: print(f"Request timeout (attempt {attempt + 1}/{self.max_retries}): {url}") if attempt == self.max_retries - 1: raise time.sleep(2 ** attempt) # Exponential backoff except requests.exceptions.HTTPError as e: if e.response.status_code == 429: # Too Many Requests wait_time = 2 ** attempt print(f"Rate limited, waiting {wait_time}s (attempt {attempt + 1}/{self.max_retries})") time.sleep(wait_time) if attempt == self.max_retries - 1: raise else: raise except Exception as e: print(f"Request error (attempt {attempt + 1}/{self.max_retries}): {e}") if attempt == self.max_retries - 1: raise time.sleep(2 ** attempt) return None def _load_company_tickers(self, force_refresh=False): """Load and index company tickers data""" cache_key = "company_tickers_json" # Check if already loaded and cache is valid if self._index_loaded and not force_refresh and self._is_cache_valid(cache_key): return self._get_cached(cache_key) # Check cache first companies = self._get_cached(cache_key) if not force_refresh else None if not companies: try: # Download company tickers url = "https://www.sec.gov/files/company_tickers.json" print(f"Downloading company tickers from SEC...") response = self._make_request_with_retry(url) if not response: print("Failed to download company tickers") return None companies = response.json() # Cache for 1 hour self._set_cache(cache_key, companies) print(f"Loaded {len(companies)} companies") except Exception as e: print(f"Error loading company tickers: {e}") return None else: print(f"Using cached company tickers ({len(companies)} companies)") # Build fast lookup indexes self._ticker_index = {} self._cik_index = {} self._name_lower_index = {} self._name_prefix_index = {} self._ticker_prefix_index = {} self._alias_index = {} for _, company in companies.items(): cik = str(company["cik_str"]).zfill(10) ticker = company["ticker"] name = company["title"] company_data = { "cik": cik, "name": name, "ticker": ticker } # Index by ticker (lowercase for case-insensitive) ticker_lower = ticker.lower() self._ticker_index[ticker_lower] = company_data # Index by CIK self._cik_index[cik] = company_data # Index by exact name (lowercase) name_lower = name.lower() self._name_lower_index[name_lower] = company_data # Build prefix indexes for faster partial matching # Name prefix index (use 3-character prefixes) if len(name_lower) >= 3: for i in range(len(name_lower) - 2): prefix = name_lower[i:i+3] if prefix not in self._name_prefix_index: self._name_prefix_index[prefix] = [] self._name_prefix_index[prefix].append(company_data) # Ticker prefix index (use 2-character prefixes for tickers) if len(ticker_lower) >= 2: prefix = ticker_lower[:2] if prefix not in self._ticker_prefix_index: self._ticker_prefix_index[prefix] = [] self._ticker_prefix_index[prefix].append(company_data) # Build alias index for intelligent search for alias, tickers in self._company_aliases.items(): for ticker in tickers: ticker_lower = ticker.lower() if ticker_lower in self._ticker_index: self._alias_index[alias.lower()] = self._ticker_index[ticker_lower] break # Use first matching ticker self._index_loaded = True print(f"Built indexes: {len(self._ticker_index)} tickers, {len(self._cik_index)} CIKs") print(f"Built prefix indexes: {len(self._name_prefix_index)} name prefixes, {len(self._ticker_prefix_index)} ticker prefixes") print(f"Built alias index: {len(self._alias_index)} common aliases") return companies def get_company_by_cik(self, cik): """Fast lookup of company info by CIK (from cached tickers)""" # Ensure data is loaded self._load_company_tickers() # Normalize CIK cik_normalized = str(cik).zfill(10) # Fast index lookup return self._cik_index.get(cik_normalized) def get_company_by_ticker(self, ticker): """Fast lookup of company info by ticker""" # Ensure data is loaded self._load_company_tickers() # Fast index lookup (case-insensitive) return self._ticker_index.get(ticker.lower()) def search_company_by_name(self, company_name): """Search company CIK by company name with caching and optimized search""" try: # Load company tickers and build indexes companies = self._load_company_tickers() if not companies: return None # Prepare search input search_name = company_name.lower().strip() # Check search cache first cache_key = f"search_{search_name}" if cache_key in self._search_cache: return self._search_cache[cache_key].copy() if self._search_cache[cache_key] else None result = None # Optimize: Use fast index lookups first # Priority 1: Exact ticker match (fastest - O(1) hash lookup) if search_name in self._ticker_index: result = self._ticker_index[search_name].copy() # Priority 2: Common alias match (intelligent search - O(1)) elif search_name in self._alias_index: result = self._alias_index[search_name].copy() print(f"Alias match: '{company_name}' → {result.get('ticker')} ({result.get('name')})") # Priority 3: Exact name match (fast - O(1) hash lookup) elif search_name in self._name_lower_index: result = self._name_lower_index[search_name].copy() # Priority 4: Exact CIK match (fast - O(1) hash lookup) # Handle CIK input (8-10 digits) elif search_name.isdigit() and len(search_name) >= 8: cik_normalized = search_name.zfill(10) if cik_normalized in self._cik_index: result = self._cik_index[cik_normalized].copy() # Priority 5: Prefix-based partial matches (optimized with prefix indexes) if not result: result = self._search_with_prefix_index(search_name) # Cache the result (even if None) self._cache_search_result(cache_key, result) return result.copy() if result else None except Exception as e: print(f"Error searching company: {e}") return None def _search_with_prefix_index(self, search_name): """Optimized partial match search using prefix indexes""" candidates = set() # Strategy 1: Try ticker prefix match if search term looks like ticker if len(search_name) <= 5: # Use ticker prefix index if len(search_name) >= 2: prefix = search_name[:2] if prefix in self._ticker_prefix_index: for company_data in self._ticker_prefix_index[prefix]: ticker_lower = company_data["ticker"].lower() if search_name in ticker_lower: # Exact prefix match in ticker - highest priority if ticker_lower.startswith(search_name): return company_data candidates.add((company_data["cik"], company_data["name"], company_data["ticker"])) # Strategy 2: Use name prefix index for name-based search if len(search_name) >= 3: # Try first 3 characters as prefix prefix = search_name[:3] if prefix in self._name_prefix_index: for company_data in self._name_prefix_index[prefix]: name_lower = company_data["name"].lower() # Check if search term is in the name if search_name in name_lower: # Exact prefix match - highest priority if name_lower.startswith(search_name): return company_data candidates.add((company_data["cik"], company_data["name"], company_data["ticker"])) # Strategy 3: If prefix index didn't help (search term in middle of name), # do limited iteration on a subset of companies if not candidates and len(search_name) >= 3: # Only scan companies whose names contain the first 3 chars anywhere scan_limit = 0 for prefix_key, company_list in self._name_prefix_index.items(): if search_name[:3] in prefix_key: for company_data in company_list: name_lower = company_data["name"].lower() ticker_lower = company_data["ticker"].lower() if search_name in name_lower or search_name in ticker_lower: candidates.add((company_data["cik"], company_data["name"], company_data["ticker"])) scan_limit += 1 if scan_limit > 1000: # Limit scan to avoid performance issues break if scan_limit > 1000: break # Return first candidate if found if candidates: cik, name, ticker = next(iter(candidates)) return {"cik": cik, "name": name, "ticker": ticker} return None def _cache_search_result(self, cache_key, result): """Cache search result with size limit""" # Implement LRU-like behavior: if cache is full, clear oldest half if len(self._search_cache) >= self._search_cache_max_size: # Simple strategy: clear half of the cache keys_to_remove = list(self._search_cache.keys())[:self._search_cache_max_size // 2] for key in keys_to_remove: del self._search_cache[key] self._search_cache[cache_key] = result def _get_period_cache(self, cache_key): """Get cached period data if valid (Layer 3)""" if cache_key not in self._period_cache_timestamps: return None age = time.time() - self._period_cache_timestamps[cache_key] if age < self._period_cache_ttl: return self._period_cache.get(cache_key) else: # Expired, remove from cache self._period_cache.pop(cache_key, None) self._period_cache_timestamps.pop(cache_key, None) return None def _set_period_cache(self, cache_key, result): """Cache period data with size limit (Layer 3)""" # LRU-like eviction if cache is full if len(self._period_cache) >= self._period_cache_max_size: # Remove oldest half keys_to_remove = list(self._period_cache.keys())[:self._period_cache_max_size // 2] for key in keys_to_remove: self._period_cache.pop(key, None) self._period_cache_timestamps.pop(key, None) self._period_cache[cache_key] = result self._period_cache_timestamps[cache_key] = time.time() def get_company_info(self, cik): """ Get basic company information with caching Args: cik (str): Company CIK code Returns: dict: Dictionary containing company information """ if not self.edgar: print("sec_edgar_api library not installed") return None # Check cache first cache_key = f"info_{cik}" cached = self._get_cached(cache_key) if cached: return cached try: # Add timeout wrapper for sec-edgar-api calls import signal def timeout_handler(signum, frame): raise TimeoutError("SEC API call timeout") # Set alarm for 45 seconds (only works on Unix-like systems) try: signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(45) # Increased timeout submissions = self.edgar.get_submissions(cik=cik) signal.alarm(0) # Cancel alarm except AttributeError: # Windows doesn't support SIGALRM, use direct call submissions = self.edgar.get_submissions(cik=cik) result = { "cik": cik, "name": submissions.get("name", ""), "tickers": submissions.get("tickers", []), "sic": submissions.get("sic", ""), "sic_description": submissions.get("sicDescription", "") } # Cache the result self._set_cache(cache_key, result) return result except TimeoutError: print(f"Timeout getting company info for CIK: {cik}") return None except Exception as e: print(f"Error getting company info: {e}") return None def get_company_filings(self, cik, form_types=None): """ Get all company filing documents with caching Args: cik (str): Company CIK code form_types (list): List of form types, e.g., ['10-K', '10-Q'], None for all types Returns: list: List of filing documents """ if not self.edgar: print("sec_edgar_api library not installed") return [] # Check cache first (cache all filings, filter later) cache_key = f"filings_{cik}" cached = self._get_cached(cache_key) if not cached: try: # Add timeout wrapper import signal def timeout_handler(signum, frame): raise TimeoutError("SEC API call timeout") try: signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(45) # Increased timeout submissions = self.edgar.get_submissions(cik=cik) signal.alarm(0) except AttributeError: # Windows fallback submissions = self.edgar.get_submissions(cik=cik) # Extract filing information filings = [] recent = submissions.get("filings", {}).get("recent", {}) # Get data from each field form_types_list = recent.get("form", []) filing_dates = recent.get("filingDate", []) accession_numbers = recent.get("accessionNumber", []) primary_documents = recent.get("primaryDocument", []) # Iterate through all filings for i in range(len(form_types_list)): filing_date = filing_dates[i] if i < len(filing_dates) else "" accession_number = accession_numbers[i] if i < len(accession_numbers) else "" primary_document = primary_documents[i] if i < len(primary_documents) else "" filing = { "form_type": form_types_list[i], "filing_date": filing_date, "accession_number": accession_number, "primary_document": primary_document } filings.append(filing) # Cache all filings self._set_cache(cache_key, filings) cached = filings except TimeoutError: print(f"Timeout getting company filings for CIK: {cik}") return [] except Exception as e: print(f"Error getting company filings: {e}") return [] # Filter by form type if specified if form_types: return [f for f in cached if f.get("form_type") in form_types] return cached def get_company_facts(self, cik): """ Get all company financial facts data with caching and timeout Args: cik (str): Company CIK code Returns: dict: Company financial facts data """ if not self.edgar: print("sec_edgar_api library not installed") return {} # Check cache first cache_key = f"facts_{cik}" cached = self._get_cached(cache_key) if cached: return cached try: # Add timeout wrapper import signal def timeout_handler(signum, frame): raise TimeoutError("SEC API call timeout") try: signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(60) # 60 seconds for facts (larger dataset) facts = self.edgar.get_company_facts(cik=cik) signal.alarm(0) except AttributeError: # Windows fallback facts = self.edgar.get_company_facts(cik=cik) # Cache the result self._set_cache(cache_key, facts) return facts except TimeoutError: print(f"Timeout getting company facts for CIK: {cik}") return {} except Exception as e: print(f"Error getting company facts: {e}") return {} def get_financial_data_for_period(self, cik, period): """ Get financial data for a specific period (supports annual and quarterly) Args: cik (str): Company CIK code period (str): Period in format 'YYYY' or 'YYYYQX' (e.g., '2025' or '2025Q3') Returns: dict: Financial data dictionary """ if not self.edgar: print("sec_edgar_api library not installed") return {} # Check period cache first (Layer 3) cache_key = f"period_{cik}_{period}" cached = self._get_period_cache(cache_key) if cached is not None: print(f"[Cache Hit] get_financial_data_for_period({cik}, {period})") return cached.copy() # Return copy to avoid mutation try: # Get company financial facts facts = self.get_company_facts(cik) if not facts: return {} # Extract us-gaap and ifrs-full financial data (20-F may use IFRS) us_gaap = facts.get("facts", {}).get("us-gaap", {}) ifrs_full = facts.get("facts", {}).get("ifrs-full", {}) # Define financial metrics and their XBRL tags # Include multiple possible tags to improve match rate (including US-GAAP and IFRS tags) financial_metrics = { "total_revenue": ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "RevenueFromContractWithCustomerIncludingAssessedTax", "SalesRevenueNet", "RevenueFromContractWithCustomer", "Revenue"], "net_income": ["NetIncomeLoss", "ProfitLoss", "NetIncome", "ProfitLossAttributableToOwnersOfParent"], "earnings_per_share": ["EarningsPerShareBasic", "EarningsPerShare", "BasicEarningsPerShare", "BasicEarningsLossPerShare"], "operating_expenses": ["OperatingExpenses", "OperatingCostsAndExpenses", "OperatingExpensesExcludingDepreciationAndAmortization", "CostsAndExpenses", "GeneralAndAdministrativeExpense", "CostOfRevenue", "ResearchAndDevelopmentExpense", "SellingAndMarketingExpense"], "operating_cash_flow": ["NetCashProvidedByUsedInOperatingActivities", "NetCashProvidedUsedInOperatingActivities", "NetCashFlowsFromUsedInOperatingActivities", "CashFlowsFromUsedInOperatingActivities"], } # Determine target form types to search if 'Q' in period: # Quarterly data, mainly search 10-Q (20-F usually doesn't have quarterly reports) target_forms = ["10-Q"] target_forms_annual = ["10-K", "20-F"] # for fallback year = int(period.split('Q')[0]) quarter = period.split('Q')[1] else: # Annual data, search 10-K and 20-F annual forms target_forms = ["10-K", "20-F"] target_forms_annual = target_forms year = int(period) quarter = None # Store result with consolidated meta and sources (added for de-duplication) result = { "period": period, "meta": { "year": year, "quarter": quarter, "is_20f_filer": False, # will set below "primary_source": {} # Common source info for all metrics in this period }, "sources": {} # Per-metric source info (only if differs from primary) } # Detect if company uses 20-F (foreign filer) is_20f_filer = False all_filings = self.get_company_filings(cik) if all_filings: form_types_used = set(f.get('form_type', '') for f in all_filings[:20]) if '20-F' in form_types_used and '10-K' not in form_types_used: is_20f_filer = True # Reflect in meta result["meta"]["is_20f_filer"] = is_20f_filer # Get company filings to find accession number and primary document filings = self.get_company_filings(cik, form_types=target_forms) filings_map = {} # Map: form_year -> {accession_number, primary_document, filing_date, form_type} # Build filing map for quick lookup for filing in filings: form_type = filing.get("form_type", "") filing_date = filing.get("filing_date", "") accession_number = filing.get("accession_number", "") primary_document = filing.get("primary_document", "") if filing_date and accession_number: # Extract year from filing_date (format: YYYY-MM-DD) file_year = int(filing_date[:4]) if len(filing_date) >= 4 else 0 # Store filing if it matches the period year # For 20-F, also check year-1 (fiscal year may differ from filing year) if file_year == year or (is_20f_filer and form_type == '20-F' and file_year in [year - 1, year + 1]): key = f"{form_type}_{file_year}" if key not in filings_map: filings_map[key] = { "accession_number": accession_number, "primary_document": primary_document, "form_type": form_type, "filing_date": filing_date, "file_year": file_year } # Iterate through each financial metric for metric_key, metric_tags in financial_metrics.items(): # Support multiple possible tags for metric_tag in metric_tags: # Search both US-GAAP and IFRS tags # For 20-F filers, prioritize IFRS metric_data = None data_source = None if is_20f_filer: # Check IFRS first for 20-F filers if metric_tag in ifrs_full: metric_data = ifrs_full[metric_tag] data_source = "ifrs-full" elif metric_tag in us_gaap: metric_data = us_gaap[metric_tag] data_source = "us-gaap" else: # Check US-GAAP first for 10-K filers if metric_tag in us_gaap: metric_data = us_gaap[metric_tag] data_source = "us-gaap" elif metric_tag in ifrs_full: metric_data = ifrs_full[metric_tag] data_source = "ifrs-full" if metric_data: units = metric_data.get("units", {}) # Find USD unit data (supports USD and USD/shares) usd_data = None if "USD" in units: usd_data = units["USD"] elif "USD/shares" in units and metric_key == "earnings_per_share": # EPS uses USD/shares unit usd_data = units["USD/shares"] if usd_data: # Try exact match first, then loose match matched_entry = None # Search for data in the specified period for entry in usd_data: form = entry.get("form", "") fy = entry.get("fy", 0) fp = entry.get("fp", "") end_date = entry.get("end", "") if not end_date or len(end_date) < 4: continue entry_year = int(end_date[:4]) # Check if form type matches if form in target_forms: if quarter: # Quarterly data match if entry_year == year and fp == f"Q{quarter}": # If already matched, compare end date, choose the latest if matched_entry: if entry.get("end", "") > matched_entry.get("end", ""): matched_entry = entry else: matched_entry = entry else: # Annual data match - prioritize fiscal year (fy) field # Strategy 1: Exact match by fiscal year if fy == year and (fp == "FY" or fp == "" or not fp): # If already matched, compare end date, choose the latest if matched_entry: if entry.get("end", "") > matched_entry.get("end", ""): matched_entry = entry else: matched_entry = entry # Strategy 2: Match by end date year (when fy not available or doesn't match) elif not matched_entry and entry_year == year and (fp == "FY" or fp == "" or not fp): matched_entry = entry # Strategy 3: Allow fy to differ by 1 year (fiscal year vs calendar year mismatch) elif not matched_entry and fy > 0 and abs(fy - year) <= 1 and (fp == "FY" or fp == "" or not fp): matched_entry = entry # Strategy 4: Enhanced matching for 20-F - check frame field and end date elif not matched_entry and form == "20-F": frame = entry.get("frame", "") # Match if CY{year} in frame OR end date contains year OR fiscal year within range if (f"CY{year}" in frame or (str(year) in end_date and len(end_date) >= 4 and end_date[:4] == str(year)) or (fy > 0 and abs(fy - year) <= 1)): # Additional check: prefer entries with FY period if fp == "FY" or fp == "" or not fp: matched_entry = entry # If quarterly data not found, try finding from annual report (fallback strategy) if not matched_entry and quarter and target_forms_annual: for entry in usd_data: form = entry.get("form", "") end_date = entry.get("end", "") fp = entry.get("fp", "") if form in target_forms_annual and end_date: # Check if end date is within this quarter range if str(year) in end_date and f"Q{quarter}" in fp: matched_entry = entry break # Apply matched data if matched_entry: result[metric_key] = matched_entry.get("val", 0) # Get form and accession info form_type = matched_entry.get("form", "") accn_from_facts = matched_entry.get('accn', '').replace('-', '') filed_date = matched_entry.get('filed', '') filed_year = int(filed_date[:4]) if filed_date and len(filed_date) >= 4 else year # Try to get accession_number and primary_document from filings # For 20-F, try multiple year keys since filing year may differ filing_info = None possible_keys = [f"{form_type}_{year}"] if form_type == "20-F": possible_keys.extend([f"20-F_{filed_year}", f"20-F_{year-1}", f"20-F_{year+1}"]) for filing_key in possible_keys: if filing_key in filings_map: filing_info = filings_map[filing_key] break if filing_info: # Use filing info from get_company_filings accession_number = filing_info["accession_number"].replace('-', '') primary_document = filing_info["primary_document"] # Generate complete source URL if primary_document: url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_number}/{primary_document}" else: url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100" else: # Fallback to company browse page if filing not found url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100" # Backward compatible: only set once to avoid later overwrites if "source_url" not in result: result["source_url"] = url result["source_form"] = form_type result["data_source"] = data_source # Set primary source info (common for all metrics in this period) result["meta"]["primary_source"] = { "url": url, "form": form_type, "data_source": data_source, "filed": matched_entry.get("filed", ""), "accn": matched_entry.get("accn", ""), "fy": matched_entry.get("fy", 0), "fp": matched_entry.get("fp", ""), "frame": matched_entry.get("frame", ""), "start": matched_entry.get("start", ""), "end": matched_entry.get("end", "") } else: # Only add per-metric source if it differs from primary primary_src = result["meta"]["primary_source"] if (url != primary_src.get("url") or form_type != primary_src.get("form") or data_source != primary_src.get("data_source")): result["sources"][metric_key] = { "url": url, "form": form_type, "data_source": data_source, "filed": matched_entry.get("filed", "") } # Simplified details: only metric-specific info (tag and val) # All common fields (form, fy, fp, accn, filed, frame, data_source, start, end) # are now in meta.primary_source result[f"{metric_key}_details"] = { "tag": metric_tag, "val": matched_entry.get("val", 0) } # If data is found, break out of tag loop if metric_key in result: break # Cache the result (Layer 3) self._set_period_cache(cache_key, result) return result except Exception as e: print(f"Error getting financial data for period {period}: {e}") return {}