JC321 commited on
Commit
74fcd29
·
verified ·
1 Parent(s): 875213d

Upload 3 files

Browse files
Files changed (3) hide show
  1. edgar_client.py +112 -43
  2. financial_analyzer.py +53 -19
  3. mcp_server_sse.py +9 -0
edgar_client.py CHANGED
@@ -43,9 +43,16 @@ class EdgarDataClient:
43
 
44
  # Cache for frequently accessed data
45
  self._company_cache = {} # Cache company info to avoid repeated calls
46
- self._cache_ttl = 300 # 5 minutes cache TTL
 
47
  self._cache_timestamps = {}
48
 
 
 
 
 
 
 
49
  if EdgarClient:
50
  self.edgar = EdgarClient(user_agent=user_agent)
51
  else:
@@ -68,7 +75,9 @@ class EdgarDataClient:
68
  if cache_key not in self._cache_timestamps:
69
  return False
70
  age = time.time() - self._cache_timestamps[cache_key]
71
- return age < self._cache_ttl
 
 
72
 
73
  def _get_cached(self, cache_key):
74
  """Get cached data if valid"""
@@ -116,64 +125,124 @@ class EdgarDataClient:
116
  time.sleep(2 ** attempt)
117
 
118
  return None
 
 
 
 
119
 
120
- def search_company_by_name(self, company_name):
121
- """Search company CIK by company name with caching and optimized search"""
122
- try:
123
- # Check cache for company_tickers.json
124
- cache_key = "company_tickers_json"
125
- companies = self._get_cached(cache_key)
126
-
127
- if not companies:
128
- # Use SEC company ticker database
 
129
  url = "https://www.sec.gov/files/company_tickers.json"
 
130
 
131
  response = self._make_request_with_retry(url)
132
  if not response:
 
133
  return None
134
 
135
  companies = response.json()
136
- # Cache the entire company list for 5 minutes
137
  self._set_cache(cache_key, companies)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
 
139
  # Prepare search input
140
  search_name = company_name.lower().strip()
141
 
142
- # Optimize: Use early return for exact matches
143
- # First pass: Look for exact ticker or exact name match (fastest)
144
- for _, company in companies.items():
145
- company_ticker = company["ticker"].lower()
146
- company_title = company["title"].lower()
147
-
148
- # Exact ticker match (highest priority) - return immediately
149
- if search_name == company_ticker:
150
- return {
151
- "cik": str(company["cik_str"]).zfill(10),
152
- "name": company["title"],
153
- "ticker": company["ticker"]
154
- }
155
-
156
- # Exact name match - return immediately
157
- if search_name == company_title:
158
- return {
159
- "cik": str(company["cik_str"]).zfill(10),
160
- "name": company["title"],
161
- "ticker": company["ticker"]
162
- }
163
 
164
- # Second pass: Look for partial matches (only if no exact match found)
 
165
  matches = []
166
- for _, company in companies.items():
167
- company_title = company["title"].lower()
168
- company_ticker = company["ticker"].lower()
169
 
170
  # Partial match in name or ticker
171
- if search_name in company_title or search_name in company_ticker:
172
- matches.append({
173
- "cik": str(company["cik_str"]).zfill(10),
174
- "name": company["title"],
175
- "ticker": company["ticker"]
176
- })
177
  # Optimize: Stop after finding 10 matches to avoid scanning all 13,000+
178
  if len(matches) >= 10:
179
  break
 
43
 
44
  # Cache for frequently accessed data
45
  self._company_cache = {} # Cache company info to avoid repeated calls
46
+ self._cache_ttl = 300 # 5 minutes cache TTL (for company info)
47
+ self._tickers_cache_ttl = 3600 # 1 hour for company tickers (rarely changes)
48
  self._cache_timestamps = {}
49
 
50
+ # Fast lookup indexes for company tickers
51
+ self._ticker_index = {} # ticker -> company data
52
+ self._cik_index = {} # cik -> company data
53
+ self._name_lower_index = {} # lowercase name -> company data
54
+ self._index_loaded = False
55
+
56
  if EdgarClient:
57
  self.edgar = EdgarClient(user_agent=user_agent)
58
  else:
 
75
  if cache_key not in self._cache_timestamps:
76
  return False
77
  age = time.time() - self._cache_timestamps[cache_key]
78
+ # Use longer TTL for company tickers list
79
+ ttl = self._tickers_cache_ttl if cache_key == "company_tickers_json" else self._cache_ttl
80
+ return age < ttl
81
 
82
  def _get_cached(self, cache_key):
83
  """Get cached data if valid"""
 
125
  time.sleep(2 ** attempt)
126
 
127
  return None
128
+
129
+ def _load_company_tickers(self, force_refresh=False):
130
+ """Load and index company tickers data"""
131
+ cache_key = "company_tickers_json"
132
 
133
+ # Check if already loaded and cache is valid
134
+ if self._index_loaded and not force_refresh and self._is_cache_valid(cache_key):
135
+ return self._get_cached(cache_key)
136
+
137
+ # Check cache first
138
+ companies = self._get_cached(cache_key) if not force_refresh else None
139
+
140
+ if not companies:
141
+ try:
142
+ # Download company tickers
143
  url = "https://www.sec.gov/files/company_tickers.json"
144
+ print(f"Downloading company tickers from SEC...")
145
 
146
  response = self._make_request_with_retry(url)
147
  if not response:
148
+ print("Failed to download company tickers")
149
  return None
150
 
151
  companies = response.json()
152
+ # Cache for 1 hour
153
  self._set_cache(cache_key, companies)
154
+ print(f"Loaded {len(companies)} companies")
155
+ except Exception as e:
156
+ print(f"Error loading company tickers: {e}")
157
+ return None
158
+ else:
159
+ print(f"Using cached company tickers ({len(companies)} companies)")
160
+
161
+ # Build fast lookup indexes
162
+ self._ticker_index = {}
163
+ self._cik_index = {}
164
+ self._name_lower_index = {}
165
+
166
+ for _, company in companies.items():
167
+ cik = str(company["cik_str"]).zfill(10)
168
+ ticker = company["ticker"]
169
+ name = company["title"]
170
+
171
+ company_data = {
172
+ "cik": cik,
173
+ "name": name,
174
+ "ticker": ticker
175
+ }
176
+
177
+ # Index by ticker (lowercase for case-insensitive)
178
+ self._ticker_index[ticker.lower()] = company_data
179
+
180
+ # Index by CIK
181
+ self._cik_index[cik] = company_data
182
+
183
+ # Index by exact name (lowercase)
184
+ self._name_lower_index[name.lower()] = company_data
185
+
186
+ self._index_loaded = True
187
+ print(f"Built indexes: {len(self._ticker_index)} tickers, {len(self._cik_index)} CIKs")
188
+ return companies
189
+
190
+ def get_company_by_cik(self, cik):
191
+ """Fast lookup of company info by CIK (from cached tickers)"""
192
+ # Ensure data is loaded
193
+ self._load_company_tickers()
194
+
195
+ # Normalize CIK
196
+ cik_normalized = str(cik).zfill(10)
197
+
198
+ # Fast index lookup
199
+ return self._cik_index.get(cik_normalized)
200
+
201
+ def get_company_by_ticker(self, ticker):
202
+ """Fast lookup of company info by ticker"""
203
+ # Ensure data is loaded
204
+ self._load_company_tickers()
205
+
206
+ # Fast index lookup (case-insensitive)
207
+ return self._ticker_index.get(ticker.lower())
208
+
209
+ def search_company_by_name(self, company_name):
210
+ """Search company CIK by company name with caching and optimized search"""
211
+ try:
212
+ # Load company tickers and build indexes
213
+ companies = self._load_company_tickers()
214
+
215
+ if not companies:
216
+ return None
217
 
218
  # Prepare search input
219
  search_name = company_name.lower().strip()
220
 
221
+ # Optimize: Use fast index lookups first
222
+ # Priority 1: Exact ticker match (fastest - O(1) hash lookup)
223
+ if search_name in self._ticker_index:
224
+ return self._ticker_index[search_name].copy()
225
+
226
+ # Priority 2: Exact name match (fast - O(1) hash lookup)
227
+ if search_name in self._name_lower_index:
228
+ return self._name_lower_index[search_name].copy()
229
+
230
+ # Priority 3: Exact CIK match (fast - O(1) hash lookup)
231
+ # Handle CIK input (8-10 digits)
232
+ if search_name.isdigit() and len(search_name) >= 8:
233
+ cik_normalized = search_name.zfill(10)
234
+ if cik_normalized in self._cik_index:
235
+ return self._cik_index[cik_normalized].copy()
 
 
 
 
 
 
236
 
237
+ # Priority 4: Partial matches (slower - requires iteration)
238
+ # Only execute if exact matches fail
239
  matches = []
240
+ for ticker_lower, company_data in self._ticker_index.items():
241
+ name_lower = company_data["name"].lower()
 
242
 
243
  # Partial match in name or ticker
244
+ if search_name in name_lower or search_name in ticker_lower:
245
+ matches.append(company_data.copy())
 
 
 
 
246
  # Optimize: Stop after finding 10 matches to avoid scanning all 13,000+
247
  if len(matches) >= 10:
248
  break
financial_analyzer.py CHANGED
@@ -28,40 +28,74 @@ class FinancialAnalyzer:
28
  # Strip whitespace
29
  company_input = company_input.strip()
30
 
31
- # Strategy 1: If input is numeric and looks like CIK (8-10 digits), treat as CIK
32
  if company_input.isdigit() and len(company_input) >= 8:
33
- # Pad to 10 digits if needed
34
  cik = company_input.zfill(10)
35
- company_info = self.edgar_client.get_company_info(cik)
36
- if company_info:
37
- return company_info
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38
  else:
39
- return {"error": "Company not found for specified CIK"}
 
 
 
 
 
 
 
 
 
40
 
41
- # Strategy 2: Search by name/ticker
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  # This returns basic info: {cik, name, ticker}
43
  basic_info = self.edgar_client.search_company_by_name(company_input)
44
 
45
  if not basic_info:
46
  return {"error": "No matching company found"}
47
 
48
- # Strategy 3: Decide whether to fetch detailed info
49
- # If user input is short (likely a ticker), return enriched basic info quickly
50
- # If user input is long (likely a full name), get detailed info
51
-
52
- input_length = len(company_input)
53
- is_likely_ticker = input_length <= 5 and company_input.isupper()
54
-
55
- # For ticker searches, return enriched basic info without additional API call
56
  if is_likely_ticker:
57
- # Quick response with basic info + some enrichment
58
  return {
59
  "cik": basic_info['cik'],
60
  "name": basic_info['name'],
61
  "tickers": [basic_info['ticker']] if basic_info.get('ticker') else [],
62
- "ein": None, # Not available in basic search
63
- "fiscal_year_end": None, # Not available in basic search
64
- "sic_description": None, # Not available in basic search
65
  "_source": "quick_search",
66
  "_note": "Basic info from ticker search. Use get_company_info for full details."
67
  }
 
28
  # Strip whitespace
29
  company_input = company_input.strip()
30
 
31
+ # Strategy 1: If input is numeric and looks like CIK (8-10 digits), use fast CIK lookup
32
  if company_input.isdigit() and len(company_input) >= 8:
33
+ # Normalize CIK to 10 digits
34
  cik = company_input.zfill(10)
35
+
36
+ # Try fast lookup first (from cached tickers)
37
+ basic_info = self.edgar_client.get_company_by_cik(cik)
38
+
39
+ if basic_info:
40
+ # Fast path succeeded, now get detailed info
41
+ company_info = self.edgar_client.get_company_info(cik)
42
+ if company_info:
43
+ return company_info
44
+ else:
45
+ # Fallback to basic info if detailed fetch fails
46
+ return {
47
+ "cik": basic_info['cik'],
48
+ "name": basic_info['name'],
49
+ "tickers": [basic_info['ticker']] if basic_info.get('ticker') else [],
50
+ "_source": "basic_cik_lookup"
51
+ }
52
  else:
53
+ # CIK not found in cache, try full API call
54
+ company_info = self.edgar_client.get_company_info(cik)
55
+ if company_info:
56
+ return company_info
57
+ else:
58
+ return {"error": "Company not found for specified CIK"}
59
+
60
+ # Strategy 2: Check if it looks like a ticker (short uppercase)
61
+ input_length = len(company_input)
62
+ is_likely_ticker = input_length <= 5 and company_input.isupper()
63
 
64
+ if is_likely_ticker:
65
+ # Try fast ticker lookup first
66
+ basic_info = self.edgar_client.get_company_by_ticker(company_input)
67
+
68
+ if basic_info:
69
+ # Fast ticker lookup succeeded - return enriched basic info
70
+ return {
71
+ "cik": basic_info['cik'],
72
+ "name": basic_info['name'],
73
+ "tickers": [basic_info['ticker']] if basic_info.get('ticker') else [],
74
+ "ein": None, # Not available in basic search
75
+ "fiscal_year_end": None, # Not available in basic search
76
+ "sic_description": None, # Not available in basic search
77
+ "_source": "quick_ticker_search",
78
+ "_note": "Basic info from ticker search. Use get_company_info for full details."
79
+ }
80
+
81
+ # Strategy 3: General search by name/ticker
82
  # This returns basic info: {cik, name, ticker}
83
  basic_info = self.edgar_client.search_company_by_name(company_input)
84
 
85
  if not basic_info:
86
  return {"error": "No matching company found"}
87
 
88
+ # Strategy 4: Decide whether to fetch detailed info
89
+ # For ticker-like searches, return basic info quickly
 
 
 
 
 
 
90
  if is_likely_ticker:
91
+ # Quick response with basic info
92
  return {
93
  "cik": basic_info['cik'],
94
  "name": basic_info['name'],
95
  "tickers": [basic_info['ticker']] if basic_info.get('ticker') else [],
96
+ "ein": None,
97
+ "fiscal_year_end": None,
98
+ "sic_description": None,
99
  "_source": "quick_search",
100
  "_note": "Basic info from ticker search. Use get_company_info for full details."
101
  }
mcp_server_sse.py CHANGED
@@ -78,6 +78,15 @@ financial_analyzer = FinancialAnalyzer(
78
  user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"
79
  )
80
 
 
 
 
 
 
 
 
 
 
81
 
82
  # ==================== MCP Protocol Implementation ====================
83
 
 
78
  user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"
79
  )
80
 
81
+ # Preload company tickers data on startup for better performance
82
+ print("[Startup] Preloading company tickers data...")
83
+ try:
84
+ edgar_client._load_company_tickers()
85
+ print("[Startup] Company tickers preloaded successfully")
86
+ except Exception as e:
87
+ print(f"[Startup] Warning: Failed to preload company tickers: {e}")
88
+ print("[Startup] Will load on first request")
89
+
90
 
91
  # ==================== MCP Protocol Implementation ====================
92