JC321 commited on
Commit
c583058
·
verified ·
1 Parent(s): a91f7b3

Upload 3 files

Browse files
Files changed (3) hide show
  1. Dockerfile +3 -3
  2. edgar_client.py +53 -3
  3. mcp_server_sse.py +59 -8
Dockerfile CHANGED
@@ -24,8 +24,8 @@ ENV PYTHONUNBUFFERED=1
24
  ENV PYTHONDONTWRITEBYTECODE=1
25
 
26
  # Health check for container monitoring
27
- HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
28
  CMD curl -f http://localhost:7860/health || exit 1
29
 
30
- # Run MCP Server with SSE transport
31
- CMD ["uvicorn", "mcp_server_sse:app", "--host", "0.0.0.0", "--port", "7860", "--timeout-keep-alive", "75", "--limit-concurrency", "200", "--log-level", "info"]
 
24
  ENV PYTHONDONTWRITEBYTECODE=1
25
 
26
  # Health check for container monitoring
27
+ HEALTHCHECK --interval=60s --timeout=15s --start-period=60s --retries=5 \
28
  CMD curl -f http://localhost:7860/health || exit 1
29
 
30
+ # Run MCP Server with SSE transport - optimized settings for CPU UPGRADE
31
+ CMD ["uvicorn", "mcp_server_sse:app", "--host", "0.0.0.0", "--port", "7860", "--timeout-keep-alive", "120", "--limit-concurrency", "50", "--backlog", "100", "--log-level", "info", "--workers", "1"]
edgar_client.py CHANGED
@@ -7,26 +7,76 @@ except ImportError:
7
  EdgarClient = None
8
  import json
9
  import time
 
10
 
11
 
12
  class EdgarDataClient:
13
  def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"):
14
  """Initialize EDGAR client"""
15
  self.user_agent = user_agent
 
 
 
 
 
16
  if EdgarClient:
17
  self.edgar = EdgarClient(user_agent=user_agent)
18
  else:
19
  self.edgar = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
 
21
  def search_company_by_name(self, company_name):
22
  """Search company CIK by company name"""
23
  try:
24
  # Use SEC company ticker database
25
  url = "https://www.sec.gov/files/company_tickers.json"
26
- headers = {"User-Agent": self.user_agent}
27
 
28
- response = requests.get(url, headers=headers)
29
- response.raise_for_status()
 
30
 
31
  companies = response.json()
32
 
 
7
  EdgarClient = None
8
  import json
9
  import time
10
+ from functools import wraps
11
 
12
 
13
  class EdgarDataClient:
14
  def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"):
15
  """Initialize EDGAR client"""
16
  self.user_agent = user_agent
17
+ self.last_request_time = 0
18
+ self.min_request_interval = 0.11 # SEC allows 10 requests/second, use 0.11s to be safe
19
+ self.request_timeout = 30 # 30 seconds timeout for HTTP requests
20
+ self.max_retries = 3 # Maximum retry attempts
21
+
22
  if EdgarClient:
23
  self.edgar = EdgarClient(user_agent=user_agent)
24
  else:
25
  self.edgar = None
26
+
27
+ def _rate_limit(self):
28
+ """Rate limiting to comply with SEC API limits (10 requests/second)"""
29
+ current_time = time.time()
30
+ time_since_last_request = current_time - self.last_request_time
31
+
32
+ if time_since_last_request < self.min_request_interval:
33
+ sleep_time = self.min_request_interval - time_since_last_request
34
+ time.sleep(sleep_time)
35
+
36
+ self.last_request_time = time.time()
37
+
38
+ def _make_request_with_retry(self, url, headers=None):
39
+ """Make HTTP request with retry logic and timeout"""
40
+ if headers is None:
41
+ headers = {"User-Agent": self.user_agent}
42
+
43
+ for attempt in range(self.max_retries):
44
+ try:
45
+ self._rate_limit()
46
+ response = requests.get(url, headers=headers, timeout=self.request_timeout)
47
+ response.raise_for_status()
48
+ return response
49
+ except requests.exceptions.Timeout:
50
+ print(f"Request timeout (attempt {attempt + 1}/{self.max_retries}): {url}")
51
+ if attempt == self.max_retries - 1:
52
+ raise
53
+ time.sleep(2 ** attempt) # Exponential backoff
54
+ except requests.exceptions.HTTPError as e:
55
+ if e.response.status_code == 429: # Too Many Requests
56
+ wait_time = 2 ** attempt
57
+ print(f"Rate limited, waiting {wait_time}s (attempt {attempt + 1}/{self.max_retries})")
58
+ time.sleep(wait_time)
59
+ if attempt == self.max_retries - 1:
60
+ raise
61
+ else:
62
+ raise
63
+ except Exception as e:
64
+ print(f"Request error (attempt {attempt + 1}/{self.max_retries}): {e}")
65
+ if attempt == self.max_retries - 1:
66
+ raise
67
+ time.sleep(2 ** attempt)
68
+
69
+ return None
70
 
71
  def search_company_by_name(self, company_name):
72
  """Search company CIK by company name"""
73
  try:
74
  # Use SEC company ticker database
75
  url = "https://www.sec.gov/files/company_tickers.json"
 
76
 
77
+ response = self._make_request_with_retry(url)
78
+ if not response:
79
+ return None
80
 
81
  companies = response.json()
82
 
mcp_server_sse.py CHANGED
@@ -15,6 +15,8 @@ import asyncio
15
  from datetime import datetime
16
  from edgar_client import EdgarDataClient
17
  from financial_analyzer import FinancialAnalyzer
 
 
18
 
19
  # Initialize FastAPI app
20
  app = FastAPI(
@@ -23,6 +25,11 @@ app = FastAPI(
23
  version="2.0.0"
24
  )
25
 
 
 
 
 
 
26
  # Configure CORS for remote access
27
  app.add_middleware(
28
  CORSMiddleware,
@@ -32,6 +39,21 @@ app.add_middleware(
32
  allow_headers=["*"],
33
  )
34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  # Initialize EDGAR clients
36
  edgar_client = EdgarDataClient(
37
  user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"
@@ -178,7 +200,15 @@ MCP_TOOLS = [
178
 
179
 
180
  def execute_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
181
- """Execute MCP tool and return clean JSON result"""
 
 
 
 
 
 
 
 
182
  try:
183
  if tool_name == "search_company":
184
  result = edgar_client.search_company_by_name(arguments["company_name"])
@@ -408,14 +438,15 @@ async def sse_endpoint(request: Request):
408
  }
409
  yield f"data: {json.dumps(init_message)}\n\n"
410
 
411
- # Keep connection alive
412
  try:
413
  while True:
414
- await asyncio.sleep(30)
415
  # Send ping to keep connection alive
416
  ping_message = {
417
  "jsonrpc": "2.0",
418
- "method": "ping"
 
419
  }
420
  yield f"data: {json.dumps(ping_message)}\n\n"
421
  except asyncio.CancelledError:
@@ -425,9 +456,10 @@ async def sse_endpoint(request: Request):
425
  event_stream(),
426
  media_type="text/event-stream",
427
  headers={
428
- "Cache-Control": "no-cache",
429
  "Connection": "keep-alive",
430
- "X-Accel-Buffering": "no"
 
431
  }
432
  )
433
 
@@ -582,17 +614,36 @@ async def list_tools():
582
 
583
  @app.get("/health")
584
  async def health_check():
585
- """Health check endpoint for monitoring"""
 
586
  return {
587
  "status": "healthy",
588
  "server": "sec-financial-data",
589
  "version": "2.0.0",
590
  "protocol": "MCP",
591
  "transport": "SSE",
592
- "tools_count": len(MCP_TOOLS)
 
 
 
 
 
593
  }
594
 
595
 
 
 
 
 
 
 
 
 
 
 
 
 
 
596
  @app.api_route("/api/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
597
  async def redirect_old_api(path: str):
598
  """Handle old REST API endpoints with helpful message"""
 
15
  from datetime import datetime
16
  from edgar_client import EdgarDataClient
17
  from financial_analyzer import FinancialAnalyzer
18
+ import time
19
+ import sys
20
 
21
  # Initialize FastAPI app
22
  app = FastAPI(
 
25
  version="2.0.0"
26
  )
27
 
28
+ # Server startup time for monitoring
29
+ server_start_time = time.time()
30
+ request_count = 0
31
+ error_count = 0
32
+
33
  # Configure CORS for remote access
34
  app.add_middleware(
35
  CORSMiddleware,
 
39
  allow_headers=["*"],
40
  )
41
 
42
+ # Request tracking middleware
43
+ @app.middleware("http")
44
+ async def track_requests(request: Request, call_next):
45
+ global request_count, error_count
46
+ request_count += 1
47
+
48
+ try:
49
+ response = await call_next(request)
50
+ if response.status_code >= 400:
51
+ error_count += 1
52
+ return response
53
+ except Exception as e:
54
+ error_count += 1
55
+ raise
56
+
57
  # Initialize EDGAR clients
58
  edgar_client = EdgarDataClient(
59
  user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"
 
200
 
201
 
202
  def execute_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
203
+ """Execute MCP tool and return clean JSON result with timeout protection"""
204
+ import signal
205
+
206
+ class TimeoutError(Exception):
207
+ pass
208
+
209
+ def timeout_handler(signum, frame):
210
+ raise TimeoutError("Tool execution timeout")
211
+
212
  try:
213
  if tool_name == "search_company":
214
  result = edgar_client.search_company_by_name(arguments["company_name"])
 
438
  }
439
  yield f"data: {json.dumps(init_message)}\n\n"
440
 
441
+ # Keep connection alive with shorter ping interval for better stability
442
  try:
443
  while True:
444
+ await asyncio.sleep(15) # Reduced from 30s to 15s for better keepalive
445
  # Send ping to keep connection alive
446
  ping_message = {
447
  "jsonrpc": "2.0",
448
+ "method": "ping",
449
+ "timestamp": datetime.now().isoformat()
450
  }
451
  yield f"data: {json.dumps(ping_message)}\n\n"
452
  except asyncio.CancelledError:
 
456
  event_stream(),
457
  media_type="text/event-stream",
458
  headers={
459
+ "Cache-Control": "no-cache, no-transform",
460
  "Connection": "keep-alive",
461
+ "X-Accel-Buffering": "no",
462
+ "Content-Type": "text/event-stream"
463
  }
464
  )
465
 
 
614
 
615
  @app.get("/health")
616
  async def health_check():
617
+ """Enhanced health check endpoint with diagnostics"""
618
+ uptime_seconds = time.time() - server_start_time
619
  return {
620
  "status": "healthy",
621
  "server": "sec-financial-data",
622
  "version": "2.0.0",
623
  "protocol": "MCP",
624
  "transport": "SSE",
625
+ "tools_count": len(MCP_TOOLS),
626
+ "uptime_seconds": round(uptime_seconds, 2),
627
+ "python_version": sys.version,
628
+ "request_count": request_count,
629
+ "error_count": error_count,
630
+ "timestamp": datetime.now().isoformat()
631
  }
632
 
633
 
634
+ @app.get("/ready")
635
+ async def readiness_check():
636
+ """Readiness check for load balancers"""
637
+ try:
638
+ # Quick validation that clients are initialized
639
+ if edgar_client and financial_analyzer:
640
+ return {"status": "ready", "timestamp": datetime.now().isoformat()}
641
+ else:
642
+ raise HTTPException(status_code=503, detail="Services not initialized")
643
+ except Exception as e:
644
+ raise HTTPException(status_code=503, detail=f"Not ready: {str(e)}")
645
+
646
+
647
  @app.api_route("/api/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
648
  async def redirect_old_api(path: str):
649
  """Handle old REST API endpoints with helpful message"""