import io import os import traceback import zipfile from fastapi import FastAPI, HTTPException, status from fastapi.concurrency import run_in_threadpool from fastapi.responses import StreamingResponse from pydantic import BaseModel from opentrons.simulate import simulate, format_runlog app = FastAPI() class Protocol(BaseModel): name: str content: str @app.get("/") async def root(): return {"message": "Opentrons simulation API is running."} @app.get("/protocols") async def list_saved_protocols(): """ storageディレクトリに保存されているプロトコルファイルの一覧を返します。 """ storage_dir = "storage" if not os.path.isdir(storage_dir): # ディレクトリが存在しない場合は空のリストを返す return {"protocols": []} try: # os.listdirはブロッキングI/Oなのでスレッドプールで実行 file_list = await run_in_threadpool(os.listdir, storage_dir) return {"protocols": sorted(file_list)} except Exception as e: print(f"Error listing files: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An error occurred while listing the protocols." ) @app.get("/protocols/download") async def download_all_protocols(): """ storageディレクトリ内のすべてのプロトコルをzipファイルにまとめてダウンロードします。 """ storage_dir = "storage" if not os.path.isdir(storage_dir) or not os.listdir(storage_dir): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No protocols found to download." ) # メモリ上でzipファイルを作成 zip_io = io.BytesIO() with zipfile.ZipFile(zip_io, mode='w', compression=zipfile.ZIP_DEFLATED) as temp_zip: # run_in_threadpoolの中でファイルI/Oを実行 def create_zip_in_thread(): for root, _, files in os.walk(storage_dir): for file in files: file_path = os.path.join(root, file) # zipファイル内でのパスをルートからの相対パスに設定 temp_zip.write(file_path, os.path.relpath(file_path, storage_dir)) await run_in_threadpool(create_zip_in_thread) # BytesIOのポインタを先頭に戻す zip_io.seek(0) return StreamingResponse( content=zip_io, media_type="application/zip", headers={"Content-Disposition": "attachment; filename=protocols.zip"} ) @app.post("/simulate") async def simulate_protocol(protocol: Protocol): """ プロトコルの内容を受け取り、シミュレーションのみを実行します。 ファイルへの保存は行いません。 """ try: protocol_file = io.StringIO(protocol.content) run_log, _ = await run_in_threadpool( simulate, protocol_file=protocol_file, file_name=protocol.name ) return { "protocol_name": protocol.name, "run_status": "success", "run_log": format_runlog(run_log) } except Exception as e: print(f"Simulation Error: {e}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail={ "message": "Failed to simulate protocol.", "error_type": type(e).__name__, "error_details": str(e), "traceback": traceback.format_exc() } ) @app.post("/protocols") async def save_and_simulate_protocol(protocol: Protocol): """ プロトコルをファイルに保存し、続けてシミュレーションを実行します。 保存とシミュレーション両方の結果を返します。 """ # --- 1. プロトコルファイルを保存 --- storage_dir = "storage" os.makedirs(storage_dir, exist_ok=True) file_path = os.path.join(storage_dir, protocol.name) try: # スレッドプールでファイルを保存し、実際に保存されたパスを受け取る saved_path = await run_in_threadpool(save_protocol_file, protocol.content, file_path) save_message = f"Protocol '{os.path.basename(saved_path)}' saved successfully." except Exception as e: print(f"File saving error: {e}") # ファイル保存に失敗した場合は、ここで処理を中断してエラーを返す raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An error occurred while saving the file." ) # --- 2. シミュレーションを実行 --- try: protocol_file = io.StringIO(protocol.content) # スレッドプールでシミュレーションを実行 run_log, _ = await run_in_threadpool( simulate, protocol_file=protocol_file, file_name=protocol.name ) # 保存とシミュレーション両方の成功を返す return { "save_status": "success", "save_message": save_message, "protocol_name": protocol.name, "run_status": "success", "run_log": format_runlog(run_log) } except Exception as e: # ファイル保存は成功したが、シミュレーションでエラーが発生した場合 print(f"Simulation Error after save: {e}") # 保存が成功したことを伝えつつ、シミュレーションのエラー情報を返す return { "save_status": "success", "save_message": save_message, "protocol_name": protocol.name, "run_status": "failure", "error_details": str(e), "traceback": traceback.format_exc() } def save_protocol_file(content: str, file_path: str) -> str: """ 指定されたパスにテキストコンテンツを保存します。 ファイル名が重複する場合は、連番を付与して新しいパスに保存します。 実際に保存したファイルのパスを返します。 """ base, ext = os.path.splitext(file_path) counter = 1 new_path = file_path while os.path.exists(new_path): new_path = f"{base}_{counter}{ext}" counter += 1 with open(new_path, 'w') as f: f.write(content) return new_path