Spaces:
Sleeping
Sleeping
| import io | |
| import os | |
| import tempfile | |
| from fastapi import FastAPI, File, UploadFile, Form | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from PIL import Image, UnidentifiedImageError | |
| import uvicorn | |
| from routes.segmentation import segment_chess_board | |
| from routes.detection import detect_pieces | |
| from routes.fen_generator import gen_fen | |
| from routes.chess_review import analyze_pgn | |
| from typing import List, Dict, Any, Union | |
| from pydantic import BaseModel | |
| import asyncio | |
| import sys | |
| import tracemalloc | |
| from fastapi import requests | |
| import base64 | |
| tracemalloc.start() | |
| if sys.platform == "win32": | |
| asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) | |
| app = FastAPI() | |
| class DetectionResults(BaseModel): | |
| boxes: list | |
| confidences: list | |
| classes: list | |
| class FileUpload(BaseModel): | |
| file_data : str | |
| async def read_root(): | |
| return { | |
| "name": "Narendra", | |
| "age": 20, | |
| "Gender": "Male" | |
| } | |
| async def get_fen(file : UploadFile = File(), perspective : str = Form("w"), next_to_move : str = Form("w")): | |
| if perspective not in ["w" , "b"]: | |
| return JSONResponse(content={"error" : "Perspective should be w (white) or b (black)"}, status_code=500) | |
| if next_to_move not in ["w" , "b"]: | |
| return JSONResponse(content={"error" : "next to move should be w (white) or b (black)"}, status_code=500) | |
| try: | |
| image_content = await file.read() | |
| if not image_content: | |
| return JSONResponse(content={"error": "Empty file uploaded"}, status_code=400) | |
| try: | |
| image = Image.open(io.BytesIO(image_content)) | |
| except UnidentifiedImageError: | |
| return JSONResponse(content={"error": "Invalid image format"}, status_code=400) | |
| segmented_image = await segment_chess_board(image) | |
| if isinstance(segmented_image, dict): | |
| return JSONResponse(content=segmented_image, status_code=400) | |
| segmented_image = segmented_image.resize((224, 224)) | |
| detection_results = await detect_pieces(segmented_image) | |
| if "error" in detection_results: | |
| return JSONResponse(content=detection_results, status_code=400) | |
| fen = gen_fen(detection_results, perspective, next_to_move) | |
| if not fen: | |
| return JSONResponse(content={"error": "FEN generation failed", "details": "Invalid input data"}, status_code=500) | |
| return JSONResponse(content={"FEN": fen}, status_code=200) | |
| except Exception as e: | |
| return JSONResponse(content={"error": "Unexpected error occurred", "details": str(e)}, status_code=500) | |
| async def getReview(file_upload: FileUpload): | |
| # this function returns text based and overall review of the game by taking base64 encoded pgn file as input | |
| print(os.getcwd()) | |
| print("call recieved") | |
| if not file_upload.file_data: | |
| return JSONResponse(content={"error": "Empty file uploaded"}, status_code=400) | |
| try: | |
| file_data = base64.b64decode(file_upload.file_data) | |
| # Save the uploaded file to a temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pgn") as tmp_file: | |
| tmp_file.write(file_data) | |
| tmp_file_path = tmp_file.name | |
| # Analyze the PGN file | |
| analysis_result = analyze_pgn(tmp_file_path) | |
| # Clean up the temporary file | |
| os.remove(tmp_file_path) | |
| if not analysis_result: | |
| return JSONResponse(content={"error": "No game found in the PGN file"}, status_code=400) | |
| return analysis_result | |
| except Exception as e: | |
| return JSONResponse(content={"error": "Unexpected error occurred", "details": str(e)}, status_code=500) | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |