EasyReportDataMCP / edgar_client.py
JC321's picture
Upload edgar_client.py
c824ed2 verified
raw
history blame
26.1 kB
"""EDGAR API Client Module with Performance Optimization"""
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
try:
from sec_edgar_api.EdgarClient import EdgarClient
except ImportError:
EdgarClient = None
import json
import time
import threading
from functools import lru_cache
from datetime import datetime, timedelta
class EdgarDataClient:
# Class-level cache for company_tickers.json (shared across instances)
_company_tickers_cache = None
_company_tickers_cache_time = None
_company_tickers_cache_ttl = 3600 # 1 hour TTL
_cache_lock = threading.Lock()
# Class-level rate limiter (SEC requires max 10 requests per second)
_last_request_time = 0
_rate_limit_lock = threading.Lock()
_min_request_interval = 0.11 # 110ms between requests (9 req/sec, safe margin)
def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"):
"""Initialize EDGAR client with connection pooling and timeout"""
self.user_agent = user_agent
# Configure requests session with connection pooling
self.session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=20,
max_retries=retry_strategy,
pool_block=False
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# Set default timeout
self.timeout = 30 # 30 seconds timeout
# Initialize sec_edgar_api client with timeout wrapper
if EdgarClient:
self.edgar = EdgarClient(user_agent=user_agent)
# Monkey patch to add timeout
self._patch_edgar_client_timeout()
else:
self.edgar = None
def _patch_edgar_client_timeout(self):
"""Monkey patch sec_edgar_api to add timeout support"""
if not self.edgar:
return
# Wrap get_submissions and get_company_facts with timeout (thread-based, Gradio compatible)
original_get_submissions = self.edgar.get_submissions
original_get_company_facts = self.edgar.get_company_facts
def get_submissions_with_timeout(cik):
"""Thread-based timeout wrapper for get_submissions (Gradio compatible)"""
result = [None]
exception = [None]
def wrapper():
try:
result[0] = original_get_submissions(cik)
except Exception as e:
exception[0] = e
thread = threading.Thread(target=wrapper, daemon=True)
thread.start()
thread.join(timeout=self.timeout)
if thread.is_alive():
raise TimeoutError(f"SEC API request timeout ({self.timeout}s)")
if exception[0]:
raise exception[0]
return result[0]
def get_company_facts_with_timeout(cik):
"""Thread-based timeout wrapper for get_company_facts (Gradio compatible)"""
result = [None]
exception = [None]
def wrapper():
try:
result[0] = original_get_company_facts(cik)
except Exception as e:
exception[0] = e
thread = threading.Thread(target=wrapper, daemon=True)
thread.start()
thread.join(timeout=self.timeout)
if thread.is_alive():
raise TimeoutError(f"SEC API request timeout ({self.timeout}s)")
if exception[0]:
raise exception[0]
return result[0]
self.edgar.get_submissions = get_submissions_with_timeout
self.edgar.get_company_facts = get_company_facts_with_timeout
def _rate_limit(self):
"""Thread-safe rate limiting to comply with SEC requirements"""
with self._rate_limit_lock:
current_time = time.time()
time_since_last = current_time - EdgarDataClient._last_request_time
if time_since_last < self._min_request_interval:
sleep_time = self._min_request_interval - time_since_last
time.sleep(sleep_time)
EdgarDataClient._last_request_time = time.time()
def search_company_by_name(self, company_name):
"""Search company CIK by company name with caching and optimized ticker matching"""
try:
# Check cache first
with self._cache_lock:
current_time = time.time()
# If cache is valid, use it
if (EdgarDataClient._company_tickers_cache is not None and
EdgarDataClient._company_tickers_cache_time is not None and
current_time - EdgarDataClient._company_tickers_cache_time < self._company_tickers_cache_ttl):
companies = EdgarDataClient._company_tickers_cache
else:
# Cache miss or expired, fetch new data
self._rate_limit()
url = "https://www.sec.gov/files/company_tickers.json"
headers = {"User-Agent": self.user_agent}
response = self.session.get(url, headers=headers, timeout=self.timeout)
response.raise_for_status()
companies = response.json()
# Update cache
EdgarDataClient._company_tickers_cache = companies
EdgarDataClient._company_tickers_cache_time = current_time
# ✅ OPTIMIZATION 1: Prioritize exact ticker match (fastest path)
search_name_upper = company_name.upper().strip()
for _, company in companies.items():
if company["ticker"].upper() == search_name_upper:
# Exact ticker match - return immediately
return {
"cik": str(company["cik_str"]).zfill(10),
"name": company["title"],
"ticker": company["ticker"]
}
# ✅ OPTIMIZATION 2: Search for matching company names
matches = []
exact_matches = []
search_name_lower = company_name.lower()
for _, company in companies.items():
company_title = company["title"].lower()
ticker_lower = company["ticker"].lower()
# Exact match
if search_name_lower == company_title:
exact_matches.append({
"cik": str(company["cik_str"]).zfill(10),
"name": company["title"],
"ticker": company["ticker"]
})
# Partial match (name or ticker contains search term)
elif search_name_lower in company_title or search_name_lower in ticker_lower:
matches.append({
"cik": str(company["cik_str"]).zfill(10),
"name": company["title"],
"ticker": company["ticker"]
})
# Return exact match first, then partial match
if exact_matches:
return exact_matches[0]
elif matches:
return matches[0]
else:
return None
except TimeoutError as e:
print(f"Timeout searching company: {e}")
return None
except Exception as e:
print(f"Error searching company: {e}")
return None
@lru_cache(maxsize=128)
def get_company_info(self, cik):
"""
Get basic company information (cached)
Args:
cik (str): Company CIK code
Returns:
dict: Dictionary containing company information
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return None
try:
self._rate_limit()
# Get company submissions (now has timeout protection)
submissions = self.edgar.get_submissions(cik=cik)
return {
"cik": cik,
"name": submissions.get("name", ""),
"tickers": submissions.get("tickers", []),
"sic": submissions.get("sic", ""),
"sic_description": submissions.get("sicDescription", "")
}
except TimeoutError as e:
print(f"Timeout getting company info for CIK {cik}: {e}")
return None
except Exception as e:
print(f"Error getting company info: {e}")
return None
@lru_cache(maxsize=128)
def get_company_filings(self, cik, form_types=None):
"""
Get all company filing documents (cached)
Args:
cik (str): Company CIK code
form_types (tuple): Tuple of form types, e.g., ('10-K', '10-Q'), None for all types
Returns:
list: List of filing documents
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return []
# Convert list to tuple for caching (lists are not hashable)
if form_types and isinstance(form_types, list):
form_types = tuple(form_types)
try:
self._rate_limit()
# Get company submissions (now has timeout protection)
submissions = self.edgar.get_submissions(cik=cik)
# Extract filing information
filings = []
recent = submissions.get("filings", {}).get("recent", {})
# Get data from each field
form_types_list = recent.get("form", [])
filing_dates = recent.get("filingDate", [])
accession_numbers = recent.get("accessionNumber", [])
primary_documents = recent.get("primaryDocument", [])
# Iterate through all filings
for i in range(len(form_types_list)):
form_type = form_types_list[i]
# Filter by form type if specified
if form_types and form_type not in form_types:
continue
filing_date = filing_dates[i] if i < len(filing_dates) else ""
accession_number = accession_numbers[i] if i < len(accession_numbers) else ""
primary_document = primary_documents[i] if i < len(primary_documents) else ""
filing = {
"form_type": form_type,
"filing_date": filing_date,
"accession_number": accession_number,
"primary_document": primary_document
}
filings.append(filing)
return filings
except TimeoutError as e:
print(f"Timeout getting company filings for CIK {cik}: {e}")
return []
except Exception as e:
print(f"Error getting company filings: {e}")
return []
@lru_cache(maxsize=128)
def get_company_facts(self, cik):
"""
Get all company financial facts data (cached)
Args:
cik (str): Company CIK code
Returns:
dict: Company financial facts data
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return {}
try:
self._rate_limit()
# Now has timeout protection via monkey patch
facts = self.edgar.get_company_facts(cik=cik)
return facts
except TimeoutError as e:
print(f"Timeout getting company facts for CIK {cik}: {e}")
return {}
except Exception as e:
print(f"Error getting company facts: {e}")
return {}
def get_financial_data_for_period(self, cik, period):
"""
Get financial data for a specific period (supports annual and quarterly)
Args:
cik (str): Company CIK code
period (str): Period in format 'YYYY' or 'YYYYQX' (e.g., '2025' or '2025Q3')
Returns:
dict: Financial data dictionary
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return {}
try:
# Get company financial facts
facts = self.get_company_facts(cik)
if not facts:
return {}
# Extract us-gaap and ifrs-full financial data (20-F may use IFRS)
us_gaap = facts.get("facts", {}).get("us-gaap", {})
ifrs_full = facts.get("facts", {}).get("ifrs-full", {})
# Define financial metrics and their XBRL tags
# Include multiple possible tags to improve match rate (including US-GAAP and IFRS tags)
financial_metrics = {
"total_revenue": ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "RevenueFromContractWithCustomerIncludingAssessedTax", "SalesRevenueNet", "RevenueFromContractWithCustomer", "Revenue"],
"net_income": ["NetIncomeLoss", "ProfitLoss", "NetIncome", "ProfitLossAttributableToOwnersOfParent"],
"earnings_per_share": ["EarningsPerShareBasic", "EarningsPerShare", "BasicEarningsPerShare", "BasicEarningsLossPerShare"],
"operating_expenses": ["OperatingExpenses", "OperatingCostsAndExpenses", "OperatingExpensesExcludingDepreciationAndAmortization", "CostsAndExpenses", "GeneralAndAdministrativeExpense", "CostOfRevenue", "ResearchAndDevelopmentExpense", "SellingAndMarketingExpense"],
"operating_cash_flow": ["NetCashProvidedByUsedInOperatingActivities", "NetCashProvidedUsedInOperatingActivities", "NetCashFlowsFromUsedInOperatingActivities", "CashFlowsFromUsedInOperatingActivities"],
}
# Store result
result = {"period": period}
# Determine target form types to search
if 'Q' in period:
# Quarterly data, mainly search 10-Q (20-F usually doesn't have quarterly reports)
target_forms = ("10-Q",) # Use tuple for caching
target_forms_annual = ("10-K", "20-F") # for fallback
year = int(period.split('Q')[0])
quarter = period.split('Q')[1]
else:
# Annual data, search 10-K and 20-F annual forms
target_forms = ("10-K", "20-F") # Use tuple for caching
target_forms_annual = target_forms
year = int(period)
quarter = None
# Get company filings to find accession number and primary document
filings = self.get_company_filings(cik, form_types=target_forms)
filings_map = {} # Map: form -> {accession_number, primary_document, filing_date}
# Build filing map for quick lookup
for filing in filings:
form_type = filing.get("form_type", "")
filing_date = filing.get("filing_date", "")
accession_number = filing.get("accession_number", "")
primary_document = filing.get("primary_document", "")
if filing_date and accession_number:
# Extract year from filing_date (format: YYYY-MM-DD)
file_year = int(filing_date[:4]) if len(filing_date) >= 4 else 0
# Store filing if it matches the period year
if file_year == year:
key = f"{form_type}_{file_year}"
if key not in filings_map:
filings_map[key] = {
"accession_number": accession_number,
"primary_document": primary_document,
"form_type": form_type,
"filing_date": filing_date
}
# Iterate through each financial metric
for metric_key, metric_tags in financial_metrics.items():
# Support multiple possible tags
for metric_tag in metric_tags:
# Search both US-GAAP and IFRS tags
metric_data = None
data_source = None
if metric_tag in us_gaap:
metric_data = us_gaap[metric_tag]
data_source = "us-gaap"
elif metric_tag in ifrs_full:
metric_data = ifrs_full[metric_tag]
data_source = "ifrs-full"
if metric_data:
units = metric_data.get("units", {})
# Find USD unit data (supports USD and USD/shares)
usd_data = None
if "USD" in units:
usd_data = units["USD"]
elif "USD/shares" in units and metric_key == "earnings_per_share":
# EPS uses USD/shares unit
usd_data = units["USD/shares"]
if usd_data:
# Try exact match first, then loose match
matched_entry = None
# Search for data in the specified period
for entry in usd_data:
form = entry.get("form", "")
fy = entry.get("fy", 0)
fp = entry.get("fp", "")
end_date = entry.get("end", "")
if not end_date or len(end_date) < 4:
continue
entry_year = int(end_date[:4])
# Check if form type matches
if form in target_forms:
if quarter:
# Quarterly data match
if entry_year == year and fp == f"Q{quarter}":
# If already matched, compare end date, choose the latest
if matched_entry:
if entry.get("end", "") > matched_entry.get("end", ""):
matched_entry = entry
else:
matched_entry = entry
else:
# Annual data match - prioritize fiscal year (fy) field
# Strategy 1: Exact match by fiscal year
if fy == year and (fp == "FY" or fp == "" or not fp):
# If already matched, compare end date, choose the latest
if matched_entry:
if entry.get("end", "") > matched_entry.get("end", ""):
matched_entry = entry
else:
matched_entry = entry
# Strategy 2: Match by end date year (when fy not available or doesn't match)
elif not matched_entry and entry_year == year and (fp == "FY" or fp == "" or not fp):
matched_entry = entry
# Strategy 3: Allow fy to differ by 1 year (fiscal year vs calendar year mismatch)
elif not matched_entry and fy > 0 and abs(fy - year) <= 1 and (fp == "FY" or fp == "" or not fp):
matched_entry = entry
# Strategy 4: Match by frame field for 20-F
elif not matched_entry and form == "20-F" and "frame" in entry:
frame = entry.get("frame", "")
if f"CY{year}" in frame or str(year) in end_date:
matched_entry = entry
# If quarterly data not found, try finding from annual report (fallback strategy)
if not matched_entry and quarter and target_forms_annual:
for entry in usd_data:
form = entry.get("form", "")
end_date = entry.get("end", "")
fp = entry.get("fp", "")
if form in target_forms_annual and end_date:
# Check if end date is within this quarter range
if str(year) in end_date and f"Q{quarter}" in fp:
matched_entry = entry
break
# Apply matched data
if matched_entry:
result[metric_key] = matched_entry.get("val", 0)
# Get form and accession info
form_type = matched_entry.get("form", "")
accn_from_facts = matched_entry.get('accn', '').replace('-', '')
# Try to get accession_number and primary_document from filings
filing_key = f"{form_type}_{year}"
filing_info = filings_map.get(filing_key)
if filing_info:
# Use filing info from get_company_filings
accession_number = filing_info["accession_number"].replace('-', '')
primary_document = filing_info["primary_document"]
# Generate complete source URL
if primary_document:
result["source_url"] = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_number}/{primary_document}"
else:
result["source_url"] = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100"
else:
# Fallback to company browse page if filing not found
result["source_url"] = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100"
result["source_form"] = form_type
result["data_source"] = data_source
# Add detailed information
result[f"{metric_key}_details"] = {
"tag": metric_tag,
"form": matched_entry.get("form", ""),
"fy": matched_entry.get("fy", 0),
"fp": matched_entry.get("fp", ""),
"val": matched_entry.get("val", 0),
"start": matched_entry.get("start", ""),
"end": matched_entry.get("end", ""),
"accn": matched_entry.get("accn", ""),
"filed": matched_entry.get("filed", ""),
"frame": matched_entry.get("frame", ""),
"data_source": data_source
}
# If data is found, break out of tag loop
if metric_key in result:
break
return result
except Exception as e:
print(f"Error getting financial data for period {period}: {e}")
return {}