Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| from fastapi import FastAPI, Header, Request, Response, HTTPException | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| import traceback | |
| from typing import Annotated, Optional | |
| from fetchYoutubeSubtitle import fetchSubtitle, fetchSubtitleUrls, fetchSubtitleByInfo | |
| token = os.getenv("HF_X_TOKEN") | |
| app = FastAPI() | |
| def read_root(request: Request): | |
| print(request.headers) | |
| print(request.client.host) | |
| print(request.client.port) | |
| return {"Hello": "World!"} | |
| def read_json(): | |
| return JSONResponse(content={"Hello": "World!"}) | |
| async def get_subtitle( | |
| url: str, | |
| subtype: str = "srt", | |
| lang: str = "en", | |
| proxy: str = None, | |
| x_token: Annotated[str | None, Header()] = None, | |
| ): | |
| if token != x_token: | |
| return JSONResponse({"error": "Invalid token"}) | |
| subtitle = await fetchSubtitle(url, lang=lang, subType=subtype, proxy=proxy) | |
| return JSONResponse(content=subtitle) | |
| async def get_subtitleUrls( | |
| url: str, proxy: str = None, x_token: Annotated[str | None, Header()] = None | |
| ): | |
| if token != x_token: | |
| return JSONResponse({"error": "Invalid token"}) | |
| subtitles = await fetchSubtitleUrls(url, proxy=proxy) | |
| return JSONResponse(content=subtitles) | |
| def download_file(content, chunk_size): | |
| num_chunks = (len(content) // chunk_size) + 1 | |
| for i in range(num_chunks): | |
| start = i * chunk_size | |
| end = (i + 1) * chunk_size | |
| yield content[start:end] | |
| async def download( | |
| url: str, | |
| fileName: str, | |
| fileType: str, | |
| info: str, # download info | |
| proxy: str = None, | |
| x_token: Annotated[str | None, Header()] = None, | |
| request: Request = None, | |
| ): | |
| if token != x_token: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| dlInfo = None | |
| try: | |
| dlInfo = json.loads(info) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid params") | |
| try: | |
| # print( | |
| # "url: {}, fileName: {}, fileType: {}, dlInfo: {}".format( | |
| # url, fileName, fileType, dlInfo | |
| # ) | |
| # ) | |
| subtitle = await fetchSubtitleByInfo(url, fileType, dlInfo, proxy=proxy) | |
| excluded_headers = [ | |
| "content-encoding", | |
| "content-length", | |
| "transfer-encoding", | |
| "connection", | |
| ] | |
| headers = [ | |
| (name, value) | |
| for (name, value) in request.headers.items() | |
| if name.lower() not in excluded_headers | |
| ] | |
| headers.append(("Content-Type", "text/plain")) | |
| headers.append( | |
| ( | |
| "Content-Disposition", | |
| f'attachment; filename="{fileName.encode("utf-8").decode("latin-1")}.{fileType}"', | |
| ) | |
| ) | |
| return StreamingResponse( | |
| download_file(subtitle, 8192), headers=dict(headers), status_code=200 | |
| ) | |
| except Exception as e: | |
| print(e) | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail="Internal Server Error") | |