yukee1992 commited on
Commit
c32dfa7
·
verified ·
1 Parent(s): 784c435

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +42 -69
app.py CHANGED
@@ -3,21 +3,18 @@ import uuid
3
  import httpx
4
  import torch
5
  import logging
6
- import socket
7
  from typing import Dict, Optional
8
  from fastapi import FastAPI, Request, BackgroundTasks, HTTPException
9
  from fastapi.responses import JSONResponse
10
  from transformers import AutoTokenizer, AutoModelForCausalLM
11
  import uvicorn
12
  from contextlib import asynccontextmanager
13
- from pydantic import BaseModel
14
- from tenacity import retry, stop_after_attempt, wait_exponential
15
 
16
  # Configuration
17
  MODEL_ID = "google/gemma-1.1-2b-it"
18
  HF_TOKEN = os.getenv("HF_TOKEN", "")
19
- TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
20
- TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
21
  MAX_TOKENS = 400
22
  DEVICE = "cpu"
23
  PORT = int(os.getenv("PORT", 7860))
@@ -32,10 +29,6 @@ logger = logging.getLogger(__name__)
32
  # Job storage
33
  jobs: Dict[str, dict] = {}
34
 
35
- class JobRequest(BaseModel):
36
- topic: str
37
- callback_url: Optional[str] = None
38
-
39
  class ScriptGenerator:
40
  def __init__(self):
41
  self.tokenizer = None
@@ -58,18 +51,14 @@ class ScriptGenerator:
58
  ).to(DEVICE)
59
  self.loaded = True
60
  logger.info("Model loaded successfully")
61
- self.send_telegram_notification("🤖 Model loaded and ready!")
62
  except Exception as e:
63
- error_msg = f"Model loading failed: {str(e)}"
64
- logger.error(error_msg)
65
- self.send_telegram_notification(f"❌ {error_msg}")
66
  raise
67
 
68
- @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
69
- async def send_telegram_notification(self, message: str):
70
- """Send notification to Telegram with retry logic"""
71
  if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
72
- logger.warning("Telegram not configured")
73
  return
74
 
75
  try:
@@ -79,20 +68,11 @@ class ScriptGenerator:
79
  "text": message,
80
  "parse_mode": "Markdown"
81
  }
82
-
83
- transport = httpx.AsyncHTTPTransport(retries=3)
84
- timeout = httpx.Timeout(15.0, connect=10.0)
85
-
86
- async with httpx.AsyncClient(transport=transport, timeout=timeout) as client:
87
- response = await client.post(url, json=payload)
88
- response.raise_for_status()
89
- logger.info(f"Telegram notification sent")
90
-
91
  except Exception as e:
92
- logger.error(f"Telegram notification failed: {str(e)}")
93
- raise
94
 
95
- # Lifespan handler
96
  @asynccontextmanager
97
  async def lifespan(app: FastAPI):
98
  generator = ScriptGenerator()
@@ -102,14 +82,6 @@ async def lifespan(app: FastAPI):
102
  app = FastAPI(lifespan=lifespan)
103
  generator = ScriptGenerator()
104
 
105
- def check_dns():
106
- """Check DNS resolution"""
107
- try:
108
- socket.gethostbyname('api.telegram.org')
109
- return True
110
- except socket.gaierror:
111
- return False
112
-
113
  def generate_script(topic: str) -> str:
114
  """Generate script with error handling"""
115
  try:
@@ -145,69 +117,72 @@ def generate_script(topic: str) -> str:
145
  async def process_job(job_id: str, topic: str, callback_url: str = None):
146
  """Background task to process job"""
147
  try:
148
- await generator.send_telegram_notification(
149
- f"⏳ Job Started\nID: `{job_id}`\nTopic: _{topic}_"
150
- )
151
 
152
  script = generate_script(topic)
153
- jobs[job_id] = {"status": "complete", "result": script}
 
 
 
 
 
154
 
155
  if callback_url:
156
  try:
157
- async with httpx.AsyncClient() as client:
158
- response = await client.post(
159
  callback_url,
160
- json={"job_id": job_id, "status": "complete", "result": script},
161
- timeout=30.0
 
 
 
162
  )
163
- if response.status_code == 200:
164
- await generator.send_telegram_notification(
165
- f"✅ Job Complete\nID: `{job_id}`"
166
- )
167
  except Exception as e:
168
  logger.error(f"Callback failed: {str(e)}")
 
169
 
170
  except Exception as e:
171
  error_msg = f"Job failed: {str(e)}"
172
- jobs[job_id] = {"status": "failed", "error": error_msg}
173
- await generator.send_telegram_notification(
174
- f"❌ Job Failed\nID: `{job_id}`\nError: _{error_msg}_"
175
- )
 
176
 
177
  @app.post("/api/submit")
178
  async def submit_job(request: Request, background_tasks: BackgroundTasks):
179
- """Submit new job"""
180
  try:
181
  data = await request.json()
182
  job_id = str(uuid.uuid4())
183
 
184
  if not data.get("topic"):
185
- raise HTTPException(400, detail="Topic is required")
186
 
 
 
187
  jobs[job_id] = {
188
  "status": "processing",
189
- "callback_url": data.get("callback_url")
 
190
  }
191
 
192
  background_tasks.add_task(
193
  process_job,
194
  job_id,
195
  data["topic"],
196
- data.get("callback_url")
197
  )
198
 
199
- return {"job_id": job_id, "status": "queued"}
 
 
 
200
 
201
  except Exception as e:
202
  logger.error(f"Submission error: {str(e)}")
203
- raise HTTPException(400, detail=str(e))
204
-
205
- @app.get("/api/status/{job_id}")
206
- async def get_status(job_id: str):
207
- """Check job status"""
208
- if job_id not in jobs:
209
- raise HTTPException(404, detail="Job not found")
210
- return jobs[job_id]
211
 
212
  @app.get("/health")
213
  async def health_check():
@@ -215,7 +190,6 @@ async def health_check():
215
  return {
216
  "status": "healthy",
217
  "model_loaded": generator.loaded,
218
- "dns_working": check_dns(),
219
  "telegram_configured": bool(TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID)
220
  }
221
 
@@ -224,6 +198,5 @@ if __name__ == "__main__":
224
  app,
225
  host="0.0.0.0",
226
  port=PORT,
227
- log_level="info",
228
- timeout_keep_alive=300
229
  )
 
3
  import httpx
4
  import torch
5
  import logging
 
6
  from typing import Dict, Optional
7
  from fastapi import FastAPI, Request, BackgroundTasks, HTTPException
8
  from fastapi.responses import JSONResponse
9
  from transformers import AutoTokenizer, AutoModelForCausalLM
10
  import uvicorn
11
  from contextlib import asynccontextmanager
 
 
12
 
13
  # Configuration
14
  MODEL_ID = "google/gemma-1.1-2b-it"
15
  HF_TOKEN = os.getenv("HF_TOKEN", "")
16
+ TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
17
+ TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "")
18
  MAX_TOKENS = 400
19
  DEVICE = "cpu"
20
  PORT = int(os.getenv("PORT", 7860))
 
29
  # Job storage
30
  jobs: Dict[str, dict] = {}
31
 
 
 
 
 
32
  class ScriptGenerator:
33
  def __init__(self):
34
  self.tokenizer = None
 
51
  ).to(DEVICE)
52
  self.loaded = True
53
  logger.info("Model loaded successfully")
54
+ self._send_telegram_sync("🤖 Model loaded and ready!")
55
  except Exception as e:
56
+ logger.error(f"Model loading failed: {str(e)}")
 
 
57
  raise
58
 
59
+ def _send_telegram_sync(self, message: str):
60
+ """Synchronous Telegram notification with retry"""
 
61
  if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
 
62
  return
63
 
64
  try:
 
68
  "text": message,
69
  "parse_mode": "Markdown"
70
  }
71
+ response = httpx.post(url, json=payload, timeout=10.0)
72
+ response.raise_for_status()
 
 
 
 
 
 
 
73
  except Exception as e:
74
+ logger.warning(f"Telegram notification failed: {str(e)}")
 
75
 
 
76
  @asynccontextmanager
77
  async def lifespan(app: FastAPI):
78
  generator = ScriptGenerator()
 
82
  app = FastAPI(lifespan=lifespan)
83
  generator = ScriptGenerator()
84
 
 
 
 
 
 
 
 
 
85
  def generate_script(topic: str) -> str:
86
  """Generate script with error handling"""
87
  try:
 
117
  async def process_job(job_id: str, topic: str, callback_url: str = None):
118
  """Background task to process job"""
119
  try:
120
+ generator._send_telegram_sync(f"⏳ Starting job: {job_id}\nTopic: {topic[:50]}...")
 
 
121
 
122
  script = generate_script(topic)
123
+ jobs[job_id] = {
124
+ "status": "complete",
125
+ "result": script
126
+ }
127
+
128
+ generator._send_telegram_sync(f"✅ Completed job: {job_id}")
129
 
130
  if callback_url:
131
  try:
132
+ async with httpx.AsyncClient(timeout=30.0) as client:
133
+ await client.post(
134
  callback_url,
135
+ json={
136
+ "job_id": job_id,
137
+ "status": "complete",
138
+ "result": script
139
+ }
140
  )
 
 
 
 
141
  except Exception as e:
142
  logger.error(f"Callback failed: {str(e)}")
143
+ generator._send_telegram_sync(f"⚠️ Callback failed for {job_id}")
144
 
145
  except Exception as e:
146
  error_msg = f"Job failed: {str(e)}"
147
+ jobs[job_id] = {
148
+ "status": "failed",
149
+ "error": error_msg
150
+ }
151
+ generator._send_telegram_sync(f"❌ Failed job: {job_id}\nError: {error_msg[:100]}")
152
 
153
  @app.post("/api/submit")
154
  async def submit_job(request: Request, background_tasks: BackgroundTasks):
155
+ """Endpoint to submit new job"""
156
  try:
157
  data = await request.json()
158
  job_id = str(uuid.uuid4())
159
 
160
  if not data.get("topic"):
161
+ raise HTTPException(status_code=400, detail="Topic is required")
162
 
163
+ callback_url = data.get("callback_url")
164
+
165
  jobs[job_id] = {
166
  "status": "processing",
167
+ "result": None,
168
+ "callback_url": callback_url
169
  }
170
 
171
  background_tasks.add_task(
172
  process_job,
173
  job_id,
174
  data["topic"],
175
+ callback_url
176
  )
177
 
178
+ return JSONResponse({
179
+ "job_id": job_id,
180
+ "status": "queued"
181
+ })
182
 
183
  except Exception as e:
184
  logger.error(f"Submission error: {str(e)}")
185
+ raise HTTPException(status_code=400, detail=str(e))
 
 
 
 
 
 
 
186
 
187
  @app.get("/health")
188
  async def health_check():
 
190
  return {
191
  "status": "healthy",
192
  "model_loaded": generator.loaded,
 
193
  "telegram_configured": bool(TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID)
194
  }
195
 
 
198
  app,
199
  host="0.0.0.0",
200
  port=PORT,
201
+ log_level="info"
 
202
  )