Spaces:
Paused
Paused
File size: 3,867 Bytes
8ab869c f6054c8 8ab869c d1ca95b ced06cb 8ab869c f6054c8 8ab869c f6054c8 8ab869c f6054c8 8ab869c e551281 8ab869c fcccf3d 8ab869c 6b9a605 f6054c8 8ab869c f6054c8 8ab869c 6b9a605 f6054c8 6b9a605 2d84f49 6b9a605 2d84f49 6b9a605 f6054c8 2d84f49 ced06cb 8ab869c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | import os
import shutil
import logging
import tempfile
import zipfile
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse, JSONResponse
from typing import List
import subprocess
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__) # '__name__' will use the module's name
app = FastAPI()
def remove_tmp(dir_path: str):
# Cleanup: Remove the temporary directory and its contents
shutil.rmtree(dir_path)
@app.post("/upload_zip/")
async def upload_zip(background_tasks: BackgroundTasks, file: UploadFile = File(...)):
if not file.filename.endswith('.zip'):
raise HTTPException(status_code=400, detail="File must be a zip archive")
tmpdir = tempfile.mkdtemp()
try:
# Save the zip file to the temporary directory
zip_path = os.path.join(tmpdir, file.filename)
with open(zip_path, 'wb') as buffer:
shutil.copyfileobj(file.file, buffer)
# Extract the zip file
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(tmpdir)
# Find all jpg files in the extracted directory
image_paths = [os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.jpg')]
if not image_paths:
raise HTTPException(status_code=400, detail="No .jpg files found in the zip archive")
# Prepare the output directory
output_dir = os.path.join(tmpdir, "output")
os.makedirs(output_dir, exist_ok=True)
# Define the model weights path
weights_path = "/app/dust3r/checkpoints/DUSt3R_ViTLarge_BaseDecoder_512_dpt.pth"
# # Check if CUDA is available and set the device
# device = 'cuda' if torch.cuda.is_available() else 'cpu'
# Run the 3D model generation script
command = [
# "--silent",
"python", "/app/dust3r/demo.py",
"--images", *image_paths,
"--weights", weights_path,
# "--device", device,
"--output_dir", output_dir
]
try:
result = subprocess.run(command, check=True, capture_output=True, text=True)
logger.info("Subprocess output:", result.stdout)
logger.warning("Subprocess error:", result.stderr)
except subprocess.CalledProcessError as e:
logger.error("Subprocess failed with error:", e.stderr)
raise HTTPException(status_code=500, detail=f"3D model generation failed: {str(e)}")
# Debug: List files in the output directory
logger.debug("Files in output directory:", os.listdir(output_dir))
# Find the .glb file in the output directory
glb_files = [os.path.join(output_dir, f) for f in os.listdir(output_dir) if f.endswith('.glb')]
if not glb_files:
raise HTTPException(status_code=500, detail=f"No .glb file generated in {output_dir}")
glb_file_path = glb_files[0]
# Debug: Check if the file exists
if not os.path.exists(glb_file_path):
raise HTTPException(status_code=500, detail=f"File {glb_file_path} does not exist")
# Optionally, remove the temporary directory if you don't need it anymore
background_tasks.add_task(remove_tmp, tmpdir)
# Return the .glb file as a response
return FileResponse(glb_file_path, media_type='application/octet-stream', filename=os.path.basename(glb_file_path))
except Exception as e:
logger.error("Error generating the scene.", exc_info=True)
return JSONResponse({'error': 'Error generating the scene.', 'detail': str(e)}, status_code=500)
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=7860)
|