chari-md's picture
init commit
fffcd69 unverified
import io
import os
import uuid
from typing import Any, Dict, List
import boto3
import ifcopenshell
from mcp.server.fastmcp import FastMCP
from starlette.requests import Request
from starlette.responses import PlainTextResponse
def main():
# Create an MCP server
mcp = FastMCP("IfcOpenShell MCP Server", stateless_http=True)
# Store for IFC models
models = {}
@mcp.tool()
def load_model(username: str, filename: str) -> str:
"""
Load an IFC model from S3 using environment variable for bucket name.
Args:
username: The username/email directory in S3 (e.g., 'giulia.daleo2G24H').
filename: The IFC filename to load (e.g., 'hotel.ifc').
Returns:
A model_id that can be used in subsequent calls.
"""
try:
# Get bucket name from environment variable
bucket_name = os.environ.get("S3_BUCKET_NAME")
if not bucket_name:
raise Exception("S3_BUCKET_NAME environment variable is not set")
# Set user email from parameter
user_email = username
file_name = filename
# Construct the object key
object_key = f"{user_email}/{file_name}"
# Create S3 client
s3_client = boto3.client("s3")
# Create a BytesIO object to store the file data in memory
file_data = io.BytesIO()
# Download the file directly to memory
s3_client.download_fileobj(bucket_name, object_key, file_data)
# save file locally
file_path = f"/tmp/{file_name}"
with open(file_path, "wb") as f:
f.write(file_data.getvalue())
# Load the model directly from the in-memory file
model = ifcopenshell.open(file_path)
# Clean up the local file if needed
os.remove(file_path)
# Generate a unique ID for this model
model_id = str(uuid.uuid4())
# Store the model for later use
models[model_id] = {
"model": model,
"source": {
"bucket": bucket_name,
"key": object_key,
"user_email": user_email,
"file_name": file_name,
},
}
return model_id
except Exception as e:
raise Exception(f"Failed to load model: {str(e)}")
@mcp.tool()
def filter_by_category(model_id: str, category: str) -> List[Dict[str, Any]]:
"""
Filter elements in the IFC model by category.
Args:
model_id: The ID of the model returned by load_model_from_s3.
category: The IFC category to filter by (e.g., "IfcDoor", "IfcWindow").
Returns:
A list of elements matching the category.
"""
if model_id not in models:
raise Exception(f"Model with ID {model_id} not found")
model = models[model_id]["model"]
try:
# Get elements by category
elements = model.by_type(category)
# Convert elements to a serializable format
result = []
for element in elements:
element_data = {
"id": element.id(),
"type": element.is_a(),
"guid": element.GlobalId if hasattr(element, "GlobalId") else None,
"name": element.Name if hasattr(element, "Name") else None,
}
# Add properties if available
if hasattr(element, "IsDefinedBy"):
properties = {}
for definition in element.IsDefinedBy:
if definition.is_a("IfcRelDefinesByProperties"):
property_set = definition.RelatingPropertyDefinition
if property_set.is_a("IfcPropertySet"):
for prop in property_set.HasProperties:
if prop.is_a("IfcPropertySingleValue"):
properties[prop.Name] = (
str(prop.NominalValue.wrappedValue)
if prop.NominalValue
else None
)
element_data["properties"] = properties
result.append(element_data)
return result
except Exception as e:
raise Exception(f"Error filtering by category: {str(e)}")
@mcp.tool()
def get_element_by_id(model_id: str, element_id: int) -> Dict[str, Any]:
"""
Get a specific element by its ID.
Args:
model_id: The ID of the model returned by load_model_from_s3.
element_id: The ID of the element to retrieve.
Returns:
The element details.
"""
if model_id not in models:
raise Exception(f"Model with ID {model_id} not found")
model = models[model_id]["model"]
try:
element = model.by_id(element_id)
element_data = {
"id": element.id(),
"type": element.is_a(),
"guid": element.GlobalId if hasattr(element, "GlobalId") else None,
"name": element.Name if hasattr(element, "Name") else None,
}
# Add properties if available
if hasattr(element, "IsDefinedBy"):
properties = {}
for definition in element.IsDefinedBy:
if definition.is_a("IfcRelDefinesByProperties"):
property_set = definition.RelatingPropertyDefinition
if property_set.is_a("IfcPropertySet"):
for prop in property_set.HasProperties:
if prop.is_a("IfcPropertySingleValue"):
properties[prop.Name] = (
str(prop.NominalValue.wrappedValue)
if prop.NominalValue
else None
)
element_data["properties"] = properties
return element_data
except Exception as e:
raise Exception(f"Error getting element by ID: {str(e)}")
# @server.tool()
# def get_model_info(model_id: str) -> Dict[str, Any]:
# """
# Get information about the loaded IFC model.
# Args:
# model_id: The ID of the model returned by load_model_from_s3.
# Returns:
# Information about the model.
# """
# if model_id not in models:
# raise Exception(f"Model with ID {model_id} not found")
# model = models[model_id]["model"]
# try:
# # Get the project
# project = model.by_type("IfcProject")[0]
# # Basic model info
# info = {
# "model_id": model_id,
# "schema": model.schema,
# "source": models[model_id]["source"],
# "project_name": project.Name if hasattr(project, "Name") else None,
# "category_counts": {},
# }
# # Count the number of elements by category
# for element in model.by_type("Ifc*"):
# category = element.is_a()
# if category in info["category_counts"]:
# info["category_counts"][category] += 1
# else:
# info["category_counts"][category] = 1
# return info
# except Exception as e:
# raise Exception(f"Error getting model info: {str(e)}")
@mcp.custom_route("/health", methods=["GET"])
async def health(request: Request) -> PlainTextResponse:
"""
A basic health check endpoint. Returns HTTP 200 with body "OK".
"""
return PlainTextResponse("OK")
@mcp.custom_route("/clear", methods=["GET"])
async def clear(request: Request) -> PlainTextResponse:
"""
Remove a model from memory using its model_id.
Query Parameters:
model_id: The ID of the model to remove from memory.
Returns:
Confirmation message as plain text.
"""
try:
# Extract model_id from query parameters
model_id = request.query_params.get("model_id")
# Validate parameters
if not model_id:
return PlainTextResponse(
"Error: 'model_id' query parameter is required", status_code=400
)
# Check if model exists
if model_id not in models:
return PlainTextResponse(
f"Error: Model with ID {model_id} not found", status_code=404
)
# Store model info for the response
source_info = models[model_id]["source"]
filename = source_info.get("file_name", "unknown")
# Remove the model from the dictionary
del models[model_id]
# Return success message
return PlainTextResponse(
f"Model {model_id} ({filename}) successfully removed from memory"
)
except Exception as e:
return PlainTextResponse(f"Error: {str(e)}", status_code=500)
@mcp.custom_route("/load", methods=["GET"])
async def load(request: Request) -> PlainTextResponse:
"""
Load an IFC model via HTTP request.
Query Parameters:
username: The username/email directory in S3 (e.g., 'giulia.daleo2G24H').
filename: The IFC filename to load (e.g., 'hotel.ifc').
Returns:
The model_id as plain text that can be used in subsequent calls.
"""
try:
# Extract query parameters
username = request.query_params.get("username")
filename = request.query_params.get("filename")
# Validate parameters
if not username or not filename:
return PlainTextResponse(
"Error: Both 'username' and 'filename' query parameters are required",
status_code=400,
)
# Get bucket name from environment variable
bucket_name = os.environ.get("S3_BUCKET_NAME")
if not bucket_name:
return PlainTextResponse(
"Error: S3_BUCKET_NAME environment variable is not set",
status_code=500,
)
# Set user email from parameter
user_email = username
file_name = filename
# Construct the object key
object_key = f"{user_email}/{file_name}"
# Create S3 client
s3_client = boto3.client("s3")
# Create a BytesIO object to store the file data in memory
file_data = io.BytesIO()
# Download the file directly to memory
try:
s3_client.download_fileobj(bucket_name, object_key, file_data)
except Exception as s3_error:
return PlainTextResponse(
f"Error downloading file from S3: {str(s3_error)}", status_code=404
)
# save file locally
file_path = f"/tmp/{file_name}"
with open(file_path, "wb") as f:
f.write(file_data.getvalue())
# Load the model directly from the in-memory file
try:
model = ifcopenshell.open(file_path)
except Exception as model_error:
# Clean up temp file if it exists
if os.path.exists(file_path):
os.remove(file_path)
return PlainTextResponse(
f"Error loading IFC model: {str(model_error)}", status_code=500
)
# Clean up the local file if needed
os.remove(file_path)
# Generate a unique ID for this model
model_id = str(uuid.uuid4())
# Store the model for later use
models[model_id] = {
"model": model,
"source": {
"bucket": bucket_name,
"key": object_key,
"user_email": user_email,
"file_name": file_name,
},
}
# Return the model_id as plain text
return PlainTextResponse(model_id)
except Exception as e:
return PlainTextResponse(f"Error: {str(e)}", status_code=500)
import uvicorn
port = int(os.environ.get("PORT", 7860))
uvicorn.run(mcp.streamable_http_app(), host="0.0.0.0", port=port)
return 0