Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.staticfiles import StaticFiles | |
| from pytubefix import YouTube | |
| from pytubefix.exceptions import PytubeFixError | |
| from concurrent.futures import ThreadPoolExecutor | |
| import asyncio | |
| import aiohttp | |
| import aiofiles | |
| import tempfile | |
| import uuid | |
| import base64 | |
| import io | |
| import os | |
| import random | |
| import traceback | |
| import string | |
| import json | |
| app = FastAPI() | |
| MODAL_BASE_URL = "https://sxqib--api-fastapi-app.modal.run" | |
| def generate_hash(length=12): | |
| # Characters that can appear in the hash | |
| characters = string.ascii_lowercase + string.digits | |
| # Generate a random string of the specified length | |
| hash_string = ''.join(random.choice(characters) for _ in range(length)) | |
| return hash_string | |
| async def read_root(): | |
| return {"message": "Saqib's API"} | |
| # Create a directory to store MP3 files if it doesn't exist | |
| AUDIO_DIR = "audio_files" | |
| os.makedirs(AUDIO_DIR, exist_ok=True) | |
| # Create a directory for storing output files | |
| OUTPUT_DIR = "output" | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Mount the audio directory | |
| app.mount("/audio", StaticFiles(directory=AUDIO_DIR), name="audio") | |
| # Mount the output directory | |
| app.mount("/output", StaticFiles(directory=OUTPUT_DIR), name="output") | |
| thread_pool = ThreadPoolExecutor(max_workers=2) | |
| async def run_ffmpeg_async(ffmpeg_command): | |
| loop = asyncio.get_running_loop() | |
| await loop.run_in_executor(thread_pool, ffmpeg_command) | |
| async def download_file(url: str, suffix: str): | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as response: | |
| if response.status != 200: | |
| raise HTTPException(status_code=400, detail=f"Failed to download file from {url}") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: | |
| temp_file.write(await response.read()) | |
| return temp_file.name | |
| async def add_audio_to_image(request: Request): | |
| try: | |
| # Generate a unique filename | |
| output_filename = f"{uuid.uuid4()}.mp4" | |
| output_path = os.path.join(OUTPUT_DIR, output_filename) | |
| # Call the modal API with the request data and download the output file | |
| data = await request.json() | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(f"{MODAL_BASE_URL}/add_audio_to_image", json=data) as response: | |
| if response.status != 200: | |
| raise HTTPException(status_code=500, detail="Failed to process request") | |
| output_data = await response.read() | |
| async with aiofiles.open(output_path, "wb") as f: | |
| await f.write(output_data) | |
| # Return the URL path to the output file | |
| return f"https://sxqib-api.hf.space/output/{output_filename}" | |
| except Exception as e: | |
| print(f"An error occurred: {str(e)}") | |
| print(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") | |
| async def add_audio_to_video(request: Request): | |
| try: | |
| # Generate a unique filename | |
| output_filename = f"{uuid.uuid4()}.mp4" | |
| output_path = os.path.join(OUTPUT_DIR, output_filename) | |
| # Call the modal API with the request data and download the output file | |
| data = await request.json() | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(f"{MODAL_BASE_URL}/add_audio_to_video", json=data) as response: | |
| if response.status != 200: | |
| raise HTTPException(status_code=500, detail="Failed to process request") | |
| output_data = await response.read() | |
| async with aiofiles.open(output_path, "wb") as f: | |
| await f.write(output_data) | |
| # Return the URL path to the output file | |
| return f"https://sxqib-api.hf.space/output/{output_filename}" | |
| except Exception as e: | |
| print(f"An error occurred: {str(e)}") | |
| print(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") | |
| async def concatenate_videos(request: Request): | |
| try: | |
| # Generate a unique filename for the output | |
| output_filename = f"{uuid.uuid4()}.mp4" | |
| output_path = os.path.join(OUTPUT_DIR, output_filename) | |
| # Call the modal API with the request data and download the output file | |
| data = await request.json() | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(f"{MODAL_BASE_URL}/concatenate_videos", json=data) as response: | |
| if response.status != 200: | |
| raise HTTPException(status_code=500, detail="Failed to process request") | |
| output_data = await response.read() | |
| async with aiofiles.open(output_path, "wb") as f: | |
| await f.write(output_data) | |
| # Return the URL path to the output file | |
| return f"https://sxqib-api.hf.space/output/{output_filename}" | |
| except Exception as e: | |
| print(f"An error occurred: {str(e)}") | |
| print(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") | |
| async def get_audio(request: Request): | |
| try: | |
| # Generate a unique filename | |
| unique_filename = f"{uuid.uuid4().hex}.mp3" | |
| out_file = os.path.join(AUDIO_DIR, unique_filename) | |
| # Call the modal API with the request parameters and download the output file | |
| data = request.query_params | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(f"{MODAL_BASE_URL}/get_audio", params=data) as response: | |
| if response.status != 200: | |
| raise HTTPException(status_code=500, detail="Failed to process request") | |
| audio_data = await response.read() | |
| async with aiofiles.open(out_file, "wb") as f: | |
| await f.write(audio_data) | |
| # Return the URL path to the output file | |
| return f"https://sxqib-api.hf.space/audio/{unique_filename}" | |
| except Exception as e: | |
| print(f"An error occurred: {str(e)}") | |
| print(traceback.format_exc()) | |
| raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") | |
| async def whisper(request: Request): | |
| data = await request.json() # Extracting JSON data from request | |
| if "audio_url" not in data: | |
| raise HTTPException(status_code=400, detail="audio_url not found in request") | |
| url = data["audio_url"] | |
| headers = { | |
| 'Accept': 'application/json, text/plain, */*', | |
| 'Accept-Language': 'en-US,en;q=0.9', | |
| 'Cache-Control': 'no-cache', | |
| 'Connection': 'keep-alive', | |
| 'Content-Type': 'application/json', | |
| 'DNT': '1', | |
| 'Origin': 'https://deepinfra.com', | |
| 'Pragma': 'no-cache', | |
| 'Referer': 'https://deepinfra.com/', | |
| 'Sec-Fetch-Dest': 'empty', | |
| 'Sec-Fetch-Mode': 'cors', | |
| 'Sec-Fetch-Site': 'same-site', | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0', | |
| 'sec-ch-ua': '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"', | |
| 'sec-ch-ua-mobile': '?0', | |
| 'sec-ch-ua-platform': '"Windows"', | |
| } | |
| # Async HTTP request to get the audio file | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as resp: | |
| if resp.status != 200: | |
| return f"Failed to download audio: {resp.status}" | |
| audio_data = await resp.read() | |
| # Encode the audio data to base64 | |
| audio_base64 = base64.b64encode(audio_data).decode("utf-8") | |
| json_data = '{"audio": "' + audio_base64 + '"}' | |
| # Post request to the API | |
| async with session.post(f'https://api.deepinfra.com/v1/inference/openai/whisper-large', headers=headers, data=json_data) as post_resp: | |
| if post_resp.status != 200: | |
| return f"API request failed: {post_resp.status}" | |
| return await post_resp.json() | |
| async def img2location(request: Request): | |
| request_json = await request.json() | |
| image_url = request_json.get("image_url", None) | |
| if not image_url: | |
| raise HTTPException(status_code=400, detail="image_url not found in request") | |
| headers = { | |
| 'accept': '*/*', | |
| 'accept-language': 'en-US,en;q=0.9', | |
| 'cache-control': 'no-cache', | |
| 'dnt': '1', | |
| 'origin': 'https://geospy.ai', | |
| 'pragma': 'no-cache', | |
| 'priority': 'u=1, i', | |
| 'referer': 'https://geospy.ai/', | |
| 'sec-ch-ua': '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"', | |
| 'sec-ch-ua-mobile': '?0', | |
| 'sec-ch-ua-platform': '"Windows"', | |
| 'sec-fetch-dest': 'empty', | |
| 'sec-fetch-mode': 'cors', | |
| 'sec-fetch-site': 'cross-site', | |
| 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0', | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| # Fetch the image from the URL | |
| async with session.get(image_url) as img_response: | |
| if img_response.status != 200: | |
| return f"Failed to fetch image: HTTP {img_response.status}" | |
| image_data = await img_response.read() | |
| # Using BytesIO to handle the byte content | |
| data = aiohttp.FormData() | |
| data.add_field('image', io.BytesIO(image_data), filename="image.png", content_type='image/png') | |
| # Sending the POST request | |
| async with session.post("https://locate-image-7cs5mab6na-uc.a.run.app/", headers=headers, data=data) as response: | |
| if response.status != 200: | |
| return f"Failed to upload image: HTTP {response.status}" | |
| json_response = await response.json() | |
| if json_response["message"]["latitude"] and json_response["message"]["longitude"]: | |
| json_response["message"]["latitude"] = str(json_response["message"]["latitude"]) | |
| json_response["message"]["longitude"] = str(json_response["message"]["longitude"]) | |
| return json_response | |
| async def pixart_sigma(request: Request): | |
| request_json = await request.json() | |
| prompt = request_json.get("prompt", None) | |
| negative_prompt = request_json.get("negative_prompt", "") | |
| style = request_json.get("style", "(No style)") | |
| use_negative_prompt = request_json.get("use_negative_prompt", True) | |
| num_imgs = request_json.get("num_imgs", 1) | |
| seed = request_json.get("seed", 0) | |
| width = request_json.get("width", 1024) | |
| height = request_json.get("height", 1024) | |
| schedule = request_json.get("schedule", "DPM-Solver") | |
| dpms_guidance_scale = request_json.get("dpms_guidance_scale", 4.5) | |
| sas_guidance_scale = request_json.get("sas_guidance_scale", 3) | |
| dpms_inference_steps = request_json.get("dpms_inference_steps", 14) | |
| sas_inference_steps = request_json.get("sas_inference_steps", 25) | |
| randomize_seed = request_json.get("randomize_seed", True) | |
| hash = generate_hash() | |
| headers = { | |
| 'accept': '*/*' | |
| } | |
| params = { | |
| '__theme': 'light', | |
| } | |
| json_data = { | |
| 'data': [ | |
| prompt, | |
| negative_prompt, | |
| style, | |
| use_negative_prompt, | |
| num_imgs, | |
| seed, | |
| width, | |
| height, | |
| schedule, | |
| dpms_guidance_scale, | |
| sas_guidance_scale, | |
| dpms_inference_steps, | |
| sas_inference_steps, | |
| randomize_seed, | |
| ], | |
| 'event_data': None, | |
| 'fn_index': 3, | |
| 'trigger_id': 7, | |
| 'session_hash': hash, | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(f'https://pixart-alpha-pixart-sigma.hf.space/queue/join', params=params, headers=headers, json=json_data, ssl=False) as response: | |
| print(response.status) | |
| params = { | |
| 'session_hash': hash, | |
| } | |
| async with session.get(f'https://pixart-alpha-pixart-sigma.hf.space/queue/data', params=params, headers=headers, ssl=False) as response: | |
| async for line in response.content: | |
| try: | |
| if line: | |
| line = line.decode('utf-8') | |
| line = line.replace('data: ', '') | |
| line_json = json.loads(line) | |
| if line_json["msg"] == "process_completed": | |
| image_url = line_json["output"]["data"][0][0]["image"]["url"] | |
| return {"image_url": image_url} | |
| except: | |
| pass | |
| # if __name__ == "__main__": | |
| # import uvicorn | |
| # uvicorn.run(app, host="0.0.0.0", port=8000) |