Spaces:
Sleeping
Sleeping
File size: 44,195 Bytes
0bedf4a ce94df5 0bedf4a c583058 ce94df5 0bedf4a c583058 6d91fb6 c583058 ce94df5 6d91fb6 ce94df5 c583058 74fcd29 6d91fb6 74fcd29 6d91fb6 bbf5389 6d91fb6 0bedf4a c583058 ce94df5 74fcd29 ce94df5 c583058 ce94df5 c583058 ce94df5 c583058 74fcd29 0bedf4a 74fcd29 0c8197e 74fcd29 0c8197e 74fcd29 0c8197e 74fcd29 0c8197e 74fcd29 6d91fb6 74fcd29 6d91fb6 74fcd29 6d91fb6 74fcd29 6d91fb6 74fcd29 0bedf4a 875213d 0c8197e 6d91fb6 74fcd29 6d91fb6 74fcd29 6d91fb6 74fcd29 6d91fb6 74fcd29 6d91fb6 74fcd29 6d91fb6 875213d 6d91fb6 0bedf4a 6d91fb6 0bedf4a 6d91fb6 bbf5389 0bedf4a ce94df5 0bedf4a ce94df5 0bedf4a ce94df5 0bedf4a ce94df5 6d91fb6 ce94df5 6d91fb6 ce94df5 0bedf4a ce94df5 0bedf4a ce94df5 0bedf4a ce94df5 0bedf4a ce94df5 0bedf4a ce94df5 6d91fb6 ce94df5 0bedf4a ce94df5 0bedf4a ce94df5 0bedf4a ce94df5 0bedf4a ce94df5 0bedf4a ce94df5 6d91fb6 ce94df5 0bedf4a ce94df5 0bedf4a bbf5389 0bedf4a f6bd766 ce1f3b5 f6bd766 ce1f3b5 f6bd766 00da01d f6bd766 00da01d 0bedf4a 00da01d 0bedf4a 00da01d 0bedf4a 00da01d 0bedf4a 00da01d 0bedf4a 00da01d 0bedf4a 00da01d 0bedf4a 00da01d 0bedf4a 00da01d 0bedf4a 00da01d 0bedf4a f6bd766 0bedf4a f6bd766 0bedf4a f6bd766 ce1f3b5 6458e64 ce1f3b5 7c13159 0bedf4a 6458e64 0bedf4a 6458e64 0bedf4a bbf5389 0bedf4a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 |
"""EDGAR API Client Module"""
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
try:
from sec_edgar_api.EdgarClient import EdgarClient
except ImportError:
EdgarClient = None
import json
import time
from functools import wraps
import threading
class EdgarDataClient:
def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"):
"""Initialize EDGAR client"""
self.user_agent = user_agent
self.last_request_time = 0
self.min_request_interval = 0.11 # SEC allows 10 requests/second, use 0.11s to be safe
self.request_timeout = 45 # Increased from 30 to 45 seconds for better reliability
self.max_retries = 3 # Maximum retry attempts
self._lock = threading.Lock() # Thread-safe rate limiting
# Configure requests session with connection pooling and retry logic
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=20,
pool_block=False
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self.session.headers.update({"User-Agent": user_agent})
# Cache for frequently accessed data
self._company_cache = {} # Cache company info to avoid repeated calls
self._cache_ttl = 600 # Increased from 300 to 600 seconds (10 minutes) for better performance
self._tickers_cache_ttl = 7200 # Increased from 3600 to 7200 seconds (2 hours)
self._cache_timestamps = {}
# Fast lookup indexes for company tickers
self._ticker_index = {} # ticker -> company data
self._cik_index = {} # cik -> company data
self._name_lower_index = {} # lowercase name -> company data
self._name_prefix_index = {} # name prefix (3 chars) -> list of company data
self._ticker_prefix_index = {} # ticker prefix (2 chars) -> list of company data
self._alias_index = {} # common aliases -> company data
self._index_loaded = False
# Search result cache (for performance)
self._search_cache = {} # search_key -> result
self._search_cache_max_size = 1000 # Limit cache size
# Layer 3: Period data cache (avoid re-parsing XBRL for same period)
self._period_cache = {} # period_key -> financial data
self._period_cache_timestamps = {} # period_key -> timestamp
self._period_cache_ttl = 1800 # 30 minutes cache (financial data changes rarely)
self._period_cache_max_size = 1000 # Limit cache size
# Common company aliases for intelligent search
self._company_aliases = {
'google': ['GOOGL', 'GOOG'],
'facebook': ['META'],
'meta': ['META'],
'apple': ['AAPL'],
'microsoft': ['MSFT'],
'amazon': ['AMZN'],
'tesla': ['TSLA'],
'nvidia': ['NVDA'],
'netflix': ['NFLX'],
'alphabet': ['GOOGL', 'GOOG'],
'twitter': ['TWTR'], # Historical
'intel': ['INTC'],
'amd': ['AMD'],
'oracle': ['ORCL'],
'salesforce': ['CRM'],
'adobe': ['ADBE'],
'cisco': ['CSCO'],
'ibm': ['IBM'],
'walmart': ['WMT'],
'disney': ['DIS'],
'nike': ['NKE'],
'mcdonalds': ['MCD'],
'coca cola': ['KO'],
'pepsi': ['PEP'],
'starbucks': ['SBUX'],
'boeing': ['BA'],
'ge': ['GE'],
'general electric': ['GE'],
'ford': ['F'],
'gm': ['GM'],
'general motors': ['GM'],
'jpmorgan': ['JPM'],
'goldman': ['GS'],
'goldman sachs': ['GS'],
'morgan stanley': ['MS'],
'bank of america': ['BAC'],
'wells fargo': ['WFC'],
'visa': ['V'],
'mastercard': ['MA'],
'berkshire': ['BRK.B', 'BRK.A'],
'exxon': ['XOM'],
'chevron': ['CVX'],
'pfizer': ['PFE'],
'johnson': ['JNJ'],
'merck': ['MRK'],
}
if EdgarClient:
self.edgar = EdgarClient(user_agent=user_agent)
else:
self.edgar = None
def _rate_limit(self):
"""Thread-safe rate limiting to comply with SEC API limits (10 requests/second)"""
with self._lock:
current_time = time.time()
time_since_last_request = current_time - self.last_request_time
if time_since_last_request < self.min_request_interval:
sleep_time = self.min_request_interval - time_since_last_request
time.sleep(sleep_time)
self.last_request_time = time.time()
def _is_cache_valid(self, cache_key):
"""Check if cache entry is still valid"""
if cache_key not in self._cache_timestamps:
return False
age = time.time() - self._cache_timestamps[cache_key]
# Use longer TTL for company tickers list
ttl = self._tickers_cache_ttl if cache_key == "company_tickers_json" else self._cache_ttl
return age < ttl
def _get_cached(self, cache_key):
"""Get cached data if valid"""
if self._is_cache_valid(cache_key):
return self._company_cache.get(cache_key)
return None
def _set_cache(self, cache_key, data):
"""Set cache data with timestamp"""
self._company_cache[cache_key] = data
self._cache_timestamps[cache_key] = time.time()
def _make_request_with_retry(self, url, headers=None, use_session=True):
"""Make HTTP request with retry logic and timeout"""
if headers is None:
headers = {"User-Agent": self.user_agent}
for attempt in range(self.max_retries):
try:
self._rate_limit()
if use_session:
response = self.session.get(url, headers=headers, timeout=self.request_timeout)
else:
response = requests.get(url, headers=headers, timeout=self.request_timeout)
response.raise_for_status()
return response
except requests.exceptions.Timeout:
print(f"Request timeout (attempt {attempt + 1}/{self.max_retries}): {url}")
if attempt == self.max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429: # Too Many Requests
wait_time = 2 ** attempt
print(f"Rate limited, waiting {wait_time}s (attempt {attempt + 1}/{self.max_retries})")
time.sleep(wait_time)
if attempt == self.max_retries - 1:
raise
else:
raise
except Exception as e:
print(f"Request error (attempt {attempt + 1}/{self.max_retries}): {e}")
if attempt == self.max_retries - 1:
raise
time.sleep(2 ** attempt)
return None
def _load_company_tickers(self, force_refresh=False):
"""Load and index company tickers data"""
cache_key = "company_tickers_json"
# Check if already loaded and cache is valid
if self._index_loaded and not force_refresh and self._is_cache_valid(cache_key):
return self._get_cached(cache_key)
# Check cache first
companies = self._get_cached(cache_key) if not force_refresh else None
if not companies:
try:
# Download company tickers
url = "https://www.sec.gov/files/company_tickers.json"
print(f"Downloading company tickers from SEC...")
response = self._make_request_with_retry(url)
if not response:
print("Failed to download company tickers")
return None
companies = response.json()
# Cache for 1 hour
self._set_cache(cache_key, companies)
print(f"Loaded {len(companies)} companies")
except Exception as e:
print(f"Error loading company tickers: {e}")
return None
else:
print(f"Using cached company tickers ({len(companies)} companies)")
# Build fast lookup indexes
self._ticker_index = {}
self._cik_index = {}
self._name_lower_index = {}
self._name_prefix_index = {}
self._ticker_prefix_index = {}
self._alias_index = {}
for _, company in companies.items():
cik = str(company["cik_str"]).zfill(10)
ticker = company["ticker"]
name = company["title"]
company_data = {
"cik": cik,
"name": name,
"ticker": ticker
}
# Index by ticker (lowercase for case-insensitive)
ticker_lower = ticker.lower()
self._ticker_index[ticker_lower] = company_data
# Index by CIK
self._cik_index[cik] = company_data
# Index by exact name (lowercase)
name_lower = name.lower()
self._name_lower_index[name_lower] = company_data
# Build prefix indexes for faster partial matching
# Name prefix index (use 3-character prefixes)
if len(name_lower) >= 3:
for i in range(len(name_lower) - 2):
prefix = name_lower[i:i+3]
if prefix not in self._name_prefix_index:
self._name_prefix_index[prefix] = []
self._name_prefix_index[prefix].append(company_data)
# Ticker prefix index (use 2-character prefixes for tickers)
if len(ticker_lower) >= 2:
prefix = ticker_lower[:2]
if prefix not in self._ticker_prefix_index:
self._ticker_prefix_index[prefix] = []
self._ticker_prefix_index[prefix].append(company_data)
# Build alias index for intelligent search
for alias, tickers in self._company_aliases.items():
for ticker in tickers:
ticker_lower = ticker.lower()
if ticker_lower in self._ticker_index:
self._alias_index[alias.lower()] = self._ticker_index[ticker_lower]
break # Use first matching ticker
self._index_loaded = True
print(f"Built indexes: {len(self._ticker_index)} tickers, {len(self._cik_index)} CIKs")
print(f"Built prefix indexes: {len(self._name_prefix_index)} name prefixes, {len(self._ticker_prefix_index)} ticker prefixes")
print(f"Built alias index: {len(self._alias_index)} common aliases")
return companies
def get_company_by_cik(self, cik):
"""Fast lookup of company info by CIK (from cached tickers)"""
# Ensure data is loaded
self._load_company_tickers()
# Normalize CIK
cik_normalized = str(cik).zfill(10)
# Fast index lookup
return self._cik_index.get(cik_normalized)
def get_company_by_ticker(self, ticker):
"""Fast lookup of company info by ticker"""
# Ensure data is loaded
self._load_company_tickers()
# Fast index lookup (case-insensitive)
return self._ticker_index.get(ticker.lower())
def search_company_by_name(self, company_name):
"""Search company CIK by company name with caching and optimized search"""
try:
# Load company tickers and build indexes
companies = self._load_company_tickers()
if not companies:
return None
# Prepare search input
search_name = company_name.lower().strip()
# Check search cache first
cache_key = f"search_{search_name}"
if cache_key in self._search_cache:
return self._search_cache[cache_key].copy() if self._search_cache[cache_key] else None
result = None
# Optimize: Use fast index lookups first
# Priority 1: Exact ticker match (fastest - O(1) hash lookup)
if search_name in self._ticker_index:
result = self._ticker_index[search_name].copy()
# Priority 2: Common alias match (intelligent search - O(1))
elif search_name in self._alias_index:
result = self._alias_index[search_name].copy()
print(f"Alias match: '{company_name}' → {result.get('ticker')} ({result.get('name')})")
# Priority 3: Exact name match (fast - O(1) hash lookup)
elif search_name in self._name_lower_index:
result = self._name_lower_index[search_name].copy()
# Priority 4: Exact CIK match (fast - O(1) hash lookup)
# Handle CIK input (8-10 digits)
elif search_name.isdigit() and len(search_name) >= 8:
cik_normalized = search_name.zfill(10)
if cik_normalized in self._cik_index:
result = self._cik_index[cik_normalized].copy()
# Priority 5: Prefix-based partial matches (optimized with prefix indexes)
if not result:
result = self._search_with_prefix_index(search_name)
# Cache the result (even if None)
self._cache_search_result(cache_key, result)
return result.copy() if result else None
except Exception as e:
print(f"Error searching company: {e}")
return None
def _search_with_prefix_index(self, search_name):
"""Optimized partial match search using prefix indexes"""
candidates = set()
# Strategy 1: Try ticker prefix match if search term looks like ticker
if len(search_name) <= 5:
# Use ticker prefix index
if len(search_name) >= 2:
prefix = search_name[:2]
if prefix in self._ticker_prefix_index:
for company_data in self._ticker_prefix_index[prefix]:
ticker_lower = company_data["ticker"].lower()
if search_name in ticker_lower:
# Exact prefix match in ticker - highest priority
if ticker_lower.startswith(search_name):
return company_data
candidates.add((company_data["cik"], company_data["name"], company_data["ticker"]))
# Strategy 2: Use name prefix index for name-based search
if len(search_name) >= 3:
# Try first 3 characters as prefix
prefix = search_name[:3]
if prefix in self._name_prefix_index:
for company_data in self._name_prefix_index[prefix]:
name_lower = company_data["name"].lower()
# Check if search term is in the name
if search_name in name_lower:
# Exact prefix match - highest priority
if name_lower.startswith(search_name):
return company_data
candidates.add((company_data["cik"], company_data["name"], company_data["ticker"]))
# Strategy 3: If prefix index didn't help (search term in middle of name),
# do limited iteration on a subset of companies
if not candidates and len(search_name) >= 3:
# Only scan companies whose names contain the first 3 chars anywhere
scan_limit = 0
for prefix_key, company_list in self._name_prefix_index.items():
if search_name[:3] in prefix_key:
for company_data in company_list:
name_lower = company_data["name"].lower()
ticker_lower = company_data["ticker"].lower()
if search_name in name_lower or search_name in ticker_lower:
candidates.add((company_data["cik"], company_data["name"], company_data["ticker"]))
scan_limit += 1
if scan_limit > 1000: # Limit scan to avoid performance issues
break
if scan_limit > 1000:
break
# Return first candidate if found
if candidates:
cik, name, ticker = next(iter(candidates))
return {"cik": cik, "name": name, "ticker": ticker}
return None
def _cache_search_result(self, cache_key, result):
"""Cache search result with size limit"""
# Implement LRU-like behavior: if cache is full, clear oldest half
if len(self._search_cache) >= self._search_cache_max_size:
# Simple strategy: clear half of the cache
keys_to_remove = list(self._search_cache.keys())[:self._search_cache_max_size // 2]
for key in keys_to_remove:
del self._search_cache[key]
self._search_cache[cache_key] = result
def _get_period_cache(self, cache_key):
"""Get cached period data if valid (Layer 3)"""
if cache_key not in self._period_cache_timestamps:
return None
age = time.time() - self._period_cache_timestamps[cache_key]
if age < self._period_cache_ttl:
return self._period_cache.get(cache_key)
else:
# Expired, remove from cache
self._period_cache.pop(cache_key, None)
self._period_cache_timestamps.pop(cache_key, None)
return None
def _set_period_cache(self, cache_key, result):
"""Cache period data with size limit (Layer 3)"""
# LRU-like eviction if cache is full
if len(self._period_cache) >= self._period_cache_max_size:
# Remove oldest half
keys_to_remove = list(self._period_cache.keys())[:self._period_cache_max_size // 2]
for key in keys_to_remove:
self._period_cache.pop(key, None)
self._period_cache_timestamps.pop(key, None)
self._period_cache[cache_key] = result
self._period_cache_timestamps[cache_key] = time.time()
def get_company_info(self, cik):
"""
Get basic company information with caching
Args:
cik (str): Company CIK code
Returns:
dict: Dictionary containing company information
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return None
# Check cache first
cache_key = f"info_{cik}"
cached = self._get_cached(cache_key)
if cached:
return cached
try:
# Add timeout wrapper for sec-edgar-api calls
import signal
def timeout_handler(signum, frame):
raise TimeoutError("SEC API call timeout")
# Set alarm for 45 seconds (only works on Unix-like systems)
try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(45) # Increased timeout
submissions = self.edgar.get_submissions(cik=cik)
signal.alarm(0) # Cancel alarm
except AttributeError:
# Windows doesn't support SIGALRM, use direct call
submissions = self.edgar.get_submissions(cik=cik)
result = {
"cik": cik,
"name": submissions.get("name", ""),
"tickers": submissions.get("tickers", []),
"sic": submissions.get("sic", ""),
"sic_description": submissions.get("sicDescription", "")
}
# Cache the result
self._set_cache(cache_key, result)
return result
except TimeoutError:
print(f"Timeout getting company info for CIK: {cik}")
return None
except Exception as e:
print(f"Error getting company info: {e}")
return None
def get_company_filings(self, cik, form_types=None):
"""
Get all company filing documents with caching
Args:
cik (str): Company CIK code
form_types (list): List of form types, e.g., ['10-K', '10-Q'], None for all types
Returns:
list: List of filing documents
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return []
# Check cache first (cache all filings, filter later)
cache_key = f"filings_{cik}"
cached = self._get_cached(cache_key)
if not cached:
try:
# Add timeout wrapper
import signal
def timeout_handler(signum, frame):
raise TimeoutError("SEC API call timeout")
try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(45) # Increased timeout
submissions = self.edgar.get_submissions(cik=cik)
signal.alarm(0)
except AttributeError:
# Windows fallback
submissions = self.edgar.get_submissions(cik=cik)
# Extract filing information
filings = []
recent = submissions.get("filings", {}).get("recent", {})
# Get data from each field
form_types_list = recent.get("form", [])
filing_dates = recent.get("filingDate", [])
accession_numbers = recent.get("accessionNumber", [])
primary_documents = recent.get("primaryDocument", [])
# Iterate through all filings
for i in range(len(form_types_list)):
filing_date = filing_dates[i] if i < len(filing_dates) else ""
accession_number = accession_numbers[i] if i < len(accession_numbers) else ""
primary_document = primary_documents[i] if i < len(primary_documents) else ""
filing = {
"form_type": form_types_list[i],
"filing_date": filing_date,
"accession_number": accession_number,
"primary_document": primary_document
}
filings.append(filing)
# Cache all filings
self._set_cache(cache_key, filings)
cached = filings
except TimeoutError:
print(f"Timeout getting company filings for CIK: {cik}")
return []
except Exception as e:
print(f"Error getting company filings: {e}")
return []
# Filter by form type if specified
if form_types:
return [f for f in cached if f.get("form_type") in form_types]
return cached
def get_company_facts(self, cik):
"""
Get all company financial facts data with caching and timeout
Args:
cik (str): Company CIK code
Returns:
dict: Company financial facts data
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return {}
# Check cache first
cache_key = f"facts_{cik}"
cached = self._get_cached(cache_key)
if cached:
return cached
try:
# Add timeout wrapper
import signal
def timeout_handler(signum, frame):
raise TimeoutError("SEC API call timeout")
try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(60) # 60 seconds for facts (larger dataset)
facts = self.edgar.get_company_facts(cik=cik)
signal.alarm(0)
except AttributeError:
# Windows fallback
facts = self.edgar.get_company_facts(cik=cik)
# Cache the result
self._set_cache(cache_key, facts)
return facts
except TimeoutError:
print(f"Timeout getting company facts for CIK: {cik}")
return {}
except Exception as e:
print(f"Error getting company facts: {e}")
return {}
def get_financial_data_for_period(self, cik, period):
"""
Get financial data for a specific period (supports annual and quarterly)
Args:
cik (str): Company CIK code
period (str): Period in format 'YYYY' or 'YYYYQX' (e.g., '2025' or '2025Q3')
Returns:
dict: Financial data dictionary
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return {}
# Check period cache first (Layer 3)
cache_key = f"period_{cik}_{period}"
cached = self._get_period_cache(cache_key)
if cached is not None:
print(f"[Cache Hit] get_financial_data_for_period({cik}, {period})")
return cached.copy() # Return copy to avoid mutation
try:
# Get company financial facts
facts = self.get_company_facts(cik)
if not facts:
return {}
# Extract us-gaap and ifrs-full financial data (20-F may use IFRS)
us_gaap = facts.get("facts", {}).get("us-gaap", {})
ifrs_full = facts.get("facts", {}).get("ifrs-full", {})
# Define financial metrics and their XBRL tags
# Include multiple possible tags to improve match rate (including US-GAAP and IFRS tags)
financial_metrics = {
"total_revenue": ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "RevenueFromContractWithCustomerIncludingAssessedTax", "SalesRevenueNet", "RevenueFromContractWithCustomer", "Revenue"],
"net_income": ["NetIncomeLoss", "ProfitLoss", "NetIncome", "ProfitLossAttributableToOwnersOfParent"],
"earnings_per_share": ["EarningsPerShareBasic", "EarningsPerShare", "BasicEarningsPerShare", "BasicEarningsLossPerShare"],
"operating_expenses": ["OperatingExpenses", "OperatingCostsAndExpenses", "OperatingExpensesExcludingDepreciationAndAmortization", "CostsAndExpenses", "GeneralAndAdministrativeExpense", "CostOfRevenue", "ResearchAndDevelopmentExpense", "SellingAndMarketingExpense"],
"operating_cash_flow": ["NetCashProvidedByUsedInOperatingActivities", "NetCashProvidedUsedInOperatingActivities", "NetCashFlowsFromUsedInOperatingActivities", "CashFlowsFromUsedInOperatingActivities"],
}
# Determine target form types to search
if 'Q' in period:
# Quarterly data, mainly search 10-Q (20-F usually doesn't have quarterly reports)
target_forms = ["10-Q"]
target_forms_annual = ["10-K", "20-F"] # for fallback
year = int(period.split('Q')[0])
quarter = period.split('Q')[1]
else:
# Annual data, search 10-K and 20-F annual forms
target_forms = ["10-K", "20-F"]
target_forms_annual = target_forms
year = int(period)
quarter = None
# Store result with consolidated meta and sources (added for de-duplication)
result = {
"period": period,
"meta": {
"year": year,
"quarter": quarter,
"is_20f_filer": False, # will set below
"primary_source": {} # Common source info for all metrics in this period
},
"sources": {} # Per-metric source info (only if differs from primary)
}
# Detect if company uses 20-F (foreign filer)
is_20f_filer = False
all_filings = self.get_company_filings(cik)
if all_filings:
form_types_used = set(f.get('form_type', '') for f in all_filings[:20])
if '20-F' in form_types_used and '10-K' not in form_types_used:
is_20f_filer = True
# Reflect in meta
result["meta"]["is_20f_filer"] = is_20f_filer
# Get company filings to find accession number and primary document
filings = self.get_company_filings(cik, form_types=target_forms)
filings_map = {} # Map: form_year -> {accession_number, primary_document, filing_date, form_type}
# Build filing map for quick lookup
for filing in filings:
form_type = filing.get("form_type", "")
filing_date = filing.get("filing_date", "")
accession_number = filing.get("accession_number", "")
primary_document = filing.get("primary_document", "")
if filing_date and accession_number:
# Extract year from filing_date (format: YYYY-MM-DD)
file_year = int(filing_date[:4]) if len(filing_date) >= 4 else 0
# Store filing if it matches the period year
# For 20-F, also check year-1 (fiscal year may differ from filing year)
if file_year == year or (is_20f_filer and form_type == '20-F' and file_year in [year - 1, year + 1]):
key = f"{form_type}_{file_year}"
if key not in filings_map:
filings_map[key] = {
"accession_number": accession_number,
"primary_document": primary_document,
"form_type": form_type,
"filing_date": filing_date,
"file_year": file_year
}
# Iterate through each financial metric
for metric_key, metric_tags in financial_metrics.items():
# Support multiple possible tags
for metric_tag in metric_tags:
# Search both US-GAAP and IFRS tags
# For 20-F filers, prioritize IFRS
metric_data = None
data_source = None
if is_20f_filer:
# Check IFRS first for 20-F filers
if metric_tag in ifrs_full:
metric_data = ifrs_full[metric_tag]
data_source = "ifrs-full"
elif metric_tag in us_gaap:
metric_data = us_gaap[metric_tag]
data_source = "us-gaap"
else:
# Check US-GAAP first for 10-K filers
if metric_tag in us_gaap:
metric_data = us_gaap[metric_tag]
data_source = "us-gaap"
elif metric_tag in ifrs_full:
metric_data = ifrs_full[metric_tag]
data_source = "ifrs-full"
if metric_data:
units = metric_data.get("units", {})
# Find USD unit data (supports USD and USD/shares)
usd_data = None
if "USD" in units:
usd_data = units["USD"]
elif "USD/shares" in units and metric_key == "earnings_per_share":
# EPS uses USD/shares unit
usd_data = units["USD/shares"]
if usd_data:
# Try exact match first, then loose match
matched_entry = None
# Search for data in the specified period
for entry in usd_data:
form = entry.get("form", "")
fy = entry.get("fy", 0)
fp = entry.get("fp", "")
end_date = entry.get("end", "")
if not end_date or len(end_date) < 4:
continue
entry_year = int(end_date[:4])
# Check if form type matches
if form in target_forms:
if quarter:
# Quarterly data match
if entry_year == year and fp == f"Q{quarter}":
# If already matched, compare end date, choose the latest
if matched_entry:
if entry.get("end", "") > matched_entry.get("end", ""):
matched_entry = entry
else:
matched_entry = entry
else:
# Annual data match - prioritize fiscal year (fy) field
# Strategy 1: Exact match by fiscal year
if fy == year and (fp == "FY" or fp == "" or not fp):
# If already matched, compare end date, choose the latest
if matched_entry:
if entry.get("end", "") > matched_entry.get("end", ""):
matched_entry = entry
else:
matched_entry = entry
# Strategy 2: Match by end date year (when fy not available or doesn't match)
elif not matched_entry and entry_year == year and (fp == "FY" or fp == "" or not fp):
matched_entry = entry
# Strategy 3: Allow fy to differ by 1 year (fiscal year vs calendar year mismatch)
elif not matched_entry and fy > 0 and abs(fy - year) <= 1 and (fp == "FY" or fp == "" or not fp):
matched_entry = entry
# Strategy 4: Enhanced matching for 20-F - check frame field and end date
elif not matched_entry and form == "20-F":
frame = entry.get("frame", "")
# Match if CY{year} in frame OR end date contains year OR fiscal year within range
if (f"CY{year}" in frame or
(str(year) in end_date and len(end_date) >= 4 and end_date[:4] == str(year)) or
(fy > 0 and abs(fy - year) <= 1)):
# Additional check: prefer entries with FY period
if fp == "FY" or fp == "" or not fp:
matched_entry = entry
# If quarterly data not found, try finding from annual report (fallback strategy)
if not matched_entry and quarter and target_forms_annual:
for entry in usd_data:
form = entry.get("form", "")
end_date = entry.get("end", "")
fp = entry.get("fp", "")
if form in target_forms_annual and end_date:
# Check if end date is within this quarter range
if str(year) in end_date and f"Q{quarter}" in fp:
matched_entry = entry
break
# Apply matched data
if matched_entry:
result[metric_key] = matched_entry.get("val", 0)
# Get form and accession info
form_type = matched_entry.get("form", "")
accn_from_facts = matched_entry.get('accn', '').replace('-', '')
filed_date = matched_entry.get('filed', '')
filed_year = int(filed_date[:4]) if filed_date and len(filed_date) >= 4 else year
# Try to get accession_number and primary_document from filings
# For 20-F, try multiple year keys since filing year may differ
filing_info = None
possible_keys = [f"{form_type}_{year}"]
if form_type == "20-F":
possible_keys.extend([f"20-F_{filed_year}", f"20-F_{year-1}", f"20-F_{year+1}"])
for filing_key in possible_keys:
if filing_key in filings_map:
filing_info = filings_map[filing_key]
break
if filing_info:
# Use filing info from get_company_filings
accession_number = filing_info["accession_number"].replace('-', '')
primary_document = filing_info["primary_document"]
# Generate complete source URL
if primary_document:
url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_number}/{primary_document}"
else:
url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100"
else:
# Fallback to company browse page if filing not found
url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100"
# Backward compatible: only set once to avoid later overwrites
if "source_url" not in result:
result["source_url"] = url
result["source_form"] = form_type
result["data_source"] = data_source
# Set primary source info (common for all metrics in this period)
result["meta"]["primary_source"] = {
"url": url,
"form": form_type,
"data_source": data_source,
"filed": matched_entry.get("filed", ""),
"accn": matched_entry.get("accn", ""),
"fy": matched_entry.get("fy", 0),
"fp": matched_entry.get("fp", ""),
"frame": matched_entry.get("frame", ""),
"start": matched_entry.get("start", ""),
"end": matched_entry.get("end", "")
}
else:
# Only add per-metric source if it differs from primary
primary_src = result["meta"]["primary_source"]
if (url != primary_src.get("url") or
form_type != primary_src.get("form") or
data_source != primary_src.get("data_source")):
result["sources"][metric_key] = {
"url": url,
"form": form_type,
"data_source": data_source,
"filed": matched_entry.get("filed", "")
}
# Simplified details: only metric-specific info (tag and val)
# All common fields (form, fy, fp, accn, filed, frame, data_source, start, end)
# are now in meta.primary_source
result[f"{metric_key}_details"] = {
"tag": metric_tag,
"val": matched_entry.get("val", 0)
}
# If data is found, break out of tag loop
if metric_key in result:
break
# Cache the result (Layer 3)
self._set_period_cache(cache_key, result)
return result
except Exception as e:
print(f"Error getting financial data for period {period}: {e}")
return {}
|