Spaces:
Running
Running
| import os | |
| import asyncio | |
| from typing import Optional | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| import uvicorn | |
| import logging | |
| import json | |
| import re | |
| from io import BytesIO | |
| import math | |
| import time | |
| import fitz | |
| from PIL import Image, ImageEnhance, ImageFilter | |
| import google.generativeai as genai | |
| from google.generativeai.types import HarmCategory, HarmBlockThreshold | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler("floor_plan_api.log") | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY') | |
| if not GOOGLE_API_KEY: | |
| logger.warning("GOOGLE_API_KEY not set!") | |
| else: | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| os.makedirs("uploads", exist_ok=True) | |
| class FloorPlanQuery(BaseModel): | |
| description: Optional[str] = None | |
| class RoomQuery(BaseModel): | |
| room_name: str | |
| exact_match: bool = False | |
| class PDF: | |
| def __init__(self, filename, content_type): | |
| self.filename = filename | |
| self.content_type = content_type | |
| self.id = re.sub(r'[^a-zA-Z0-9]', '_', filename) | |
| self.processed = False | |
| self.error = None | |
| self.images = [] | |
| self.page_count = 0 | |
| self.file_type = "pdf" if content_type == "application/pdf" else "image" | |
| self.measurement_info = { | |
| "scale": 100, | |
| "ceiling_height": 2.4, | |
| "room_dimensions": {} | |
| } | |
| self.analysis_result = None | |
| def to_dict(self): | |
| return { | |
| "id": self.id, | |
| "filename": self.filename, | |
| "content_type": self.content_type, | |
| "file_type": self.file_type, | |
| "processed": self.processed, | |
| "error": self.error, | |
| "page_count": self.page_count if self.file_type == "pdf" else None, | |
| "image_count": len(self.images) if self.images else 0, | |
| "measurement_info": self.measurement_info, | |
| "has_analysis": self.analysis_result is not None, | |
| "room_count": len(self.analysis_result) if self.analysis_result else 0 | |
| } | |
| class FloorPlanProcessor: | |
| def __init__(self): | |
| self.model = genai.GenerativeModel('gemini-2.5-pro') | |
| self.pdfs = {} | |
| self.supported_image_formats = { | |
| "image/jpeg": ".jpg", | |
| "image/png": ".png", | |
| "image/gif": ".gif", | |
| "image/bmp": ".bmp", | |
| "image/tiff": ".tiff", | |
| "image/webp": ".webp" | |
| } | |
| async def process_upload(self, file_content, filename, content_type): | |
| pdf_id = re.sub(r'[^a-zA-Z0-9]', '_', filename) | |
| logger.info(f"Processing {filename} (ID: {pdf_id})") | |
| pdf = PDF(filename, content_type) | |
| self.pdfs[pdf_id] = pdf | |
| try: | |
| extension = ".pdf" if content_type == "application/pdf" else self.supported_image_formats.get(content_type, ".png") | |
| file_path = f"uploads/{pdf_id}{extension}" | |
| with open(file_path, "wb") as f: | |
| f.write(file_content) | |
| if content_type == "application/pdf": | |
| await self.extract_images_from_pdf(pdf, file_content) | |
| elif content_type in self.supported_image_formats: | |
| await self.process_image(pdf, file_content) | |
| else: | |
| raise ValueError(f"Unsupported type: {content_type}") | |
| pdf.processed = True | |
| logger.info(f"Processing complete: {pdf_id}") | |
| return pdf_id | |
| except Exception as e: | |
| logger.error(f"Error processing {filename}: {str(e)}", exc_info=True) | |
| pdf.error = str(e) | |
| return pdf_id | |
| async def process_image(self, pdf, file_content): | |
| try: | |
| img = Image.open(BytesIO(file_content)) | |
| logger.info(f"Image: {img.width}x{img.height}") | |
| pdf.images.append(img) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Image error: {str(e)}") | |
| pdf.error = str(e) | |
| return False | |
| async def extract_images_from_pdf(self, pdf, file_content): | |
| try: | |
| pdf_document = fitz.open(stream=file_content, filetype="pdf") | |
| pdf.page_count = len(pdf_document) | |
| images = [] | |
| pages_to_process = min(3, pdf.page_count) # process first 3 pages | |
| for page_num in range(pages_to_process): | |
| page = pdf_document[page_num] | |
| try: | |
| # Force full-page rendering (ignore embedded images) | |
| pix = page.get_pixmap(matrix=fitz.Matrix(2, 2), alpha=False) | |
| try: | |
| img = Image.open(BytesIO(pix.tobytes("png"))) | |
| except: | |
| img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples) | |
| images.append(img) | |
| logger.info(f"Rendered full page {page_num+1}: {img.size}") | |
| except Exception as e: | |
| logger.error(f"Page render failed [{page_num+1}]: {e}") | |
| if not images: | |
| raise ValueError("Rendering failed for all pages") | |
| pdf.images = images | |
| return True | |
| except Exception as e: | |
| pdf.error = str(e) | |
| logger.error(f"PDF extraction failed: {e}", exc_info=True) | |
| return False | |
| def _select_single_best_image(self, images): | |
| """Select the single best image""" | |
| if len(images) == 1: | |
| return images[0] | |
| # Score by area (largest = best for floor plans) | |
| scored = [(img.size[0] * img.size[1], img) for img in images] | |
| scored.sort(reverse=True, key=lambda x: x[0]) | |
| best = scored[0][1] | |
| logger.info(f"Selected best from {len(images)} images") | |
| return best | |
| def _preprocess_floor_plan(self, image): | |
| """ | |
| Enhance floor plan image for better analysis | |
| - Improves contrast (helps model see room boundaries better) | |
| - Sharpens edges (makes text/dimensions clearer) | |
| - Removes noise (reduces confusion from scan artifacts) | |
| """ | |
| try: | |
| logger.info(f"Preprocessing image: {image.size}") | |
| # Step 1: Convert to grayscale for processing | |
| if image.mode != 'L': | |
| gray = image.convert('L') | |
| else: | |
| gray = image | |
| # Step 2: Enhance contrast (1.5x = moderate boost) | |
| enhancer = ImageEnhance.Contrast(gray) | |
| gray = enhancer.enhance(1.5) | |
| logger.info("✓ Contrast enhanced") | |
| # Step 3: Sharpen to make text/lines clearer | |
| gray = gray.filter(ImageFilter.SHARPEN) | |
| logger.info("✓ Sharpened") | |
| # Step 4: Remove noise with median filter | |
| gray = gray.filter(ImageFilter.MedianFilter(size=3)) | |
| logger.info("✓ Noise removed") | |
| # Step 5: Convert back to RGB for Gemini | |
| result = gray.convert('RGB') | |
| logger.info(f"✓ Preprocessing complete: {result.size}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Preprocessing error: {str(e)}") | |
| return image # Return original if preprocessing fails | |
| def _optimize_image(self, image, target_size=2048): | |
| """Optimize image for analysis""" | |
| if image.mode not in ('RGB', 'L'): | |
| image = image.convert('RGB') | |
| width, height = image.size | |
| if width > target_size or height > target_size: | |
| ratio = target_size / max(width, height) | |
| new_width = int(width * ratio) | |
| new_height = int(height * ratio) | |
| image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| logger.info(f"Resized: {width}x{height} → {new_width}x{new_height}") | |
| return image | |
| async def analyze_floor_plan(self, pdf_id, description=None): | |
| pdf = self.pdfs.get(pdf_id) | |
| if not pdf: | |
| raise ValueError(f"PDF {pdf_id} not found") | |
| if not pdf.images: | |
| raise ValueError(f"No images in {pdf_id}") | |
| logger.info(f"\n{'='*70}") | |
| logger.info(f"Analyzing: {pdf_id}") | |
| logger.info(f"Images: {len(pdf.images)}") | |
| logger.info(f"{'='*70}") | |
| # Use ONLY the first/best image for single file analysis | |
| best_image = self._select_single_best_image(pdf.images) | |
| best_image = self._preprocess_floor_plan(best_image) | |
| optimized_image = self._optimize_image(best_image, target_size=2048) | |
| logger.info(f"Using single image: {optimized_image.size[0]}x{optimized_image.size[1]}px") | |
| # Try analysis with extended timeout | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| logger.info(f"\nAttempt {attempt + 1}/{max_retries}") | |
| result = await self._analyze_with_gemini( | |
| optimized_image, | |
| pdf.measurement_info, | |
| description, | |
| timeout=600, | |
| attempt=attempt | |
| ) | |
| if result and len(result) > 0: | |
| logger.info(f"✓ SUCCESS: {len(result)} rooms detected") | |
| return result | |
| except asyncio.TimeoutError: | |
| logger.warning(f"Timeout on attempt {attempt + 1}") | |
| if attempt < max_retries - 1: | |
| await asyncio.sleep(10) | |
| continue | |
| except Exception as e: | |
| error_str = str(e) | |
| logger.error(f"Attempt {attempt + 1} error: {error_str}") | |
| # Check for retryable errors | |
| if any(k in error_str.lower() for k in ['504', '503', '429', 'timeout', 'deadline']): | |
| if attempt < max_retries - 1: | |
| wait = 15 * (attempt + 1) | |
| logger.info(f"Waiting {wait}s before retry...") | |
| await asyncio.sleep(wait) | |
| continue | |
| # Non-retryable error | |
| logger.error(f"Non-retryable error: {error_str}") | |
| raise | |
| logger.warning("All attempts failed, using fallback") | |
| return self._generate_fallback(pdf.measurement_info) | |
| async def _analyze_with_gemini(self, image, measurement_info, description, timeout, attempt=0): | |
| """Analyze with Gemini API""" | |
| prompt = self._create_detailed_prompt(description, measurement_info) | |
| # Adjust parameters per attempt | |
| temperature = 0.2 if attempt == 0 else 0.3 | |
| max_tokens = 16384 | |
| logger.info(f"Config: temp={temperature}, max_tokens={max_tokens}") | |
| start_time = time.time() | |
| loop = asyncio.get_event_loop() | |
| # Create safety settings with correct format | |
| safety_settings = { | |
| HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, | |
| HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, | |
| HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, | |
| HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, | |
| } | |
| def make_request(): | |
| return self.model.generate_content( | |
| [prompt, image], | |
| generation_config=genai.GenerationConfig( | |
| temperature=temperature, | |
| max_output_tokens=max_tokens, | |
| top_p=0.95, | |
| top_k=40, | |
| ), | |
| safety_settings=safety_settings, | |
| request_options={'timeout': timeout} | |
| ) | |
| try: | |
| response = await asyncio.wait_for( | |
| loop.run_in_executor(None, make_request), | |
| timeout=timeout + 30 | |
| ) | |
| elapsed = time.time() - start_time | |
| logger.info(f"Response in {elapsed:.1f}s ({len(response.text)} chars)") | |
| # Extract JSON | |
| parsed = self._extract_json(response.text) | |
| if parsed and len(parsed) > 0: | |
| validated = self._validate_measurements(parsed, measurement_info) | |
| logger.info(f"Validated {len(validated)} rooms") | |
| return validated | |
| else: | |
| logger.warning("No valid JSON found") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Gemini API error: {str(e)}") | |
| raise | |
| def _create_detailed_prompt(self, description, measurement_info): | |
| """Create detailed prompt optimized for Norwegian floor plans""" | |
| prompt = f"""Du er en ekspert på norske plantegninger. Analyser denne plantegningen nøye og ekstraher ALL rom med komplette detaljer. | |
| Returner KUN en JSON-array i dette eksakte formatet: | |
| [ | |
| {{ | |
| "name": "Living Room", | |
| "name_no": "Stue", | |
| "area_m2": 0.0, | |
| "position": "beskrivelse av plassering", | |
| "dimensions_m": {{"width": 0.0, "length": 0.0}}, | |
| "windows": 0, | |
| "window_positions": ["vegg plassering"], | |
| "doors": 0, | |
| "door_positions": ["plassering"], | |
| "connected_rooms": ["Tilstøtende rom"], | |
| "has_external_access": false, | |
| "ceiling_height_m": {measurement_info['ceiling_height']}, | |
| "furniture": [], | |
| "estimated": false | |
| }} | |
| ] | |
| KRITISKE INSTRUKSJONER: | |
| 1. Finn og inkluder HVERT ENESTE rom som er synlig på plantegningen | |
| 2. Les romnavnene nøyaktig som de står på tegningen (f.eks. "SOV 1", "KJØKKEN", "STUE", "BAD", etc.) | |
| 3. Les de eksakte arealene som er vist på planen (f.eks. "25.5 m²", "12.3 m²", etc.) | |
| 4. Hvis bredde × lengde vises, bruk dem nøyaktig | |
| 5. Hvis bare areal vises, beregn omtrentlige dimensjoner: bredde ≈ √areal, lengde ≈ √areal | |
| 6. Tell vinduer nøye - se etter vinduessymboler i veggene | |
| 7. Tell dører - se etter dørsvingsymboler | |
| 8. Identifiser hvilke vegger som har vinduer/dører (nord, sør, øst, vest) | |
| 9. List tilstøtende rom som har forbindelse til hvert rom | |
| 10. Sjekk om rommet har direkte utgang til uteområde | |
| 11. Sett estimated=false KUN hvis du kan lese eksakte mål, ellers true | |
| 12. Hvis du ser møbelsymboler eller etiketter, list dem | |
| 13. Returner KUN JSON-arrayen - absolutt ingen forklaringer, ingen markdown-blokker, ingen ekstra tekst | |
| Norske romtyper å se etter: | |
| - Soverom (SOV, Soverom, Bedroom) | |
| - Kjøkken (Kitchen) | |
| - Stue (Living room, Salon) | |
| - Bad/Baderom (Bathroom, Vask, WC) | |
| - Toalett (WC, Toilet) | |
| - Gang/Korridor (Hallway) | |
| - Entré (Entrance, Inngang) | |
| - Bod/Garderobe (Storage, Closet, Skap) | |
| - Kontor (Office, Arbeidsrom) | |
| - Vaskerom (Laundry, Vaskeri) | |
| - Terrasse/Balkong (Terrace, Balcony, Uteplass) | |
| - Garasje (Garage, Biloppstilling) | |
| - Spisestue (Dining room) | |
| - Sportsbod (Sports storage) | |
| - Tech/Teknisk rom (Technical room) | |
| - Vindfang (Mudroom) | |
| - Trapperom (Stairwell, Trapp) | |
| - Loft/Hems (Attic, Loft) | |
| - Kjeller (Basement) | |
| Målestokk: 1:{measurement_info['scale']} | |
| Standard takhøyde: {measurement_info['ceiling_height']}m | |
| """ | |
| if description: | |
| prompt += f"\n\nBrukeren ga denne konteksten: {description}" | |
| return prompt | |
| def _extract_json(self, text): | |
| """Extract JSON from response""" | |
| if not text: | |
| return None | |
| # Remove markdown | |
| text = text.strip() | |
| text = re.sub(r'```(?:json|javascript)?\s*', '', text) | |
| text = text.strip('`').strip() | |
| # Try direct parse | |
| try: | |
| data = json.loads(text) | |
| if isinstance(data, list) and len(data) > 0: | |
| return data | |
| except json.JSONDecodeError: | |
| pass | |
| # Find JSON array | |
| patterns = [ | |
| r'\[\s*\{[\s\S]*?\}\s*\]', | |
| r'\[[\s\S]*?\]', | |
| ] | |
| for pattern in patterns: | |
| matches = list(re.finditer(pattern, text)) | |
| for match in sorted(matches, key=lambda m: len(m.group(0)), reverse=True): | |
| try: | |
| data = json.loads(match.group(0)) | |
| if isinstance(data, list) and len(data) > 0: | |
| return data | |
| except: | |
| continue | |
| logger.warning(f"Could not extract JSON from: {text[:300]}...") | |
| return None | |
| def _validate_measurements(self, data, measurement_info): | |
| """Validate and fix room measurements""" | |
| if not isinstance(data, list): | |
| return [] | |
| ceiling = measurement_info.get('ceiling_height', 2.4) | |
| for room in data: | |
| # Ensure required fields | |
| room.setdefault("name", "Unknown") | |
| room.setdefault("name_no", room["name"]) | |
| room.setdefault("ceiling_height_m", ceiling) | |
| room.setdefault("windows", 0) | |
| room.setdefault("doors", 1) | |
| room.setdefault("estimated", False) | |
| room.setdefault("furniture", []) | |
| room.setdefault("connected_rooms", []) | |
| room.setdefault("window_positions", []) | |
| room.setdefault("door_positions", []) | |
| # Fix dimensions | |
| if "dimensions_m" not in room: | |
| room["dimensions_m"] = {"width": 0, "length": 0} | |
| width = room["dimensions_m"].get("width", 0) | |
| length = room["dimensions_m"].get("length", 0) | |
| if width > 0 and length > 0: | |
| room["area_m2"] = round(width * length, 1) | |
| elif room.get("area_m2", 0) > 0: | |
| side = math.sqrt(room["area_m2"]) | |
| room["dimensions_m"]["width"] = round(side, 1) | |
| room["dimensions_m"]["length"] = round(side, 1) | |
| room["estimated"] = True | |
| else: | |
| room["dimensions_m"] = {"width": 3.0, "length": 3.0} | |
| room["area_m2"] = 9.0 | |
| room["estimated"] = True | |
| return data | |
| def _generate_fallback(self, measurement_info): | |
| """Generate fallback structure""" | |
| ceiling = measurement_info.get('ceiling_height', 2.4) | |
| return [ | |
| { | |
| "name": "Living Room", "name_no": "Stue", | |
| "area_m2": 35.0, "position": "center", | |
| "dimensions_m": {"width": 6.0, "length": 5.8}, | |
| "windows": 2, "doors": 2, | |
| "ceiling_height_m": ceiling, | |
| "estimated": True, | |
| "furniture": [], | |
| "connected_rooms": [], | |
| "window_positions": [], | |
| "door_positions": [], | |
| "has_external_access": False | |
| } | |
| ] | |
| app = FastAPI( | |
| title="Floor Plan API", | |
| version="1.0.7", | |
| docs_url="/" | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"] | |
| ) | |
| processor = FloorPlanProcessor() | |
| async def get_status(): | |
| return { | |
| "status": "running", | |
| "pdfs_count": len(processor.pdfs), | |
| "model": "gemini-2.5-pro" | |
| } | |
| async def get_pdfs(): | |
| return {"pdfs": [pdf.to_dict() for pdf in processor.pdfs.values()]} | |
| async def get_pdf(pdf_id: str): | |
| if pdf_id not in processor.pdfs: | |
| raise HTTPException(status_code=404, detail="PDF not found") | |
| return processor.pdfs[pdf_id].to_dict() | |
| async def upload_pdf(file: UploadFile = File(...)): | |
| content_type = file.content_type.lower() | |
| supported = ["application/pdf"] + list(processor.supported_image_formats.keys()) | |
| if content_type not in supported: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Unsupported file type"} | |
| ) | |
| try: | |
| file_content = await file.read() | |
| pdf_id = await processor.process_upload(file_content, file.filename, content_type) | |
| pdf_info = processor.pdfs[pdf_id].to_dict() | |
| return { | |
| "message": "Upload successful", | |
| "pdf_id": pdf_id, | |
| "pdf_info": pdf_info | |
| } | |
| except Exception as e: | |
| logger.error(f"Upload error: {str(e)}") | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| async def analyze_pdf(pdf_id: str, query: FloorPlanQuery = None): | |
| if pdf_id not in processor.pdfs: | |
| raise HTTPException(status_code=404, detail="PDF not found") | |
| pdf = processor.pdfs[pdf_id] | |
| if not pdf.processed: | |
| return JSONResponse(status_code=400, content={"error": "Still processing"}) | |
| if not pdf.images: | |
| return JSONResponse(status_code=400, content={"error": "No images"}) | |
| try: | |
| description = query.description if query else None | |
| start_time = time.time() | |
| result = await asyncio.wait_for( | |
| processor.analyze_floor_plan(pdf_id, description), | |
| timeout=1200 | |
| ) | |
| elapsed = time.time() - start_time | |
| pdf.analysis_result = result | |
| is_fallback = any( | |
| room.get("estimated") and len(result) <= 2 | |
| for room in result | |
| ) | |
| return { | |
| "message": "Analysis complete", | |
| "pdf_id": pdf_id, | |
| "measurement_info": pdf.measurement_info, | |
| "rooms": result, | |
| "analysis_time_seconds": round(elapsed, 1), | |
| "is_estimated": is_fallback, | |
| "room_count": len(result) | |
| } | |
| except Exception as e: | |
| logger.error(f"Analysis error: {str(e)}", exc_info=True) | |
| try: | |
| fallback = processor._generate_fallback(pdf.measurement_info) | |
| return { | |
| "message": "Error - using fallback", | |
| "pdf_id": pdf_id, | |
| "rooms": fallback, | |
| "is_estimated": True, | |
| "error": str(e) | |
| } | |
| except: | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": str(e), "pdf_id": pdf_id} | |
| ) | |
| async def find_room(pdf_id: str, query: RoomQuery): | |
| if pdf_id not in processor.pdfs: | |
| raise HTTPException(status_code=404, detail="PDF not found") | |
| pdf = processor.pdfs[pdf_id] | |
| if not pdf.analysis_result: | |
| raise HTTPException(status_code=400, content={"error": "Not analyzed yet"}) | |
| found = [] | |
| name_lower = query.room_name.lower() | |
| for room in pdf.analysis_result: | |
| en = room.get("name", "").lower() | |
| no = room.get("name_no", "").lower() | |
| if query.exact_match: | |
| if en == name_lower or no == name_lower: | |
| found.append(room) | |
| else: | |
| if name_lower in en or name_lower in no: | |
| found.append(room) | |
| if not found: | |
| raise HTTPException(status_code=404, content={"error": "Room not found"}) | |
| if len(found) == 1: | |
| return {"message": "Room found", "pdf_id": pdf_id, "room": found[0]} | |
| return { | |
| "message": f"Found {len(found)} rooms", | |
| "pdf_id": pdf_id, | |
| "rooms": found | |
| } | |
| async def startup_event(): | |
| os.makedirs("uploads", exist_ok=True) | |
| os.makedirs("logs", exist_ok=True) | |
| logger.info("\n" + "="*60) | |
| logger.info("Floor Plan API - Optimized Version") | |
| logger.info(f"Model: gemini-2.5-pro") | |
| logger.info(f"With Image Preprocessing: YES") | |
| logger.info(f"API Key: {'SET' if GOOGLE_API_KEY else 'NOT SET'}") | |
| logger.info(f"Port: 7860") | |
| logger.info("="*60 + "\n") | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |