quiz-api-server / app /_main.py
baxin's picture
Rename app/main.py to app/_main.py
d4e9581 verified
import io
import os
import traceback
import zipfile
from fastapi import FastAPI, HTTPException, status
from fastapi.concurrency import run_in_threadpool
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from opentrons.simulate import simulate, format_runlog
app = FastAPI()
class Protocol(BaseModel):
name: str
content: str
@app.get("/")
async def root():
return {"message": "Opentrons simulation API is running."}
@app.get("/protocols")
async def list_saved_protocols():
"""
storageディレクトリに保存されているプロトコルファイルの一覧を返します。
"""
storage_dir = "storage"
if not os.path.isdir(storage_dir):
# ディレクトリが存在しない場合は空のリストを返す
return {"protocols": []}
try:
# os.listdirはブロッキングI/Oなのでスレッドプールで実行
file_list = await run_in_threadpool(os.listdir, storage_dir)
return {"protocols": sorted(file_list)}
except Exception as e:
print(f"Error listing files: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An error occurred while listing the protocols."
)
@app.get("/protocols/download")
async def download_all_protocols():
"""
storageディレクトリ内のすべてのプロトコルをzipファイルにまとめてダウンロードします。
"""
storage_dir = "storage"
if not os.path.isdir(storage_dir) or not os.listdir(storage_dir):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No protocols found to download."
)
# メモリ上でzipファイルを作成
zip_io = io.BytesIO()
with zipfile.ZipFile(zip_io, mode='w', compression=zipfile.ZIP_DEFLATED) as temp_zip:
# run_in_threadpoolの中でファイルI/Oを実行
def create_zip_in_thread():
for root, _, files in os.walk(storage_dir):
for file in files:
file_path = os.path.join(root, file)
# zipファイル内でのパスをルートからの相対パスに設定
temp_zip.write(file_path, os.path.relpath(file_path, storage_dir))
await run_in_threadpool(create_zip_in_thread)
# BytesIOのポインタを先頭に戻す
zip_io.seek(0)
return StreamingResponse(
content=zip_io,
media_type="application/zip",
headers={"Content-Disposition": "attachment; filename=protocols.zip"}
)
@app.post("/simulate")
async def simulate_protocol(protocol: Protocol):
"""
プロトコルの内容を受け取り、シミュレーションのみを実行します。
ファイルへの保存は行いません。
"""
try:
protocol_file = io.StringIO(protocol.content)
run_log, _ = await run_in_threadpool(
simulate,
protocol_file=protocol_file,
file_name=protocol.name
)
return {
"protocol_name": protocol.name,
"run_status": "success",
"run_log": format_runlog(run_log)
}
except Exception as e:
print(f"Simulation Error: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"message": "Failed to simulate protocol.",
"error_type": type(e).__name__,
"error_details": str(e),
"traceback": traceback.format_exc()
}
)
@app.post("/protocols")
async def save_and_simulate_protocol(protocol: Protocol):
"""
プロトコルをファイルに保存し、続けてシミュレーションを実行します。
保存とシミュレーション両方の結果を返します。
"""
# --- 1. プロトコルファイルを保存 ---
storage_dir = "storage"
os.makedirs(storage_dir, exist_ok=True)
file_path = os.path.join(storage_dir, protocol.name)
try:
# スレッドプールでファイルを保存し、実際に保存されたパスを受け取る
saved_path = await run_in_threadpool(save_protocol_file, protocol.content, file_path)
save_message = f"Protocol '{os.path.basename(saved_path)}' saved successfully."
except Exception as e:
print(f"File saving error: {e}")
# ファイル保存に失敗した場合は、ここで処理を中断してエラーを返す
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An error occurred while saving the file."
)
# --- 2. シミュレーションを実行 ---
try:
protocol_file = io.StringIO(protocol.content)
# スレッドプールでシミュレーションを実行
run_log, _ = await run_in_threadpool(
simulate,
protocol_file=protocol_file,
file_name=protocol.name
)
# 保存とシミュレーション両方の成功を返す
return {
"save_status": "success",
"save_message": save_message,
"protocol_name": protocol.name,
"run_status": "success",
"run_log": format_runlog(run_log)
}
except Exception as e:
# ファイル保存は成功したが、シミュレーションでエラーが発生した場合
print(f"Simulation Error after save: {e}")
# 保存が成功したことを伝えつつ、シミュレーションのエラー情報を返す
return {
"save_status": "success",
"save_message": save_message,
"protocol_name": protocol.name,
"run_status": "failure",
"error_details": str(e),
"traceback": traceback.format_exc()
}
def save_protocol_file(content: str, file_path: str) -> str:
"""
指定されたパスにテキストコンテンツを保存します。
ファイル名が重複する場合は、連番を付与して新しいパスに保存します。
実際に保存したファイルのパスを返します。
"""
base, ext = os.path.splitext(file_path)
counter = 1
new_path = file_path
while os.path.exists(new_path):
new_path = f"{base}_{counter}{ext}"
counter += 1
with open(new_path, 'w') as f:
f.write(content)
return new_path