Spaces:
Sleeping
Sleeping
| import io | |
| import os | |
| import uuid | |
| from typing import Any, Dict, List | |
| import boto3 | |
| import ifcopenshell | |
| from mcp.server.fastmcp import FastMCP | |
| from starlette.requests import Request | |
| from starlette.responses import PlainTextResponse | |
| def main(): | |
| # Create an MCP server | |
| mcp = FastMCP("IfcOpenShell MCP Server", stateless_http=True) | |
| # Store for IFC models | |
| models = {} | |
| def load_model(username: str, filename: str) -> str: | |
| """ | |
| Load an IFC model from S3 using environment variable for bucket name. | |
| Args: | |
| username: The username/email directory in S3 (e.g., 'giulia.daleo2G24H'). | |
| filename: The IFC filename to load (e.g., 'hotel.ifc'). | |
| Returns: | |
| A model_id that can be used in subsequent calls. | |
| """ | |
| try: | |
| # Get bucket name from environment variable | |
| bucket_name = os.environ.get("S3_BUCKET_NAME") | |
| if not bucket_name: | |
| raise Exception("S3_BUCKET_NAME environment variable is not set") | |
| # Set user email from parameter | |
| user_email = username | |
| file_name = filename | |
| # Construct the object key | |
| object_key = f"{user_email}/{file_name}" | |
| # Create S3 client | |
| s3_client = boto3.client("s3") | |
| # Create a BytesIO object to store the file data in memory | |
| file_data = io.BytesIO() | |
| # Download the file directly to memory | |
| s3_client.download_fileobj(bucket_name, object_key, file_data) | |
| # save file locally | |
| file_path = f"/tmp/{file_name}" | |
| with open(file_path, "wb") as f: | |
| f.write(file_data.getvalue()) | |
| # Load the model directly from the in-memory file | |
| model = ifcopenshell.open(file_path) | |
| # Clean up the local file if needed | |
| os.remove(file_path) | |
| # Generate a unique ID for this model | |
| model_id = str(uuid.uuid4()) | |
| # Store the model for later use | |
| models[model_id] = { | |
| "model": model, | |
| "source": { | |
| "bucket": bucket_name, | |
| "key": object_key, | |
| "user_email": user_email, | |
| "file_name": file_name, | |
| }, | |
| } | |
| return model_id | |
| except Exception as e: | |
| raise Exception(f"Failed to load model: {str(e)}") | |
| def filter_by_category(model_id: str, category: str) -> List[Dict[str, Any]]: | |
| """ | |
| Filter elements in the IFC model by category. | |
| Args: | |
| model_id: The ID of the model returned by load_model_from_s3. | |
| category: The IFC category to filter by (e.g., "IfcDoor", "IfcWindow"). | |
| Returns: | |
| A list of elements matching the category. | |
| """ | |
| if model_id not in models: | |
| raise Exception(f"Model with ID {model_id} not found") | |
| model = models[model_id]["model"] | |
| try: | |
| # Get elements by category | |
| elements = model.by_type(category) | |
| # Convert elements to a serializable format | |
| result = [] | |
| for element in elements: | |
| element_data = { | |
| "id": element.id(), | |
| "type": element.is_a(), | |
| "guid": element.GlobalId if hasattr(element, "GlobalId") else None, | |
| "name": element.Name if hasattr(element, "Name") else None, | |
| } | |
| # Add properties if available | |
| if hasattr(element, "IsDefinedBy"): | |
| properties = {} | |
| for definition in element.IsDefinedBy: | |
| if definition.is_a("IfcRelDefinesByProperties"): | |
| property_set = definition.RelatingPropertyDefinition | |
| if property_set.is_a("IfcPropertySet"): | |
| for prop in property_set.HasProperties: | |
| if prop.is_a("IfcPropertySingleValue"): | |
| properties[prop.Name] = ( | |
| str(prop.NominalValue.wrappedValue) | |
| if prop.NominalValue | |
| else None | |
| ) | |
| element_data["properties"] = properties | |
| result.append(element_data) | |
| return result | |
| except Exception as e: | |
| raise Exception(f"Error filtering by category: {str(e)}") | |
| def get_element_by_id(model_id: str, element_id: int) -> Dict[str, Any]: | |
| """ | |
| Get a specific element by its ID. | |
| Args: | |
| model_id: The ID of the model returned by load_model_from_s3. | |
| element_id: The ID of the element to retrieve. | |
| Returns: | |
| The element details. | |
| """ | |
| if model_id not in models: | |
| raise Exception(f"Model with ID {model_id} not found") | |
| model = models[model_id]["model"] | |
| try: | |
| element = model.by_id(element_id) | |
| element_data = { | |
| "id": element.id(), | |
| "type": element.is_a(), | |
| "guid": element.GlobalId if hasattr(element, "GlobalId") else None, | |
| "name": element.Name if hasattr(element, "Name") else None, | |
| } | |
| # Add properties if available | |
| if hasattr(element, "IsDefinedBy"): | |
| properties = {} | |
| for definition in element.IsDefinedBy: | |
| if definition.is_a("IfcRelDefinesByProperties"): | |
| property_set = definition.RelatingPropertyDefinition | |
| if property_set.is_a("IfcPropertySet"): | |
| for prop in property_set.HasProperties: | |
| if prop.is_a("IfcPropertySingleValue"): | |
| properties[prop.Name] = ( | |
| str(prop.NominalValue.wrappedValue) | |
| if prop.NominalValue | |
| else None | |
| ) | |
| element_data["properties"] = properties | |
| return element_data | |
| except Exception as e: | |
| raise Exception(f"Error getting element by ID: {str(e)}") | |
| # @server.tool() | |
| # def get_model_info(model_id: str) -> Dict[str, Any]: | |
| # """ | |
| # Get information about the loaded IFC model. | |
| # Args: | |
| # model_id: The ID of the model returned by load_model_from_s3. | |
| # Returns: | |
| # Information about the model. | |
| # """ | |
| # if model_id not in models: | |
| # raise Exception(f"Model with ID {model_id} not found") | |
| # model = models[model_id]["model"] | |
| # try: | |
| # # Get the project | |
| # project = model.by_type("IfcProject")[0] | |
| # # Basic model info | |
| # info = { | |
| # "model_id": model_id, | |
| # "schema": model.schema, | |
| # "source": models[model_id]["source"], | |
| # "project_name": project.Name if hasattr(project, "Name") else None, | |
| # "category_counts": {}, | |
| # } | |
| # # Count the number of elements by category | |
| # for element in model.by_type("Ifc*"): | |
| # category = element.is_a() | |
| # if category in info["category_counts"]: | |
| # info["category_counts"][category] += 1 | |
| # else: | |
| # info["category_counts"][category] = 1 | |
| # return info | |
| # except Exception as e: | |
| # raise Exception(f"Error getting model info: {str(e)}") | |
| async def health(request: Request) -> PlainTextResponse: | |
| """ | |
| A basic health check endpoint. Returns HTTP 200 with body "OK". | |
| """ | |
| return PlainTextResponse("OK") | |
| async def clear(request: Request) -> PlainTextResponse: | |
| """ | |
| Remove a model from memory using its model_id. | |
| Query Parameters: | |
| model_id: The ID of the model to remove from memory. | |
| Returns: | |
| Confirmation message as plain text. | |
| """ | |
| try: | |
| # Extract model_id from query parameters | |
| model_id = request.query_params.get("model_id") | |
| # Validate parameters | |
| if not model_id: | |
| return PlainTextResponse( | |
| "Error: 'model_id' query parameter is required", status_code=400 | |
| ) | |
| # Check if model exists | |
| if model_id not in models: | |
| return PlainTextResponse( | |
| f"Error: Model with ID {model_id} not found", status_code=404 | |
| ) | |
| # Store model info for the response | |
| source_info = models[model_id]["source"] | |
| filename = source_info.get("file_name", "unknown") | |
| # Remove the model from the dictionary | |
| del models[model_id] | |
| # Return success message | |
| return PlainTextResponse( | |
| f"Model {model_id} ({filename}) successfully removed from memory" | |
| ) | |
| except Exception as e: | |
| return PlainTextResponse(f"Error: {str(e)}", status_code=500) | |
| async def load(request: Request) -> PlainTextResponse: | |
| """ | |
| Load an IFC model via HTTP request. | |
| Query Parameters: | |
| username: The username/email directory in S3 (e.g., 'giulia.daleo2G24H'). | |
| filename: The IFC filename to load (e.g., 'hotel.ifc'). | |
| Returns: | |
| The model_id as plain text that can be used in subsequent calls. | |
| """ | |
| try: | |
| # Extract query parameters | |
| username = request.query_params.get("username") | |
| filename = request.query_params.get("filename") | |
| # Validate parameters | |
| if not username or not filename: | |
| return PlainTextResponse( | |
| "Error: Both 'username' and 'filename' query parameters are required", | |
| status_code=400, | |
| ) | |
| # Get bucket name from environment variable | |
| bucket_name = os.environ.get("S3_BUCKET_NAME") | |
| if not bucket_name: | |
| return PlainTextResponse( | |
| "Error: S3_BUCKET_NAME environment variable is not set", | |
| status_code=500, | |
| ) | |
| # Set user email from parameter | |
| user_email = username | |
| file_name = filename | |
| # Construct the object key | |
| object_key = f"{user_email}/{file_name}" | |
| # Create S3 client | |
| s3_client = boto3.client("s3") | |
| # Create a BytesIO object to store the file data in memory | |
| file_data = io.BytesIO() | |
| # Download the file directly to memory | |
| try: | |
| s3_client.download_fileobj(bucket_name, object_key, file_data) | |
| except Exception as s3_error: | |
| return PlainTextResponse( | |
| f"Error downloading file from S3: {str(s3_error)}", status_code=404 | |
| ) | |
| # save file locally | |
| file_path = f"/tmp/{file_name}" | |
| with open(file_path, "wb") as f: | |
| f.write(file_data.getvalue()) | |
| # Load the model directly from the in-memory file | |
| try: | |
| model = ifcopenshell.open(file_path) | |
| except Exception as model_error: | |
| # Clean up temp file if it exists | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| return PlainTextResponse( | |
| f"Error loading IFC model: {str(model_error)}", status_code=500 | |
| ) | |
| # Clean up the local file if needed | |
| os.remove(file_path) | |
| # Generate a unique ID for this model | |
| model_id = str(uuid.uuid4()) | |
| # Store the model for later use | |
| models[model_id] = { | |
| "model": model, | |
| "source": { | |
| "bucket": bucket_name, | |
| "key": object_key, | |
| "user_email": user_email, | |
| "file_name": file_name, | |
| }, | |
| } | |
| # Return the model_id as plain text | |
| return PlainTextResponse(model_id) | |
| except Exception as e: | |
| return PlainTextResponse(f"Error: {str(e)}", status_code=500) | |
| import uvicorn | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run(mcp.streamable_http_app(), host="0.0.0.0", port=port) | |
| return 0 | |