dragg2 commited on
Commit
131251a
·
verified ·
1 Parent(s): 17362a1

Upload 4 files

Browse files
Files changed (2) hide show
  1. Dockerfile +9 -11
  2. app.py +109 -72
Dockerfile CHANGED
@@ -1,19 +1,17 @@
1
-
2
- FROM aiogram/telegram-bot-api:latest
3
-
4
- RUN apk add --no-cache python3 py3-pip && \
5
- python3 -m venv /opt/venv && \
6
- /opt/venv/bin/pip install --no-cache-dir --upgrade pip && \
7
- /opt/venv/bin/pip install --no-cache-dir fastapi uvicorn httpx
8
-
9
  ENV PATH="/opt/venv/bin:$PATH"
10
  WORKDIR /app
11
-
12
  COPY app.py /app/app.py
13
  COPY start.sh /app/start.sh
14
  RUN sed -i 's/\r$//' /app/start.sh && chmod +x /app/start.sh
15
 
16
  EXPOSE 7860
17
- # 关键:基础镜像自带 ENTRYPOINT=/docker-entrypoint.sh,会忽略/吞掉 CMD
18
- # 这里必须显式覆盖 ENTRYPOINT,才能执行我们自己的启动脚本(先起 telegram-bot-api 内部端口,再起对外 7860 代理)
19
  ENTRYPOINT ["/app/start.sh"]
 
1
+ FROM aiogram/telegram-bot-api:latest
2
+
3
+ RUN apk add --no-cache python3 py3-pip && \
4
+ python3 -m venv /opt/venv && \
5
+ /opt/venv/bin/pip install --no-cache-dir --upgrade pip && \
6
+ /opt/venv/bin/pip install --no-cache-dir fastapi uvicorn httpx
7
+
 
8
  ENV PATH="/opt/venv/bin:$PATH"
9
  WORKDIR /app
10
+
11
  COPY app.py /app/app.py
12
  COPY start.sh /app/start.sh
13
  RUN sed -i 's/\r$//' /app/start.sh && chmod +x /app/start.sh
14
 
15
  EXPOSE 7860
16
+
 
17
  ENTRYPOINT ["/app/start.sh"]
app.py CHANGED
@@ -1,27 +1,58 @@
1
  import os
2
  import json
3
  import asyncio
 
4
 
5
  from fastapi import FastAPI, Request
6
  from fastapi.responses import Response, StreamingResponse, FileResponse
7
  import httpx
8
 
9
- # 内部真实 Bot API Server(telegram-bot-api)监听端口
10
- # 注意:外网端口必须是 7860(HF 会检查),但 Bot API 可以只跑在容器内部端口(默认 8081)
11
  UPSTREAM = f"http://127.0.0.1:{os.environ.get('TELEGRAM_UPSTREAM_PORT', '8081')}"
12
- app = FastAPI()
13
 
14
  WORK_DIR = os.environ.get("TELEGRAM_WORK_DIR", "/tmp/telegram-bot-api-data")
15
  DOWNLOAD_WAIT_SECONDS = float(os.environ.get("TELEGRAM_DOWNLOAD_WAIT_SECONDS", "8"))
16
  DOWNLOAD_POLL_INTERVAL_MS = int(os.environ.get("TELEGRAM_DOWNLOAD_POLL_INTERVAL_MS", "200"))
17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
  def _normalize_bot_api_file_path(raw_fp: str | None, token_enc: str | None) -> str:
19
  if not raw_fp:
20
  return ""
21
 
22
  fp = str(raw_fp).replace("\\", "/").lstrip("/")
23
 
24
- # 有些 Local Bot API Server 会把 token 段也带进 file_path,先把它裁掉
25
  if token_enc:
26
  marker = f"/{token_enc}/"
27
  idx = fp.find(marker)
@@ -50,11 +81,28 @@ def _normalize_bot_api_file_path(raw_fp: str | None, token_enc: str | None) -> s
50
  return "/".join(parts[i:])
51
  return fp
52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
  @app.get("/")
54
  async def root():
55
- return {"ok": True, "hint": "use /tg/ prefix, e.g. /tg/bot<TOKEN>/getMe"}
56
 
57
- @app.api_route("/tg/{path:path}", methods=["GET","POST","PUT","PATCH","DELETE","HEAD","OPTIONS"])
58
  async def proxy(path: str, request: Request):
59
  # 兜底:如果下游请求把 file_path 带成了“工作目录前缀”,这里直接改写成相对路径再转发给 telegram-bot-api
60
  path_for_upstream = path.lstrip("/")
@@ -69,9 +117,6 @@ async def proxy(path: str, request: Request):
69
  if fixed:
70
  path_for_upstream = f"file/bot{token_enc}/{fixed}"
71
 
72
- # 官方/通用做法(更稳):--local 场景 getFile 可能返回“本地路径”,所以 /file 下载最好不要再依赖 bot-api 自己的 /file 端点
73
- # 直接从容器的 WORK_DIR 里读文件返回给浏览器。
74
- # 这样就算 bot-api 的 /file 行为有差异,也不会影响 CloudPaste 预览下载。
75
  if path_for_upstream.startswith("file/"):
76
  rest = path_for_upstream[len("file/") :]
77
  token_enc = None
@@ -81,60 +126,56 @@ async def proxy(path: str, request: Request):
81
  rel = rest.split("/", 1)[1] if "/" in rest else ""
82
  rel_fixed = _normalize_bot_api_file_path(rel, token_enc)
83
 
84
- candidates = []
85
- if token_enc and rel_fixed:
86
- candidates.append(os.path.join(WORK_DIR, token_enc, rel_fixed))
87
- if rel_fixed:
88
- candidates.append(os.path.join(WORK_DIR, rel_fixed))
89
 
90
- for p in candidates:
91
- try:
92
- if p and os.path.isfile(p):
93
- return FileResponse(p)
94
- except Exception:
95
- pass
96
-
97
- # 如果本地没有这个文件(HF 不持久化常见),允许 CloudPaste 通过 query 传入 file_id 来触发回源下载:
98
- # - 不新增新的路���接口,仍然是 /tg/file/bot<TOKEN>/<file_path>
99
- # - 只是在 query 上多带一个 file_id(例如 ?file_id=xxx)
100
  file_id = request.query_params.get("file_id") or request.query_params.get("fid")
101
  if token_enc and file_id:
102
- async with httpx.AsyncClient(timeout=None, follow_redirects=True) as client:
103
- r = await client.get(f"{UPSTREAM}/bot{token_enc}/getFile", params={"file_id": file_id})
104
- payload = None
105
- try:
106
- payload = r.json()
107
- except Exception:
108
- payload = None
 
109
 
110
- if r.status_code == 200 and isinstance(payload, dict) and payload.get("ok") is True:
111
- fp = None
112
- try:
113
- fp = (payload.get("result") or {}).get("file_path")
114
- except Exception:
115
- fp = None
116
-
117
- rel2 = _normalize_bot_api_file_path(fp if isinstance(fp, str) else None, token_enc)
118
- if rel2:
119
- candidates2 = [
120
- os.path.join(WORK_DIR, token_enc, rel2),
121
- os.path.join(WORK_DIR, rel2),
122
- ]
123
-
124
- waited = 0.0
125
- interval = max(0.05, DOWNLOAD_POLL_INTERVAL_MS / 1000.0)
126
- max_wait = max(0.0, DOWNLOAD_WAIT_SECONDS)
127
- while waited <= max_wait:
128
- for p2 in candidates2:
129
- try:
130
- if p2 and os.path.isfile(p2):
131
- return FileResponse(p2)
132
- except Exception:
133
- pass
134
- if waited >= max_wait:
135
- break
136
- await asyncio.sleep(interval)
137
- waited += interval
 
 
 
 
138
 
139
  url = f"{UPSTREAM}/{path_for_upstream}"
140
 
@@ -147,9 +188,7 @@ async def proxy(path: str, request: Request):
147
  async for chunk in request.stream():
148
  yield chunk
149
 
150
- # 关键修复:
151
- # - Bot API 的 /bot... 响应是小 JSON(sendDocument/getFile/getMe),直接“读全量再返回”,避免流式上下文提前关闭导致空响应。
152
- # - Bot API 的 /file/... 响应是大文件,才需要流式透传。
153
  is_file_download = path_for_upstream.startswith("file/")
154
  is_get_file = "/getFile" in ("/" + path_for_upstream)
155
 
@@ -163,8 +202,6 @@ async def proxy(path: str, request: Request):
163
  "last-modified",
164
  }
165
 
166
- # 重要:不能用 “async with AsyncClient()” 包住 StreamingResponse。
167
- # 否则函数 return 后 client 会被关闭,导致 /file/... 的下载流被提前掐断(CloudPaste 就会报“下载分片失败”)。
168
  client = httpx.AsyncClient(timeout=None, follow_redirects=True)
169
  if not is_file_download:
170
  try:
@@ -176,14 +213,9 @@ async def proxy(path: str, request: Request):
176
  content=iter_request_body(),
177
  )
178
  resp_headers = {k: v for k, v in r.headers.items() if k.lower() in passthrough_allow}
179
- # 关键:非流式响应不要透传 Content-Length(我们可能会改写 body,长度会变)
180
  resp_headers.pop("content-length", None)
181
  resp_headers.pop("Content-Length", None)
182
 
183
- # 关键兼容:某些 Local Bot API Server 会在 getFile 的 result.file_path 里返回本机工作目录路径
184
- # 例如:var/lib/telegram-bot-api/<token>/documents/xxx
185
- # 但 CloudPaste 会把这个 file_path 拼到 /file/bot<TOKEN>/<file_path> 上,导致 404。
186
- # 这里把 file_path 规范化回 documents/xxx 这种“相对路径”,让后续下载正常。
187
  content_type = (r.headers.get("content-type") or "").lower()
188
  if is_get_file and r.status_code == 200 and "application/json" in content_type:
189
  try:
@@ -201,9 +233,7 @@ async def proxy(path: str, request: Request):
201
  result["file_path"] = fixed
202
  payload["result"] = result
203
  body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
204
- # 确保 JSON 的 content-type 不被冲掉
205
  resp_headers.setdefault("content-type", "application/json")
206
- # body 被我们改写过,Content-Length 必须删掉让框架自己算
207
  resp_headers.pop("content-length", None)
208
  resp_headers.pop("Content-Length", None)
209
  return Response(content=body, status_code=r.status_code, headers=resp_headers)
@@ -214,7 +244,6 @@ async def proxy(path: str, request: Request):
214
  finally:
215
  await client.aclose()
216
 
217
- # /file/...:流式回传,必须保证上游连接在迭代期间保持打开
218
  req = client.build_request(
219
  request.method,
220
  url,
@@ -236,3 +265,11 @@ async def proxy(path: str, request: Request):
236
  await client.aclose()
237
 
238
  return StreamingResponse(iter_response(), status_code=r.status_code, headers=resp_headers)
 
 
 
 
 
 
 
 
 
1
  import os
2
  import json
3
  import asyncio
4
+ import time
5
 
6
  from fastapi import FastAPI, Request
7
  from fastapi.responses import Response, StreamingResponse, FileResponse
8
  import httpx
9
 
10
+ # Bot API Server监听端口
 
11
  UPSTREAM = f"http://127.0.0.1:{os.environ.get('TELEGRAM_UPSTREAM_PORT', '8081')}"
12
+ app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
13
 
14
  WORK_DIR = os.environ.get("TELEGRAM_WORK_DIR", "/tmp/telegram-bot-api-data")
15
  DOWNLOAD_WAIT_SECONDS = float(os.environ.get("TELEGRAM_DOWNLOAD_WAIT_SECONDS", "8"))
16
  DOWNLOAD_POLL_INTERVAL_MS = int(os.environ.get("TELEGRAM_DOWNLOAD_POLL_INTERVAL_MS", "200"))
17
 
18
+ _INFLIGHT_LOCKS: dict[str, asyncio.Lock] = {}
19
+ _INFLIGHT_LAST_SEEN: dict[str, float] = {}
20
+
21
+ def _inflight_key(token_enc: str | None, file_id: str | None) -> str:
22
+ return f"{token_enc or ''}:{file_id or ''}"
23
+
24
+ def _get_inflight_lock(key: str) -> asyncio.Lock:
25
+ lock = _INFLIGHT_LOCKS.get(key)
26
+ if lock is None:
27
+ lock = asyncio.Lock()
28
+ _INFLIGHT_LOCKS[key] = lock
29
+ _INFLIGHT_LAST_SEEN[key] = time.time()
30
+ return lock
31
+
32
+ def _cleanup_inflight_locks(max_entries: int = 1024, ttl_seconds: float = 300.0) -> None:
33
+ if len(_INFLIGHT_LOCKS) <= max_entries:
34
+ return
35
+ now = time.time()
36
+ for key, lock in list(_INFLIGHT_LOCKS.items()):
37
+ last = _INFLIGHT_LAST_SEEN.get(key, 0.0)
38
+ if (now - last) > ttl_seconds and (not lock.locked()):
39
+ _INFLIGHT_LOCKS.pop(key, None)
40
+ _INFLIGHT_LAST_SEEN.pop(key, None)
41
+
42
+ def _normalize_proxy_prefix(raw: str | None) -> str:
43
+ s = str(raw or "").strip()
44
+ if not s or s == "/":
45
+ return ""
46
+ if not s.startswith("/"):
47
+ s = "/" + s
48
+ return s.rstrip("/")
49
+
50
  def _normalize_bot_api_file_path(raw_fp: str | None, token_enc: str | None) -> str:
51
  if not raw_fp:
52
  return ""
53
 
54
  fp = str(raw_fp).replace("\\", "/").lstrip("/")
55
 
 
56
  if token_enc:
57
  marker = f"/{token_enc}/"
58
  idx = fp.find(marker)
 
81
  return "/".join(parts[i:])
82
  return fp
83
 
84
+ def _build_local_candidates(work_dir: str, token_enc: str | None, rel_path: str | None) -> list[str]:
85
+ candidates = []
86
+ rp = str(rel_path or "").strip().lstrip("/")
87
+ if token_enc and rp:
88
+ candidates.append(os.path.join(work_dir, token_enc, rp))
89
+ if rp:
90
+ candidates.append(os.path.join(work_dir, rp))
91
+ return candidates
92
+
93
+ def _try_file_response(candidates: list[str]):
94
+ for p in candidates:
95
+ try:
96
+ if p and os.path.isfile(p):
97
+ return FileResponse(p)
98
+ except Exception:
99
+ pass
100
+ return None
101
+
102
  @app.get("/")
103
  async def root():
104
+ return {"ok": True}
105
 
 
106
  async def proxy(path: str, request: Request):
107
  # 兜底:如果下游请求把 file_path 带成了“工作目录前缀”,这里直接改写成相对路径再转发给 telegram-bot-api
108
  path_for_upstream = path.lstrip("/")
 
117
  if fixed:
118
  path_for_upstream = f"file/bot{token_enc}/{fixed}"
119
 
 
 
 
120
  if path_for_upstream.startswith("file/"):
121
  rest = path_for_upstream[len("file/") :]
122
  token_enc = None
 
126
  rel = rest.split("/", 1)[1] if "/" in rest else ""
127
  rel_fixed = _normalize_bot_api_file_path(rel, token_enc)
128
 
129
+ candidates = _build_local_candidates(WORK_DIR, token_enc, rel_fixed)
130
+ resp = _try_file_response(candidates)
131
+ if resp is not None:
132
+ return resp
 
133
 
134
+ # 如果本地没有这个文件(HF 不持久化),允许 CloudPaste 通过 query 传入 file_id 来触发回源下载:
135
+ # query 上多带一个 file_id(例如 ?file_id=xxx)
 
 
 
 
 
 
 
 
136
  file_id = request.query_params.get("file_id") or request.query_params.get("fid")
137
  if token_enc and file_id:
138
+ # 防抖:同一个 (token,file_id) 的回源下载只触发一次,避免并发 Range 请求重复打 getFile/重复等待落盘
139
+ key = _inflight_key(token_enc, str(file_id))
140
+ lock = _get_inflight_lock(key)
141
+ try:
142
+ async with lock:
143
+ resp_again = _try_file_response(_build_local_candidates(WORK_DIR, token_enc, rel_fixed))
144
+ if resp_again is not None:
145
+ return resp_again
146
 
147
+ async with httpx.AsyncClient(timeout=None, follow_redirects=True) as client:
148
+ r = await client.get(f"{UPSTREAM}/bot{token_enc}/getFile", params={"file_id": file_id})
149
+ payload = None
150
+ try:
151
+ payload = r.json()
152
+ except Exception:
153
+ payload = None
154
+
155
+ if r.status_code == 200 and isinstance(payload, dict) and payload.get("ok") is True:
156
+ fp = None
157
+ try:
158
+ fp = (payload.get("result") or {}).get("file_path")
159
+ except Exception:
160
+ fp = None
161
+
162
+ rel2 = _normalize_bot_api_file_path(fp if isinstance(fp, str) else None, token_enc)
163
+ if rel2:
164
+ candidates2 = _build_local_candidates(WORK_DIR, token_enc, rel2)
165
+
166
+ waited = 0.0
167
+ interval = max(0.05, DOWNLOAD_POLL_INTERVAL_MS / 1000.0)
168
+ max_wait = max(0.0, DOWNLOAD_WAIT_SECONDS)
169
+ while waited <= max_wait:
170
+ resp2 = _try_file_response(candidates2)
171
+ if resp2 is not None:
172
+ return resp2
173
+ if waited >= max_wait:
174
+ break
175
+ await asyncio.sleep(interval)
176
+ waited += interval
177
+ finally:
178
+ _cleanup_inflight_locks()
179
 
180
  url = f"{UPSTREAM}/{path_for_upstream}"
181
 
 
188
  async for chunk in request.stream():
189
  yield chunk
190
 
191
+
 
 
192
  is_file_download = path_for_upstream.startswith("file/")
193
  is_get_file = "/getFile" in ("/" + path_for_upstream)
194
 
 
202
  "last-modified",
203
  }
204
 
 
 
205
  client = httpx.AsyncClient(timeout=None, follow_redirects=True)
206
  if not is_file_download:
207
  try:
 
213
  content=iter_request_body(),
214
  )
215
  resp_headers = {k: v for k, v in r.headers.items() if k.lower() in passthrough_allow}
 
216
  resp_headers.pop("content-length", None)
217
  resp_headers.pop("Content-Length", None)
218
 
 
 
 
 
219
  content_type = (r.headers.get("content-type") or "").lower()
220
  if is_get_file and r.status_code == 200 and "application/json" in content_type:
221
  try:
 
233
  result["file_path"] = fixed
234
  payload["result"] = result
235
  body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
 
236
  resp_headers.setdefault("content-type", "application/json")
 
237
  resp_headers.pop("content-length", None)
238
  resp_headers.pop("Content-Length", None)
239
  return Response(content=body, status_code=r.status_code, headers=resp_headers)
 
244
  finally:
245
  await client.aclose()
246
 
 
247
  req = client.build_request(
248
  request.method,
249
  url,
 
265
  await client.aclose()
266
 
267
  return StreamingResponse(iter_response(), status_code=r.status_code, headers=resp_headers)
268
+
269
+ _PROXY_PREFIX = _normalize_proxy_prefix(os.environ.get("TELEGRAM_PROXY_PREFIX", "/tg"))
270
+ _PROXY_ROUTE = f"{_PROXY_PREFIX}/{{path:path}}" if _PROXY_PREFIX else "/{path:path}"
271
+ app.add_api_route(
272
+ _PROXY_ROUTE,
273
+ proxy,
274
+ methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"],
275
+ )