iplotnor's picture
Update app.py
9ed1d07 verified
import os
import asyncio
from typing import Optional
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import uvicorn
import logging
import json
import re
from io import BytesIO
import math
import time
import fitz
from PIL import Image, ImageEnhance, ImageFilter
import google.generativeai as genai
from google.generativeai.types import HarmCategory, HarmBlockThreshold
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler("floor_plan_api.log")
]
)
logger = logging.getLogger(__name__)
GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY')
if not GOOGLE_API_KEY:
logger.warning("GOOGLE_API_KEY not set!")
else:
genai.configure(api_key=GOOGLE_API_KEY)
os.makedirs("uploads", exist_ok=True)
class FloorPlanQuery(BaseModel):
description: Optional[str] = None
class RoomQuery(BaseModel):
room_name: str
exact_match: bool = False
class PDF:
def __init__(self, filename, content_type):
self.filename = filename
self.content_type = content_type
self.id = re.sub(r'[^a-zA-Z0-9]', '_', filename)
self.processed = False
self.error = None
self.images = []
self.page_count = 0
self.file_type = "pdf" if content_type == "application/pdf" else "image"
self.measurement_info = {
"scale": 100,
"ceiling_height": 2.4,
"room_dimensions": {}
}
self.analysis_result = None
def to_dict(self):
return {
"id": self.id,
"filename": self.filename,
"content_type": self.content_type,
"file_type": self.file_type,
"processed": self.processed,
"error": self.error,
"page_count": self.page_count if self.file_type == "pdf" else None,
"image_count": len(self.images) if self.images else 0,
"measurement_info": self.measurement_info,
"has_analysis": self.analysis_result is not None,
"room_count": len(self.analysis_result) if self.analysis_result else 0
}
class FloorPlanProcessor:
def __init__(self):
self.model = genai.GenerativeModel('gemini-2.5-pro')
self.pdfs = {}
self.supported_image_formats = {
"image/jpeg": ".jpg",
"image/png": ".png",
"image/gif": ".gif",
"image/bmp": ".bmp",
"image/tiff": ".tiff",
"image/webp": ".webp"
}
async def process_upload(self, file_content, filename, content_type):
pdf_id = re.sub(r'[^a-zA-Z0-9]', '_', filename)
logger.info(f"Processing {filename} (ID: {pdf_id})")
pdf = PDF(filename, content_type)
self.pdfs[pdf_id] = pdf
try:
extension = ".pdf" if content_type == "application/pdf" else self.supported_image_formats.get(content_type, ".png")
file_path = f"uploads/{pdf_id}{extension}"
with open(file_path, "wb") as f:
f.write(file_content)
if content_type == "application/pdf":
await self.extract_images_from_pdf(pdf, file_content)
elif content_type in self.supported_image_formats:
await self.process_image(pdf, file_content)
else:
raise ValueError(f"Unsupported type: {content_type}")
pdf.processed = True
logger.info(f"Processing complete: {pdf_id}")
return pdf_id
except Exception as e:
logger.error(f"Error processing {filename}: {str(e)}", exc_info=True)
pdf.error = str(e)
return pdf_id
async def process_image(self, pdf, file_content):
try:
img = Image.open(BytesIO(file_content))
logger.info(f"Image: {img.width}x{img.height}")
pdf.images.append(img)
return True
except Exception as e:
logger.error(f"Image error: {str(e)}")
pdf.error = str(e)
return False
async def extract_images_from_pdf(self, pdf, file_content):
try:
pdf_document = fitz.open(stream=file_content, filetype="pdf")
pdf.page_count = len(pdf_document)
images = []
pages_to_process = min(3, pdf.page_count) # process first 3 pages
for page_num in range(pages_to_process):
page = pdf_document[page_num]
try:
# Force full-page rendering (ignore embedded images)
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2), alpha=False)
try:
img = Image.open(BytesIO(pix.tobytes("png")))
except:
img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
images.append(img)
logger.info(f"Rendered full page {page_num+1}: {img.size}")
except Exception as e:
logger.error(f"Page render failed [{page_num+1}]: {e}")
if not images:
raise ValueError("Rendering failed for all pages")
pdf.images = images
return True
except Exception as e:
pdf.error = str(e)
logger.error(f"PDF extraction failed: {e}", exc_info=True)
return False
def _select_single_best_image(self, images):
"""Select the single best image"""
if len(images) == 1:
return images[0]
# Score by area (largest = best for floor plans)
scored = [(img.size[0] * img.size[1], img) for img in images]
scored.sort(reverse=True, key=lambda x: x[0])
best = scored[0][1]
logger.info(f"Selected best from {len(images)} images")
return best
def _preprocess_floor_plan(self, image):
"""
Enhance floor plan image for better analysis
- Improves contrast (helps model see room boundaries better)
- Sharpens edges (makes text/dimensions clearer)
- Removes noise (reduces confusion from scan artifacts)
"""
try:
logger.info(f"Preprocessing image: {image.size}")
# Step 1: Convert to grayscale for processing
if image.mode != 'L':
gray = image.convert('L')
else:
gray = image
# Step 2: Enhance contrast (1.5x = moderate boost)
enhancer = ImageEnhance.Contrast(gray)
gray = enhancer.enhance(1.5)
logger.info("✓ Contrast enhanced")
# Step 3: Sharpen to make text/lines clearer
gray = gray.filter(ImageFilter.SHARPEN)
logger.info("✓ Sharpened")
# Step 4: Remove noise with median filter
gray = gray.filter(ImageFilter.MedianFilter(size=3))
logger.info("✓ Noise removed")
# Step 5: Convert back to RGB for Gemini
result = gray.convert('RGB')
logger.info(f"✓ Preprocessing complete: {result.size}")
return result
except Exception as e:
logger.error(f"Preprocessing error: {str(e)}")
return image # Return original if preprocessing fails
def _optimize_image(self, image, target_size=2048):
"""Optimize image for analysis"""
if image.mode not in ('RGB', 'L'):
image = image.convert('RGB')
width, height = image.size
if width > target_size or height > target_size:
ratio = target_size / max(width, height)
new_width = int(width * ratio)
new_height = int(height * ratio)
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
logger.info(f"Resized: {width}x{height}{new_width}x{new_height}")
return image
async def analyze_floor_plan(self, pdf_id, description=None):
pdf = self.pdfs.get(pdf_id)
if not pdf:
raise ValueError(f"PDF {pdf_id} not found")
if not pdf.images:
raise ValueError(f"No images in {pdf_id}")
logger.info(f"\n{'='*70}")
logger.info(f"Analyzing: {pdf_id}")
logger.info(f"Images: {len(pdf.images)}")
logger.info(f"{'='*70}")
# Use ONLY the first/best image for single file analysis
best_image = self._select_single_best_image(pdf.images)
best_image = self._preprocess_floor_plan(best_image)
optimized_image = self._optimize_image(best_image, target_size=2048)
logger.info(f"Using single image: {optimized_image.size[0]}x{optimized_image.size[1]}px")
# Try analysis with extended timeout
max_retries = 3
for attempt in range(max_retries):
try:
logger.info(f"\nAttempt {attempt + 1}/{max_retries}")
result = await self._analyze_with_gemini(
optimized_image,
pdf.measurement_info,
description,
timeout=600,
attempt=attempt
)
if result and len(result) > 0:
logger.info(f"✓ SUCCESS: {len(result)} rooms detected")
return result
except asyncio.TimeoutError:
logger.warning(f"Timeout on attempt {attempt + 1}")
if attempt < max_retries - 1:
await asyncio.sleep(10)
continue
except Exception as e:
error_str = str(e)
logger.error(f"Attempt {attempt + 1} error: {error_str}")
# Check for retryable errors
if any(k in error_str.lower() for k in ['504', '503', '429', 'timeout', 'deadline']):
if attempt < max_retries - 1:
wait = 15 * (attempt + 1)
logger.info(f"Waiting {wait}s before retry...")
await asyncio.sleep(wait)
continue
# Non-retryable error
logger.error(f"Non-retryable error: {error_str}")
raise
logger.warning("All attempts failed, using fallback")
return self._generate_fallback(pdf.measurement_info)
async def _analyze_with_gemini(self, image, measurement_info, description, timeout, attempt=0):
"""Analyze with Gemini API"""
prompt = self._create_detailed_prompt(description, measurement_info)
# Adjust parameters per attempt
temperature = 0.2 if attempt == 0 else 0.3
max_tokens = 16384
logger.info(f"Config: temp={temperature}, max_tokens={max_tokens}")
start_time = time.time()
loop = asyncio.get_event_loop()
# Create safety settings with correct format
safety_settings = {
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
}
def make_request():
return self.model.generate_content(
[prompt, image],
generation_config=genai.GenerationConfig(
temperature=temperature,
max_output_tokens=max_tokens,
top_p=0.95,
top_k=40,
),
safety_settings=safety_settings,
request_options={'timeout': timeout}
)
try:
response = await asyncio.wait_for(
loop.run_in_executor(None, make_request),
timeout=timeout + 30
)
elapsed = time.time() - start_time
logger.info(f"Response in {elapsed:.1f}s ({len(response.text)} chars)")
# Extract JSON
parsed = self._extract_json(response.text)
if parsed and len(parsed) > 0:
validated = self._validate_measurements(parsed, measurement_info)
logger.info(f"Validated {len(validated)} rooms")
return validated
else:
logger.warning("No valid JSON found")
return None
except Exception as e:
logger.error(f"Gemini API error: {str(e)}")
raise
def _create_detailed_prompt(self, description, measurement_info):
"""Create detailed prompt optimized for Norwegian floor plans"""
prompt = f"""Du er en ekspert på norske plantegninger. Analyser denne plantegningen nøye og ekstraher ALL rom med komplette detaljer.
Returner KUN en JSON-array i dette eksakte formatet:
[
{{
"name": "Living Room",
"name_no": "Stue",
"area_m2": 0.0,
"position": "beskrivelse av plassering",
"dimensions_m": {{"width": 0.0, "length": 0.0}},
"windows": 0,
"window_positions": ["vegg plassering"],
"doors": 0,
"door_positions": ["plassering"],
"connected_rooms": ["Tilstøtende rom"],
"has_external_access": false,
"ceiling_height_m": {measurement_info['ceiling_height']},
"furniture": [],
"estimated": false
}}
]
KRITISKE INSTRUKSJONER:
1. Finn og inkluder HVERT ENESTE rom som er synlig på plantegningen
2. Les romnavnene nøyaktig som de står på tegningen (f.eks. "SOV 1", "KJØKKEN", "STUE", "BAD", etc.)
3. Les de eksakte arealene som er vist på planen (f.eks. "25.5 m²", "12.3 m²", etc.)
4. Hvis bredde × lengde vises, bruk dem nøyaktig
5. Hvis bare areal vises, beregn omtrentlige dimensjoner: bredde ≈ √areal, lengde ≈ √areal
6. Tell vinduer nøye - se etter vinduessymboler i veggene
7. Tell dører - se etter dørsvingsymboler
8. Identifiser hvilke vegger som har vinduer/dører (nord, sør, øst, vest)
9. List tilstøtende rom som har forbindelse til hvert rom
10. Sjekk om rommet har direkte utgang til uteområde
11. Sett estimated=false KUN hvis du kan lese eksakte mål, ellers true
12. Hvis du ser møbelsymboler eller etiketter, list dem
13. Returner KUN JSON-arrayen - absolutt ingen forklaringer, ingen markdown-blokker, ingen ekstra tekst
Norske romtyper å se etter:
- Soverom (SOV, Soverom, Bedroom)
- Kjøkken (Kitchen)
- Stue (Living room, Salon)
- Bad/Baderom (Bathroom, Vask, WC)
- Toalett (WC, Toilet)
- Gang/Korridor (Hallway)
- Entré (Entrance, Inngang)
- Bod/Garderobe (Storage, Closet, Skap)
- Kontor (Office, Arbeidsrom)
- Vaskerom (Laundry, Vaskeri)
- Terrasse/Balkong (Terrace, Balcony, Uteplass)
- Garasje (Garage, Biloppstilling)
- Spisestue (Dining room)
- Sportsbod (Sports storage)
- Tech/Teknisk rom (Technical room)
- Vindfang (Mudroom)
- Trapperom (Stairwell, Trapp)
- Loft/Hems (Attic, Loft)
- Kjeller (Basement)
Målestokk: 1:{measurement_info['scale']}
Standard takhøyde: {measurement_info['ceiling_height']}m
"""
if description:
prompt += f"\n\nBrukeren ga denne konteksten: {description}"
return prompt
def _extract_json(self, text):
"""Extract JSON from response"""
if not text:
return None
# Remove markdown
text = text.strip()
text = re.sub(r'```(?:json|javascript)?\s*', '', text)
text = text.strip('`').strip()
# Try direct parse
try:
data = json.loads(text)
if isinstance(data, list) and len(data) > 0:
return data
except json.JSONDecodeError:
pass
# Find JSON array
patterns = [
r'\[\s*\{[\s\S]*?\}\s*\]',
r'\[[\s\S]*?\]',
]
for pattern in patterns:
matches = list(re.finditer(pattern, text))
for match in sorted(matches, key=lambda m: len(m.group(0)), reverse=True):
try:
data = json.loads(match.group(0))
if isinstance(data, list) and len(data) > 0:
return data
except:
continue
logger.warning(f"Could not extract JSON from: {text[:300]}...")
return None
def _validate_measurements(self, data, measurement_info):
"""Validate and fix room measurements"""
if not isinstance(data, list):
return []
ceiling = measurement_info.get('ceiling_height', 2.4)
for room in data:
# Ensure required fields
room.setdefault("name", "Unknown")
room.setdefault("name_no", room["name"])
room.setdefault("ceiling_height_m", ceiling)
room.setdefault("windows", 0)
room.setdefault("doors", 1)
room.setdefault("estimated", False)
room.setdefault("furniture", [])
room.setdefault("connected_rooms", [])
room.setdefault("window_positions", [])
room.setdefault("door_positions", [])
# Fix dimensions
if "dimensions_m" not in room:
room["dimensions_m"] = {"width": 0, "length": 0}
width = room["dimensions_m"].get("width", 0)
length = room["dimensions_m"].get("length", 0)
if width > 0 and length > 0:
room["area_m2"] = round(width * length, 1)
elif room.get("area_m2", 0) > 0:
side = math.sqrt(room["area_m2"])
room["dimensions_m"]["width"] = round(side, 1)
room["dimensions_m"]["length"] = round(side, 1)
room["estimated"] = True
else:
room["dimensions_m"] = {"width": 3.0, "length": 3.0}
room["area_m2"] = 9.0
room["estimated"] = True
return data
def _generate_fallback(self, measurement_info):
"""Generate fallback structure"""
ceiling = measurement_info.get('ceiling_height', 2.4)
return [
{
"name": "Living Room", "name_no": "Stue",
"area_m2": 35.0, "position": "center",
"dimensions_m": {"width": 6.0, "length": 5.8},
"windows": 2, "doors": 2,
"ceiling_height_m": ceiling,
"estimated": True,
"furniture": [],
"connected_rooms": [],
"window_positions": [],
"door_positions": [],
"has_external_access": False
}
]
app = FastAPI(
title="Floor Plan API",
version="1.0.7",
docs_url="/"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
processor = FloorPlanProcessor()
@app.get("/status")
async def get_status():
return {
"status": "running",
"pdfs_count": len(processor.pdfs),
"model": "gemini-2.5-pro"
}
@app.get("/pdfs")
async def get_pdfs():
return {"pdfs": [pdf.to_dict() for pdf in processor.pdfs.values()]}
@app.get("/pdf/{pdf_id}")
async def get_pdf(pdf_id: str):
if pdf_id not in processor.pdfs:
raise HTTPException(status_code=404, detail="PDF not found")
return processor.pdfs[pdf_id].to_dict()
@app.post("/upload")
async def upload_pdf(file: UploadFile = File(...)):
content_type = file.content_type.lower()
supported = ["application/pdf"] + list(processor.supported_image_formats.keys())
if content_type not in supported:
return JSONResponse(
status_code=400,
content={"error": "Unsupported file type"}
)
try:
file_content = await file.read()
pdf_id = await processor.process_upload(file_content, file.filename, content_type)
pdf_info = processor.pdfs[pdf_id].to_dict()
return {
"message": "Upload successful",
"pdf_id": pdf_id,
"pdf_info": pdf_info
}
except Exception as e:
logger.error(f"Upload error: {str(e)}")
return JSONResponse(status_code=500, content={"error": str(e)})
@app.post("/analyze/{pdf_id}")
async def analyze_pdf(pdf_id: str, query: FloorPlanQuery = None):
if pdf_id not in processor.pdfs:
raise HTTPException(status_code=404, detail="PDF not found")
pdf = processor.pdfs[pdf_id]
if not pdf.processed:
return JSONResponse(status_code=400, content={"error": "Still processing"})
if not pdf.images:
return JSONResponse(status_code=400, content={"error": "No images"})
try:
description = query.description if query else None
start_time = time.time()
result = await asyncio.wait_for(
processor.analyze_floor_plan(pdf_id, description),
timeout=1200
)
elapsed = time.time() - start_time
pdf.analysis_result = result
is_fallback = any(
room.get("estimated") and len(result) <= 2
for room in result
)
return {
"message": "Analysis complete",
"pdf_id": pdf_id,
"measurement_info": pdf.measurement_info,
"rooms": result,
"analysis_time_seconds": round(elapsed, 1),
"is_estimated": is_fallback,
"room_count": len(result)
}
except Exception as e:
logger.error(f"Analysis error: {str(e)}", exc_info=True)
try:
fallback = processor._generate_fallback(pdf.measurement_info)
return {
"message": "Error - using fallback",
"pdf_id": pdf_id,
"rooms": fallback,
"is_estimated": True,
"error": str(e)
}
except:
return JSONResponse(
status_code=500,
content={"error": str(e), "pdf_id": pdf_id}
)
@app.post("/room/{pdf_id}")
async def find_room(pdf_id: str, query: RoomQuery):
if pdf_id not in processor.pdfs:
raise HTTPException(status_code=404, detail="PDF not found")
pdf = processor.pdfs[pdf_id]
if not pdf.analysis_result:
raise HTTPException(status_code=400, content={"error": "Not analyzed yet"})
found = []
name_lower = query.room_name.lower()
for room in pdf.analysis_result:
en = room.get("name", "").lower()
no = room.get("name_no", "").lower()
if query.exact_match:
if en == name_lower or no == name_lower:
found.append(room)
else:
if name_lower in en or name_lower in no:
found.append(room)
if not found:
raise HTTPException(status_code=404, content={"error": "Room not found"})
if len(found) == 1:
return {"message": "Room found", "pdf_id": pdf_id, "room": found[0]}
return {
"message": f"Found {len(found)} rooms",
"pdf_id": pdf_id,
"rooms": found
}
@app.on_event("startup")
async def startup_event():
os.makedirs("uploads", exist_ok=True)
os.makedirs("logs", exist_ok=True)
logger.info("\n" + "="*60)
logger.info("Floor Plan API - Optimized Version")
logger.info(f"Model: gemini-2.5-pro")
logger.info(f"With Image Preprocessing: YES")
logger.info(f"API Key: {'SET' if GOOGLE_API_KEY else 'NOT SET'}")
logger.info(f"Port: 7860")
logger.info("="*60 + "\n")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)