JC321 commited on
Commit
dd9b9f0
·
verified ·
1 Parent(s): b450591

Upload edgar_client.py

Browse files
Files changed (1) hide show
  1. edgar_client.py +200 -66
edgar_client.py CHANGED
@@ -12,6 +12,8 @@ import time
12
  import threading
13
  from functools import lru_cache
14
  from datetime import datetime, timedelta
 
 
15
 
16
 
17
  class EdgarDataClient:
@@ -26,10 +28,40 @@ class EdgarDataClient:
26
  _rate_limit_lock = threading.Lock()
27
  _min_request_interval = 0.11 # 110ms between requests (9 req/sec, safe margin)
28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"):
30
  """Initialize EDGAR client with connection pooling and timeout"""
31
  self.user_agent = user_agent
32
 
 
 
 
33
  # Configure requests session with connection pooling
34
  self.session = requests.Session()
35
 
@@ -131,76 +163,178 @@ class EdgarDataClient:
131
  time.sleep(sleep_time)
132
 
133
  EdgarDataClient._last_request_time = time.time()
134
-
135
- def search_company_by_name(self, company_name):
136
- """Search company CIK by company name with caching and optimized ticker matching"""
137
- try:
138
- # Check cache first
139
- with self._cache_lock:
140
- current_time = time.time()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
141
 
142
- # If cache is valid, use it
143
- if (EdgarDataClient._company_tickers_cache is not None and
144
- EdgarDataClient._company_tickers_cache_time is not None and
145
- current_time - EdgarDataClient._company_tickers_cache_time < self._company_tickers_cache_ttl):
146
- companies = EdgarDataClient._company_tickers_cache
147
- else:
148
- # Cache miss or expired, fetch new data
149
- self._rate_limit()
150
- url = "https://www.sec.gov/files/company_tickers.json"
151
- headers = {"User-Agent": self.user_agent}
152
-
153
- response = self.session.get(url, headers=headers, timeout=self.timeout)
154
- response.raise_for_status()
155
 
156
- companies = response.json()
 
 
157
 
158
- # Update cache
159
- EdgarDataClient._company_tickers_cache = companies
160
- EdgarDataClient._company_tickers_cache_time = current_time
161
-
162
- # ✅ OPTIMIZATION 1: Prioritize exact ticker match (fastest path)
163
- search_name_upper = company_name.upper().strip()
164
- for _, company in companies.items():
165
- if company["ticker"].upper() == search_name_upper:
166
- # Exact ticker match - return immediately
167
- return {
168
- "cik": str(company["cik_str"]).zfill(10),
169
- "name": company["title"],
170
- "ticker": company["ticker"]
171
- }
172
-
173
- # ✅ OPTIMIZATION 2: Search for matching company names
174
- matches = []
175
- exact_matches = []
176
- search_name_lower = company_name.lower()
177
-
178
- for _, company in companies.items():
179
- company_title = company["title"].lower()
180
- ticker_lower = company["ticker"].lower()
181
 
182
- # Exact match
183
- if search_name_lower == company_title:
184
- exact_matches.append({
185
- "cik": str(company["cik_str"]).zfill(10),
186
- "name": company["title"],
187
- "ticker": company["ticker"]
188
- })
189
- # Partial match (name or ticker contains search term)
190
- elif search_name_lower in company_title or search_name_lower in ticker_lower:
191
- matches.append({
192
- "cik": str(company["cik_str"]).zfill(10),
193
- "name": company["title"],
194
- "ticker": company["ticker"]
195
- })
196
-
197
- # Return exact match first, then partial match
198
- if exact_matches:
199
- return exact_matches[0]
200
- elif matches:
201
- return matches[0]
202
- else:
203
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
204
 
205
  except TimeoutError as e:
206
  print(f"Timeout searching company: {e}")
 
12
  import threading
13
  from functools import lru_cache
14
  from datetime import datetime, timedelta
15
+ import re
16
+ import difflib
17
 
18
 
19
  class EdgarDataClient:
 
28
  _rate_limit_lock = threading.Lock()
29
  _min_request_interval = 0.11 # 110ms between requests (9 req/sec, safe margin)
30
 
31
+ # 新增:公司索引(加速搜索,避免每次遍历全量数据)
32
+ _by_ticker = None # ticker -> company info
33
+ _by_title = None # title (lowercase) -> company info
34
+ _by_title_norm = None # normalized title -> company info
35
+ _all_keys = None # 用于模糊匹配的所有key列表
36
+ _index_built_time = None
37
+ _index_ttl = 3600 # 1 hour
38
+
39
+ # 新增:常见别名映射(提升搜索智能性)
40
+ _alias_map = {
41
+ "google": "alphabet inc",
42
+ "alphabet": "alphabet inc",
43
+ "facebook": "meta platforms, inc.",
44
+ "meta": "meta platforms, inc.",
45
+ "amazon": "amazon.com, inc.",
46
+ "apple": "apple inc.",
47
+ "microsoft": "microsoft corporation",
48
+ "netflix": "netflix, inc.",
49
+ "nvidia": "nvidia corporation",
50
+ "tesla": "tesla, inc.",
51
+ "adobe": "adobe inc.",
52
+ "oracle": "oracle corporation",
53
+ "ibm": "international business machines corporation",
54
+ "paypal": "paypal holdings, inc.",
55
+ "shopify": "shopify inc.",
56
+ }
57
+
58
  def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"):
59
  """Initialize EDGAR client with connection pooling and timeout"""
60
  self.user_agent = user_agent
61
 
62
+ # 新增:实例级搜索缓存(进一步减少重复搜索开销)
63
+ self._search_cache = {}
64
+
65
  # Configure requests session with connection pooling
66
  self.session = requests.Session()
67
 
 
163
  time.sleep(sleep_time)
164
 
165
  EdgarDataClient._last_request_time = time.time()
166
+
167
+ def _normalize_text(self, s: str) -> str:
168
+ """规范化文本:用于提升匹配准确度"""
169
+ if not s:
170
+ return ""
171
+ s = s.lower().strip()
172
+ s = s.replace("&", " and ")
173
+ s = re.sub(r"[.,()\-_/]", " ", s)
174
+ s = re.sub(r"\s+", " ", s)
175
+ # 移除常见后缀词
176
+ stopwords = {"inc", "inc.", "incorporated", "corp", "corporation", "co", "company", "plc", "ltd", "llc", "the"}
177
+ tokens = [t for t in s.split() if t not in stopwords]
178
+ return " ".join(tokens).strip()
179
+
180
+ def _ensure_company_index(self):
181
+ """确保公司索引已构建(按需构建或过期重建)"""
182
+ with self._cache_lock:
183
+ current_time = time.time()
184
+
185
+ # 若 company_tickers 缓存不存在或已过期,先刷新
186
+ if (EdgarDataClient._company_tickers_cache is None or
187
+ EdgarDataClient._company_tickers_cache_time is None or
188
+ current_time - EdgarDataClient._company_tickers_cache_time >= self._company_tickers_cache_ttl):
189
+ # 拉取并更新 company_tickers 缓存
190
+ self._rate_limit()
191
+ url = "https://www.sec.gov/files/company_tickers.json"
192
+ headers = {"User-Agent": self.user_agent}
193
+ response = self.session.get(url, headers=headers, timeout=self.timeout)
194
+ response.raise_for_status()
195
+ companies = response.json()
196
+ EdgarDataClient._company_tickers_cache = companies
197
+ EdgarDataClient._company_tickers_cache_time = current_time
198
+ else:
199
+ companies = EdgarDataClient._company_tickers_cache
200
+
201
+ # 若索引不存在或已过期,则重建索引
202
+ if (EdgarDataClient._by_ticker is None or
203
+ EdgarDataClient._by_title is None or
204
+ EdgarDataClient._by_title_norm is None or
205
+ EdgarDataClient._all_keys is None or
206
+ EdgarDataClient._index_built_time is None or
207
+ current_time - EdgarDataClient._index_built_time >= EdgarDataClient._index_ttl):
208
 
209
+ by_ticker = {}
210
+ by_title = {}
211
+ by_title_norm = {}
212
+ all_keys = []
213
+
214
+ for _, company in companies.items():
215
+ title = company.get("title", "")
216
+ ticker = company.get("ticker", "")
217
+ cik_str = str(company.get("cik_str", "")).zfill(10)
 
 
 
 
218
 
219
+ title_lower = title.lower()
220
+ ticker_lower = ticker.lower()
221
+ title_norm = self._normalize_text(title)
222
 
223
+ # 构建索引:ticker、title、normalized title
224
+ if ticker_lower:
225
+ by_ticker[ticker_lower] = {"cik": cik_str, "name": title, "ticker": ticker}
226
+ all_keys.append(ticker_lower)
227
+ if title_lower:
228
+ by_title[title_lower] = {"cik": cik_str, "name": title, "ticker": ticker}
229
+ if title_norm:
230
+ by_title_norm[title_norm] = {"cik": cik_str, "name": title, "ticker": ticker}
231
+ all_keys.append(title_norm)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
232
 
233
+ EdgarDataClient._by_ticker = by_ticker
234
+ EdgarDataClient._by_title = by_title
235
+ EdgarDataClient._by_title_norm = by_title_norm
236
+ EdgarDataClient._all_keys = all_keys
237
+ EdgarDataClient._index_built_time = current_time
238
+
239
+ def search_company_by_name(self, company_name):
240
+ """Search company CIK by company name with caching and optimized ticker matching"""
241
+ try:
242
+ # 实例级缓存命中检查(按规范化后的query)
243
+ norm_query = self._normalize_text(company_name)
244
+ cache_hit = self._search_cache.get(norm_query)
245
+ if cache_hit:
246
+ return cache_hit
247
+
248
+ # 确保索引已构建(首次或过期后会重建)
249
+ self._ensure_company_index()
250
+
251
+ # 获取索引引用(已在锁内构建完成)
252
+ by_ticker = EdgarDataClient._by_ticker
253
+ by_title = EdgarDataClient._by_title
254
+ by_title_norm = EdgarDataClient._by_title_norm
255
+ all_keys = EdgarDataClient._all_keys
256
+
257
+ # ✅ OPTIMIZATION 1: Ticker 优先匹配(遵循项目规范)
258
+ raw = company_name.strip().lower()
259
+ raw_compact = re.sub(r"[^a-z0-9]", "", raw)
260
+ is_ticker_like = len(raw_compact) <= 5 and len(raw_compact) >= 1
261
+
262
+ if is_ticker_like and raw_compact in by_ticker:
263
+ result = by_ticker[raw_compact]
264
+ self._search_cache[norm_query] = result
265
+ return result
266
+
267
+ # ✅ OPTIMIZATION 2: 别名映射(如 'google' -> 'alphabet inc')
268
+ alias_target = EdgarDataClient._alias_map.get(norm_query)
269
+ if alias_target:
270
+ alias_norm = self._normalize_text(alias_target)
271
+ # 先尝试规范化标题
272
+ if alias_norm in by_title_norm:
273
+ result = by_title_norm[alias_norm]
274
+ self._search_cache[norm_query] = result
275
+ return result
276
+ # 再尝试原始标题
277
+ alias_lower = alias_target.lower()
278
+ if alias_lower in by_title:
279
+ result = by_title[alias_lower]
280
+ self._search_cache[norm_query] = result
281
+ return result
282
+ # 最后尝试 ticker(有些别名可能实际上是ticker)
283
+ alias_ticker = re.sub(r"[^a-z0-9]", "", alias_lower)
284
+ if alias_ticker in by_ticker:
285
+ result = by_ticker[alias_ticker]
286
+ self._search_cache[norm_query] = result
287
+ return result
288
+
289
+ # ✅ OPTIMIZATION 3: 精确匹配(原始标题)
290
+ title_lower = company_name.lower().strip()
291
+ if title_lower in by_title:
292
+ result = by_title[title_lower]
293
+ self._search_cache[norm_query] = result
294
+ return result
295
+
296
+ # ✅ OPTIMIZATION 4: 精确匹配(规范化标题)
297
+ if norm_query in by_title_norm:
298
+ result = by_title_norm[norm_query]
299
+ self._search_cache[norm_query] = result
300
+ return result
301
+
302
+ # ✅ OPTIMIZATION 5: 精确匹配(ticker,再次尝试原始输入)
303
+ if raw_compact in by_ticker:
304
+ result = by_ticker[raw_compact]
305
+ self._search_cache[norm_query] = result
306
+ return result
307
+
308
+ # ✅ OPTIMIZATION 6: 部分包含匹配
309
+ partial_matches = []
310
+ for key in by_title_norm.keys():
311
+ if norm_query in key:
312
+ partial_matches.append(key)
313
+ if not partial_matches:
314
+ for t in by_ticker.keys():
315
+ if norm_query in t:
316
+ partial_matches.append(t)
317
+ if partial_matches:
318
+ best_key = max(
319
+ partial_matches,
320
+ key=lambda k: difflib.SequenceMatcher(None, norm_query, k).ratio()
321
+ )
322
+ result = by_title_norm.get(best_key) or by_ticker.get(best_key)
323
+ if result:
324
+ self._search_cache[norm_query] = result
325
+ return result
326
+
327
+ # ✅ OPTIMIZATION 7: 模糊匹配(difflib,用于拼写近似的情况)
328
+ close = difflib.get_close_matches(norm_query, all_keys, n=1, cutoff=0.78)
329
+ if close:
330
+ best = close[0]
331
+ result = by_title_norm.get(best) or by_ticker.get(best)
332
+ if result:
333
+ self._search_cache[norm_query] = result
334
+ return result
335
+
336
+ # 未找到
337
+ return None
338
 
339
  except TimeoutError as e:
340
  print(f"Timeout searching company: {e}")