""" FastAPI Service for Construction Scope Validation Deploy on Hugging Face Spaces - Flattened File Structure """ from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any import json import numpy as np import os import shutil from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity import re app = FastAPI( title="Construction Scope Validator API", description="Validates and enriches LLM-generated construction scope with DB data", version="1.0.0" ) #--------------------------- # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============= MODEL LOADING WITH FLAT STRUCTURE ============= print("="*60) print("LOADING MODEL...") print("="*60) def setup_model_structure(): """ Create temporary folder structure for sentence-transformers if files are in root (flattened structure) """ # Check if we need to create structure if not os.path.exists('1_Pooling') or not os.path.exists('2_Normalize'): print("Creating temporary model structure...") # Create directories os.makedirs('1_Pooling', exist_ok=True) os.makedirs('2_Normalize', exist_ok=True) # Pooling config pooling_config = { "word_embedding_dimension": 384, "pooling_mode_cls_token": False, "pooling_mode_mean_tokens": True, "pooling_mode_max_tokens": False, "pooling_mode_mean_sqrt_len_tokens": False } with open('1_Pooling/config.json', 'w') as f: json.dump(pooling_config, f, indent=2) # Normalize config (empty is fine) with open('2_Normalize/config.json', 'w') as f: json.dump({}, f) print("✓ Created 1_Pooling/config.json") print("✓ Created 2_Normalize/config.json") # Setup structure before loading model setup_model_structure() try: model_files = ['config.json', 'sentence_bert_config.json'] has_weights = os.path.exists('pytorch_model.bin') or os.path.exists('model.safetensors') has_model = all(os.path.exists(f) for f in model_files) and has_weights if has_model: print("✓ Model files found in root directory") print("Loading trained model...") embedding_model = SentenceTransformer('./', device='cpu') print("✅ Trained model loaded successfully!") else: print("⚠️ Model not found, using base model...") embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu') print("✅ Base model loaded successfully!") except Exception as e: print(f"❌ Error loading trained model: {e}") print("Falling back to base model...") embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu') print("✅ Base model loaded successfully!") print("="*60) # ============= DATA MODELS ============= class LLMScopeItem(BaseModel): stage: str task: str material: str quantity: float unit: str class LLMAreaScope(BaseModel): area: str items: List[LLMScopeItem] class LLMScopeRequest(BaseModel): scope_of_work: List[LLMAreaScope] class ValidatedMaterial(BaseModel): materialId: int name: str material: str unit: str price: float margin: float categories: List[str] confidence_score: float class ValidatedTask(BaseModel): taskId: int task: str displayName: str unit: str stageId: int roomArea: List[str] confidence_score: float recommended_materials: List[ValidatedMaterial] class ValidatedStage(BaseModel): stageId: int stage: str priority: int confidence_score: float tasks: List[ValidatedTask] class ValidatedArea(BaseModel): roomId: Optional[int] name: str roomType: str matched: bool confidence_score: float stages: List[ValidatedStage] class ValidatedResponse(BaseModel): areas: List[ValidatedArea] summary: Dict[str, Any] # ============= HELPER FUNCTION ============= def parse_room_area(room_area_value): """Parse roomArea field which might be a string, list, or None""" if room_area_value is None: return [] if isinstance(room_area_value, list): return room_area_value if isinstance(room_area_value, str): try: parsed = json.loads(room_area_value) if isinstance(parsed, list): return parsed return [str(parsed)] except json.JSONDecodeError: return [room_area_value] return [str(room_area_value)] # ============= DATABASE LOADERS ============= class DatabaseLoader: def __init__(self): self.stages = [] self.tasks = [] self.materials = [] self.rooms = [] self.stage_embeddings = None self.task_embeddings = None self.material_embeddings = None def load_data(self, stages_file: str, tasks_file: str, materials_file: str, rooms_file: str): """Load JSON data files""" print(f"Loading {stages_file}...") with open(stages_file, 'r', encoding='utf-8') as f: self.stages = [json.loads(line) for line in f if line.strip()] print(f"Loading {tasks_file}...") with open(tasks_file, 'r', encoding='utf-8') as f: self.tasks = [json.loads(line) for line in f if line.strip()] print(f"Loading {materials_file}...") with open(materials_file, 'r', encoding='utf-8') as f: self.materials = [json.loads(line) for line in f if line.strip()] print(f"Loading {rooms_file}...") with open(rooms_file, 'r', encoding='utf-8') as f: self.rooms = [json.loads(line) for line in f if line.strip()] print(f"✅ Loaded: {len(self.stages)} stages, {len(self.tasks)} tasks, " f"{len(self.materials)} materials, {len(self.rooms)} rooms") def initialize_embeddings(self): """Pre-compute embeddings for fast lookup""" print("Computing stage embeddings...") stage_texts = [s['stage'] for s in self.stages] self.stage_embeddings = embedding_model.encode(stage_texts, show_progress_bar=True) print("Computing task embeddings...") task_texts = [t['task'] for t in self.tasks] self.task_embeddings = embedding_model.encode(task_texts, show_progress_bar=True) print("Computing material embeddings...") material_texts = [m['material'] for m in self.materials] self.material_embeddings = embedding_model.encode(material_texts, show_progress_bar=True) print("✅ Embeddings ready!") # Global DB instance db = DatabaseLoader() # ============= MATCHING FUNCTIONS ============= def find_best_stage(llm_stage: str, threshold: float = 0.5) -> tuple: """Find closest matching stage from DB""" query_embedding = embedding_model.encode([llm_stage]) similarities = cosine_similarity(query_embedding, db.stage_embeddings)[0] best_idx = np.argmax(similarities) best_score = similarities[best_idx] if best_score >= threshold: return db.stages[best_idx], best_score return None, 0.0 def find_best_room(llm_area: str, threshold: float = 0.6) -> tuple: """Find closest matching room from DB""" llm_area_lower = llm_area.lower() for room in db.rooms: if room['name'].lower() == llm_area_lower: return room, 1.0 room_texts = [r['name'] for r in db.rooms] query_embedding = embedding_model.encode([llm_area]) room_embeddings = embedding_model.encode(room_texts) similarities = cosine_similarity(query_embedding, room_embeddings)[0] best_idx = np.argmax(similarities) best_score = similarities[best_idx] if best_score >= threshold: return db.rooms[best_idx], best_score return None, 0.0 def find_tasks_for_stage(stage_id: int, llm_task: str, top_k: int = 5) -> List[tuple]: """Find relevant tasks for a stage matching LLM task description""" stage_tasks = [t for t in db.tasks if t['stageId'] == stage_id] if not stage_tasks: return [] task_indices = [db.tasks.index(t) for t in stage_tasks] query_embedding = embedding_model.encode([llm_task]) stage_task_embeddings = db.task_embeddings[task_indices] similarities = cosine_similarity(query_embedding, stage_task_embeddings)[0] top_indices = np.argsort(similarities)[-top_k:][::-1] results = [(stage_tasks[idx], similarities[idx]) for idx in top_indices] return results def extract_keywords(text: str) -> List[str]: """Extract meaningful keywords from text""" stop_words = {'and', 'or', 'the', 'to', 'a', 'of', 'for', 'in', 'on', 'supply', 'install'} words = re.findall(r'\b\w+\b', text.lower()) return [w for w in words if w not in stop_words and len(w) > 2] def find_materials_for_task(task: dict, llm_material: str, unit: str, top_k: int = 10) -> List[tuple]: """Find materials matching task requirements""" task_keywords = extract_keywords(task['task']) llm_keywords = extract_keywords(llm_material) all_keywords = set(task_keywords + llm_keywords) compatible_materials = [ m for m in db.materials if m['unit'] == unit or m['unit'] == 'unit' or m['unit'] is None ] if not compatible_materials: compatible_materials = db.materials scored_materials = [] for material in compatible_materials: score = 0.0 material_text = material['material'].lower() for keyword in all_keywords: if keyword in material_text: score += 2.0 categories_str = ' '.join(material.get('categories', [])).lower() for keyword in all_keywords: if keyword in categories_str: score += 1.0 material_idx = db.materials.index(material) query_embedding = embedding_model.encode([llm_material]) material_embedding = db.material_embeddings[material_idx].reshape(1, -1) semantic_score = cosine_similarity(query_embedding, material_embedding)[0][0] score += semantic_score * 5.0 if score > 0: scored_materials.append((material, score)) scored_materials.sort(key=lambda x: x[1], reverse=True) return scored_materials[:top_k] # ============= VALIDATION PIPELINE ============= def validate_scope(llm_scope: LLMScopeRequest) -> ValidatedResponse: """Main validation pipeline""" validated_areas = [] for area_scope in llm_scope.scope_of_work: matched_room, room_confidence = find_best_room(area_scope.area) validated_stages_dict = {} for item in area_scope.items: matched_stage, stage_confidence = find_best_stage(item.stage) if not matched_stage: continue stage_id = matched_stage['stageId'] if stage_id not in validated_stages_dict: validated_stages_dict[stage_id] = { 'stage_data': matched_stage, 'confidence': stage_confidence, 'tasks': [] } task_matches = find_tasks_for_stage(stage_id, item.task, top_k=3) if not task_matches: continue best_task, task_confidence = task_matches[0] material_matches = find_materials_for_task( best_task, item.material, item.unit, top_k=5 ) validated_materials = [ ValidatedMaterial( materialId=m['materialId'], name=m['name'], material=m['material'], unit=m['unit'] or 'unit', price=float(m['price']), margin=float(m['margin']), categories=m['categories'], confidence_score=round(score / 10.0, 2) ) for m, score in material_matches ] validated_task = ValidatedTask( taskId=best_task['taskId'], task=best_task['task'], displayName=best_task['displayName'], unit=best_task['unit'], stageId=best_task['stageId'], roomArea=parse_room_area(best_task['roomArea']), confidence_score=round(task_confidence, 2), recommended_materials=validated_materials ) validated_stages_dict[stage_id]['tasks'].append(validated_task) validated_stages = [ ValidatedStage( stageId=stage_data['stage_data']['stageId'], stage=stage_data['stage_data']['stage'], priority=stage_data['stage_data']['priority'], confidence_score=round(stage_data['confidence'], 2), tasks=stage_data['tasks'] ) for stage_data in validated_stages_dict.values() ] validated_stages.sort(key=lambda x: x.priority) validated_area = ValidatedArea( roomId=matched_room['id'] if matched_room else None, name=matched_room['name'] if matched_room else area_scope.area, roomType=matched_room['roomType'] if matched_room else 'unknown', matched=matched_room is not None, confidence_score=round(room_confidence, 2), stages=validated_stages ) validated_areas.append(validated_area) summary = { 'total_areas': len(validated_areas), 'total_stages': sum(len(a.stages) for a in validated_areas), 'total_tasks': sum(len(s.tasks) for a in validated_areas for s in a.stages), 'total_materials': sum( len(t.recommended_materials) for a in validated_areas for s in a.stages for t in s.tasks ), 'matched_areas': sum(1 for a in validated_areas if a.matched), 'avg_confidence': round( np.mean([a.confidence_score for a in validated_areas]), 2 ) if validated_areas else 0.0 } return ValidatedResponse(areas=validated_areas, summary=summary) # ============= API ENDPOINTS ============= @app.get("/") async def root(): return { "service": "Construction Scope Validator", "version": "1.0.0", "status": "running", "data_loaded": len(db.stages) > 0, "model_type": "trained" if os.path.exists('model.safetensors') else "base" } @app.get("/health") async def health(): return { "status": "healthy", "stages_loaded": len(db.stages), "tasks_loaded": len(db.tasks), "materials_loaded": len(db.materials), "rooms_loaded": len(db.rooms), "embeddings_ready": db.stage_embeddings is not None, "model_type": "trained" if os.path.exists('model.safetensors') else "base" } @app.post("/validate", response_model=ValidatedResponse) async def validate_scope_endpoint(request: LLMScopeRequest): """Validate LLM-generated scope against database""" try: if not db.stages: raise HTTPException(status_code=500, detail="Database not loaded") result = validate_scope(request) return result except Exception as e: import traceback error_detail = f"Validation error: {str(e)}\n{traceback.format_exc()}" raise HTTPException(status_code=500, detail=error_detail) @app.post("/match-stage") async def match_stage(stage_name: str): """Test endpoint: match a single stage name""" matched_stage, confidence = find_best_stage(stage_name) if matched_stage: return { "input": stage_name, "matched": matched_stage, "confidence": round(confidence, 2) } return {"input": stage_name, "matched": None, "confidence": 0.0} @app.post("/match-room") async def match_room(room_name: str): """Test endpoint: match a single room name""" matched_room, confidence = find_best_room(room_name) if matched_room: return { "input": room_name, "matched": matched_room, "confidence": round(confidence, 2) } return {"input": room_name, "matched": None, "confidence": 0.0} # ============= STARTUP ============= @app.on_event("startup") async def startup_event(): """Load data and initialize embeddings on startup""" try: print("\n" + "="*60) print("STARTING UP...") print("="*60) db.load_data( stages_file='stages.json', tasks_file='tasks.json', materials_file='materials.json', rooms_file='rooms.json' ) db.initialize_embeddings() print("\n" + "="*60) print("✅ SERVICE READY!") print("="*60) except Exception as e: print(f"\n❌ STARTUP ERROR: {e}") import traceback traceback.print_exc() if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860) # """ # FastAPI Service for Construction Scope Validation # Deploy on Hugging Face Spaces # """ # from fastapi import FastAPI, HTTPException # from fastapi.middleware.cors import CORSMiddleware # from pydantic import BaseModel, Field # from typing import List, Optional, Dict, Any # import json # import numpy as np # import os # from sentence_transformers import SentenceTransformer # from sklearn.metrics.pairwise import cosine_similarity # import re # app = FastAPI( # title="Construction Scope Validator API", # description="Validates and enriches LLM-generated construction scope with DB data", # version="1.0.0" # ) # # CORS middleware # app.add_middleware( # CORSMiddleware, # allow_origins=["*"], # allow_credentials=True, # allow_methods=["*"], # allow_headers=["*"], # ) # # Load embedding model (cached globally) # print("="*60) # print("LOADING MODEL...") # print("="*60) # try: # model_files = ['config.json', 'sentence_bert_config.json'] # has_weights = os.path.exists('pytorch_model.bin') or os.path.exists('model.safetensors') # has_model = all(os.path.exists(f) for f in model_files) and has_weights # if has_model: # print("✓ Trained model files found in root directory") # print("Loading trained model...") # embedding_model = SentenceTransformer('./', device='cpu') # print("✅ Trained model loaded successfully!") # else: # print("⚠️ Trained model not found, using base model...") # embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu') # print("✅ Base model loaded successfully!") # except Exception as e: # print(f"❌ Error loading trained model: {e}") # print("Falling back to base model...") # embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu') # print("✅ Base model loaded successfully!") # print("="*60) # # ============= DATA MODELS ============= # class LLMScopeItem(BaseModel): # stage: str # task: str # material: str # quantity: float # unit: str # class LLMAreaScope(BaseModel): # area: str # items: List[LLMScopeItem] # class LLMScopeRequest(BaseModel): # scope_of_work: List[LLMAreaScope] # class ValidatedMaterial(BaseModel): # materialId: int # name: str # material: str # unit: str # price: float # margin: float # categories: List[str] # confidence_score: float # class ValidatedTask(BaseModel): # taskId: int # task: str # displayName: str # unit: str # stageId: int # roomArea: List[str] # confidence_score: float # recommended_materials: List[ValidatedMaterial] # class ValidatedStage(BaseModel): # stageId: int # stage: str # priority: int # confidence_score: float # tasks: List[ValidatedTask] # class ValidatedArea(BaseModel): # roomId: Optional[int] # name: str # roomType: str # matched: bool # confidence_score: float # stages: List[ValidatedStage] # class ValidatedResponse(BaseModel): # areas: List[ValidatedArea] # summary: Dict[str, Any] # # ============= HELPER FUNCTION ============= # def parse_room_area(room_area_value): # """ # Parse roomArea field which might be a string, list, or None # Returns a proper list of strings # """ # if room_area_value is None: # return [] # # If it's already a list, return it # if isinstance(room_area_value, list): # return room_area_value # # If it's a string, try to parse it as JSON # if isinstance(room_area_value, str): # try: # parsed = json.loads(room_area_value) # if isinstance(parsed, list): # return parsed # return [str(parsed)] # except json.JSONDecodeError: # # If JSON parsing fails, treat it as a single item # return [room_area_value] # # Fallback: convert to string and wrap in list # return [str(room_area_value)] # # ============= DATABASE LOADERS ============= # class DatabaseLoader: # def __init__(self): # self.stages = [] # self.tasks = [] # self.materials = [] # self.rooms = [] # self.stage_embeddings = None # self.task_embeddings = None # self.material_embeddings = None # def load_data(self, stages_file: str, tasks_file: str, materials_file: str, rooms_file: str): # """Load JSON data files""" # print(f"Loading {stages_file}...") # with open(stages_file, 'r', encoding='utf-8') as f: # self.stages = [json.loads(line) for line in f if line.strip()] # print(f"Loading {tasks_file}...") # with open(tasks_file, 'r', encoding='utf-8') as f: # self.tasks = [json.loads(line) for line in f if line.strip()] # print(f"Loading {materials_file}...") # with open(materials_file, 'r', encoding='utf-8') as f: # self.materials = [json.loads(line) for line in f if line.strip()] # print(f"Loading {rooms_file}...") # with open(rooms_file, 'r', encoding='utf-8') as f: # self.rooms = [json.loads(line) for line in f if line.strip()] # print(f"✅ Loaded: {len(self.stages)} stages, {len(self.tasks)} tasks, " # f"{len(self.materials)} materials, {len(self.rooms)} rooms") # def initialize_embeddings(self): # """Pre-compute embeddings for fast lookup""" # print("Computing stage embeddings...") # stage_texts = [s['stage'] for s in self.stages] # self.stage_embeddings = embedding_model.encode(stage_texts, show_progress_bar=True) # print("Computing task embeddings...") # task_texts = [t['task'] for t in self.tasks] # self.task_embeddings = embedding_model.encode(task_texts, show_progress_bar=True) # print("Computing material embeddings...") # material_texts = [m['material'] for m in self.materials] # self.material_embeddings = embedding_model.encode(material_texts, show_progress_bar=True) # print("✅ Embeddings ready!") # # Global DB instance # db = DatabaseLoader() # # ============= MATCHING FUNCTIONS ============= # def find_best_stage(llm_stage: str, threshold: float = 0.5) -> tuple: # """Find closest matching stage from DB""" # query_embedding = embedding_model.encode([llm_stage]) # similarities = cosine_similarity(query_embedding, db.stage_embeddings)[0] # best_idx = np.argmax(similarities) # best_score = similarities[best_idx] # if best_score >= threshold: # return db.stages[best_idx], best_score # return None, 0.0 # def find_best_room(llm_area: str, threshold: float = 0.6) -> tuple: # """Find closest matching room from DB""" # llm_area_lower = llm_area.lower() # # Exact match first # for room in db.rooms: # if room['name'].lower() == llm_area_lower: # return room, 1.0 # # Fuzzy match # room_texts = [r['name'] for r in db.rooms] # query_embedding = embedding_model.encode([llm_area]) # room_embeddings = embedding_model.encode(room_texts) # similarities = cosine_similarity(query_embedding, room_embeddings)[0] # best_idx = np.argmax(similarities) # best_score = similarities[best_idx] # if best_score >= threshold: # return db.rooms[best_idx], best_score # return None, 0.0 # def find_tasks_for_stage(stage_id: int, llm_task: str, top_k: int = 5) -> List[tuple]: # """Find relevant tasks for a stage matching LLM task description""" # stage_tasks = [t for t in db.tasks if t['stageId'] == stage_id] # if not stage_tasks: # return [] # task_indices = [db.tasks.index(t) for t in stage_tasks] # query_embedding = embedding_model.encode([llm_task]) # stage_task_embeddings = db.task_embeddings[task_indices] # similarities = cosine_similarity(query_embedding, stage_task_embeddings)[0] # top_indices = np.argsort(similarities)[-top_k:][::-1] # results = [(stage_tasks[idx], similarities[idx]) for idx in top_indices] # return results # def extract_keywords(text: str) -> List[str]: # """Extract meaningful keywords from text""" # stop_words = {'and', 'or', 'the', 'to', 'a', 'of', 'for', 'in', 'on', 'supply', 'install'} # words = re.findall(r'\b\w+\b', text.lower()) # return [w for w in words if w not in stop_words and len(w) > 2] # def find_materials_for_task(task: dict, llm_material: str, unit: str, top_k: int = 10) -> List[tuple]: # """Find materials matching task requirements""" # task_keywords = extract_keywords(task['task']) # llm_keywords = extract_keywords(llm_material) # all_keywords = set(task_keywords + llm_keywords) # compatible_materials = [ # m for m in db.materials # if m['unit'] == unit or m['unit'] == 'unit' or m['unit'] is None # ] # if not compatible_materials: # compatible_materials = db.materials # scored_materials = [] # for material in compatible_materials: # score = 0.0 # material_text = material['material'].lower() # for keyword in all_keywords: # if keyword in material_text: # score += 2.0 # categories_str = ' '.join(material.get('categories', [])).lower() # for keyword in all_keywords: # if keyword in categories_str: # score += 1.0 # material_idx = db.materials.index(material) # query_embedding = embedding_model.encode([llm_material]) # material_embedding = db.material_embeddings[material_idx].reshape(1, -1) # semantic_score = cosine_similarity(query_embedding, material_embedding)[0][0] # score += semantic_score * 5.0 # if score > 0: # scored_materials.append((material, score)) # scored_materials.sort(key=lambda x: x[1], reverse=True) # return scored_materials[:top_k] # # ============= VALIDATION PIPELINE ============= # def validate_scope(llm_scope: LLMScopeRequest) -> ValidatedResponse: # """Main validation pipeline""" # validated_areas = [] # for area_scope in llm_scope.scope_of_work: # matched_room, room_confidence = find_best_room(area_scope.area) # validated_stages_dict = {} # for item in area_scope.items: # matched_stage, stage_confidence = find_best_stage(item.stage) # if not matched_stage: # continue # stage_id = matched_stage['stageId'] # if stage_id not in validated_stages_dict: # validated_stages_dict[stage_id] = { # 'stage_data': matched_stage, # 'confidence': stage_confidence, # 'tasks': [] # } # task_matches = find_tasks_for_stage(stage_id, item.task, top_k=3) # if not task_matches: # continue # best_task, task_confidence = task_matches[0] # material_matches = find_materials_for_task( # best_task, item.material, item.unit, top_k=5 # ) # validated_materials = [ # ValidatedMaterial( # materialId=m['materialId'], # name=m['name'], # material=m['material'], # unit=m['unit'] or 'unit', # price=float(m['price']), # margin=float(m['margin']), # categories=m['categories'], # confidence_score=round(score / 10.0, 2) # ) # for m, score in material_matches # ] # # FIX: Parse roomArea properly # validated_task = ValidatedTask( # taskId=best_task['taskId'], # task=best_task['task'], # displayName=best_task['displayName'], # unit=best_task['unit'], # stageId=best_task['stageId'], # roomArea=parse_room_area(best_task['roomArea']), # <-- FIXED HERE # confidence_score=round(task_confidence, 2), # recommended_materials=validated_materials # ) # validated_stages_dict[stage_id]['tasks'].append(validated_task) # validated_stages = [ # ValidatedStage( # stageId=stage_data['stage_data']['stageId'], # stage=stage_data['stage_data']['stage'], # priority=stage_data['stage_data']['priority'], # confidence_score=round(stage_data['confidence'], 2), # tasks=stage_data['tasks'] # ) # for stage_data in validated_stages_dict.values() # ] # validated_stages.sort(key=lambda x: x.priority) # validated_area = ValidatedArea( # roomId=matched_room['id'] if matched_room else None, # name=matched_room['name'] if matched_room else area_scope.area, # roomType=matched_room['roomType'] if matched_room else 'unknown', # matched=matched_room is not None, # confidence_score=round(room_confidence, 2), # stages=validated_stages # ) # validated_areas.append(validated_area) # summary = { # 'total_areas': len(validated_areas), # 'total_stages': sum(len(a.stages) for a in validated_areas), # 'total_tasks': sum(len(s.tasks) for a in validated_areas for s in a.stages), # 'total_materials': sum( # len(t.recommended_materials) # for a in validated_areas # for s in a.stages # for t in s.tasks # ), # 'matched_areas': sum(1 for a in validated_areas if a.matched), # 'avg_confidence': round( # np.mean([a.confidence_score for a in validated_areas]), 2 # ) if validated_areas else 0.0 # } # return ValidatedResponse(areas=validated_areas, summary=summary) # # ============= API ENDPOINTS ============= # @app.get("/") # async def root(): # return { # "service": "Construction Scope Validator", # "version": "1.0.0", # "status": "running", # "data_loaded": len(db.stages) > 0, # "model_type": "trained" if os.path.exists('model.safetensors') else "base" # } # @app.get("/health") # async def health(): # return { # "status": "healthy", # "stages_loaded": len(db.stages), # "tasks_loaded": len(db.tasks), # "materials_loaded": len(db.materials), # "rooms_loaded": len(db.rooms), # "embeddings_ready": db.stage_embeddings is not None, # "model_type": "trained" if os.path.exists('model.safetensors') else "base" # } # @app.post("/validate", response_model=ValidatedResponse) # async def validate_scope_endpoint(request: LLMScopeRequest): # """ # Validate LLM-generated scope against database # Returns enriched data with matched stages, tasks, materials, and confidence scores # """ # try: # if not db.stages: # raise HTTPException(status_code=500, detail="Database not loaded") # result = validate_scope(request) # return result # except Exception as e: # import traceback # error_detail = f"Validation error: {str(e)}\n{traceback.format_exc()}" # raise HTTPException(status_code=500, detail=error_detail) # @app.post("/match-stage") # async def match_stage(stage_name: str): # """Test endpoint: match a single stage name""" # matched_stage, confidence = find_best_stage(stage_name) # if matched_stage: # return { # "input": stage_name, # "matched": matched_stage, # "confidence": round(confidence, 2) # } # return {"input": stage_name, "matched": None, "confidence": 0.0} # @app.post("/match-room") # async def match_room(room_name: str): # """Test endpoint: match a single room name""" # matched_room, confidence = find_best_room(room_name) # if matched_room: # return { # "input": room_name, # "matched": matched_room, # "confidence": round(confidence, 2) # } # return {"input": room_name, "matched": None, "confidence": 0.0} # # ============= STARTUP ============= # @app.on_event("startup") # async def startup_event(): # """Load data and initialize embeddings on startup""" # try: # print("\n" + "="*60) # print("STARTING UP...") # print("="*60) # db.load_data( # stages_file='stages.json', # tasks_file='tasks.json', # materials_file='materials.json', # rooms_file='rooms.json' # ) # db.initialize_embeddings() # print("\n" + "="*60) # print("✅ SERVICE READY!") # print("="*60) # except Exception as e: # print(f"\n❌ STARTUP ERROR: {e}") # print("Make sure JSON files are in the correct location") # import traceback # traceback.print_exc() # if __name__ == "__main__": # import uvicorn # uvicorn.run(app, host="0.0.0.0", port=7860) # """ # FastAPI Service for Construction Scope Validation # Deploy on Hugging Face Spaces # """ # from fastapi import FastAPI, HTTPException # from fastapi.middleware.cors import CORSMiddleware # from pydantic import BaseModel, Field # from typing import List, Optional, Dict, Any # import json # import numpy as np # import os # from sentence_transformers import SentenceTransformer # from sklearn.metrics.pairwise import cosine_similarity # import re # app = FastAPI( # title="Construction Scope Validator API", # description="Validates and enriches LLM-generated construction scope with DB data", # version="1.0.0" # ) # # CORS middleware # app.add_middleware( # CORSMiddleware, # allow_origins=["*"], # allow_credentials=True, # allow_methods=["*"], # allow_headers=["*"], # ) # # Load embedding model (cached globally) # # Try to load trained model from root, fallback to base model # print("="*60) # print("LOADING MODEL...") # print("="*60) # try: # # Check if trained model files exist in root # # Check if trained model files exist in root # model_files = ['config.json', 'sentence_bert_config.json'] # # Check for either pytorch_model.bin or model.safetensors # has_weights = os.path.exists('pytorch_model.bin') or os.path.exists('model.safetensors') # has_model = all(os.path.exists(f) for f in model_files) and has_weights # if has_model: # print("✓ Trained model files found in root directory") # print("Loading trained model...") # embedding_model = SentenceTransformer('./', device='cpu') # print("✅ Trained model loaded successfully!") # else: # print("⚠️ Trained model not found, using base model...") # embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu') # print("✅ Base model loaded successfully!") # except Exception as e: # print(f"❌ Error loading trained model: {e}") # print("Falling back to base model...") # embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu') # print("✅ Base model loaded successfully!") # print("="*60) # # ============= DATA MODELS ============= # class LLMScopeItem(BaseModel): # stage: str # task: str # material: str # quantity: float # unit: str # class LLMAreaScope(BaseModel): # area: str # items: List[LLMScopeItem] # class LLMScopeRequest(BaseModel): # scope_of_work: List[LLMAreaScope] # class ValidatedMaterial(BaseModel): # materialId: int # name: str # material: str # unit: str # price: float # margin: float # categories: List[str] # confidence_score: float # class ValidatedTask(BaseModel): # taskId: int # task: str # displayName: str # unit: str # stageId: int # roomArea: List[str] # confidence_score: float # recommended_materials: List[ValidatedMaterial] # class ValidatedStage(BaseModel): # stageId: int # stage: str # priority: int # confidence_score: float # tasks: List[ValidatedTask] # class ValidatedArea(BaseModel): # roomId: Optional[int] # name: str # roomType: str # matched: bool # confidence_score: float # stages: List[ValidatedStage] # class ValidatedResponse(BaseModel): # areas: List[ValidatedArea] # summary: Dict[str, Any] # # ============= DATABASE LOADERS ============= # class DatabaseLoader: # def __init__(self): # self.stages = [] # self.tasks = [] # self.materials = [] # self.rooms = [] # self.stage_embeddings = None # self.task_embeddings = None # self.material_embeddings = None # def load_data(self, stages_file: str, tasks_file: str, materials_file: str, rooms_file: str): # """Load JSON data files""" # print(f"Loading {stages_file}...") # with open(stages_file, 'r', encoding='utf-8') as f: # self.stages = [json.loads(line) for line in f if line.strip()] # print(f"Loading {tasks_file}...") # with open(tasks_file, 'r', encoding='utf-8') as f: # self.tasks = [json.loads(line) for line in f if line.strip()] # print(f"Loading {materials_file}...") # with open(materials_file, 'r', encoding='utf-8') as f: # self.materials = [json.loads(line) for line in f if line.strip()] # print(f"Loading {rooms_file}...") # with open(rooms_file, 'r', encoding='utf-8') as f: # self.rooms = [json.loads(line) for line in f if line.strip()] # print(f"✅ Loaded: {len(self.stages)} stages, {len(self.tasks)} tasks, " # f"{len(self.materials)} materials, {len(self.rooms)} rooms") # def initialize_embeddings(self): # """Pre-compute embeddings for fast lookup""" # print("Computing stage embeddings...") # stage_texts = [s['stage'] for s in self.stages] # self.stage_embeddings = embedding_model.encode(stage_texts, show_progress_bar=True) # print("Computing task embeddings...") # task_texts = [t['task'] for t in self.tasks] # self.task_embeddings = embedding_model.encode(task_texts, show_progress_bar=True) # print("Computing material embeddings...") # material_texts = [m['material'] for m in self.materials] # self.material_embeddings = embedding_model.encode(material_texts, show_progress_bar=True) # print("✅ Embeddings ready!") # # Global DB instance # db = DatabaseLoader() # # ============= MATCHING FUNCTIONS ============= # def find_best_stage(llm_stage: str, threshold: float = 0.5) -> tuple: # """Find closest matching stage from DB""" # query_embedding = embedding_model.encode([llm_stage]) # similarities = cosine_similarity(query_embedding, db.stage_embeddings)[0] # best_idx = np.argmax(similarities) # best_score = similarities[best_idx] # if best_score >= threshold: # return db.stages[best_idx], best_score # return None, 0.0 # def find_best_room(llm_area: str, threshold: float = 0.6) -> tuple: # """Find closest matching room from DB""" # llm_area_lower = llm_area.lower() # # Exact match first # for room in db.rooms: # if room['name'].lower() == llm_area_lower: # return room, 1.0 # # Fuzzy match # room_texts = [r['name'] for r in db.rooms] # query_embedding = embedding_model.encode([llm_area]) # room_embeddings = embedding_model.encode(room_texts) # similarities = cosine_similarity(query_embedding, room_embeddings)[0] # best_idx = np.argmax(similarities) # best_score = similarities[best_idx] # if best_score >= threshold: # return db.rooms[best_idx], best_score # return None, 0.0 # def find_tasks_for_stage(stage_id: int, llm_task: str, top_k: int = 5) -> List[tuple]: # """Find relevant tasks for a stage matching LLM task description""" # # Filter tasks by stage # stage_tasks = [t for t in db.tasks if t['stageId'] == stage_id] # if not stage_tasks: # return [] # # Compute similarities # task_indices = [db.tasks.index(t) for t in stage_tasks] # query_embedding = embedding_model.encode([llm_task]) # stage_task_embeddings = db.task_embeddings[task_indices] # similarities = cosine_similarity(query_embedding, stage_task_embeddings)[0] # # Get top K # top_indices = np.argsort(similarities)[-top_k:][::-1] # results = [(stage_tasks[idx], similarities[idx]) for idx in top_indices] # return results # def extract_keywords(text: str) -> List[str]: # """Extract meaningful keywords from text""" # # Remove common words # stop_words = {'and', 'or', 'the', 'to', 'a', 'of', 'for', 'in', 'on', 'supply', 'install'} # words = re.findall(r'\b\w+\b', text.lower()) # return [w for w in words if w not in stop_words and len(w) > 2] # def find_materials_for_task(task: dict, llm_material: str, unit: str, top_k: int = 10) -> List[tuple]: # """Find materials matching task requirements""" # task_keywords = extract_keywords(task['task']) # llm_keywords = extract_keywords(llm_material) # all_keywords = set(task_keywords + llm_keywords) # # Filter by unit compatibility # compatible_materials = [ # m for m in db.materials # if m['unit'] == unit or m['unit'] == 'unit' or m['unit'] is None # ] # if not compatible_materials: # # Fallback: allow any unit # compatible_materials = db.materials # # Score materials # scored_materials = [] # for material in compatible_materials: # score = 0.0 # material_text = material['material'].lower() # # Keyword matching # for keyword in all_keywords: # if keyword in material_text: # score += 2.0 # # Category matching # categories_str = ' '.join(material.get('categories', [])).lower() # for keyword in all_keywords: # if keyword in categories_str: # score += 1.0 # # Embedding similarity # material_idx = db.materials.index(material) # query_embedding = embedding_model.encode([llm_material]) # material_embedding = db.material_embeddings[material_idx].reshape(1, -1) # semantic_score = cosine_similarity(query_embedding, material_embedding)[0][0] # score += semantic_score * 5.0 # if score > 0: # scored_materials.append((material, score)) # # Sort and return top K # scored_materials.sort(key=lambda x: x[1], reverse=True) # return scored_materials[:top_k] # # ============= VALIDATION PIPELINE ============= # def validate_scope(llm_scope: LLMScopeRequest) -> ValidatedResponse: # """Main validation pipeline""" # validated_areas = [] # for area_scope in llm_scope.scope_of_work: # # Match room/area # matched_room, room_confidence = find_best_room(area_scope.area) # validated_stages_dict = {} # for item in area_scope.items: # # Match stage # matched_stage, stage_confidence = find_best_stage(item.stage) # if not matched_stage: # continue # Skip if stage not found # stage_id = matched_stage['stageId'] # # Initialize stage if new # if stage_id not in validated_stages_dict: # validated_stages_dict[stage_id] = { # 'stage_data': matched_stage, # 'confidence': stage_confidence, # 'tasks': [] # } # # Match task # task_matches = find_tasks_for_stage(stage_id, item.task, top_k=3) # if not task_matches: # continue # best_task, task_confidence = task_matches[0] # # Match materials # material_matches = find_materials_for_task( # best_task, # item.material, # item.unit, # top_k=5 # ) # validated_materials = [ # ValidatedMaterial( # materialId=m['materialId'], # name=m['name'], # material=m['material'], # unit=m['unit'] or 'unit', # price=float(m['price']), # margin=float(m['margin']), # categories=m['categories'], # confidence_score=round(score / 10.0, 2) # ) # for m, score in material_matches # ] # validated_task = ValidatedTask( # taskId=best_task['taskId'], # task=best_task['task'], # displayName=best_task['displayName'], # unit=best_task['unit'], # stageId=best_task['stageId'], # roomArea=best_task['roomArea'], # confidence_score=round(task_confidence, 2), # recommended_materials=validated_materials # ) # validated_stages_dict[stage_id]['tasks'].append(validated_task) # # Build validated stages list # validated_stages = [ # ValidatedStage( # stageId=stage_data['stage_data']['stageId'], # stage=stage_data['stage_data']['stage'], # priority=stage_data['stage_data']['priority'], # confidence_score=round(stage_data['confidence'], 2), # tasks=stage_data['tasks'] # ) # for stage_data in validated_stages_dict.values() # ] # # Sort stages by priority # validated_stages.sort(key=lambda x: x.priority) # validated_area = ValidatedArea( # roomId=matched_room['id'] if matched_room else None, # name=matched_room['name'] if matched_room else area_scope.area, # roomType=matched_room['roomType'] if matched_room else 'unknown', # matched=matched_room is not None, # confidence_score=round(room_confidence, 2), # stages=validated_stages # ) # validated_areas.append(validated_area) # # Build summary # summary = { # 'total_areas': len(validated_areas), # 'total_stages': sum(len(a.stages) for a in validated_areas), # 'total_tasks': sum(len(s.tasks) for a in validated_areas for s in a.stages), # 'total_materials': sum( # len(t.recommended_materials) # for a in validated_areas # for s in a.stages # for t in s.tasks # ), # 'matched_areas': sum(1 for a in validated_areas if a.matched), # 'avg_confidence': round( # np.mean([a.confidence_score for a in validated_areas]), 2 # ) if validated_areas else 0.0 # } # return ValidatedResponse(areas=validated_areas, summary=summary) # # ============= API ENDPOINTS ============= # @app.get("/") # async def root(): # return { # "service": "Construction Scope Validator", # "version": "1.0.0", # "status": "running", # "data_loaded": len(db.stages) > 0, # "model_type": "trained" if os.path.exists('pytorch_model.bin') else "base" # } # @app.get("/health") # async def health(): # return { # "status": "healthy", # "stages_loaded": len(db.stages), # "tasks_loaded": len(db.tasks), # "materials_loaded": len(db.materials), # "rooms_loaded": len(db.rooms), # "embeddings_ready": db.stage_embeddings is not None, # "model_type": "trained" if os.path.exists('pytorch_model.bin') else "base" # } # @app.post("/validate", response_model=ValidatedResponse) # async def validate_scope_endpoint(request: LLMScopeRequest): # """ # Validate LLM-generated scope against database # Returns enriched data with: # - Matched stages from DB # - Matched tasks from DB # - Recommended materials with pricing # - Confidence scores for all matches # """ # try: # if not db.stages: # raise HTTPException(status_code=500, detail="Database not loaded") # result = validate_scope(request) # return result # except Exception as e: # raise HTTPException(status_code=500, detail=f"Validation error: {str(e)}") # @app.post("/match-stage") # async def match_stage(stage_name: str): # """Test endpoint: match a single stage name""" # matched_stage, confidence = find_best_stage(stage_name) # if matched_stage: # return { # "input": stage_name, # "matched": matched_stage, # "confidence": round(confidence, 2) # } # return {"input": stage_name, "matched": None, "confidence": 0.0} # @app.post("/match-room") # async def match_room(room_name: str): # """Test endpoint: match a single room name""" # matched_room, confidence = find_best_room(room_name) # if matched_room: # return { # "input": room_name, # "matched": matched_room, # "confidence": round(confidence, 2) # } # return {"input": room_name, "matched": None, "confidence": 0.0} # # ============= STARTUP ============= # @app.on_event("startup") # async def startup_event(): # """Load data and initialize embeddings on startup""" # try: # print("\n" + "="*60) # print("STARTING UP...") # print("="*60) # # Check what files are available # print("\nFiles in root directory:") # for file in os.listdir('.'): # print(f" - {file}") # # Load data # db.load_data( # stages_file='stages.json', # tasks_file='tasks.json', # materials_file='materials.json', # rooms_file='rooms.json' # ) # db.initialize_embeddings() # print("\n" + "="*60) # print("✅ SERVICE READY!") # print("="*60) # except Exception as e: # print(f"\n❌ STARTUP ERROR: {e}") # print("Make sure JSON files are in the correct location") # import traceback # traceback.print_exc() # if __name__ == "__main__": # import uvicorn # uvicorn.run(app, host="0.0.0.0", port=7860)