Spaces:
Runtime error
Runtime error
| import os.path | |
| from fastapi import FastAPI, Response, UploadFile, File | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import uvicorn | |
| import subprocess | |
| import random | |
| import string | |
| import cv2 | |
| import os | |
| app = FastAPI() | |
| origins = ["*"] | |
| app.add_middleware(CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"],allow_headers=["*"]) | |
| def generate_random_filename(length=10): | |
| characters = string.ascii_letters + string.digits | |
| random_filename = ''.join(random.choice(characters) for _ in range(length)) | |
| return random_filename | |
| def save_video(image_folder): | |
| output_video = os.path.join(image_folder, 'output_video.mp4') | |
| images = [img for img in os.listdir(image_folder) if img.endswith(".jpg")] | |
| if not images or len(images) < 2: | |
| return None | |
| images.sort() | |
| frame = cv2.imread(os.path.join(image_folder, images[0])) | |
| height, width, layers = frame.shape | |
| video = cv2.VideoWriter(output_video, cv2.VideoWriter_fourcc(*'mp4v'), 30, (width, height)) | |
| for image in images: | |
| video.write(cv2.imread(os.path.join(image_folder, image))) | |
| cv2.destroyAllWindows() | |
| video.release() | |
| return output_video | |
| def run_scenario_script(): | |
| return {'msg': 'hi'} | |
| async def run_scenario_script(file: UploadFile = File(...)): | |
| filename = file.filename | |
| file_content = await file.read() | |
| file_text = file_content.decode("utf-8") | |
| # xosc file | |
| # todo python path | |
| cur_dir = generate_random_filename(length=4) | |
| cur_dir = os.path.join('./temp', cur_dir) | |
| runner = subprocess.Popen(['python', 'scenario_runner.py', '-openscenario', '']) | |
| autopilot = subprocess.Popen(['python', 'autopilot_control_offscreen.py', '-save_dir', cur_dir]) | |
| _, _ = runner.communicate() | |
| _, _ = autopilot.communicate() | |
| code_r = runner.poll() | |
| code_a = autopilot.poll() | |
| video_path = save_video(cur_dir) | |
| if video_path is None: | |
| return Response(content="No video.", media_type="text/plain") | |
| with open(video_path, 'rb') as video_file: | |
| video_data = video_file.read() | |
| return Response(content=video_data, media_type="video/mp4") | |
| if __name__ == '__main__': | |
| uvicorn.run(app='scenario_api:app', host="0.0.0.0", port=8456, reload=True) |