Spaces:
Runtime error
Runtime error
| """EDGAR API Client Module with Performance Optimization""" | |
| import requests | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| try: | |
| from sec_edgar_api.EdgarClient import EdgarClient | |
| except ImportError: | |
| EdgarClient = None | |
| import json | |
| import time | |
| import threading | |
| from functools import lru_cache | |
| from datetime import datetime, timedelta | |
| class EdgarDataClient: | |
| # Class-level cache for company_tickers.json (shared across instances) | |
| _company_tickers_cache = None | |
| _company_tickers_cache_time = None | |
| _company_tickers_cache_ttl = 3600 # 1 hour TTL | |
| _cache_lock = threading.Lock() | |
| # Class-level rate limiter (SEC requires max 10 requests per second) | |
| _last_request_time = 0 | |
| _rate_limit_lock = threading.Lock() | |
| _min_request_interval = 0.11 # 110ms between requests (9 req/sec, safe margin) | |
| def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"): | |
| """Initialize EDGAR client with connection pooling and timeout""" | |
| self.user_agent = user_agent | |
| # Configure requests session with connection pooling | |
| self.session = requests.Session() | |
| # Configure retry strategy | |
| retry_strategy = Retry( | |
| total=3, | |
| backoff_factor=1, | |
| status_forcelist=[429, 500, 502, 503, 504], | |
| allowed_methods=["HEAD", "GET", "OPTIONS"] | |
| ) | |
| adapter = HTTPAdapter( | |
| pool_connections=10, | |
| pool_maxsize=20, | |
| max_retries=retry_strategy, | |
| pool_block=False | |
| ) | |
| self.session.mount("http://", adapter) | |
| self.session.mount("https://", adapter) | |
| # Set default timeout | |
| self.timeout = 30 # 30 seconds timeout | |
| # Initialize sec_edgar_api client with timeout wrapper | |
| if EdgarClient: | |
| self.edgar = EdgarClient(user_agent=user_agent) | |
| # Monkey patch to add timeout | |
| self._patch_edgar_client_timeout() | |
| else: | |
| self.edgar = None | |
| def _patch_edgar_client_timeout(self): | |
| """Monkey patch sec_edgar_api to add timeout support""" | |
| if not self.edgar: | |
| return | |
| # Wrap get_submissions and get_company_facts with timeout (thread-based, Gradio compatible) | |
| original_get_submissions = self.edgar.get_submissions | |
| original_get_company_facts = self.edgar.get_company_facts | |
| def get_submissions_with_timeout(cik): | |
| """Thread-based timeout wrapper for get_submissions (Gradio compatible)""" | |
| result = [None] | |
| exception = [None] | |
| def wrapper(): | |
| try: | |
| result[0] = original_get_submissions(cik) | |
| except Exception as e: | |
| exception[0] = e | |
| thread = threading.Thread(target=wrapper, daemon=True) | |
| thread.start() | |
| thread.join(timeout=self.timeout) | |
| if thread.is_alive(): | |
| raise TimeoutError(f"SEC API request timeout ({self.timeout}s)") | |
| if exception[0]: | |
| raise exception[0] | |
| return result[0] | |
| def get_company_facts_with_timeout(cik): | |
| """Thread-based timeout wrapper for get_company_facts (Gradio compatible)""" | |
| result = [None] | |
| exception = [None] | |
| def wrapper(): | |
| try: | |
| result[0] = original_get_company_facts(cik) | |
| except Exception as e: | |
| exception[0] = e | |
| thread = threading.Thread(target=wrapper, daemon=True) | |
| thread.start() | |
| thread.join(timeout=self.timeout) | |
| if thread.is_alive(): | |
| raise TimeoutError(f"SEC API request timeout ({self.timeout}s)") | |
| if exception[0]: | |
| raise exception[0] | |
| return result[0] | |
| self.edgar.get_submissions = get_submissions_with_timeout | |
| self.edgar.get_company_facts = get_company_facts_with_timeout | |
| def _rate_limit(self): | |
| """Thread-safe rate limiting to comply with SEC requirements""" | |
| with self._rate_limit_lock: | |
| current_time = time.time() | |
| time_since_last = current_time - EdgarDataClient._last_request_time | |
| if time_since_last < self._min_request_interval: | |
| sleep_time = self._min_request_interval - time_since_last | |
| time.sleep(sleep_time) | |
| EdgarDataClient._last_request_time = time.time() | |
| def search_company_by_name(self, company_name): | |
| """Search company CIK by company name with caching and optimized ticker matching""" | |
| try: | |
| # Check cache first | |
| with self._cache_lock: | |
| current_time = time.time() | |
| # If cache is valid, use it | |
| if (EdgarDataClient._company_tickers_cache is not None and | |
| EdgarDataClient._company_tickers_cache_time is not None and | |
| current_time - EdgarDataClient._company_tickers_cache_time < self._company_tickers_cache_ttl): | |
| companies = EdgarDataClient._company_tickers_cache | |
| else: | |
| # Cache miss or expired, fetch new data | |
| self._rate_limit() | |
| url = "https://www.sec.gov/files/company_tickers.json" | |
| headers = {"User-Agent": self.user_agent} | |
| response = self.session.get(url, headers=headers, timeout=self.timeout) | |
| response.raise_for_status() | |
| companies = response.json() | |
| # Update cache | |
| EdgarDataClient._company_tickers_cache = companies | |
| EdgarDataClient._company_tickers_cache_time = current_time | |
| # ✅ OPTIMIZATION 1: Prioritize exact ticker match (fastest path) | |
| search_name_upper = company_name.upper().strip() | |
| for _, company in companies.items(): | |
| if company["ticker"].upper() == search_name_upper: | |
| # Exact ticker match - return immediately | |
| return { | |
| "cik": str(company["cik_str"]).zfill(10), | |
| "name": company["title"], | |
| "ticker": company["ticker"] | |
| } | |
| # ✅ OPTIMIZATION 2: Search for matching company names | |
| matches = [] | |
| exact_matches = [] | |
| search_name_lower = company_name.lower() | |
| for _, company in companies.items(): | |
| company_title = company["title"].lower() | |
| ticker_lower = company["ticker"].lower() | |
| # Exact match | |
| if search_name_lower == company_title: | |
| exact_matches.append({ | |
| "cik": str(company["cik_str"]).zfill(10), | |
| "name": company["title"], | |
| "ticker": company["ticker"] | |
| }) | |
| # Partial match (name or ticker contains search term) | |
| elif search_name_lower in company_title or search_name_lower in ticker_lower: | |
| matches.append({ | |
| "cik": str(company["cik_str"]).zfill(10), | |
| "name": company["title"], | |
| "ticker": company["ticker"] | |
| }) | |
| # Return exact match first, then partial match | |
| if exact_matches: | |
| return exact_matches[0] | |
| elif matches: | |
| return matches[0] | |
| else: | |
| return None | |
| except TimeoutError as e: | |
| print(f"Timeout searching company: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"Error searching company: {e}") | |
| return None | |
| def get_company_info(self, cik): | |
| """ | |
| Get basic company information (cached) | |
| Args: | |
| cik (str): Company CIK code | |
| Returns: | |
| dict: Dictionary containing company information | |
| """ | |
| if not self.edgar: | |
| print("sec_edgar_api library not installed") | |
| return None | |
| try: | |
| self._rate_limit() | |
| # Get company submissions (now has timeout protection) | |
| submissions = self.edgar.get_submissions(cik=cik) | |
| return { | |
| "cik": cik, | |
| "name": submissions.get("name", ""), | |
| "tickers": submissions.get("tickers", []), | |
| "sic": submissions.get("sic", ""), | |
| "sic_description": submissions.get("sicDescription", "") | |
| } | |
| except TimeoutError as e: | |
| print(f"Timeout getting company info for CIK {cik}: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"Error getting company info: {e}") | |
| return None | |
| def get_company_filings(self, cik, form_types=None): | |
| """ | |
| Get all company filing documents (cached) | |
| Args: | |
| cik (str): Company CIK code | |
| form_types (tuple): Tuple of form types, e.g., ('10-K', '10-Q'), None for all types | |
| Returns: | |
| list: List of filing documents | |
| """ | |
| if not self.edgar: | |
| print("sec_edgar_api library not installed") | |
| return [] | |
| # Convert list to tuple for caching (lists are not hashable) | |
| if form_types and isinstance(form_types, list): | |
| form_types = tuple(form_types) | |
| try: | |
| self._rate_limit() | |
| # Get company submissions (now has timeout protection) | |
| submissions = self.edgar.get_submissions(cik=cik) | |
| # Extract filing information | |
| filings = [] | |
| recent = submissions.get("filings", {}).get("recent", {}) | |
| # Get data from each field | |
| form_types_list = recent.get("form", []) | |
| filing_dates = recent.get("filingDate", []) | |
| accession_numbers = recent.get("accessionNumber", []) | |
| primary_documents = recent.get("primaryDocument", []) | |
| # Iterate through all filings | |
| for i in range(len(form_types_list)): | |
| form_type = form_types_list[i] | |
| # Filter by form type if specified | |
| if form_types and form_type not in form_types: | |
| continue | |
| filing_date = filing_dates[i] if i < len(filing_dates) else "" | |
| accession_number = accession_numbers[i] if i < len(accession_numbers) else "" | |
| primary_document = primary_documents[i] if i < len(primary_documents) else "" | |
| filing = { | |
| "form_type": form_type, | |
| "filing_date": filing_date, | |
| "accession_number": accession_number, | |
| "primary_document": primary_document | |
| } | |
| filings.append(filing) | |
| return filings | |
| except TimeoutError as e: | |
| print(f"Timeout getting company filings for CIK {cik}: {e}") | |
| return [] | |
| except Exception as e: | |
| print(f"Error getting company filings: {e}") | |
| return [] | |
| def get_company_facts(self, cik): | |
| """ | |
| Get all company financial facts data (cached) | |
| Args: | |
| cik (str): Company CIK code | |
| Returns: | |
| dict: Company financial facts data | |
| """ | |
| if not self.edgar: | |
| print("sec_edgar_api library not installed") | |
| return {} | |
| try: | |
| self._rate_limit() | |
| # Now has timeout protection via monkey patch | |
| facts = self.edgar.get_company_facts(cik=cik) | |
| return facts | |
| except TimeoutError as e: | |
| print(f"Timeout getting company facts for CIK {cik}: {e}") | |
| return {} | |
| except Exception as e: | |
| print(f"Error getting company facts: {e}") | |
| return {} | |
| def get_financial_data_for_period(self, cik, period): | |
| """ | |
| Get financial data for a specific period (supports annual and quarterly) | |
| Args: | |
| cik (str): Company CIK code | |
| period (str): Period in format 'YYYY' or 'YYYYQX' (e.g., '2025' or '2025Q3') | |
| Returns: | |
| dict: Financial data dictionary | |
| """ | |
| if not self.edgar: | |
| print("sec_edgar_api library not installed") | |
| return {} | |
| try: | |
| # Get company financial facts | |
| facts = self.get_company_facts(cik) | |
| if not facts: | |
| return {} | |
| # Extract us-gaap and ifrs-full financial data (20-F may use IFRS) | |
| us_gaap = facts.get("facts", {}).get("us-gaap", {}) | |
| ifrs_full = facts.get("facts", {}).get("ifrs-full", {}) | |
| # Define financial metrics and their XBRL tags | |
| # Include multiple possible tags to improve match rate (including US-GAAP and IFRS tags) | |
| financial_metrics = { | |
| "total_revenue": ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "RevenueFromContractWithCustomerIncludingAssessedTax", "SalesRevenueNet", "RevenueFromContractWithCustomer", "Revenue"], | |
| "net_income": ["NetIncomeLoss", "ProfitLoss", "NetIncome", "ProfitLossAttributableToOwnersOfParent"], | |
| "earnings_per_share": ["EarningsPerShareBasic", "EarningsPerShare", "BasicEarningsPerShare", "BasicEarningsLossPerShare"], | |
| "operating_expenses": ["OperatingExpenses", "OperatingCostsAndExpenses", "OperatingExpensesExcludingDepreciationAndAmortization", "CostsAndExpenses", "GeneralAndAdministrativeExpense", "CostOfRevenue", "ResearchAndDevelopmentExpense", "SellingAndMarketingExpense"], | |
| "operating_cash_flow": ["NetCashProvidedByUsedInOperatingActivities", "NetCashProvidedUsedInOperatingActivities", "NetCashFlowsFromUsedInOperatingActivities", "CashFlowsFromUsedInOperatingActivities"], | |
| } | |
| # Store result | |
| result = {"period": period} | |
| # Determine target form types to search | |
| if 'Q' in period: | |
| # Quarterly data, mainly search 10-Q (20-F usually doesn't have quarterly reports) | |
| target_forms = ("10-Q",) # Use tuple for caching | |
| target_forms_annual = ("10-K", "20-F") # for fallback | |
| year = int(period.split('Q')[0]) | |
| quarter = period.split('Q')[1] | |
| else: | |
| # Annual data, search 10-K and 20-F annual forms | |
| target_forms = ("10-K", "20-F") # Use tuple for caching | |
| target_forms_annual = target_forms | |
| year = int(period) | |
| quarter = None | |
| # Get company filings to find accession number and primary document | |
| filings = self.get_company_filings(cik, form_types=target_forms) | |
| filings_map = {} # Map: form -> {accession_number, primary_document, filing_date} | |
| # Build filing map for quick lookup | |
| for filing in filings: | |
| form_type = filing.get("form_type", "") | |
| filing_date = filing.get("filing_date", "") | |
| accession_number = filing.get("accession_number", "") | |
| primary_document = filing.get("primary_document", "") | |
| if filing_date and accession_number: | |
| # Extract year from filing_date (format: YYYY-MM-DD) | |
| file_year = int(filing_date[:4]) if len(filing_date) >= 4 else 0 | |
| # Store filing if it matches the period year | |
| if file_year == year: | |
| key = f"{form_type}_{file_year}" | |
| if key not in filings_map: | |
| filings_map[key] = { | |
| "accession_number": accession_number, | |
| "primary_document": primary_document, | |
| "form_type": form_type, | |
| "filing_date": filing_date | |
| } | |
| # Iterate through each financial metric | |
| for metric_key, metric_tags in financial_metrics.items(): | |
| # Support multiple possible tags | |
| for metric_tag in metric_tags: | |
| # Search both US-GAAP and IFRS tags | |
| metric_data = None | |
| data_source = None | |
| if metric_tag in us_gaap: | |
| metric_data = us_gaap[metric_tag] | |
| data_source = "us-gaap" | |
| elif metric_tag in ifrs_full: | |
| metric_data = ifrs_full[metric_tag] | |
| data_source = "ifrs-full" | |
| if metric_data: | |
| units = metric_data.get("units", {}) | |
| # Find USD unit data (supports USD and USD/shares) | |
| usd_data = None | |
| if "USD" in units: | |
| usd_data = units["USD"] | |
| elif "USD/shares" in units and metric_key == "earnings_per_share": | |
| # EPS uses USD/shares unit | |
| usd_data = units["USD/shares"] | |
| if usd_data: | |
| # Try exact match first, then loose match | |
| matched_entry = None | |
| # Search for data in the specified period | |
| for entry in usd_data: | |
| form = entry.get("form", "") | |
| fy = entry.get("fy", 0) | |
| fp = entry.get("fp", "") | |
| end_date = entry.get("end", "") | |
| if not end_date or len(end_date) < 4: | |
| continue | |
| entry_year = int(end_date[:4]) | |
| # Check if form type matches | |
| if form in target_forms: | |
| if quarter: | |
| # Quarterly data match | |
| if entry_year == year and fp == f"Q{quarter}": | |
| # If already matched, compare end date, choose the latest | |
| if matched_entry: | |
| if entry.get("end", "") > matched_entry.get("end", ""): | |
| matched_entry = entry | |
| else: | |
| matched_entry = entry | |
| else: | |
| # Annual data match - prioritize fiscal year (fy) field | |
| # Strategy 1: Exact match by fiscal year | |
| if fy == year and (fp == "FY" or fp == "" or not fp): | |
| # If already matched, compare end date, choose the latest | |
| if matched_entry: | |
| if entry.get("end", "") > matched_entry.get("end", ""): | |
| matched_entry = entry | |
| else: | |
| matched_entry = entry | |
| # Strategy 2: Match by end date year (when fy not available or doesn't match) | |
| elif not matched_entry and entry_year == year and (fp == "FY" or fp == "" or not fp): | |
| matched_entry = entry | |
| # Strategy 3: Allow fy to differ by 1 year (fiscal year vs calendar year mismatch) | |
| elif not matched_entry and fy > 0 and abs(fy - year) <= 1 and (fp == "FY" or fp == "" or not fp): | |
| matched_entry = entry | |
| # Strategy 4: Match by frame field for 20-F | |
| elif not matched_entry and form == "20-F" and "frame" in entry: | |
| frame = entry.get("frame", "") | |
| if f"CY{year}" in frame or str(year) in end_date: | |
| matched_entry = entry | |
| # If quarterly data not found, try finding from annual report (fallback strategy) | |
| if not matched_entry and quarter and target_forms_annual: | |
| for entry in usd_data: | |
| form = entry.get("form", "") | |
| end_date = entry.get("end", "") | |
| fp = entry.get("fp", "") | |
| if form in target_forms_annual and end_date: | |
| # Check if end date is within this quarter range | |
| if str(year) in end_date and f"Q{quarter}" in fp: | |
| matched_entry = entry | |
| break | |
| # Apply matched data | |
| if matched_entry: | |
| result[metric_key] = matched_entry.get("val", 0) | |
| # Get form and accession info | |
| form_type = matched_entry.get("form", "") | |
| accn_from_facts = matched_entry.get('accn', '').replace('-', '') | |
| # Try to get accession_number and primary_document from filings | |
| filing_key = f"{form_type}_{year}" | |
| filing_info = filings_map.get(filing_key) | |
| if filing_info: | |
| # Use filing info from get_company_filings | |
| accession_number = filing_info["accession_number"].replace('-', '') | |
| primary_document = filing_info["primary_document"] | |
| # Generate complete source URL | |
| if primary_document: | |
| result["source_url"] = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_number}/{primary_document}" | |
| else: | |
| result["source_url"] = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100" | |
| else: | |
| # Fallback to company browse page if filing not found | |
| result["source_url"] = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100" | |
| result["source_form"] = form_type | |
| result["data_source"] = data_source | |
| # Add detailed information | |
| result[f"{metric_key}_details"] = { | |
| "tag": metric_tag, | |
| "form": matched_entry.get("form", ""), | |
| "fy": matched_entry.get("fy", 0), | |
| "fp": matched_entry.get("fp", ""), | |
| "val": matched_entry.get("val", 0), | |
| "start": matched_entry.get("start", ""), | |
| "end": matched_entry.get("end", ""), | |
| "accn": matched_entry.get("accn", ""), | |
| "filed": matched_entry.get("filed", ""), | |
| "frame": matched_entry.get("frame", ""), | |
| "data_source": data_source | |
| } | |
| # If data is found, break out of tag loop | |
| if metric_key in result: | |
| break | |
| return result | |
| except Exception as e: | |
| print(f"Error getting financial data for period {period}: {e}") | |
| return {} | |