import os import asyncio from typing import Optional from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel import uvicorn import logging import json import re from io import BytesIO import math import time import fitz from PIL import Image, ImageEnhance, ImageFilter import google.generativeai as genai from google.generativeai.types import HarmCategory, HarmBlockThreshold logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler("floor_plan_api.log") ] ) logger = logging.getLogger(__name__) GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY') if not GOOGLE_API_KEY: logger.warning("GOOGLE_API_KEY not set!") else: genai.configure(api_key=GOOGLE_API_KEY) os.makedirs("uploads", exist_ok=True) class FloorPlanQuery(BaseModel): description: Optional[str] = None class RoomQuery(BaseModel): room_name: str exact_match: bool = False class PDF: def __init__(self, filename, content_type): self.filename = filename self.content_type = content_type self.id = re.sub(r'[^a-zA-Z0-9]', '_', filename) self.processed = False self.error = None self.images = [] self.page_count = 0 self.file_type = "pdf" if content_type == "application/pdf" else "image" self.measurement_info = { "scale": 100, "ceiling_height": 2.4, "room_dimensions": {} } self.analysis_result = None def to_dict(self): return { "id": self.id, "filename": self.filename, "content_type": self.content_type, "file_type": self.file_type, "processed": self.processed, "error": self.error, "page_count": self.page_count if self.file_type == "pdf" else None, "image_count": len(self.images) if self.images else 0, "measurement_info": self.measurement_info, "has_analysis": self.analysis_result is not None, "room_count": len(self.analysis_result) if self.analysis_result else 0 } class FloorPlanProcessor: def __init__(self): self.model = genai.GenerativeModel('gemini-2.5-pro') self.pdfs = {} self.supported_image_formats = { "image/jpeg": ".jpg", "image/png": ".png", "image/gif": ".gif", "image/bmp": ".bmp", "image/tiff": ".tiff", "image/webp": ".webp" } async def process_upload(self, file_content, filename, content_type): pdf_id = re.sub(r'[^a-zA-Z0-9]', '_', filename) logger.info(f"Processing {filename} (ID: {pdf_id})") pdf = PDF(filename, content_type) self.pdfs[pdf_id] = pdf try: extension = ".pdf" if content_type == "application/pdf" else self.supported_image_formats.get(content_type, ".png") file_path = f"uploads/{pdf_id}{extension}" with open(file_path, "wb") as f: f.write(file_content) if content_type == "application/pdf": await self.extract_images_from_pdf(pdf, file_content) elif content_type in self.supported_image_formats: await self.process_image(pdf, file_content) else: raise ValueError(f"Unsupported type: {content_type}") pdf.processed = True logger.info(f"Processing complete: {pdf_id}") return pdf_id except Exception as e: logger.error(f"Error processing {filename}: {str(e)}", exc_info=True) pdf.error = str(e) return pdf_id async def process_image(self, pdf, file_content): try: img = Image.open(BytesIO(file_content)) logger.info(f"Image: {img.width}x{img.height}") pdf.images.append(img) return True except Exception as e: logger.error(f"Image error: {str(e)}") pdf.error = str(e) return False async def extract_images_from_pdf(self, pdf, file_content): try: pdf_document = fitz.open(stream=file_content, filetype="pdf") pdf.page_count = len(pdf_document) images = [] pages_to_process = min(3, pdf.page_count) # process first 3 pages for page_num in range(pages_to_process): page = pdf_document[page_num] try: # Force full-page rendering (ignore embedded images) pix = page.get_pixmap(matrix=fitz.Matrix(2, 2), alpha=False) try: img = Image.open(BytesIO(pix.tobytes("png"))) except: img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples) images.append(img) logger.info(f"Rendered full page {page_num+1}: {img.size}") except Exception as e: logger.error(f"Page render failed [{page_num+1}]: {e}") if not images: raise ValueError("Rendering failed for all pages") pdf.images = images return True except Exception as e: pdf.error = str(e) logger.error(f"PDF extraction failed: {e}", exc_info=True) return False def _select_single_best_image(self, images): """Select the single best image""" if len(images) == 1: return images[0] # Score by area (largest = best for floor plans) scored = [(img.size[0] * img.size[1], img) for img in images] scored.sort(reverse=True, key=lambda x: x[0]) best = scored[0][1] logger.info(f"Selected best from {len(images)} images") return best def _preprocess_floor_plan(self, image): """ Enhance floor plan image for better analysis - Improves contrast (helps model see room boundaries better) - Sharpens edges (makes text/dimensions clearer) - Removes noise (reduces confusion from scan artifacts) """ try: logger.info(f"Preprocessing image: {image.size}") # Step 1: Convert to grayscale for processing if image.mode != 'L': gray = image.convert('L') else: gray = image # Step 2: Enhance contrast (1.5x = moderate boost) enhancer = ImageEnhance.Contrast(gray) gray = enhancer.enhance(1.5) logger.info("✓ Contrast enhanced") # Step 3: Sharpen to make text/lines clearer gray = gray.filter(ImageFilter.SHARPEN) logger.info("✓ Sharpened") # Step 4: Remove noise with median filter gray = gray.filter(ImageFilter.MedianFilter(size=3)) logger.info("✓ Noise removed") # Step 5: Convert back to RGB for Gemini result = gray.convert('RGB') logger.info(f"✓ Preprocessing complete: {result.size}") return result except Exception as e: logger.error(f"Preprocessing error: {str(e)}") return image # Return original if preprocessing fails def _optimize_image(self, image, target_size=2048): """Optimize image for analysis""" if image.mode not in ('RGB', 'L'): image = image.convert('RGB') width, height = image.size if width > target_size or height > target_size: ratio = target_size / max(width, height) new_width = int(width * ratio) new_height = int(height * ratio) image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) logger.info(f"Resized: {width}x{height} → {new_width}x{new_height}") return image async def analyze_floor_plan(self, pdf_id, description=None): pdf = self.pdfs.get(pdf_id) if not pdf: raise ValueError(f"PDF {pdf_id} not found") if not pdf.images: raise ValueError(f"No images in {pdf_id}") logger.info(f"\n{'='*70}") logger.info(f"Analyzing: {pdf_id}") logger.info(f"Images: {len(pdf.images)}") logger.info(f"{'='*70}") # Use ONLY the first/best image for single file analysis best_image = self._select_single_best_image(pdf.images) best_image = self._preprocess_floor_plan(best_image) optimized_image = self._optimize_image(best_image, target_size=2048) logger.info(f"Using single image: {optimized_image.size[0]}x{optimized_image.size[1]}px") # Try analysis with extended timeout max_retries = 3 for attempt in range(max_retries): try: logger.info(f"\nAttempt {attempt + 1}/{max_retries}") result = await self._analyze_with_gemini( optimized_image, pdf.measurement_info, description, timeout=600, attempt=attempt ) if result and len(result) > 0: logger.info(f"✓ SUCCESS: {len(result)} rooms detected") return result except asyncio.TimeoutError: logger.warning(f"Timeout on attempt {attempt + 1}") if attempt < max_retries - 1: await asyncio.sleep(10) continue except Exception as e: error_str = str(e) logger.error(f"Attempt {attempt + 1} error: {error_str}") # Check for retryable errors if any(k in error_str.lower() for k in ['504', '503', '429', 'timeout', 'deadline']): if attempt < max_retries - 1: wait = 15 * (attempt + 1) logger.info(f"Waiting {wait}s before retry...") await asyncio.sleep(wait) continue # Non-retryable error logger.error(f"Non-retryable error: {error_str}") raise logger.warning("All attempts failed, using fallback") return self._generate_fallback(pdf.measurement_info) async def _analyze_with_gemini(self, image, measurement_info, description, timeout, attempt=0): """Analyze with Gemini API""" prompt = self._create_detailed_prompt(description, measurement_info) # Adjust parameters per attempt temperature = 0.2 if attempt == 0 else 0.3 max_tokens = 16384 logger.info(f"Config: temp={temperature}, max_tokens={max_tokens}") start_time = time.time() loop = asyncio.get_event_loop() # Create safety settings with correct format safety_settings = { HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, } def make_request(): return self.model.generate_content( [prompt, image], generation_config=genai.GenerationConfig( temperature=temperature, max_output_tokens=max_tokens, top_p=0.95, top_k=40, ), safety_settings=safety_settings, request_options={'timeout': timeout} ) try: response = await asyncio.wait_for( loop.run_in_executor(None, make_request), timeout=timeout + 30 ) elapsed = time.time() - start_time logger.info(f"Response in {elapsed:.1f}s ({len(response.text)} chars)") # Extract JSON parsed = self._extract_json(response.text) if parsed and len(parsed) > 0: validated = self._validate_measurements(parsed, measurement_info) logger.info(f"Validated {len(validated)} rooms") return validated else: logger.warning("No valid JSON found") return None except Exception as e: logger.error(f"Gemini API error: {str(e)}") raise def _create_detailed_prompt(self, description, measurement_info): """Create detailed prompt optimized for Norwegian floor plans""" prompt = f"""Du er en ekspert på norske plantegninger. Analyser denne plantegningen nøye og ekstraher ALL rom med komplette detaljer. Returner KUN en JSON-array i dette eksakte formatet: [ {{ "name": "Living Room", "name_no": "Stue", "area_m2": 0.0, "position": "beskrivelse av plassering", "dimensions_m": {{"width": 0.0, "length": 0.0}}, "windows": 0, "window_positions": ["vegg plassering"], "doors": 0, "door_positions": ["plassering"], "connected_rooms": ["Tilstøtende rom"], "has_external_access": false, "ceiling_height_m": {measurement_info['ceiling_height']}, "furniture": [], "estimated": false }} ] KRITISKE INSTRUKSJONER: 1. Finn og inkluder HVERT ENESTE rom som er synlig på plantegningen 2. Les romnavnene nøyaktig som de står på tegningen (f.eks. "SOV 1", "KJØKKEN", "STUE", "BAD", etc.) 3. Les de eksakte arealene som er vist på planen (f.eks. "25.5 m²", "12.3 m²", etc.) 4. Hvis bredde × lengde vises, bruk dem nøyaktig 5. Hvis bare areal vises, beregn omtrentlige dimensjoner: bredde ≈ √areal, lengde ≈ √areal 6. Tell vinduer nøye - se etter vinduessymboler i veggene 7. Tell dører - se etter dørsvingsymboler 8. Identifiser hvilke vegger som har vinduer/dører (nord, sør, øst, vest) 9. List tilstøtende rom som har forbindelse til hvert rom 10. Sjekk om rommet har direkte utgang til uteområde 11. Sett estimated=false KUN hvis du kan lese eksakte mål, ellers true 12. Hvis du ser møbelsymboler eller etiketter, list dem 13. Returner KUN JSON-arrayen - absolutt ingen forklaringer, ingen markdown-blokker, ingen ekstra tekst Norske romtyper å se etter: - Soverom (SOV, Soverom, Bedroom) - Kjøkken (Kitchen) - Stue (Living room, Salon) - Bad/Baderom (Bathroom, Vask, WC) - Toalett (WC, Toilet) - Gang/Korridor (Hallway) - Entré (Entrance, Inngang) - Bod/Garderobe (Storage, Closet, Skap) - Kontor (Office, Arbeidsrom) - Vaskerom (Laundry, Vaskeri) - Terrasse/Balkong (Terrace, Balcony, Uteplass) - Garasje (Garage, Biloppstilling) - Spisestue (Dining room) - Sportsbod (Sports storage) - Tech/Teknisk rom (Technical room) - Vindfang (Mudroom) - Trapperom (Stairwell, Trapp) - Loft/Hems (Attic, Loft) - Kjeller (Basement) Målestokk: 1:{measurement_info['scale']} Standard takhøyde: {measurement_info['ceiling_height']}m """ if description: prompt += f"\n\nBrukeren ga denne konteksten: {description}" return prompt def _extract_json(self, text): """Extract JSON from response""" if not text: return None # Remove markdown text = text.strip() text = re.sub(r'```(?:json|javascript)?\s*', '', text) text = text.strip('`').strip() # Try direct parse try: data = json.loads(text) if isinstance(data, list) and len(data) > 0: return data except json.JSONDecodeError: pass # Find JSON array patterns = [ r'\[\s*\{[\s\S]*?\}\s*\]', r'\[[\s\S]*?\]', ] for pattern in patterns: matches = list(re.finditer(pattern, text)) for match in sorted(matches, key=lambda m: len(m.group(0)), reverse=True): try: data = json.loads(match.group(0)) if isinstance(data, list) and len(data) > 0: return data except: continue logger.warning(f"Could not extract JSON from: {text[:300]}...") return None def _validate_measurements(self, data, measurement_info): """Validate and fix room measurements""" if not isinstance(data, list): return [] ceiling = measurement_info.get('ceiling_height', 2.4) for room in data: # Ensure required fields room.setdefault("name", "Unknown") room.setdefault("name_no", room["name"]) room.setdefault("ceiling_height_m", ceiling) room.setdefault("windows", 0) room.setdefault("doors", 1) room.setdefault("estimated", False) room.setdefault("furniture", []) room.setdefault("connected_rooms", []) room.setdefault("window_positions", []) room.setdefault("door_positions", []) # Fix dimensions if "dimensions_m" not in room: room["dimensions_m"] = {"width": 0, "length": 0} width = room["dimensions_m"].get("width", 0) length = room["dimensions_m"].get("length", 0) if width > 0 and length > 0: room["area_m2"] = round(width * length, 1) elif room.get("area_m2", 0) > 0: side = math.sqrt(room["area_m2"]) room["dimensions_m"]["width"] = round(side, 1) room["dimensions_m"]["length"] = round(side, 1) room["estimated"] = True else: room["dimensions_m"] = {"width": 3.0, "length": 3.0} room["area_m2"] = 9.0 room["estimated"] = True return data def _generate_fallback(self, measurement_info): """Generate fallback structure""" ceiling = measurement_info.get('ceiling_height', 2.4) return [ { "name": "Living Room", "name_no": "Stue", "area_m2": 35.0, "position": "center", "dimensions_m": {"width": 6.0, "length": 5.8}, "windows": 2, "doors": 2, "ceiling_height_m": ceiling, "estimated": True, "furniture": [], "connected_rooms": [], "window_positions": [], "door_positions": [], "has_external_access": False } ] app = FastAPI( title="Floor Plan API", version="1.0.7", docs_url="/" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"] ) processor = FloorPlanProcessor() @app.get("/status") async def get_status(): return { "status": "running", "pdfs_count": len(processor.pdfs), "model": "gemini-2.5-pro" } @app.get("/pdfs") async def get_pdfs(): return {"pdfs": [pdf.to_dict() for pdf in processor.pdfs.values()]} @app.get("/pdf/{pdf_id}") async def get_pdf(pdf_id: str): if pdf_id not in processor.pdfs: raise HTTPException(status_code=404, detail="PDF not found") return processor.pdfs[pdf_id].to_dict() @app.post("/upload") async def upload_pdf(file: UploadFile = File(...)): content_type = file.content_type.lower() supported = ["application/pdf"] + list(processor.supported_image_formats.keys()) if content_type not in supported: return JSONResponse( status_code=400, content={"error": "Unsupported file type"} ) try: file_content = await file.read() pdf_id = await processor.process_upload(file_content, file.filename, content_type) pdf_info = processor.pdfs[pdf_id].to_dict() return { "message": "Upload successful", "pdf_id": pdf_id, "pdf_info": pdf_info } except Exception as e: logger.error(f"Upload error: {str(e)}") return JSONResponse(status_code=500, content={"error": str(e)}) @app.post("/analyze/{pdf_id}") async def analyze_pdf(pdf_id: str, query: FloorPlanQuery = None): if pdf_id not in processor.pdfs: raise HTTPException(status_code=404, detail="PDF not found") pdf = processor.pdfs[pdf_id] if not pdf.processed: return JSONResponse(status_code=400, content={"error": "Still processing"}) if not pdf.images: return JSONResponse(status_code=400, content={"error": "No images"}) try: description = query.description if query else None start_time = time.time() result = await asyncio.wait_for( processor.analyze_floor_plan(pdf_id, description), timeout=1200 ) elapsed = time.time() - start_time pdf.analysis_result = result is_fallback = any( room.get("estimated") and len(result) <= 2 for room in result ) return { "message": "Analysis complete", "pdf_id": pdf_id, "measurement_info": pdf.measurement_info, "rooms": result, "analysis_time_seconds": round(elapsed, 1), "is_estimated": is_fallback, "room_count": len(result) } except Exception as e: logger.error(f"Analysis error: {str(e)}", exc_info=True) try: fallback = processor._generate_fallback(pdf.measurement_info) return { "message": "Error - using fallback", "pdf_id": pdf_id, "rooms": fallback, "is_estimated": True, "error": str(e) } except: return JSONResponse( status_code=500, content={"error": str(e), "pdf_id": pdf_id} ) @app.post("/room/{pdf_id}") async def find_room(pdf_id: str, query: RoomQuery): if pdf_id not in processor.pdfs: raise HTTPException(status_code=404, detail="PDF not found") pdf = processor.pdfs[pdf_id] if not pdf.analysis_result: raise HTTPException(status_code=400, content={"error": "Not analyzed yet"}) found = [] name_lower = query.room_name.lower() for room in pdf.analysis_result: en = room.get("name", "").lower() no = room.get("name_no", "").lower() if query.exact_match: if en == name_lower or no == name_lower: found.append(room) else: if name_lower in en or name_lower in no: found.append(room) if not found: raise HTTPException(status_code=404, content={"error": "Room not found"}) if len(found) == 1: return {"message": "Room found", "pdf_id": pdf_id, "room": found[0]} return { "message": f"Found {len(found)} rooms", "pdf_id": pdf_id, "rooms": found } @app.on_event("startup") async def startup_event(): os.makedirs("uploads", exist_ok=True) os.makedirs("logs", exist_ok=True) logger.info("\n" + "="*60) logger.info("Floor Plan API - Optimized Version") logger.info(f"Model: gemini-2.5-pro") logger.info(f"With Image Preprocessing: YES") logger.info(f"API Key: {'SET' if GOOGLE_API_KEY else 'NOT SET'}") logger.info(f"Port: 7860") logger.info("="*60 + "\n") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)