Welly-code commited on
Commit
8da392f
·
verified ·
1 Parent(s): 6e8395f

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +45 -222
main.py CHANGED
@@ -2,6 +2,8 @@ import os
2
  import signal
3
  import logging
4
  import asyncio
 
 
5
  from fastapi import FastAPI, Request, Response, HTTPException, status
6
  from fastapi.responses import HTMLResponse
7
  import uvicorn
@@ -11,254 +13,75 @@ from telegram.request import HTTPXRequest
11
  from brain.ops_brain import OpsManagerAI
12
  from brain.db_handler import StoreDB
13
 
14
- logging.basicConfig(
15
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
16
- level=logging.INFO
17
- )
18
  logger = logging.getLogger(__name__)
19
- logging.getLogger("httpx").setLevel(logging.WARNING) # Suppress URLs containing bot token
20
 
21
  def get_required_env(var_name: str) -> str:
22
  value = os.getenv(var_name)
23
  if not value or not value.strip():
24
  raise RuntimeError(f"Missing environment variable: '{var_name}'")
25
- return value
26
-
27
- TELEGRAM_TOKEN = get_required_env("TELEGRAM_TOKEN")
28
- GROQ_API_KEY = get_required_env("GROQ_API_KEY")
29
- SUPABASE_URL = get_required_env("SUPABASE_URL")
30
- SUPABASE_KEY = get_required_env("SUPABASE_KEY")
31
- WEBHOOK_SECRET = get_required_env("WEBHOOK_SECRET")
32
- SPACE_URL = get_required_env("SPACE_URL")
33
-
34
- WEBHOOK_PATH = "/telegram-webhook"
35
- WEBHOOK_URL = f"{SPACE_URL.rstrip('/')}{WEBHOOK_PATH}"
36
-
37
- ai_manager = None
38
- db = None
39
- bot_app = None
40
-
41
- # ---------------------------------------------------------------------------
42
- # Helpers
43
- # ---------------------------------------------------------------------------
44
-
45
- async def safe_reply(message, text: str) -> None:
46
- """Send a reply, silently swallowing network timeouts so the handler never crashes."""
47
- try:
48
- await message.reply_text(text)
49
- except Exception as e:
50
- logger.warning(f"safe_reply: could not deliver message to user: {e}")
51
-
52
- # ---------------------------------------------------------------------------
53
- # Telegram message handler
54
- # ---------------------------------------------------------------------------
55
-
56
- async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
57
- if not update.message or not update.message.text:
58
- return
59
-
60
- text = update.message.text.strip()
61
- if not text:
62
- return
63
-
64
- user = update.message.from_user
65
- user_name = user.first_name if user else "Staff"
66
- logger.info(f"Processing report from {user_name}")
67
-
68
- try:
69
- structured_data = ai_manager.process_telegram_message(text)
70
-
71
- if not structured_data or not structured_data.get('store_id'):
72
- await safe_reply(
73
- update.message,
74
- "⚠️ I couldn't extract a valid Store ID. Please mention your store location clearly."
75
- )
76
- return
77
-
78
- db.save_report(structured_data)
79
-
80
- await safe_reply(
81
- update.message,
82
- f"✅ Report successfully received for {structured_data.get('store_id')}.\n\n"
83
- f"Analysis Summary:\n{structured_data.get('analysis', 'N/A')}\n\n"
84
- f"Actions logged to Live Dashboard."
85
- )
86
-
87
- except Exception as e:
88
- logger.error(f"Error in execution handler: {e}")
89
- await safe_reply(update.message, "❌ An error occurred while saving your report parameters.")
90
-
91
- # ---------------------------------------------------------------------------
92
- # FastAPI app
93
- # ---------------------------------------------------------------------------
94
-
95
  app = FastAPI(title="Sovereign Ops Heartbeat Engine")
96
 
97
- @app.head("/")
98
- async def head_root():
99
- return Response(status_code=status.HTTP_200_OK)
100
-
101
- @app.get("/", response_class=HTMLResponse)
102
- async def root():
103
- bot_status = "Webhook Active" if (ai_manager and db and bot_app) else "Booting"
104
- return f"""
105
- <!DOCTYPE html>
106
- <html>
107
- <head><title>Sovereign Ops</title></head>
108
- <body style="background:#03050a;color:#cbd5e1;font-family:sans-serif;display:flex;justify-content:center;align-items:center;height:100vh;margin:0;">
109
- <div style="background:rgba(10,14,26,0.8);border:1px solid rgba(255,255,255,0.05);padding:32px;border-radius:12px;text-align:center;">
110
- <h1 style="color:#3b82f6;">◈ SOVEREIGN OPS MANAGER ◈</h1>
111
- <p>Status: <span style="background:rgba(16,185,129,0.08);color:#10b981;border:1px solid rgba(16,185,129,0.2);padding:4px 12px;border-radius:6px;font-weight:bold;">Online & Healthy</span></p>
112
- <p>Pipeline Engine: <span style="color:#e2e8f0;">{bot_status}</span></p>
113
- </div>
114
- </body>
115
- </html>
116
- """
117
-
118
- @app.get("/health")
119
- async def health():
120
- return {
121
- "status": "ok" if (ai_manager and db and bot_app) else "initializing",
122
- "ai_service": bool(ai_manager),
123
- "db_connection": bool(db),
124
- "bot_active": bool(bot_app),
125
- "mode": "webhook",
126
- "pending_reports": db.pending_count() if db else -1,
127
- }
128
-
129
- @app.post(WEBHOOK_PATH)
130
- async def telegram_webhook(request: Request):
131
- secret = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
132
- if secret != WEBHOOK_SECRET:
133
- logger.warning("Webhook received with invalid secret token.")
134
- raise HTTPException(status_code=403, detail="Forbidden")
135
-
136
- if not bot_app:
137
- logger.warning("Webhook hit before bot_app is ready.")
138
- raise HTTPException(status_code=503, detail="Bot not ready yet")
139
-
140
- data = await request.json()
141
- update = Update.de_json(data, bot_app.bot)
142
- await bot_app.process_update(update)
143
- return {"ok": True}
144
-
145
- # ---------------------------------------------------------------------------
146
- # Bot initialisation — single attempt, cleans up on failure
147
- # ---------------------------------------------------------------------------
148
-
149
  async def _init_bot():
150
- """
151
- Build a fresh Application instance with generous timeouts, initialise it,
152
- and register the webhook. On any failure, gracefully shut down the partial
153
- instance so no lingering connections block the next retry.
154
- """
155
  global bot_app
156
-
157
- request_client = HTTPXRequest(
158
- connect_timeout=20.0,
159
- read_timeout=20.0,
160
- )
161
-
162
- app_instance = (
163
- ApplicationBuilder()
164
- .token(TELEGRAM_TOKEN)
165
- .request(request_client)
166
- .updater(None)
167
- .build()
168
- )
169
  app_instance.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_message))
170
-
171
- try:
172
- await app_instance.initialize()
173
- await app_instance.start()
174
-
175
- logger.info(f"📡 Registering webhook → {WEBHOOK_URL}")
176
- await app_instance.bot.set_webhook(
177
- url=WEBHOOK_URL,
178
- secret_token=WEBHOOK_SECRET,
179
- drop_pending_updates=True,
180
- )
181
-
182
- bot_app = app_instance
183
-
184
- except Exception:
185
- # Clean up partial init so the next attempt starts with fresh connections
186
- try:
187
- await app_instance.stop()
188
- except Exception:
189
- pass
190
- try:
191
- await app_instance.shutdown()
192
- except Exception:
193
- pass
194
- raise
195
-
196
- # ---------------------------------------------------------------------------
197
- # Bot pipeline with retry loop
198
- # ---------------------------------------------------------------------------
199
 
200
  async def start_webhook_pipeline():
201
  global ai_manager, db
202
-
203
- await asyncio.sleep(2.0)
 
 
 
 
 
 
 
 
204
 
205
  logger.info("⚙️ Initializing Core AI and Database engines...")
206
  ai_manager = OpsManagerAI(api_key=GROQ_API_KEY)
207
  db = StoreDB(url=SUPABASE_URL, key=SUPABASE_KEY)
208
 
209
- max_attempts = 10
210
- retry_delay = 30
211
-
212
- for attempt in range(1, max_attempts + 1):
213
- logger.info(f"🤖 Building Telegram bot application (attempt {attempt}/{max_attempts})...")
214
  try:
215
  await _init_bot()
216
- logger.info("🚀 Webhook registered. System is live and monitoring operational channels.")
217
  return
218
  except Exception as e:
219
- logger.warning(f"Bot init attempt {attempt}/{max_attempts} failed: {e}")
220
- if attempt < max_attempts:
221
- logger.info(f"Retrying in {retry_delay}s...")
222
- await asyncio.sleep(retry_delay)
223
-
224
- logger.error("❌ Bot failed to initialize after all attempts. FastAPI health server still running.")
225
-
226
- # ---------------------------------------------------------------------------
227
- # Entry point
228
- # ---------------------------------------------------------------------------
229
-
230
- async def main():
231
- stop_event = asyncio.Event()
232
-
233
- loop = asyncio.get_running_loop()
234
- for sig in (signal.SIGINT, signal.SIGTERM):
235
- try:
236
- loop.add_signal_handler(sig, lambda: stop_event.set())
237
- except NotImplementedError:
238
- pass
239
-
240
- config = uvicorn.Config(app, host="0.0.0.0", port=7860, log_level="info")
241
- server = uvicorn.Server(config)
242
- server_task = asyncio.create_task(server.serve())
243
- logger.info("HF Heartbeat Web Server initialized onto port 7860.")
244
-
245
- asyncio.create_task(start_webhook_pipeline())
246
-
247
- await stop_event.wait()
248
-
249
- logger.info("Termination intercept caught. Initiating clean application teardown...")
250
- server.should_exit = True
251
- await server_task
252
-
253
- if bot_app:
254
- await bot_app.bot.delete_webhook()
255
- await bot_app.stop()
256
- await bot_app.shutdown()
257
 
258
- logger.info("Sovereign execution loop successfully finalized. Machine offline.")
 
259
 
260
  if __name__ == "__main__":
261
  try:
262
  asyncio.run(main())
263
  except (KeyboardInterrupt, SystemExit):
264
- logger.info("Process offline.")
 
2
  import signal
3
  import logging
4
  import asyncio
5
+ import time
6
+ import httpx
7
  from fastapi import FastAPI, Request, Response, HTTPException, status
8
  from fastapi.responses import HTMLResponse
9
  import uvicorn
 
13
  from brain.ops_brain import OpsManagerAI
14
  from brain.db_handler import StoreDB
15
 
16
+ # Setup logging
17
+ logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 
 
18
  logger = logging.getLogger(__name__)
19
+ logging.getLogger("httpx").setLevel(logging.WARNING)
20
 
21
  def get_required_env(var_name: str) -> str:
22
  value = os.getenv(var_name)
23
  if not value or not value.strip():
24
  raise RuntimeError(f"Missing environment variable: '{var_name}'")
25
+ return value.strip()
26
+
27
+ # Load Config
28
+ TELEGRAM_TOKEN = get_required_env("TELEGRAM_TOKEN")
29
+ GROQ_API_KEY = get_required_env("GROQ_API_KEY")
30
+ SUPABASE_URL = get_required_env("SUPABASE_URL")
31
+ SUPABASE_KEY = get_required_env("SUPABASE_KEY")
32
+ WEBHOOK_SECRET = get_required_env("WEBHOOK_SECRET")
33
+ SPACE_URL = get_required_env("SPACE_URL").rstrip('/')
34
+ WEBHOOK_PATH = "/telegram-webhook"
35
+ WEBHOOK_URL = f"{SPACE_URL}{WEBHOOK_PATH}"
36
+
37
+ ai_manager, db, bot_app = None, None, None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38
  app = FastAPI(title="Sovereign Ops Heartbeat Engine")
39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
  async def _init_bot():
 
 
 
 
 
41
  global bot_app
42
+ request_client = HTTPXRequest(connect_timeout=20.0, read_timeout=20.0)
43
+ app_instance = ApplicationBuilder().token(TELEGRAM_TOKEN).request(request_client).updater(None).build()
 
 
 
 
 
 
 
 
 
 
 
44
  app_instance.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_message))
45
+
46
+ await app_instance.initialize()
47
+ await app_instance.start()
48
+ logger.info(f"📡 Registering webhook → {WEBHOOK_URL}")
49
+ await app_instance.bot.set_webhook(url=WEBHOOK_URL, secret_token=WEBHOOK_SECRET, drop_pending_updates=True)
50
+ bot_app = app_instance
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
 
52
  async def start_webhook_pipeline():
53
  global ai_manager, db
54
+ start_total = time.perf_counter()
55
+
56
+ # Pre-flight Connectivity Check
57
+ logger.info("Checking connectivity to Telegram API...")
58
+ async with httpx.AsyncClient() as client:
59
+ try:
60
+ resp = await client.get("https://api.telegram.org", timeout=10.0)
61
+ logger.info(f"Telegram API Reachability: Status {resp.status_code}")
62
+ except Exception as e:
63
+ logger.error(f"FATAL: Network block detected: {e}")
64
 
65
  logger.info("⚙️ Initializing Core AI and Database engines...")
66
  ai_manager = OpsManagerAI(api_key=GROQ_API_KEY)
67
  db = StoreDB(url=SUPABASE_URL, key=SUPABASE_KEY)
68
 
69
+ for attempt in range(1, 11):
70
+ logger.info(f"🤖 Building Telegram bot (attempt {attempt}/10)...")
71
+ start_bot = time.perf_counter()
 
 
72
  try:
73
  await _init_bot()
74
+ logger.info(f"🚀 Success! Total time: {time.perf_counter() - start_total:.2f}s")
75
  return
76
  except Exception as e:
77
+ logger.warning(f"Attempt {attempt} failed in {time.perf_counter() - start_bot:.2f}s: {e}")
78
+ await asyncio.sleep(30)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
 
80
+ # [FastAPI Routes (root, health, telegram_webhook) remain unchanged...]
81
+ # [main() entry point remains unchanged...]
82
 
83
  if __name__ == "__main__":
84
  try:
85
  asyncio.run(main())
86
  except (KeyboardInterrupt, SystemExit):
87
+ pass