mlbench123's picture
Update app.py
5915e23 verified
"""
FastAPI Service for Construction Scope Validation
Deploy on Hugging Face Spaces - Flattened File Structure
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
import json
import numpy as np
import os
import shutil
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import re
app = FastAPI(
title="Construction Scope Validator API",
description="Validates and enriches LLM-generated construction scope with DB data",
version="1.0.0"
)
#---------------------------
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============= MODEL LOADING WITH FLAT STRUCTURE =============
print("="*60)
print("LOADING MODEL...")
print("="*60)
def setup_model_structure():
"""
Create temporary folder structure for sentence-transformers
if files are in root (flattened structure)
"""
# Check if we need to create structure
if not os.path.exists('1_Pooling') or not os.path.exists('2_Normalize'):
print("Creating temporary model structure...")
# Create directories
os.makedirs('1_Pooling', exist_ok=True)
os.makedirs('2_Normalize', exist_ok=True)
# Pooling config
pooling_config = {
"word_embedding_dimension": 384,
"pooling_mode_cls_token": False,
"pooling_mode_mean_tokens": True,
"pooling_mode_max_tokens": False,
"pooling_mode_mean_sqrt_len_tokens": False
}
with open('1_Pooling/config.json', 'w') as f:
json.dump(pooling_config, f, indent=2)
# Normalize config (empty is fine)
with open('2_Normalize/config.json', 'w') as f:
json.dump({}, f)
print("βœ“ Created 1_Pooling/config.json")
print("βœ“ Created 2_Normalize/config.json")
# Setup structure before loading model
setup_model_structure()
try:
model_files = ['config.json', 'sentence_bert_config.json']
has_weights = os.path.exists('pytorch_model.bin') or os.path.exists('model.safetensors')
has_model = all(os.path.exists(f) for f in model_files) and has_weights
if has_model:
print("βœ“ Model files found in root directory")
print("Loading trained model...")
embedding_model = SentenceTransformer('./', device='cpu')
print("βœ… Trained model loaded successfully!")
else:
print("⚠️ Model not found, using base model...")
embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
print("βœ… Base model loaded successfully!")
except Exception as e:
print(f"❌ Error loading trained model: {e}")
print("Falling back to base model...")
embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
print("βœ… Base model loaded successfully!")
print("="*60)
# ============= DATA MODELS =============
class LLMScopeItem(BaseModel):
stage: str
task: str
material: str
quantity: float
unit: str
class LLMAreaScope(BaseModel):
area: str
items: List[LLMScopeItem]
class LLMScopeRequest(BaseModel):
scope_of_work: List[LLMAreaScope]
class ValidatedMaterial(BaseModel):
materialId: int
name: str
material: str
unit: str
price: float
margin: float
categories: List[str]
confidence_score: float
class ValidatedTask(BaseModel):
taskId: int
task: str
displayName: str
unit: str
stageId: int
roomArea: List[str]
confidence_score: float
recommended_materials: List[ValidatedMaterial]
class ValidatedStage(BaseModel):
stageId: int
stage: str
priority: int
confidence_score: float
tasks: List[ValidatedTask]
class ValidatedArea(BaseModel):
roomId: Optional[int]
name: str
roomType: str
matched: bool
confidence_score: float
stages: List[ValidatedStage]
class ValidatedResponse(BaseModel):
areas: List[ValidatedArea]
summary: Dict[str, Any]
# ============= HELPER FUNCTION =============
def parse_room_area(room_area_value):
"""Parse roomArea field which might be a string, list, or None"""
if room_area_value is None:
return []
if isinstance(room_area_value, list):
return room_area_value
if isinstance(room_area_value, str):
try:
parsed = json.loads(room_area_value)
if isinstance(parsed, list):
return parsed
return [str(parsed)]
except json.JSONDecodeError:
return [room_area_value]
return [str(room_area_value)]
# ============= DATABASE LOADERS =============
class DatabaseLoader:
def __init__(self):
self.stages = []
self.tasks = []
self.materials = []
self.rooms = []
self.stage_embeddings = None
self.task_embeddings = None
self.material_embeddings = None
def load_data(self, stages_file: str, tasks_file: str, materials_file: str, rooms_file: str):
"""Load JSON data files"""
print(f"Loading {stages_file}...")
with open(stages_file, 'r', encoding='utf-8') as f:
self.stages = [json.loads(line) for line in f if line.strip()]
print(f"Loading {tasks_file}...")
with open(tasks_file, 'r', encoding='utf-8') as f:
self.tasks = [json.loads(line) for line in f if line.strip()]
print(f"Loading {materials_file}...")
with open(materials_file, 'r', encoding='utf-8') as f:
self.materials = [json.loads(line) for line in f if line.strip()]
print(f"Loading {rooms_file}...")
with open(rooms_file, 'r', encoding='utf-8') as f:
self.rooms = [json.loads(line) for line in f if line.strip()]
print(f"βœ… Loaded: {len(self.stages)} stages, {len(self.tasks)} tasks, "
f"{len(self.materials)} materials, {len(self.rooms)} rooms")
def initialize_embeddings(self):
"""Pre-compute embeddings for fast lookup"""
print("Computing stage embeddings...")
stage_texts = [s['stage'] for s in self.stages]
self.stage_embeddings = embedding_model.encode(stage_texts, show_progress_bar=True)
print("Computing task embeddings...")
task_texts = [t['task'] for t in self.tasks]
self.task_embeddings = embedding_model.encode(task_texts, show_progress_bar=True)
print("Computing material embeddings...")
material_texts = [m['material'] for m in self.materials]
self.material_embeddings = embedding_model.encode(material_texts, show_progress_bar=True)
print("βœ… Embeddings ready!")
# Global DB instance
db = DatabaseLoader()
# ============= MATCHING FUNCTIONS =============
def find_best_stage(llm_stage: str, threshold: float = 0.5) -> tuple:
"""Find closest matching stage from DB"""
query_embedding = embedding_model.encode([llm_stage])
similarities = cosine_similarity(query_embedding, db.stage_embeddings)[0]
best_idx = np.argmax(similarities)
best_score = similarities[best_idx]
if best_score >= threshold:
return db.stages[best_idx], best_score
return None, 0.0
def find_best_room(llm_area: str, threshold: float = 0.6) -> tuple:
"""Find closest matching room from DB"""
llm_area_lower = llm_area.lower()
for room in db.rooms:
if room['name'].lower() == llm_area_lower:
return room, 1.0
room_texts = [r['name'] for r in db.rooms]
query_embedding = embedding_model.encode([llm_area])
room_embeddings = embedding_model.encode(room_texts)
similarities = cosine_similarity(query_embedding, room_embeddings)[0]
best_idx = np.argmax(similarities)
best_score = similarities[best_idx]
if best_score >= threshold:
return db.rooms[best_idx], best_score
return None, 0.0
def find_tasks_for_stage(stage_id: int, llm_task: str, top_k: int = 5) -> List[tuple]:
"""Find relevant tasks for a stage matching LLM task description"""
stage_tasks = [t for t in db.tasks if t['stageId'] == stage_id]
if not stage_tasks:
return []
task_indices = [db.tasks.index(t) for t in stage_tasks]
query_embedding = embedding_model.encode([llm_task])
stage_task_embeddings = db.task_embeddings[task_indices]
similarities = cosine_similarity(query_embedding, stage_task_embeddings)[0]
top_indices = np.argsort(similarities)[-top_k:][::-1]
results = [(stage_tasks[idx], similarities[idx]) for idx in top_indices]
return results
def extract_keywords(text: str) -> List[str]:
"""Extract meaningful keywords from text"""
stop_words = {'and', 'or', 'the', 'to', 'a', 'of', 'for', 'in', 'on', 'supply', 'install'}
words = re.findall(r'\b\w+\b', text.lower())
return [w for w in words if w not in stop_words and len(w) > 2]
def find_materials_for_task(task: dict, llm_material: str, unit: str, top_k: int = 10) -> List[tuple]:
"""Find materials matching task requirements"""
task_keywords = extract_keywords(task['task'])
llm_keywords = extract_keywords(llm_material)
all_keywords = set(task_keywords + llm_keywords)
compatible_materials = [
m for m in db.materials
if m['unit'] == unit or m['unit'] == 'unit' or m['unit'] is None
]
if not compatible_materials:
compatible_materials = db.materials
scored_materials = []
for material in compatible_materials:
score = 0.0
material_text = material['material'].lower()
for keyword in all_keywords:
if keyword in material_text:
score += 2.0
categories_str = ' '.join(material.get('categories', [])).lower()
for keyword in all_keywords:
if keyword in categories_str:
score += 1.0
material_idx = db.materials.index(material)
query_embedding = embedding_model.encode([llm_material])
material_embedding = db.material_embeddings[material_idx].reshape(1, -1)
semantic_score = cosine_similarity(query_embedding, material_embedding)[0][0]
score += semantic_score * 5.0
if score > 0:
scored_materials.append((material, score))
scored_materials.sort(key=lambda x: x[1], reverse=True)
return scored_materials[:top_k]
# ============= VALIDATION PIPELINE =============
def validate_scope(llm_scope: LLMScopeRequest) -> ValidatedResponse:
"""Main validation pipeline"""
validated_areas = []
for area_scope in llm_scope.scope_of_work:
matched_room, room_confidence = find_best_room(area_scope.area)
validated_stages_dict = {}
for item in area_scope.items:
matched_stage, stage_confidence = find_best_stage(item.stage)
if not matched_stage:
continue
stage_id = matched_stage['stageId']
if stage_id not in validated_stages_dict:
validated_stages_dict[stage_id] = {
'stage_data': matched_stage,
'confidence': stage_confidence,
'tasks': []
}
task_matches = find_tasks_for_stage(stage_id, item.task, top_k=3)
if not task_matches:
continue
best_task, task_confidence = task_matches[0]
material_matches = find_materials_for_task(
best_task, item.material, item.unit, top_k=5
)
validated_materials = [
ValidatedMaterial(
materialId=m['materialId'],
name=m['name'],
material=m['material'],
unit=m['unit'] or 'unit',
price=float(m['price']),
margin=float(m['margin']),
categories=m['categories'],
confidence_score=round(score / 10.0, 2)
)
for m, score in material_matches
]
validated_task = ValidatedTask(
taskId=best_task['taskId'],
task=best_task['task'],
displayName=best_task['displayName'],
unit=best_task['unit'],
stageId=best_task['stageId'],
roomArea=parse_room_area(best_task['roomArea']),
confidence_score=round(task_confidence, 2),
recommended_materials=validated_materials
)
validated_stages_dict[stage_id]['tasks'].append(validated_task)
validated_stages = [
ValidatedStage(
stageId=stage_data['stage_data']['stageId'],
stage=stage_data['stage_data']['stage'],
priority=stage_data['stage_data']['priority'],
confidence_score=round(stage_data['confidence'], 2),
tasks=stage_data['tasks']
)
for stage_data in validated_stages_dict.values()
]
validated_stages.sort(key=lambda x: x.priority)
validated_area = ValidatedArea(
roomId=matched_room['id'] if matched_room else None,
name=matched_room['name'] if matched_room else area_scope.area,
roomType=matched_room['roomType'] if matched_room else 'unknown',
matched=matched_room is not None,
confidence_score=round(room_confidence, 2),
stages=validated_stages
)
validated_areas.append(validated_area)
summary = {
'total_areas': len(validated_areas),
'total_stages': sum(len(a.stages) for a in validated_areas),
'total_tasks': sum(len(s.tasks) for a in validated_areas for s in a.stages),
'total_materials': sum(
len(t.recommended_materials)
for a in validated_areas
for s in a.stages
for t in s.tasks
),
'matched_areas': sum(1 for a in validated_areas if a.matched),
'avg_confidence': round(
np.mean([a.confidence_score for a in validated_areas]), 2
) if validated_areas else 0.0
}
return ValidatedResponse(areas=validated_areas, summary=summary)
# ============= API ENDPOINTS =============
@app.get("/")
async def root():
return {
"service": "Construction Scope Validator",
"version": "1.0.0",
"status": "running",
"data_loaded": len(db.stages) > 0,
"model_type": "trained" if os.path.exists('model.safetensors') else "base"
}
@app.get("/health")
async def health():
return {
"status": "healthy",
"stages_loaded": len(db.stages),
"tasks_loaded": len(db.tasks),
"materials_loaded": len(db.materials),
"rooms_loaded": len(db.rooms),
"embeddings_ready": db.stage_embeddings is not None,
"model_type": "trained" if os.path.exists('model.safetensors') else "base"
}
@app.post("/validate", response_model=ValidatedResponse)
async def validate_scope_endpoint(request: LLMScopeRequest):
"""Validate LLM-generated scope against database"""
try:
if not db.stages:
raise HTTPException(status_code=500, detail="Database not loaded")
result = validate_scope(request)
return result
except Exception as e:
import traceback
error_detail = f"Validation error: {str(e)}\n{traceback.format_exc()}"
raise HTTPException(status_code=500, detail=error_detail)
@app.post("/match-stage")
async def match_stage(stage_name: str):
"""Test endpoint: match a single stage name"""
matched_stage, confidence = find_best_stage(stage_name)
if matched_stage:
return {
"input": stage_name,
"matched": matched_stage,
"confidence": round(confidence, 2)
}
return {"input": stage_name, "matched": None, "confidence": 0.0}
@app.post("/match-room")
async def match_room(room_name: str):
"""Test endpoint: match a single room name"""
matched_room, confidence = find_best_room(room_name)
if matched_room:
return {
"input": room_name,
"matched": matched_room,
"confidence": round(confidence, 2)
}
return {"input": room_name, "matched": None, "confidence": 0.0}
# ============= STARTUP =============
@app.on_event("startup")
async def startup_event():
"""Load data and initialize embeddings on startup"""
try:
print("\n" + "="*60)
print("STARTING UP...")
print("="*60)
db.load_data(
stages_file='stages.json',
tasks_file='tasks.json',
materials_file='materials.json',
rooms_file='rooms.json'
)
db.initialize_embeddings()
print("\n" + "="*60)
print("βœ… SERVICE READY!")
print("="*60)
except Exception as e:
print(f"\n❌ STARTUP ERROR: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
# """
# FastAPI Service for Construction Scope Validation
# Deploy on Hugging Face Spaces
# """
# from fastapi import FastAPI, HTTPException
# from fastapi.middleware.cors import CORSMiddleware
# from pydantic import BaseModel, Field
# from typing import List, Optional, Dict, Any
# import json
# import numpy as np
# import os
# from sentence_transformers import SentenceTransformer
# from sklearn.metrics.pairwise import cosine_similarity
# import re
# app = FastAPI(
# title="Construction Scope Validator API",
# description="Validates and enriches LLM-generated construction scope with DB data",
# version="1.0.0"
# )
# # CORS middleware
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"],
# allow_credentials=True,
# allow_methods=["*"],
# allow_headers=["*"],
# )
# # Load embedding model (cached globally)
# print("="*60)
# print("LOADING MODEL...")
# print("="*60)
# try:
# model_files = ['config.json', 'sentence_bert_config.json']
# has_weights = os.path.exists('pytorch_model.bin') or os.path.exists('model.safetensors')
# has_model = all(os.path.exists(f) for f in model_files) and has_weights
# if has_model:
# print("βœ“ Trained model files found in root directory")
# print("Loading trained model...")
# embedding_model = SentenceTransformer('./', device='cpu')
# print("βœ… Trained model loaded successfully!")
# else:
# print("⚠️ Trained model not found, using base model...")
# embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
# print("βœ… Base model loaded successfully!")
# except Exception as e:
# print(f"❌ Error loading trained model: {e}")
# print("Falling back to base model...")
# embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
# print("βœ… Base model loaded successfully!")
# print("="*60)
# # ============= DATA MODELS =============
# class LLMScopeItem(BaseModel):
# stage: str
# task: str
# material: str
# quantity: float
# unit: str
# class LLMAreaScope(BaseModel):
# area: str
# items: List[LLMScopeItem]
# class LLMScopeRequest(BaseModel):
# scope_of_work: List[LLMAreaScope]
# class ValidatedMaterial(BaseModel):
# materialId: int
# name: str
# material: str
# unit: str
# price: float
# margin: float
# categories: List[str]
# confidence_score: float
# class ValidatedTask(BaseModel):
# taskId: int
# task: str
# displayName: str
# unit: str
# stageId: int
# roomArea: List[str]
# confidence_score: float
# recommended_materials: List[ValidatedMaterial]
# class ValidatedStage(BaseModel):
# stageId: int
# stage: str
# priority: int
# confidence_score: float
# tasks: List[ValidatedTask]
# class ValidatedArea(BaseModel):
# roomId: Optional[int]
# name: str
# roomType: str
# matched: bool
# confidence_score: float
# stages: List[ValidatedStage]
# class ValidatedResponse(BaseModel):
# areas: List[ValidatedArea]
# summary: Dict[str, Any]
# # ============= HELPER FUNCTION =============
# def parse_room_area(room_area_value):
# """
# Parse roomArea field which might be a string, list, or None
# Returns a proper list of strings
# """
# if room_area_value is None:
# return []
# # If it's already a list, return it
# if isinstance(room_area_value, list):
# return room_area_value
# # If it's a string, try to parse it as JSON
# if isinstance(room_area_value, str):
# try:
# parsed = json.loads(room_area_value)
# if isinstance(parsed, list):
# return parsed
# return [str(parsed)]
# except json.JSONDecodeError:
# # If JSON parsing fails, treat it as a single item
# return [room_area_value]
# # Fallback: convert to string and wrap in list
# return [str(room_area_value)]
# # ============= DATABASE LOADERS =============
# class DatabaseLoader:
# def __init__(self):
# self.stages = []
# self.tasks = []
# self.materials = []
# self.rooms = []
# self.stage_embeddings = None
# self.task_embeddings = None
# self.material_embeddings = None
# def load_data(self, stages_file: str, tasks_file: str, materials_file: str, rooms_file: str):
# """Load JSON data files"""
# print(f"Loading {stages_file}...")
# with open(stages_file, 'r', encoding='utf-8') as f:
# self.stages = [json.loads(line) for line in f if line.strip()]
# print(f"Loading {tasks_file}...")
# with open(tasks_file, 'r', encoding='utf-8') as f:
# self.tasks = [json.loads(line) for line in f if line.strip()]
# print(f"Loading {materials_file}...")
# with open(materials_file, 'r', encoding='utf-8') as f:
# self.materials = [json.loads(line) for line in f if line.strip()]
# print(f"Loading {rooms_file}...")
# with open(rooms_file, 'r', encoding='utf-8') as f:
# self.rooms = [json.loads(line) for line in f if line.strip()]
# print(f"βœ… Loaded: {len(self.stages)} stages, {len(self.tasks)} tasks, "
# f"{len(self.materials)} materials, {len(self.rooms)} rooms")
# def initialize_embeddings(self):
# """Pre-compute embeddings for fast lookup"""
# print("Computing stage embeddings...")
# stage_texts = [s['stage'] for s in self.stages]
# self.stage_embeddings = embedding_model.encode(stage_texts, show_progress_bar=True)
# print("Computing task embeddings...")
# task_texts = [t['task'] for t in self.tasks]
# self.task_embeddings = embedding_model.encode(task_texts, show_progress_bar=True)
# print("Computing material embeddings...")
# material_texts = [m['material'] for m in self.materials]
# self.material_embeddings = embedding_model.encode(material_texts, show_progress_bar=True)
# print("βœ… Embeddings ready!")
# # Global DB instance
# db = DatabaseLoader()
# # ============= MATCHING FUNCTIONS =============
# def find_best_stage(llm_stage: str, threshold: float = 0.5) -> tuple:
# """Find closest matching stage from DB"""
# query_embedding = embedding_model.encode([llm_stage])
# similarities = cosine_similarity(query_embedding, db.stage_embeddings)[0]
# best_idx = np.argmax(similarities)
# best_score = similarities[best_idx]
# if best_score >= threshold:
# return db.stages[best_idx], best_score
# return None, 0.0
# def find_best_room(llm_area: str, threshold: float = 0.6) -> tuple:
# """Find closest matching room from DB"""
# llm_area_lower = llm_area.lower()
# # Exact match first
# for room in db.rooms:
# if room['name'].lower() == llm_area_lower:
# return room, 1.0
# # Fuzzy match
# room_texts = [r['name'] for r in db.rooms]
# query_embedding = embedding_model.encode([llm_area])
# room_embeddings = embedding_model.encode(room_texts)
# similarities = cosine_similarity(query_embedding, room_embeddings)[0]
# best_idx = np.argmax(similarities)
# best_score = similarities[best_idx]
# if best_score >= threshold:
# return db.rooms[best_idx], best_score
# return None, 0.0
# def find_tasks_for_stage(stage_id: int, llm_task: str, top_k: int = 5) -> List[tuple]:
# """Find relevant tasks for a stage matching LLM task description"""
# stage_tasks = [t for t in db.tasks if t['stageId'] == stage_id]
# if not stage_tasks:
# return []
# task_indices = [db.tasks.index(t) for t in stage_tasks]
# query_embedding = embedding_model.encode([llm_task])
# stage_task_embeddings = db.task_embeddings[task_indices]
# similarities = cosine_similarity(query_embedding, stage_task_embeddings)[0]
# top_indices = np.argsort(similarities)[-top_k:][::-1]
# results = [(stage_tasks[idx], similarities[idx]) for idx in top_indices]
# return results
# def extract_keywords(text: str) -> List[str]:
# """Extract meaningful keywords from text"""
# stop_words = {'and', 'or', 'the', 'to', 'a', 'of', 'for', 'in', 'on', 'supply', 'install'}
# words = re.findall(r'\b\w+\b', text.lower())
# return [w for w in words if w not in stop_words and len(w) > 2]
# def find_materials_for_task(task: dict, llm_material: str, unit: str, top_k: int = 10) -> List[tuple]:
# """Find materials matching task requirements"""
# task_keywords = extract_keywords(task['task'])
# llm_keywords = extract_keywords(llm_material)
# all_keywords = set(task_keywords + llm_keywords)
# compatible_materials = [
# m for m in db.materials
# if m['unit'] == unit or m['unit'] == 'unit' or m['unit'] is None
# ]
# if not compatible_materials:
# compatible_materials = db.materials
# scored_materials = []
# for material in compatible_materials:
# score = 0.0
# material_text = material['material'].lower()
# for keyword in all_keywords:
# if keyword in material_text:
# score += 2.0
# categories_str = ' '.join(material.get('categories', [])).lower()
# for keyword in all_keywords:
# if keyword in categories_str:
# score += 1.0
# material_idx = db.materials.index(material)
# query_embedding = embedding_model.encode([llm_material])
# material_embedding = db.material_embeddings[material_idx].reshape(1, -1)
# semantic_score = cosine_similarity(query_embedding, material_embedding)[0][0]
# score += semantic_score * 5.0
# if score > 0:
# scored_materials.append((material, score))
# scored_materials.sort(key=lambda x: x[1], reverse=True)
# return scored_materials[:top_k]
# # ============= VALIDATION PIPELINE =============
# def validate_scope(llm_scope: LLMScopeRequest) -> ValidatedResponse:
# """Main validation pipeline"""
# validated_areas = []
# for area_scope in llm_scope.scope_of_work:
# matched_room, room_confidence = find_best_room(area_scope.area)
# validated_stages_dict = {}
# for item in area_scope.items:
# matched_stage, stage_confidence = find_best_stage(item.stage)
# if not matched_stage:
# continue
# stage_id = matched_stage['stageId']
# if stage_id not in validated_stages_dict:
# validated_stages_dict[stage_id] = {
# 'stage_data': matched_stage,
# 'confidence': stage_confidence,
# 'tasks': []
# }
# task_matches = find_tasks_for_stage(stage_id, item.task, top_k=3)
# if not task_matches:
# continue
# best_task, task_confidence = task_matches[0]
# material_matches = find_materials_for_task(
# best_task, item.material, item.unit, top_k=5
# )
# validated_materials = [
# ValidatedMaterial(
# materialId=m['materialId'],
# name=m['name'],
# material=m['material'],
# unit=m['unit'] or 'unit',
# price=float(m['price']),
# margin=float(m['margin']),
# categories=m['categories'],
# confidence_score=round(score / 10.0, 2)
# )
# for m, score in material_matches
# ]
# # FIX: Parse roomArea properly
# validated_task = ValidatedTask(
# taskId=best_task['taskId'],
# task=best_task['task'],
# displayName=best_task['displayName'],
# unit=best_task['unit'],
# stageId=best_task['stageId'],
# roomArea=parse_room_area(best_task['roomArea']), # <-- FIXED HERE
# confidence_score=round(task_confidence, 2),
# recommended_materials=validated_materials
# )
# validated_stages_dict[stage_id]['tasks'].append(validated_task)
# validated_stages = [
# ValidatedStage(
# stageId=stage_data['stage_data']['stageId'],
# stage=stage_data['stage_data']['stage'],
# priority=stage_data['stage_data']['priority'],
# confidence_score=round(stage_data['confidence'], 2),
# tasks=stage_data['tasks']
# )
# for stage_data in validated_stages_dict.values()
# ]
# validated_stages.sort(key=lambda x: x.priority)
# validated_area = ValidatedArea(
# roomId=matched_room['id'] if matched_room else None,
# name=matched_room['name'] if matched_room else area_scope.area,
# roomType=matched_room['roomType'] if matched_room else 'unknown',
# matched=matched_room is not None,
# confidence_score=round(room_confidence, 2),
# stages=validated_stages
# )
# validated_areas.append(validated_area)
# summary = {
# 'total_areas': len(validated_areas),
# 'total_stages': sum(len(a.stages) for a in validated_areas),
# 'total_tasks': sum(len(s.tasks) for a in validated_areas for s in a.stages),
# 'total_materials': sum(
# len(t.recommended_materials)
# for a in validated_areas
# for s in a.stages
# for t in s.tasks
# ),
# 'matched_areas': sum(1 for a in validated_areas if a.matched),
# 'avg_confidence': round(
# np.mean([a.confidence_score for a in validated_areas]), 2
# ) if validated_areas else 0.0
# }
# return ValidatedResponse(areas=validated_areas, summary=summary)
# # ============= API ENDPOINTS =============
# @app.get("/")
# async def root():
# return {
# "service": "Construction Scope Validator",
# "version": "1.0.0",
# "status": "running",
# "data_loaded": len(db.stages) > 0,
# "model_type": "trained" if os.path.exists('model.safetensors') else "base"
# }
# @app.get("/health")
# async def health():
# return {
# "status": "healthy",
# "stages_loaded": len(db.stages),
# "tasks_loaded": len(db.tasks),
# "materials_loaded": len(db.materials),
# "rooms_loaded": len(db.rooms),
# "embeddings_ready": db.stage_embeddings is not None,
# "model_type": "trained" if os.path.exists('model.safetensors') else "base"
# }
# @app.post("/validate", response_model=ValidatedResponse)
# async def validate_scope_endpoint(request: LLMScopeRequest):
# """
# Validate LLM-generated scope against database
# Returns enriched data with matched stages, tasks, materials, and confidence scores
# """
# try:
# if not db.stages:
# raise HTTPException(status_code=500, detail="Database not loaded")
# result = validate_scope(request)
# return result
# except Exception as e:
# import traceback
# error_detail = f"Validation error: {str(e)}\n{traceback.format_exc()}"
# raise HTTPException(status_code=500, detail=error_detail)
# @app.post("/match-stage")
# async def match_stage(stage_name: str):
# """Test endpoint: match a single stage name"""
# matched_stage, confidence = find_best_stage(stage_name)
# if matched_stage:
# return {
# "input": stage_name,
# "matched": matched_stage,
# "confidence": round(confidence, 2)
# }
# return {"input": stage_name, "matched": None, "confidence": 0.0}
# @app.post("/match-room")
# async def match_room(room_name: str):
# """Test endpoint: match a single room name"""
# matched_room, confidence = find_best_room(room_name)
# if matched_room:
# return {
# "input": room_name,
# "matched": matched_room,
# "confidence": round(confidence, 2)
# }
# return {"input": room_name, "matched": None, "confidence": 0.0}
# # ============= STARTUP =============
# @app.on_event("startup")
# async def startup_event():
# """Load data and initialize embeddings on startup"""
# try:
# print("\n" + "="*60)
# print("STARTING UP...")
# print("="*60)
# db.load_data(
# stages_file='stages.json',
# tasks_file='tasks.json',
# materials_file='materials.json',
# rooms_file='rooms.json'
# )
# db.initialize_embeddings()
# print("\n" + "="*60)
# print("βœ… SERVICE READY!")
# print("="*60)
# except Exception as e:
# print(f"\n❌ STARTUP ERROR: {e}")
# print("Make sure JSON files are in the correct location")
# import traceback
# traceback.print_exc()
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=7860)
# """
# FastAPI Service for Construction Scope Validation
# Deploy on Hugging Face Spaces
# """
# from fastapi import FastAPI, HTTPException
# from fastapi.middleware.cors import CORSMiddleware
# from pydantic import BaseModel, Field
# from typing import List, Optional, Dict, Any
# import json
# import numpy as np
# import os
# from sentence_transformers import SentenceTransformer
# from sklearn.metrics.pairwise import cosine_similarity
# import re
# app = FastAPI(
# title="Construction Scope Validator API",
# description="Validates and enriches LLM-generated construction scope with DB data",
# version="1.0.0"
# )
# # CORS middleware
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"],
# allow_credentials=True,
# allow_methods=["*"],
# allow_headers=["*"],
# )
# # Load embedding model (cached globally)
# # Try to load trained model from root, fallback to base model
# print("="*60)
# print("LOADING MODEL...")
# print("="*60)
# try:
# # Check if trained model files exist in root
# # Check if trained model files exist in root
# model_files = ['config.json', 'sentence_bert_config.json']
# # Check for either pytorch_model.bin or model.safetensors
# has_weights = os.path.exists('pytorch_model.bin') or os.path.exists('model.safetensors')
# has_model = all(os.path.exists(f) for f in model_files) and has_weights
# if has_model:
# print("βœ“ Trained model files found in root directory")
# print("Loading trained model...")
# embedding_model = SentenceTransformer('./', device='cpu')
# print("βœ… Trained model loaded successfully!")
# else:
# print("⚠️ Trained model not found, using base model...")
# embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
# print("βœ… Base model loaded successfully!")
# except Exception as e:
# print(f"❌ Error loading trained model: {e}")
# print("Falling back to base model...")
# embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
# print("βœ… Base model loaded successfully!")
# print("="*60)
# # ============= DATA MODELS =============
# class LLMScopeItem(BaseModel):
# stage: str
# task: str
# material: str
# quantity: float
# unit: str
# class LLMAreaScope(BaseModel):
# area: str
# items: List[LLMScopeItem]
# class LLMScopeRequest(BaseModel):
# scope_of_work: List[LLMAreaScope]
# class ValidatedMaterial(BaseModel):
# materialId: int
# name: str
# material: str
# unit: str
# price: float
# margin: float
# categories: List[str]
# confidence_score: float
# class ValidatedTask(BaseModel):
# taskId: int
# task: str
# displayName: str
# unit: str
# stageId: int
# roomArea: List[str]
# confidence_score: float
# recommended_materials: List[ValidatedMaterial]
# class ValidatedStage(BaseModel):
# stageId: int
# stage: str
# priority: int
# confidence_score: float
# tasks: List[ValidatedTask]
# class ValidatedArea(BaseModel):
# roomId: Optional[int]
# name: str
# roomType: str
# matched: bool
# confidence_score: float
# stages: List[ValidatedStage]
# class ValidatedResponse(BaseModel):
# areas: List[ValidatedArea]
# summary: Dict[str, Any]
# # ============= DATABASE LOADERS =============
# class DatabaseLoader:
# def __init__(self):
# self.stages = []
# self.tasks = []
# self.materials = []
# self.rooms = []
# self.stage_embeddings = None
# self.task_embeddings = None
# self.material_embeddings = None
# def load_data(self, stages_file: str, tasks_file: str, materials_file: str, rooms_file: str):
# """Load JSON data files"""
# print(f"Loading {stages_file}...")
# with open(stages_file, 'r', encoding='utf-8') as f:
# self.stages = [json.loads(line) for line in f if line.strip()]
# print(f"Loading {tasks_file}...")
# with open(tasks_file, 'r', encoding='utf-8') as f:
# self.tasks = [json.loads(line) for line in f if line.strip()]
# print(f"Loading {materials_file}...")
# with open(materials_file, 'r', encoding='utf-8') as f:
# self.materials = [json.loads(line) for line in f if line.strip()]
# print(f"Loading {rooms_file}...")
# with open(rooms_file, 'r', encoding='utf-8') as f:
# self.rooms = [json.loads(line) for line in f if line.strip()]
# print(f"βœ… Loaded: {len(self.stages)} stages, {len(self.tasks)} tasks, "
# f"{len(self.materials)} materials, {len(self.rooms)} rooms")
# def initialize_embeddings(self):
# """Pre-compute embeddings for fast lookup"""
# print("Computing stage embeddings...")
# stage_texts = [s['stage'] for s in self.stages]
# self.stage_embeddings = embedding_model.encode(stage_texts, show_progress_bar=True)
# print("Computing task embeddings...")
# task_texts = [t['task'] for t in self.tasks]
# self.task_embeddings = embedding_model.encode(task_texts, show_progress_bar=True)
# print("Computing material embeddings...")
# material_texts = [m['material'] for m in self.materials]
# self.material_embeddings = embedding_model.encode(material_texts, show_progress_bar=True)
# print("βœ… Embeddings ready!")
# # Global DB instance
# db = DatabaseLoader()
# # ============= MATCHING FUNCTIONS =============
# def find_best_stage(llm_stage: str, threshold: float = 0.5) -> tuple:
# """Find closest matching stage from DB"""
# query_embedding = embedding_model.encode([llm_stage])
# similarities = cosine_similarity(query_embedding, db.stage_embeddings)[0]
# best_idx = np.argmax(similarities)
# best_score = similarities[best_idx]
# if best_score >= threshold:
# return db.stages[best_idx], best_score
# return None, 0.0
# def find_best_room(llm_area: str, threshold: float = 0.6) -> tuple:
# """Find closest matching room from DB"""
# llm_area_lower = llm_area.lower()
# # Exact match first
# for room in db.rooms:
# if room['name'].lower() == llm_area_lower:
# return room, 1.0
# # Fuzzy match
# room_texts = [r['name'] for r in db.rooms]
# query_embedding = embedding_model.encode([llm_area])
# room_embeddings = embedding_model.encode(room_texts)
# similarities = cosine_similarity(query_embedding, room_embeddings)[0]
# best_idx = np.argmax(similarities)
# best_score = similarities[best_idx]
# if best_score >= threshold:
# return db.rooms[best_idx], best_score
# return None, 0.0
# def find_tasks_for_stage(stage_id: int, llm_task: str, top_k: int = 5) -> List[tuple]:
# """Find relevant tasks for a stage matching LLM task description"""
# # Filter tasks by stage
# stage_tasks = [t for t in db.tasks if t['stageId'] == stage_id]
# if not stage_tasks:
# return []
# # Compute similarities
# task_indices = [db.tasks.index(t) for t in stage_tasks]
# query_embedding = embedding_model.encode([llm_task])
# stage_task_embeddings = db.task_embeddings[task_indices]
# similarities = cosine_similarity(query_embedding, stage_task_embeddings)[0]
# # Get top K
# top_indices = np.argsort(similarities)[-top_k:][::-1]
# results = [(stage_tasks[idx], similarities[idx]) for idx in top_indices]
# return results
# def extract_keywords(text: str) -> List[str]:
# """Extract meaningful keywords from text"""
# # Remove common words
# stop_words = {'and', 'or', 'the', 'to', 'a', 'of', 'for', 'in', 'on', 'supply', 'install'}
# words = re.findall(r'\b\w+\b', text.lower())
# return [w for w in words if w not in stop_words and len(w) > 2]
# def find_materials_for_task(task: dict, llm_material: str, unit: str, top_k: int = 10) -> List[tuple]:
# """Find materials matching task requirements"""
# task_keywords = extract_keywords(task['task'])
# llm_keywords = extract_keywords(llm_material)
# all_keywords = set(task_keywords + llm_keywords)
# # Filter by unit compatibility
# compatible_materials = [
# m for m in db.materials
# if m['unit'] == unit or m['unit'] == 'unit' or m['unit'] is None
# ]
# if not compatible_materials:
# # Fallback: allow any unit
# compatible_materials = db.materials
# # Score materials
# scored_materials = []
# for material in compatible_materials:
# score = 0.0
# material_text = material['material'].lower()
# # Keyword matching
# for keyword in all_keywords:
# if keyword in material_text:
# score += 2.0
# # Category matching
# categories_str = ' '.join(material.get('categories', [])).lower()
# for keyword in all_keywords:
# if keyword in categories_str:
# score += 1.0
# # Embedding similarity
# material_idx = db.materials.index(material)
# query_embedding = embedding_model.encode([llm_material])
# material_embedding = db.material_embeddings[material_idx].reshape(1, -1)
# semantic_score = cosine_similarity(query_embedding, material_embedding)[0][0]
# score += semantic_score * 5.0
# if score > 0:
# scored_materials.append((material, score))
# # Sort and return top K
# scored_materials.sort(key=lambda x: x[1], reverse=True)
# return scored_materials[:top_k]
# # ============= VALIDATION PIPELINE =============
# def validate_scope(llm_scope: LLMScopeRequest) -> ValidatedResponse:
# """Main validation pipeline"""
# validated_areas = []
# for area_scope in llm_scope.scope_of_work:
# # Match room/area
# matched_room, room_confidence = find_best_room(area_scope.area)
# validated_stages_dict = {}
# for item in area_scope.items:
# # Match stage
# matched_stage, stage_confidence = find_best_stage(item.stage)
# if not matched_stage:
# continue # Skip if stage not found
# stage_id = matched_stage['stageId']
# # Initialize stage if new
# if stage_id not in validated_stages_dict:
# validated_stages_dict[stage_id] = {
# 'stage_data': matched_stage,
# 'confidence': stage_confidence,
# 'tasks': []
# }
# # Match task
# task_matches = find_tasks_for_stage(stage_id, item.task, top_k=3)
# if not task_matches:
# continue
# best_task, task_confidence = task_matches[0]
# # Match materials
# material_matches = find_materials_for_task(
# best_task,
# item.material,
# item.unit,
# top_k=5
# )
# validated_materials = [
# ValidatedMaterial(
# materialId=m['materialId'],
# name=m['name'],
# material=m['material'],
# unit=m['unit'] or 'unit',
# price=float(m['price']),
# margin=float(m['margin']),
# categories=m['categories'],
# confidence_score=round(score / 10.0, 2)
# )
# for m, score in material_matches
# ]
# validated_task = ValidatedTask(
# taskId=best_task['taskId'],
# task=best_task['task'],
# displayName=best_task['displayName'],
# unit=best_task['unit'],
# stageId=best_task['stageId'],
# roomArea=best_task['roomArea'],
# confidence_score=round(task_confidence, 2),
# recommended_materials=validated_materials
# )
# validated_stages_dict[stage_id]['tasks'].append(validated_task)
# # Build validated stages list
# validated_stages = [
# ValidatedStage(
# stageId=stage_data['stage_data']['stageId'],
# stage=stage_data['stage_data']['stage'],
# priority=stage_data['stage_data']['priority'],
# confidence_score=round(stage_data['confidence'], 2),
# tasks=stage_data['tasks']
# )
# for stage_data in validated_stages_dict.values()
# ]
# # Sort stages by priority
# validated_stages.sort(key=lambda x: x.priority)
# validated_area = ValidatedArea(
# roomId=matched_room['id'] if matched_room else None,
# name=matched_room['name'] if matched_room else area_scope.area,
# roomType=matched_room['roomType'] if matched_room else 'unknown',
# matched=matched_room is not None,
# confidence_score=round(room_confidence, 2),
# stages=validated_stages
# )
# validated_areas.append(validated_area)
# # Build summary
# summary = {
# 'total_areas': len(validated_areas),
# 'total_stages': sum(len(a.stages) for a in validated_areas),
# 'total_tasks': sum(len(s.tasks) for a in validated_areas for s in a.stages),
# 'total_materials': sum(
# len(t.recommended_materials)
# for a in validated_areas
# for s in a.stages
# for t in s.tasks
# ),
# 'matched_areas': sum(1 for a in validated_areas if a.matched),
# 'avg_confidence': round(
# np.mean([a.confidence_score for a in validated_areas]), 2
# ) if validated_areas else 0.0
# }
# return ValidatedResponse(areas=validated_areas, summary=summary)
# # ============= API ENDPOINTS =============
# @app.get("/")
# async def root():
# return {
# "service": "Construction Scope Validator",
# "version": "1.0.0",
# "status": "running",
# "data_loaded": len(db.stages) > 0,
# "model_type": "trained" if os.path.exists('pytorch_model.bin') else "base"
# }
# @app.get("/health")
# async def health():
# return {
# "status": "healthy",
# "stages_loaded": len(db.stages),
# "tasks_loaded": len(db.tasks),
# "materials_loaded": len(db.materials),
# "rooms_loaded": len(db.rooms),
# "embeddings_ready": db.stage_embeddings is not None,
# "model_type": "trained" if os.path.exists('pytorch_model.bin') else "base"
# }
# @app.post("/validate", response_model=ValidatedResponse)
# async def validate_scope_endpoint(request: LLMScopeRequest):
# """
# Validate LLM-generated scope against database
# Returns enriched data with:
# - Matched stages from DB
# - Matched tasks from DB
# - Recommended materials with pricing
# - Confidence scores for all matches
# """
# try:
# if not db.stages:
# raise HTTPException(status_code=500, detail="Database not loaded")
# result = validate_scope(request)
# return result
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Validation error: {str(e)}")
# @app.post("/match-stage")
# async def match_stage(stage_name: str):
# """Test endpoint: match a single stage name"""
# matched_stage, confidence = find_best_stage(stage_name)
# if matched_stage:
# return {
# "input": stage_name,
# "matched": matched_stage,
# "confidence": round(confidence, 2)
# }
# return {"input": stage_name, "matched": None, "confidence": 0.0}
# @app.post("/match-room")
# async def match_room(room_name: str):
# """Test endpoint: match a single room name"""
# matched_room, confidence = find_best_room(room_name)
# if matched_room:
# return {
# "input": room_name,
# "matched": matched_room,
# "confidence": round(confidence, 2)
# }
# return {"input": room_name, "matched": None, "confidence": 0.0}
# # ============= STARTUP =============
# @app.on_event("startup")
# async def startup_event():
# """Load data and initialize embeddings on startup"""
# try:
# print("\n" + "="*60)
# print("STARTING UP...")
# print("="*60)
# # Check what files are available
# print("\nFiles in root directory:")
# for file in os.listdir('.'):
# print(f" - {file}")
# # Load data
# db.load_data(
# stages_file='stages.json',
# tasks_file='tasks.json',
# materials_file='materials.json',
# rooms_file='rooms.json'
# )
# db.initialize_embeddings()
# print("\n" + "="*60)
# print("βœ… SERVICE READY!")
# print("="*60)
# except Exception as e:
# print(f"\n❌ STARTUP ERROR: {e}")
# print("Make sure JSON files are in the correct location")
# import traceback
# traceback.print_exc()
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=7860)