REMB / src /api /mvp_api.py
Cuong2004's picture
update agent/mcp/tool, add algo jupyter
56e31ec
"""
AIOptimize™ MVP API
FastAPI backend for industrial estate planning optimization
Per MVP-24h.md specification
"""
import os
import io
import json
import zipfile
from datetime import datetime
from typing import Optional, List, Dict, Any
from pathlib import Path
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel
from shapely.geometry import Polygon, shape
# Import services
from src.services.session_manager import session_manager
from src.services.gemini_service import gemini_service
from src.algorithms.ga_optimizer import SimpleGAOptimizer
from src.export.dxf_exporter import DXFExporter
from src.models.domain import Layout, Plot, PlotType, SiteBoundary, LayoutMetrics
# Sample data
SAMPLE_BOUNDARY = {
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [[
[0, 0], [500, 0], [500, 400], [0, 400], [0, 0]
]]
},
"properties": {"name": "Sample Industrial Site"}
}
# === Pydantic Models ===
class UploadResponse(BaseModel):
session_id: str
boundary: Dict[str, Any]
metadata: Dict[str, Any]
class GenerateRequest(BaseModel):
session_id: str
target_plots: int = 8
setback: float = 50.0
class ChatRequest(BaseModel):
session_id: str
message: str
class ChatResponse(BaseModel):
message: str
model: str
class ExportRequest(BaseModel):
session_id: str
option_id: int
class HealthResponse(BaseModel):
status: str
version: str
gemini_available: bool
# === FastAPI App ===
app = FastAPI(
title="AIOptimize™ API",
description="AI-Powered Industrial Estate Planning Engine",
version="1.0.0"
)
# CORS for frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all for development
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Import and include Agent API router
from src.api.agent_api import router as agent_router
app.include_router(agent_router)
# Initialize optimizer and exporter
ga_optimizer = SimpleGAOptimizer()
dxf_exporter = DXFExporter()
# === API Endpoints ===
@app.get("/api/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint"""
return HealthResponse(
status="healthy",
version="1.0.0",
gemini_available=gemini_service.is_available
)
@app.get("/api/sample-data")
async def get_sample_data():
"""Get sample GeoJSON boundary data"""
return SAMPLE_BOUNDARY
@app.post("/api/upload-boundary", response_model=UploadResponse)
async def upload_boundary(file: UploadFile = File(None), geojson: str = Form(None)):
"""
Upload site boundary (GeoJSON)
Accepts either file upload or JSON string
"""
try:
# Get GeoJSON data
if file and file.filename:
content = await file.read()
geojson_data = json.loads(content)
elif geojson:
geojson_data = json.loads(geojson)
else:
raise HTTPException(400, "No boundary data provided")
# Extract coordinates from GeoJSON
if geojson_data.get("type") == "Feature":
geometry = geojson_data.get("geometry", {})
elif geojson_data.get("type") == "FeatureCollection":
features = geojson_data.get("features", [])
if features:
geometry = features[0].get("geometry", {})
else:
raise HTTPException(400, "No features in FeatureCollection")
elif geojson_data.get("type") == "Polygon":
geometry = geojson_data
else:
raise HTTPException(400, "Invalid GeoJSON format")
# Get coordinates
coords = geometry.get("coordinates", [[]])[0]
if not coords:
raise HTTPException(400, "No coordinates found")
# Create Shapely polygon and validate
polygon = Polygon(coords)
if not polygon.is_valid:
polygon = polygon.buffer(0)
# Calculate metadata
metadata = {
"area": polygon.area,
"perimeter": polygon.length,
"bounds": list(polygon.bounds),
"centroid": [polygon.centroid.x, polygon.centroid.y]
}
# Create session and store data
session = session_manager.create_session()
session_manager.set_boundary(
session.id,
boundary=geojson_data,
coords=coords,
metadata=metadata
)
return UploadResponse(
session_id=session.id,
boundary=geojson_data,
metadata=metadata
)
except json.JSONDecodeError as e:
raise HTTPException(400, f"Invalid JSON format: {str(e)}")
except HTTPException:
raise
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(500, f"Error processing boundary: {str(e)}")
# Alternative JSON endpoint for easier frontend integration
class UploadBoundaryRequest(BaseModel):
geojson: Dict[str, Any]
@app.post("/api/upload-boundary-json", response_model=UploadResponse)
async def upload_boundary_json(request: UploadBoundaryRequest):
"""Upload site boundary via JSON body"""
try:
geojson_data = request.geojson
# Extract coordinates from GeoJSON
if geojson_data.get("type") == "Feature":
geometry = geojson_data.get("geometry", {})
elif geojson_data.get("type") == "FeatureCollection":
features = geojson_data.get("features", [])
if features:
geometry = features[0].get("geometry", {})
else:
raise HTTPException(400, "No features in FeatureCollection")
elif geojson_data.get("type") == "Polygon":
geometry = geojson_data
else:
raise HTTPException(400, "Invalid GeoJSON format")
coords = geometry.get("coordinates", [[]])[0]
if not coords:
raise HTTPException(400, "No coordinates found")
polygon = Polygon(coords)
if not polygon.is_valid:
polygon = polygon.buffer(0)
metadata = {
"area": polygon.area,
"perimeter": polygon.length,
"bounds": list(polygon.bounds),
"centroid": [polygon.centroid.x, polygon.centroid.y]
}
session = session_manager.create_session()
session_manager.set_boundary(
session.id,
boundary=geojson_data,
coords=coords,
metadata=metadata
)
return UploadResponse(
session_id=session.id,
boundary=geojson_data,
metadata=metadata
)
except HTTPException:
raise
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(500, f"Error: {str(e)}")
@app.post("/api/upload-dxf", response_model=UploadResponse)
async def upload_dxf(file: UploadFile = File(...)):
"""
Upload site boundary from DXF or DWG file
Parses LWPOLYLINE entities to extract site boundary polygon
Supports both DXF and DWG formats (AutoCAD R13-R2021)
"""
import ezdxf
import tempfile
if not file.filename:
raise HTTPException(400, "No file provided")
filename_lower = file.filename.lower()
if not (filename_lower.endswith('.dxf') or filename_lower.endswith('.dwg')):
raise HTTPException(400, "Please upload a valid .dxf or .dwg file")
try:
# Save to temp file for ezdxf to read
content = await file.read()
suffix = '.dwg' if filename_lower.endswith('.dwg') else '.dxf'
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp.write(content)
tmp_path = tmp.name
# Parse DXF/DWG
try:
doc = ezdxf.readfile(tmp_path)
except IOError as e:
# ezdxf cannot read DWG files directly
if filename_lower.endswith('.dwg'):
raise HTTPException(
400,
"DWG file format requires conversion. "
"Please convert your DWG file to DXF using AutoCAD, LibreCAD, or an online converter, "
"then upload the DXF file. "
"Alternatively, most CAD software can export/save as DXF format."
)
raise HTTPException(400, f"Failed to read file: {str(e)}")
msp = doc.modelspace()
# Find closed polylines
polygons = []
for entity in msp:
if entity.dxftype() == 'LWPOLYLINE':
if entity.closed:
points = list(entity.get_points())
if len(points) >= 3:
coords = [(p[0], p[1]) for p in points]
coords.append(coords[0]) # Close polygon
poly = Polygon(coords)
if poly.is_valid:
polygons.append((poly, coords))
elif entity.dxftype() == 'POLYLINE':
if entity.is_closed:
points = list(entity.points())
if len(points) >= 3:
coords = [(p[0], p[1]) for p in points]
coords.append(coords[0])
poly = Polygon(coords)
if poly.is_valid:
polygons.append((poly, coords))
# Clean up temp file
os.unlink(tmp_path)
if not polygons:
raise HTTPException(400, "No closed polygons found in DXF file")
# Get largest polygon as site boundary
polygon, coords = max(polygons, key=lambda x: x[0].area)
# Create GeoJSON boundary
geojson_data = {
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [coords]
},
"properties": {"source": file.filename}
}
# Calculate metadata
metadata = {
"area": polygon.area,
"perimeter": polygon.length,
"bounds": list(polygon.bounds),
"centroid": [polygon.centroid.x, polygon.centroid.y],
"dxf_source": file.filename
}
# Create session
session = session_manager.create_session()
session_manager.set_boundary(
session.id,
boundary=geojson_data,
coords=coords,
metadata=metadata
)
print(f"[{suffix.upper()[1:]}] Parsed {file.filename}: {len(coords)-1} vertices, area={polygon.area:.0f}m²")
return UploadResponse(
session_id=session.id,
boundary=geojson_data,
metadata=metadata
)
except HTTPException:
raise
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(500, f"DXF parsing error: {str(e)}")
@app.post("/api/generate-layouts")
async def generate_layouts(request: GenerateRequest):
"""
Generate optimized layout options using Genetic Algorithm
Returns 3 diverse layout options:
1. Maximum Profit
2. Balanced
3. Premium
"""
# Get session
session = session_manager.get_session(request.session_id)
if not session:
raise HTTPException(404, "Session not found")
if not session.boundary_coords:
raise HTTPException(400, "No boundary uploaded for this session")
try:
# Configure and run optimizer
optimizer = SimpleGAOptimizer(
setback=request.setback,
target_plots=request.target_plots
)
options = optimizer.optimize(session.boundary_coords)
# Store in session
session_manager.set_layouts(request.session_id, options)
return {
"session_id": request.session_id,
"options": options,
"count": len(options)
}
except Exception as e:
raise HTTPException(500, f"Optimization failed: {str(e)}")
@app.post("/api/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
Chat with AI about layout options
Uses Gemini Flash 2.0 if available, otherwise falls back to hardcoded responses
"""
# Get session
session = session_manager.get_session(request.session_id)
if not session:
raise HTTPException(404, "Session not found")
# Add user message to history
session_manager.add_chat_message(request.session_id, "user", request.message)
# Generate response
response = gemini_service.chat(
message=request.message,
layouts=session.layouts,
boundary_metadata=session.metadata
)
# Add assistant message to history
session_manager.add_chat_message(
request.session_id,
"assistant",
response["message"],
response["model"]
)
return ChatResponse(**response)
@app.post("/api/export-dxf")
async def export_dxf(request: ExportRequest):
"""
Export single layout option to DXF
Returns DXF file as download
"""
# Get session
session = session_manager.get_session(request.session_id)
if not session:
raise HTTPException(404, "Session not found")
if not session.layouts:
raise HTTPException(400, "No layouts generated")
# Find requested option
option = None
for layout in session.layouts:
if layout.get("id") == request.option_id:
option = layout
break
if not option:
raise HTTPException(404, f"Option {request.option_id} not found")
try:
# Create Layout object for exporter
layout_obj = _create_layout_from_option(option, session)
# Generate DXF to bytes
dxf_bytes = _export_layout_to_bytes(layout_obj, option)
# Create filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"option_{request.option_id}_{timestamp}.dxf"
return StreamingResponse(
io.BytesIO(dxf_bytes),
media_type="application/x-autocad-dxf",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
except Exception as e:
raise HTTPException(500, f"Export failed: {str(e)}")
@app.post("/api/export-all-dxf")
async def export_all_dxf(session_id: str = Form(...)):
"""
Export all layout options as ZIP file
Returns ZIP containing 3 DXF files
"""
# Get session
session = session_manager.get_session(session_id)
if not session:
raise HTTPException(404, "Session not found")
if not session.layouts:
raise HTTPException(400, "No layouts generated")
try:
# Create ZIP in memory
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for option in session.layouts:
# Create Layout object
layout_obj = _create_layout_from_option(option, session)
# Generate DXF
dxf_bytes = _export_layout_to_bytes(layout_obj, option)
# Add to ZIP
filename = f"option_{option.get('id', 0)}_{option.get('name', 'layout').replace(' ', '_')}.dxf"
zf.writestr(filename, dxf_bytes)
zip_buffer.seek(0)
# Create filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"layouts_{timestamp}.zip"
return StreamingResponse(
zip_buffer,
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
except Exception as e:
raise HTTPException(500, f"Export failed: {str(e)}")
@app.get("/api/session/{session_id}")
async def get_session(session_id: str):
"""Get session info"""
session = session_manager.get_session(session_id)
if not session:
raise HTTPException(404, "Session not found")
return session.to_dict()
# === Helper Functions ===
def _create_layout_from_option(option: Dict, session) -> Layout:
"""Convert GA option to Layout object for DXF export"""
from shapely.geometry import box, Polygon
# Create site boundary
boundary_poly = Polygon(session.boundary_coords)
site = SiteBoundary(
geometry=boundary_poly,
area_sqm=boundary_poly.area
)
site.buildable_area_sqm = boundary_poly.buffer(-50).area
# Create layout
layout = Layout(site_boundary=site)
# Add plots
plots = []
for i, plot_data in enumerate(option.get("plots", [])):
coords = plot_data.get("coords", [])
if coords:
plot_geom = Polygon(coords)
else:
plot_geom = box(
plot_data["x"],
plot_data["y"],
plot_data["x"] + plot_data["width"],
plot_data["y"] + plot_data["height"]
)
plot = Plot(
id=f"P{i+1}",
geometry=plot_geom,
area_sqm=plot_data.get("area", plot_geom.area),
type=PlotType.INDUSTRIAL,
width_m=plot_data.get("width", 50),
depth_m=plot_data.get("height", 50)
)
plots.append(plot)
layout.plots = plots
# Set metrics
metrics = option.get("metrics", {})
layout.metrics = LayoutMetrics(
total_area_sqm=site.area_sqm,
sellable_area_sqm=metrics.get("total_area", 0),
green_space_area_sqm=0,
road_area_sqm=0,
num_plots=metrics.get("total_plots", len(plots)),
is_compliant=True
)
layout.metrics.sellable_ratio = layout.metrics.sellable_area_sqm / layout.metrics.total_area_sqm if layout.metrics.total_area_sqm > 0 else 0
return layout
def _export_layout_to_bytes(layout: Layout, option: Dict) -> bytes:
"""Export layout to DXF bytes"""
import ezdxf
from ezdxf.enums import TextEntityAlignment
# Create DXF document
doc = ezdxf.new(dxfversion="R2010")
msp = doc.modelspace()
# Setup layers
layers = {
'BOUNDARY': {'color': 7}, # White
'SETBACK': {'color': 1}, # Red
'PLOTS': {'color': 5}, # Blue
'LABELS': {'color': 7}, # White
'ANNOTATIONS': {'color': 2}, # Yellow
'TITLEBLOCK': {'color': 7} # White
}
for name, props in layers.items():
doc.layers.add(name, color=props['color'])
# Draw boundary
if layout.site_boundary and layout.site_boundary.geometry:
coords = list(layout.site_boundary.geometry.exterior.coords)
msp.add_lwpolyline(coords, dxfattribs={'layer': 'BOUNDARY', 'closed': True})
# Draw setback zone
setback = layout.site_boundary.geometry.buffer(-50)
if not setback.is_empty:
setback_coords = list(setback.exterior.coords)
msp.add_lwpolyline(setback_coords, dxfattribs={'layer': 'SETBACK', 'closed': True})
# Draw plots
for plot in layout.plots:
if plot.geometry:
coords = list(plot.geometry.exterior.coords)
msp.add_lwpolyline(coords, dxfattribs={'layer': 'PLOTS', 'closed': True})
# Add label
centroid = plot.geometry.centroid
msp.add_text(
plot.id,
dxfattribs={
'layer': 'LABELS',
'height': 5,
'insert': (centroid.x, centroid.y)
}
)
# Add area annotation
msp.add_text(
f"{plot.area_sqm:.0f}m²",
dxfattribs={
'layer': 'ANNOTATIONS',
'height': 3,
'insert': (centroid.x, centroid.y - 8)
}
)
# Add title block
if layout.site_boundary:
bounds = layout.site_boundary.geometry.bounds
minx, miny = bounds[0], bounds[1]
title_lines = [
f"AIOptimize™ - {option.get('name', 'Layout')}",
f"Plots: {option.get('metrics', {}).get('total_plots', 0)}",
f"Total Area: {option.get('metrics', {}).get('total_area', 0):.0f} m²",
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}"
]
y = miny - 20
for line in title_lines:
msp.add_text(
line,
dxfattribs={
'layer': 'TITLEBLOCK',
'height': 4,
'insert': (minx, y)
}
)
y -= 8
# Save to bytes
stream = io.StringIO()
doc.write(stream)
return stream.getvalue().encode('utf-8')
# Run with: uvicorn src.api.mvp_api:app --reload --port 8000
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)