Really-amin commited on
Commit
ec50bd1
·
verified ·
1 Parent(s): 6b51475

Upload 2 files

Browse files
Files changed (2) hide show
  1. app.py +6 -48
  2. hf_client.py +273 -0
app.py CHANGED
@@ -10,14 +10,12 @@ from contextlib import asynccontextmanager
10
 
11
  from fastapi import FastAPI, Request
12
  from fastapi.middleware.cors import CORSMiddleware
13
- from fastapi.responses import JSONResponse, HTMLResponse, FileResponse
14
- from fastapi.staticfiles import StaticFiles
15
 
16
  # Import API modules
17
  from api.endpoints import router as api_router
18
  from api.websocket import router as websocket_router, manager as ws_manager
19
  from api.pool_endpoints import router as pool_router
20
- from api.data_endpoints import router as data_router
21
 
22
  # Import new WebSocket service routers
23
  from api.ws_unified_router import router as ws_unified_router, start_all_websocket_streams
@@ -170,15 +168,6 @@ async def lifespan(app: FastAPI):
170
  task_scheduler.start()
171
  logger.info("Task scheduler started successfully")
172
 
173
- # 7. Start WebSocket data broadcaster
174
- logger.info("Starting WebSocket data broadcaster...")
175
- try:
176
- from api.ws_data_broadcaster import broadcaster
177
- asyncio.create_task(broadcaster.start_broadcasting())
178
- logger.info("WebSocket data broadcaster started")
179
- except Exception as e:
180
- logger.warning(f"Could not start WebSocket data broadcaster: {e}")
181
-
182
  # Log startup summary
183
  logger.info("=" * 80)
184
  logger.info("Crypto API Monitoring System started successfully")
@@ -196,26 +185,17 @@ async def lifespan(app: FastAPI):
196
  logger.info("Shutting down Crypto API Monitoring System...")
197
  logger.info("=" * 80)
198
 
199
- # 1. Stop WebSocket data broadcaster
200
- logger.info("Stopping WebSocket data broadcaster...")
201
- try:
202
- from api.ws_data_broadcaster import broadcaster
203
- await broadcaster.stop_broadcasting()
204
- logger.info("WebSocket data broadcaster stopped")
205
- except Exception as e:
206
- logger.warning(f"Error stopping WebSocket data broadcaster: {e}")
207
-
208
- # 2. Stop task scheduler
209
  logger.info("Stopping task scheduler...")
210
  task_scheduler.stop()
211
  logger.info("Task scheduler stopped")
212
 
213
- # 3. Stop WebSocket background tasks
214
  logger.info("Stopping WebSocket background tasks...")
215
  await ws_manager.stop_background_tasks()
216
  logger.info("WebSocket background tasks stopped")
217
 
218
- # 4. Close all WebSocket connections
219
  logger.info("Closing WebSocket connections...")
220
  await ws_manager.close_all_connections()
221
  logger.info("WebSocket connections closed")
@@ -329,12 +309,6 @@ app.include_router(
329
  tags=["Pool Management"]
330
  )
331
 
332
- # Include Data endpoints router (cryptocurrency data)
333
- app.include_router(
334
- data_router,
335
- tags=["Crypto Data"]
336
- )
337
-
338
  # Include HF router (if available)
339
  if HF_ROUTER_AVAILABLE:
340
  try:
@@ -374,26 +348,10 @@ logger.info("All WebSocket service routers included successfully")
374
  # Root Endpoints
375
  # ============================================================================
376
 
377
- @app.get("/", response_class=HTMLResponse, tags=["Root"])
378
  async def root():
379
  """
380
- Serve the Vidya HTML UI dashboard
381
-
382
- Returns:
383
- HTML dashboard interface
384
- """
385
- try:
386
- with open("index.html", "r", encoding="utf-8") as f:
387
- return HTMLResponse(content=f.read())
388
- except Exception as e:
389
- logger.error(f"Error serving index.html: {e}")
390
- return HTMLResponse(content=f"<h1>Error loading dashboard</h1><p>{str(e)}</p>", status_code=500)
391
-
392
-
393
- @app.get("/api-info", tags=["Root"])
394
- async def api_info():
395
- """
396
- API information and available endpoints
397
 
398
  Returns:
399
  API information and endpoint listing
 
10
 
11
  from fastapi import FastAPI, Request
12
  from fastapi.middleware.cors import CORSMiddleware
13
+ from fastapi.responses import JSONResponse
 
14
 
15
  # Import API modules
16
  from api.endpoints import router as api_router
17
  from api.websocket import router as websocket_router, manager as ws_manager
18
  from api.pool_endpoints import router as pool_router
 
19
 
20
  # Import new WebSocket service routers
21
  from api.ws_unified_router import router as ws_unified_router, start_all_websocket_streams
 
168
  task_scheduler.start()
169
  logger.info("Task scheduler started successfully")
170
 
 
 
 
 
 
 
 
 
 
171
  # Log startup summary
172
  logger.info("=" * 80)
173
  logger.info("Crypto API Monitoring System started successfully")
 
185
  logger.info("Shutting down Crypto API Monitoring System...")
186
  logger.info("=" * 80)
187
 
188
+ # 1. Stop task scheduler
 
 
 
 
 
 
 
 
 
189
  logger.info("Stopping task scheduler...")
190
  task_scheduler.stop()
191
  logger.info("Task scheduler stopped")
192
 
193
+ # 2. Stop WebSocket background tasks
194
  logger.info("Stopping WebSocket background tasks...")
195
  await ws_manager.stop_background_tasks()
196
  logger.info("WebSocket background tasks stopped")
197
 
198
+ # 3. Close all WebSocket connections
199
  logger.info("Closing WebSocket connections...")
200
  await ws_manager.close_all_connections()
201
  logger.info("WebSocket connections closed")
 
309
  tags=["Pool Management"]
310
  )
311
 
 
 
 
 
 
 
312
  # Include HF router (if available)
313
  if HF_ROUTER_AVAILABLE:
314
  try:
 
348
  # Root Endpoints
349
  # ============================================================================
350
 
351
+ @app.get("/", tags=["Root"])
352
  async def root():
353
  """
354
+ Root endpoint with API information and available endpoints
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
355
 
356
  Returns:
357
  API information and endpoint listing
hf_client.py ADDED
@@ -0,0 +1,273 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from typing import List, Dict, Any, Optional
4
+ import os
5
+ import time
6
+ from functools import lru_cache
7
+ from collections import deque
8
+ from datetime import datetime
9
+
10
+ ENABLE_SENTIMENT = os.getenv("ENABLE_SENTIMENT", "true").lower() in ("1", "true", "yes")
11
+ SOCIAL_MODEL = os.getenv("SENTIMENT_SOCIAL_MODEL", "ElKulako/cryptobert")
12
+ NEWS_MODEL = os.getenv("SENTIMENT_NEWS_MODEL", "kk08/CryptoBERT")
13
+
14
+
15
+ @lru_cache(maxsize=4)
16
+ def _pl(model_name: str):
17
+ if not ENABLE_SENTIMENT:
18
+ return None
19
+ from transformers import pipeline
20
+ return pipeline("sentiment-analysis", model=model_name)
21
+
22
+
23
+ def _label_to_score(lbl: str) -> float:
24
+ l = (lbl or "").lower()
25
+ if "positive" in l or "bullish" in l:
26
+ return 1.0
27
+ if "negative" in l or "bearish" in l:
28
+ return -1.0
29
+ return 0.0
30
+
31
+
32
+ def analyze_social_sentiment(text: str) -> Dict[str, Any]:
33
+ """Analyze social media text sentiment using CryptoBERT."""
34
+ if not ENABLE_SENTIMENT or not text or not text.strip():
35
+ return {"sentiment": "neutral", "score": 0.0, "confidence": 0.0}
36
+
37
+ pipe = _pl(SOCIAL_MODEL)
38
+ if pipe is None:
39
+ return {"sentiment": "neutral", "score": 0.0, "confidence": 0.0}
40
+
41
+ try:
42
+ result = pipe(text[:512])[0]
43
+ label = result.get("label", "NEUTRAL")
44
+ confidence = result.get("score", 0.0)
45
+ score = _label_to_score(label)
46
+
47
+ return {
48
+ "sentiment": label.lower(),
49
+ "score": score,
50
+ "confidence": confidence
51
+ }
52
+ except Exception as e:
53
+ return {"sentiment": "neutral", "score": 0.0, "confidence": 0.0, "error": str(e)}
54
+
55
+
56
+ def analyze_news_sentiment(text: str) -> Dict[str, Any]:
57
+ """Analyze news text sentiment using CryptoBERT."""
58
+ if not ENABLE_SENTIMENT or not text or not text.strip():
59
+ return {"sentiment": "neutral", "score": 0.0, "confidence": 0.0}
60
+
61
+ pipe = _pl(NEWS_MODEL)
62
+ if pipe is None:
63
+ return {"sentiment": "neutral", "score": 0.0, "confidence": 0.0}
64
+
65
+ try:
66
+ result = pipe(text[:512])[0]
67
+ label = result.get("label", "NEUTRAL")
68
+ confidence = result.get("score", 0.0)
69
+ score = _label_to_score(label)
70
+
71
+ return {
72
+ "sentiment": label.lower(),
73
+ "score": score,
74
+ "confidence": confidence
75
+ }
76
+ except Exception as e:
77
+ return {"sentiment": "neutral", "score": 0.0, "confidence": 0.0, "error": str(e)}
78
+
79
+
80
+ def batch_analyze_sentiment(
81
+ texts: List[str],
82
+ model_type: str = "social"
83
+ ) -> List[Dict[str, Any]]:
84
+ """Analyze multiple texts in batch."""
85
+ if not ENABLE_SENTIMENT or not texts:
86
+ return [{"sentiment": "neutral", "score": 0.0, "confidence": 0.0} for _ in texts]
87
+
88
+ model_name = SOCIAL_MODEL if model_type == "social" else NEWS_MODEL
89
+ pipe = _pl(model_name)
90
+
91
+ if pipe is None:
92
+ return [{"sentiment": "neutral", "score": 0.0, "confidence": 0.0} for _ in texts]
93
+
94
+ results = []
95
+ for text in texts:
96
+ if not text or not text.strip():
97
+ results.append({"sentiment": "neutral", "score": 0.0, "confidence": 0.0})
98
+ continue
99
+
100
+ try:
101
+ result = pipe(text[:512])[0]
102
+ label = result.get("label", "NEUTRAL")
103
+ confidence = result.get("score", 0.0)
104
+ score = _label_to_score(label)
105
+
106
+ results.append({
107
+ "sentiment": label.lower(),
108
+ "score": score,
109
+ "confidence": confidence
110
+ })
111
+ except Exception as e:
112
+ results.append({
113
+ "sentiment": "neutral",
114
+ "score": 0.0,
115
+ "confidence": 0.0,
116
+ "error": str(e)
117
+ })
118
+
119
+ return results
120
+
121
+
122
+ class HFClient:
123
+ """HuggingFace Client for sentiment analysis with usage tracking."""
124
+
125
+ def __init__(self, max_history: int = 100):
126
+ """Initialize HFClient with usage tracking.
127
+
128
+ Args:
129
+ max_history: Maximum number of recent results to keep in history
130
+ """
131
+ self.max_history = max_history
132
+ self._history = deque(maxlen=max_history)
133
+ self._stats = {
134
+ "total_requests": 0,
135
+ "successful_requests": 0,
136
+ "failed_requests": 0,
137
+ "total_latency_ms": 0.0,
138
+ "model_usage": {}
139
+ }
140
+
141
+ def analyze_sentiment(
142
+ self,
143
+ text: str,
144
+ model_type: str = "social",
145
+ metadata: Optional[Dict[str, Any]] = None
146
+ ) -> Dict[str, Any]:
147
+ """Analyze sentiment with tracking.
148
+
149
+ Args:
150
+ text: Text to analyze
151
+ model_type: Type of model to use ("social" or "news")
152
+ metadata: Optional metadata to attach to result
153
+
154
+ Returns:
155
+ Sentiment analysis result with metadata
156
+ """
157
+ start_time = time.time()
158
+ self._stats["total_requests"] += 1
159
+
160
+ # Track model usage
161
+ model_name = SOCIAL_MODEL if model_type == "social" else NEWS_MODEL
162
+ if model_name not in self._stats["model_usage"]:
163
+ self._stats["model_usage"][model_name] = 0
164
+ self._stats["model_usage"][model_name] += 1
165
+
166
+ try:
167
+ # Perform analysis
168
+ if model_type == "social":
169
+ result = analyze_social_sentiment(text)
170
+ else:
171
+ result = analyze_news_sentiment(text)
172
+
173
+ # Calculate latency
174
+ latency_ms = (time.time() - start_time) * 1000
175
+ self._stats["total_latency_ms"] += latency_ms
176
+
177
+ # Track success
178
+ if "error" not in result:
179
+ self._stats["successful_requests"] += 1
180
+ else:
181
+ self._stats["failed_requests"] += 1
182
+
183
+ # Add metadata
184
+ enriched_result = {
185
+ **result,
186
+ "timestamp": datetime.utcnow().isoformat(),
187
+ "model_type": model_type,
188
+ "model_name": model_name,
189
+ "latency_ms": round(latency_ms, 2),
190
+ "metadata": metadata or {}
191
+ }
192
+
193
+ # Add to history
194
+ self._history.append(enriched_result)
195
+
196
+ return enriched_result
197
+
198
+ except Exception as e:
199
+ self._stats["failed_requests"] += 1
200
+ latency_ms = (time.time() - start_time) * 1000
201
+ self._stats["total_latency_ms"] += latency_ms
202
+
203
+ error_result = {
204
+ "sentiment": "neutral",
205
+ "score": 0.0,
206
+ "confidence": 0.0,
207
+ "error": str(e),
208
+ "timestamp": datetime.utcnow().isoformat(),
209
+ "model_type": model_type,
210
+ "model_name": model_name,
211
+ "latency_ms": round(latency_ms, 2),
212
+ "metadata": metadata or {}
213
+ }
214
+
215
+ self._history.append(error_result)
216
+ return error_result
217
+
218
+ def get_usage_stats(self) -> Dict[str, Any]:
219
+ """Get usage statistics.
220
+
221
+ Returns:
222
+ Dictionary containing usage statistics
223
+ """
224
+ total_requests = self._stats["total_requests"]
225
+ avg_latency = (
226
+ self._stats["total_latency_ms"] / total_requests
227
+ if total_requests > 0
228
+ else 0.0
229
+ )
230
+ success_rate = (
231
+ self._stats["successful_requests"] / total_requests * 100
232
+ if total_requests > 0
233
+ else 0.0
234
+ )
235
+
236
+ return {
237
+ "total_requests": total_requests,
238
+ "successful_requests": self._stats["successful_requests"],
239
+ "failed_requests": self._stats["failed_requests"],
240
+ "success_rate_percent": round(success_rate, 2),
241
+ "avg_latency_ms": round(avg_latency, 2),
242
+ "model_usage": self._stats["model_usage"],
243
+ "history_size": len(self._history),
244
+ "max_history": self.max_history
245
+ }
246
+
247
+ def get_recent_results(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
248
+ """Get recent analysis results.
249
+
250
+ Args:
251
+ limit: Maximum number of results to return (None for all)
252
+
253
+ Returns:
254
+ List of recent results
255
+ """
256
+ if limit is None or limit >= len(self._history):
257
+ return list(self._history)
258
+ return list(self._history)[-limit:]
259
+
260
+ def clear_history(self) -> None:
261
+ """Clear the results history."""
262
+ self._history.clear()
263
+
264
+ def reset_stats(self) -> None:
265
+ """Reset all usage statistics."""
266
+ self._stats = {
267
+ "total_requests": 0,
268
+ "successful_requests": 0,
269
+ "failed_requests": 0,
270
+ "total_latency_ms": 0.0,
271
+ "model_usage": {}
272
+ }
273
+ self.clear_history()