areaplanner / app.py
iplotnor's picture
Update app.py
1c37bfb verified
import os, re, json, requests
from urllib.parse import urlparse, parse_qs, urljoin
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Dict, List, Any
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_TOKEN = os.getenv("AREALPLANER_API_TOKEN", "")
HTTP_TIMEOUT = int(os.getenv("HTTP_TIMEOUT", "30"))
BLOB_RE = re.compile(r'https://[a-z0-9.-]*blob\.core\.windows\.net/[^\s"\'<>]+\.pdf[^\s"\'<>]*', re.IGNORECASE)
def _std_headers():
return {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "application/pdf,text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": "https://www.arealplaner.no/",
"Accept-Language": "en-US,en;q=0.9,nb;q=0.8",
}
def debug_blob_url_content(blob_url: str, doc_name: str = "Unknown") -> Dict[str, Any]:
"""Debug what's actually being returned by the blob URL"""
print(f"\n{'='*80}")
print(f"πŸ” DEBUGGING BLOB URL CONTENT")
print(f"Document: {doc_name}")
print(f"URL: {blob_url}")
print(f"{'='*80}")
# Parse URL to check SAS token expiration
parsed = urlparse(blob_url)
query_params = parse_qs(parsed.query)
print(f"\nπŸ“Š URL ANALYSIS:")
print(f" Host: {parsed.netloc}")
print(f" Path: {parsed.path}")
print(f" SAS Version (sv): {query_params.get('sv', ['Not found'])[0]}")
print(f" Expiry (se): {query_params.get('se', ['Not found'])[0]}")
print(f" Resource (sr): {query_params.get('sr', ['Not found'])[0]}")
print(f" Permissions (sp): {query_params.get('sp', ['Not found'])[0]}")
# Test HEAD request
print(f"\nπŸ” HEAD REQUEST TEST:")
try:
head_response = requests.head(blob_url, timeout=HTTP_TIMEOUT)
print(f" Status Code: {head_response.status_code}")
print(f" Headers:")
for key, value in head_response.headers.items():
print(f" {key}: {value}")
if head_response.status_code != 200:
print(f" ❌ HEAD request failed with status {head_response.status_code}")
return {
"status": "head_failed",
"status_code": head_response.status_code,
"headers": dict(head_response.headers)
}
except Exception as e:
print(f" ❌ HEAD request error: {e}")
return {"status": "head_error", "error": str(e)}
# Test partial GET request (first 1KB)
print(f"\nπŸ” PARTIAL GET REQUEST TEST (first 1KB):")
try:
headers = {"Range": "bytes=0-1023"}
partial_response = requests.get(blob_url, headers=headers, timeout=HTTP_TIMEOUT)
print(f" Status Code: {partial_response.status_code}")
print(f" Content-Length: {len(partial_response.content)} bytes")
# Check if it's a PDF
content = partial_response.content
first_bytes = content[:10]
print(f" First 10 bytes (hex): {first_bytes.hex()}")
print(f" First 10 bytes (ascii): {repr(first_bytes)}")
is_pdf = content.startswith(b'%PDF-')
is_html = b'<html' in content.lower() or b'<!doctype' in content.lower()
is_xml = content.startswith(b'<?xml') or content.startswith(b'<')
print(f" Is PDF: {'βœ… YES' if is_pdf else '❌ NO'}")
print(f" Is HTML: {'⚠️ YES' if is_html else 'βœ… NO'}")
print(f" Is XML: {'⚠️ YES' if is_xml else 'βœ… NO'}")
# If it's text content, show some of it
if is_html or is_xml or not is_pdf:
try:
text_content = content.decode('utf-8', errors='ignore')[:500]
print(f" Content Preview:")
print(f" {repr(text_content)}")
except:
print(f" Content Preview: (binary data)")
return {
"status": "success",
"content_type": partial_response.headers.get('Content-Type', 'Unknown'),
"content_length": len(content),
"is_pdf": is_pdf,
"is_html": is_html,
"is_xml": is_xml,
"first_bytes": first_bytes.hex(),
"content_preview": content.decode('utf-8', errors='ignore')[:200] if not is_pdf else "PDF content"
}
except Exception as e:
print(f" ❌ Partial GET request error: {e}")
return {"status": "get_error", "error": str(e)}
def test_alternative_pdf_access(tenant: str, doc_id: int, api_token: str) -> List[Dict[str, Any]]:
"""Try alternative ways to access the PDF"""
print(f"\n{'='*60}")
print(f"πŸ”§ TESTING ALTERNATIVE PDF ACCESS METHODS")
print(f"Document ID: {doc_id}")
print(f"{'='*60}")
headers = _std_headers()
if api_token:
headers["x-waapi-token"] = api_token
# Try different endpoints
endpoints_to_try = [
f"https://www.arealplaner.no/{tenant}/dokumenter/{doc_id}",
f"https://www.arealplaner.no/{tenant}/dokumenter/{doc_id}/download",
f"https://api.arealplaner.no/api/kunder/{tenant}/dokumenter/{doc_id}/download",
f"https://api.arealplaner.no/api/kunder/{tenant}/dokumenter/{doc_id}/fil",
f"https://api.arealplaner.no/api/kunder/{tenant}/dokumenter/{doc_id}/direkteurl",
]
results = []
for endpoint in endpoints_to_try:
print(f"\nπŸ” Trying: {endpoint}")
try:
# Try HEAD first
head_resp = requests.head(endpoint, headers=headers, allow_redirects=True, timeout=HTTP_TIMEOUT)
print(f" HEAD Status: {head_resp.status_code}")
print(f" Final URL: {head_resp.url}")
print(f" Content-Type: {head_resp.headers.get('Content-Type', 'Unknown')}")
# If it's a PDF or looks promising, try GET
content_type = head_resp.headers.get('Content-Type', '').lower()
if 'pdf' in content_type or head_resp.status_code == 200:
# Try partial GET
get_resp = requests.get(endpoint, headers={**headers, "Range": "bytes=0-1023"},
allow_redirects=True, timeout=HTTP_TIMEOUT)
is_pdf = get_resp.content.startswith(b'%PDF-')
print(f" GET Status: {get_resp.status_code}")
print(f" Is PDF: {'βœ… YES' if is_pdf else '❌ NO'}")
if is_pdf:
print(f" πŸŽ‰ FOUND WORKING PDF ENDPOINT!")
results.append({
"endpoint": endpoint,
"status": "success",
"final_url": get_resp.url,
"content_type": get_resp.headers.get('Content-Type'),
"is_pdf": True
})
else:
# Show what we got instead
preview = get_resp.content[:200].decode('utf-8', errors='ignore')
print(f" Content preview: {repr(preview)}")
results.append({
"endpoint": endpoint,
"status": "not_pdf",
"content_preview": preview,
"is_pdf": False
})
else:
results.append({
"endpoint": endpoint,
"status": "failed",
"status_code": head_resp.status_code,
"is_pdf": False
})
except Exception as e:
print(f" ❌ Error: {e}")
results.append({
"endpoint": endpoint,
"status": "error",
"error": str(e),
"is_pdf": False
})
return results
def parse_step1_url(url: str):
"""Parse the step 1 URL to extract tenant, kommune, and plan ID"""
p = urlparse(url)
parts = [s for s in p.path.split("/") if s]
if not parts:
raise ValueError("Invalid Step-1 URL path")
tenant = parts[0]
qs = parse_qs(p.query)
kommunenummer = qs.get("kommunenummer", [None])[0]
planident = qs.get("planidentifikasjon", [None])[0]
print(f"Parsed URL - Tenant: {tenant}, Kommune: {kommunenummer}, Plan: {planident}")
if not (tenant and kommunenummer and planident):
raise ValueError("Missing tenant/kommunenummer/planidentifikasjon")
return tenant, kommunenummer, planident
def get_internal_plan_id(tenant: str, kommunenummer: str, planident: str, api_token: str):
"""Get the internal plan ID"""
url = f"https://api.arealplaner.no/api/gi/kunder/{tenant}/arealplaner/id/{kommunenummer}/{planident}"
print(f"Fetching internal plan ID from: {url}")
try:
r = requests.get(url, params={"apitoken": api_token}, timeout=HTTP_TIMEOUT)
if r.status_code == 401:
r = requests.get(url, headers={"x-waapi-token": api_token}, timeout=HTTP_TIMEOUT)
r.raise_for_status()
print(f"Internal plan ID response status: {r.status_code}")
print(f"Internal plan ID response text: {r.text}")
# Try to parse as JSON first, then as plain text
try:
data = r.json()
print(f"Internal plan ID JSON data: {data}")
if isinstance(data, int):
return data
if isinstance(data, dict):
for k in ("id", "planId", "arealplanId"):
v = data.get(k)
if isinstance(v, int):
return v
except json.JSONDecodeError:
pass
# Try as plain text
txt = r.text.strip()
if txt.isdigit():
return int(txt)
raise RuntimeError(f"Unexpected internal plan id response: {txt[:200]}")
except Exception as e:
print(f"Error getting internal plan ID: {e}")
raise
def get_documents(tenant: str, internal_plan_id: int, api_token: str):
"""Get the list of documents for a plan"""
url = f"https://api.arealplaner.no/api/kunder/{tenant}/arealplaner/{internal_plan_id}/dokumenter"
print(f"Fetching documents from: {url}")
try:
r = requests.get(url, headers={"x-waapi-token": api_token}, timeout=HTTP_TIMEOUT)
if r.status_code == 401:
raise RuntimeError("Unauthorized: invalid x-waapi-token")
r.raise_for_status()
print(f"Documents response status: {r.status_code}")
data = r.json()
print(f"Documents count: {len(data) if isinstance(data, list) else 'Not a list'}")
if not isinstance(data, list):
raise RuntimeError("Documents response not a list")
return data
except Exception as e:
print(f"Error getting documents: {e}")
raise
def get_planning_treatments(tenant: str, internal_plan_id: int, api_token: str):
"""Get the list of planning treatments for a plan"""
url = f"https://api.arealplaner.no/api/kunder/{tenant}/arealplaner/{internal_plan_id}/behandlinger?includeDokumenter=true"
print(f"Fetching planning treatments from: {url}")
try:
r = requests.get(url, headers={"x-waapi-token": api_token}, timeout=HTTP_TIMEOUT)
if r.status_code == 401:
raise RuntimeError("Unauthorized: invalid x-waapi-token")
r.raise_for_status()
print(f"Planning treatments response status: {r.status_code}")
data = r.json()
print(f"Planning treatments count: {len(data) if isinstance(data, list) else 'Not a list'}")
if not isinstance(data, list):
raise RuntimeError("Planning treatments response not a list")
# Extract documents from treatments
treatment_docs = []
for treatment in data:
if isinstance(treatment, dict) and 'dokumenter' in treatment:
docs = treatment.get('dokumenter', [])
if isinstance(docs, list):
treatment_docs.extend(docs)
print(f"Total treatment documents found: {len(treatment_docs)}")
return treatment_docs
except Exception as e:
print(f"Error getting planning treatments: {e}")
raise
def get_exemptions(tenant: str, internal_plan_id: int, api_token: str):
"""Get the list of exemptions for a plan"""
url = f"https://api.arealplaner.no/api/kunder/{tenant}/arealplaner/{internal_plan_id}/dispensasjoner"
print(f"Fetching exemptions from: {url}")
try:
r = requests.get(url, headers={"x-waapi-token": api_token}, timeout=HTTP_TIMEOUT)
if r.status_code == 401:
raise RuntimeError("Unauthorized: invalid x-waapi-token")
r.raise_for_status()
print(f"Exemptions response status: {r.status_code}")
data = r.json()
print(f"Exemptions count: {len(data) if isinstance(data, list) else 'Not a list'}")
if not isinstance(data, list):
raise RuntimeError("Exemptions response not a list")
return data
except Exception as e:
print(f"Error getting exemptions: {e}")
raise
def get_single_document_info(tenant: str, doc_id: int, api_token: str) -> Optional[Dict[str, Any]]:
"""Get individual document info with validation"""
url = f"https://api.arealplaner.no/api/kunder/{tenant}/dokumenter/{doc_id}"
headers = _std_headers()
if api_token:
headers["x-waapi-token"] = api_token
try:
print(f"Fetching document info for ID {doc_id}")
r = requests.get(url, headers=headers, timeout=HTTP_TIMEOUT)
r.raise_for_status()
data = r.json()
if isinstance(data, dict) and data.get('id') == doc_id:
print(f"Successfully fetched document {doc_id}: {data.get('dokumentnavn')}")
return data
else:
print(f"ERROR: API returned wrong document (expected ID {doc_id}, got {data.get('id') if isinstance(data, dict) else 'invalid response'})")
return None
except Exception as e:
print(f"Failed to fetch document {doc_id}: {e}")
return None
def find_working_pdf_url(tenant: str, doc_id: int, doc_name: str, api_token: str) -> Optional[str]:
"""Find a working PDF URL for a document"""
print(f"\n{'='*80}")
print(f"πŸ” FINDING WORKING PDF URL")
print(f"Document: {doc_name} (ID: {doc_id})")
print(f"{'='*80}")
# Step 1: Get fresh document info
doc_data = get_single_document_info(tenant, doc_id, api_token)
if not doc_data:
return None
# Step 2: Test direkteUrl if available
direkte_url = doc_data.get('direkteUrl')
if direkte_url:
print(f"\nπŸ“‹ Testing direkteUrl: {direkte_url}")
blob_debug = debug_blob_url_content(direkte_url, doc_name)
if blob_debug.get('status') == 'success' and blob_debug.get('is_pdf'):
print(f"βœ… direkteUrl works!")
return direkte_url
else:
print(f"❌ direkteUrl doesn't work: {blob_debug.get('status')}")
# Step 3: Try alternative access methods
print(f"\nπŸ“‹ Testing alternative access methods...")
alt_results = test_alternative_pdf_access(tenant, doc_id, api_token)
# Find the first working PDF endpoint
for result in alt_results:
if result.get('status') == 'success' and result.get('is_pdf'):
working_url = result.get('final_url')
print(f"βœ… Found working PDF URL: {working_url}")
return working_url
print(f"❌ No working PDF URL found for document {doc_name}")
return None
def split_rulebook_and_planning(docs):
"""Split documents into rulebook and planning documents"""
print("Splitting documents into rulebook and planning documents")
bestem = [d for d in docs if (d.get("dokumenttype") or "").lower() == "bestemmelser" or d.get("dokumenttypeId") == 5]
print(f"Found {len(bestem)} bestemmelser documents")
rule_doc = sorted(bestem, key=lambda d: (d.get("dokumentdato") or ""), reverse=True)[0] if bestem else (docs[0] if docs else None)
planning_docs = [d for d in docs if d is not rule_doc]
print(f"Rule document: {rule_doc.get('dokumentnavn') if rule_doc else 'None'}")
print(f"Planning documents: {len(planning_docs)}")
return rule_doc, planning_docs
def fix_failed_documents(result: Dict, api_token: str) -> Dict:
"""
Post-process result to fix any failed documents by getting direkteUrl from API
"""
tenant = result["inputs"]["tenant"]
# Fix rule book if failed
if result.get("rule_book") and result["rule_book"]["status"] == "failed" and result["rule_book"]["link"] is None:
doc_id = result["rule_book"]["id"]
try:
url = f"https://api.arealplaner.no/api/kunder/{tenant}/dokumenter/{doc_id}"
headers = {"x-waapi-token": api_token, "Accept": "application/json"}
response = requests.get(url, headers=headers, timeout=HTTP_TIMEOUT)
if response.status_code == 200:
data = response.json()
direkte_url = data.get('direkteUrl')
if direkte_url:
result["rule_book"]["link"] = direkte_url
result["rule_book"]["status"] = "success"
except:
pass # Keep original failed status
# Fix planning documents if failed
for i, doc in enumerate(result["planning_documents"]):
if doc["status"] == "failed" and doc["link"] is None:
doc_id = doc["id"]
try:
url = f"https://api.arealplaner.no/api/kunder/{tenant}/dokumenter/{doc_id}"
headers = {"x-waapi-token": api_token, "Accept": "application/json"}
response = requests.get(url, headers=headers, timeout=HTTP_TIMEOUT)
if response.status_code == 200:
data = response.json()
direkte_url = data.get('direkteUrl')
if direkte_url:
result["planning_documents"][i]["link"] = direkte_url
result["planning_documents"][i]["status"] = "success"
except:
pass # Keep original failed status
return result
def process_document_list(
docs: List[Dict],
tenant: str,
api_token: str,
debug_mode: bool,
doc_type: str = "document",
use_blob: bool = True
) -> List[Dict[str, Any]]:
"""Process a list of documents and resolve their PDF URLs"""
processed_docs = []
for i, doc in enumerate(docs):
doc_id = doc.get('id')
doc_name = doc.get('dokumentnavn') or doc.get('name', f"Unknown {doc_type}")
if not doc_id:
print(f"⚠️ Skipping {doc_type} without ID: {doc_name}")
continue
print(f"\n{'='*100}")
print(f"PROCESSING {doc_type.upper()} {i+1}/{len(docs)}: {doc_name}")
print(f"{'='*100}")
working_url = None
if use_blob:
# Normal blob resolution (rule books / planning docs)
if debug_mode:
working_url = find_working_pdf_url(tenant, doc_id, doc_name, api_token)
else:
doc_data = get_single_document_info(tenant, doc_id, api_token)
working_url = doc_data.get('direkteUrl') if doc_data else None
else:
# πŸš€ For other-documents β†’ just use the API-provided link directly
working_url = (
doc.get("url")
or (doc.get("dokumenter")[0]["url"] if doc.get("dokumenter") else None)
or doc.get("direkteUrl")
)
processed_docs.append({
"id": doc_id,
"name": doc_name,
"type": doc.get("dokumenttype") or doc.get("type"),
"date": doc.get("dokumentdato") or doc.get("date"),
"link": working_url,
"source":doc,
"status": "success" if working_url else "failed"
})
return processed_docs
def comprehensive_resolve_pipeline(step1_url: str, api_token: Optional[str] = None, debug_mode: bool = True):
"""Complete resolution pipeline with debugging"""
print("πŸš€ COMPREHENSIVE AREALPLANER RESOLVER")
print("=" * 80)
api_token = (api_token or API_TOKEN or "").strip()
if not api_token:
print("⚠️ Warning: No API token provided")
try:
# Parse URL
tenant, kommunenummer, planident = parse_step1_url(step1_url)
# Get internal plan ID
internal_id = get_internal_plan_id(tenant, kommunenummer, planident, api_token)
print(f"Internal Plan ID: {internal_id}")
# Get documents
docs = get_documents(tenant, internal_id, api_token)
print(f"Found {len(docs)} documents:")
for i, doc in enumerate(docs):
print(f" {i+1}. {doc.get('dokumentnavn')} (ID: {doc.get('id')}, Type: {doc.get('dokumenttype')})")
# Split documents
rule_doc, planning_docs = split_rulebook_and_planning(docs)
# Prepare result structure
result = {
"inputs": {
"tenant": tenant,
"kommunenummer": kommunenummer,
"planidentifikasjon": planident,
"internal_plan_id": internal_id,
"step1_url": step1_url,
},
"rule_book": None,
"planning_documents": [],
"debug_info": {} if debug_mode else None
}
# Process rule book
if rule_doc:
doc_id = rule_doc.get('id')
doc_name = rule_doc.get('dokumentnavn')
print(f"\n{'='*100}")
print(f"PROCESSING RULE BOOK: {doc_name}")
print(f"{'='*100}")
if debug_mode:
working_url = find_working_pdf_url(tenant, doc_id, doc_name, api_token)
else:
# Simple mode - just get direkteUrl
doc_data = get_single_document_info(tenant, doc_id, api_token)
working_url = doc_data.get('direkteUrl') if doc_data else None
result["rule_book"] = {
"id": doc_id,
"name": doc_name,
"type": rule_doc.get("dokumenttype"),
"date": rule_doc.get("dokumentdato"),
"link": working_url,
"status": "success" if working_url else "failed"
}
# Process planning documents
result["planning_documents"] = process_document_list(planning_docs, tenant, api_token, debug_mode, "planning document")
print(f"\n{'='*100}")
print("πŸ“Š RESOLUTION SUMMARY")
print(f"{'='*100}")
print(f"Rule book: {'βœ… SUCCESS' if result['rule_book'] and result['rule_book']['link'] else '❌ FAILED'}")
print(f"Planning documents: {sum(1 for doc in result['planning_documents'] if doc['link'])} / {len(result['planning_documents'])} successful")
return result
except Exception as e:
print(f"❌ Pipeline error: {e}")
raise
def get_plans_by_coordinates(tenant: str, knr: str, gnr: str, bnr: str, api_token: str):
"""Get plans by property coordinates (knr, gnr, bnr)"""
url = f"https://api.arealplaner.no/api/kunder/{tenant}/arealplaner"
params = {"knr": knr, "gnr": gnr, "bnr": bnr, "page": "1"}
print(f"Fetching plans by coordinates from: {url}")
print(f"Parameters: knr={knr}, gnr={gnr}, bnr={bnr}")
try:
r = requests.get(url, headers={"x-waapi-token": api_token}, params=params, timeout=HTTP_TIMEOUT)
if r.status_code == 401:
raise RuntimeError("Unauthorized: invalid x-waapi-token")
r.raise_for_status()
print(f"Plans response status: {r.status_code}")
data = r.json()
print(f"Plans count: {len(data) if isinstance(data, list) else 'Not a list'}")
if not isinstance(data, list):
raise RuntimeError("Plans response not a list")
# Log all found plans
for i, plan in enumerate(data):
print(f" {i+1}. {plan.get('planNavn')} (ID: {plan.get('id')}, Type: {plan.get('planType')})")
return data
except Exception as e:
print(f"Error getting plans by coordinates: {e}")
raise
def find_kommuneplanens_arealdel(plans: List[Dict]) -> Optional[Dict]:
"""Find the Kommuneplanens arealdel plan from the list"""
print("Looking for 'Kommuneplanens arealdel' plan...")
for plan in plans:
plan_type = plan.get('planType', '').strip()
if plan_type == 'Kommuneplanens arealdel':
print(f"βœ… Found Kommuneplanens arealdel: {plan.get('planNavn')} (ID: {plan.get('id')})")
return plan
print("❌ No 'Kommuneplanens arealdel' plan found")
return None
def parse_coordinates_url(url: str):
"""Parse URL to extract tenant and coordinates parameters"""
p = urlparse(url)
parts = [s for s in p.path.split("/") if s]
if not parts:
raise ValueError("Invalid coordinates URL path")
tenant = parts[0]
print(f"Parsed URL - Tenant: {tenant}")
if not tenant:
raise ValueError("Missing tenant in URL")
return tenant
def comprehensive_resolve_kommuneplanens(coordinates_url: str, knr: str, gnr: str, bnr: str, api_token: Optional[str] = None, debug_mode: bool = True):
"""Complete resolution pipeline for Kommuneplanens arealdel documents"""
print("πŸš€ COMPREHENSIVE KOMMUNEPLANENS RESOLVER")
print("=" * 80)
print(f"Coordinates: knr={knr}, gnr={gnr}, bnr={bnr}")
api_token = (api_token or API_TOKEN or "").strip()
if not api_token:
print("⚠️ Warning: No API token provided")
try:
# Parse URL to get tenant
tenant = parse_coordinates_url(coordinates_url)
# Get plans by coordinates
plans = get_plans_by_coordinates(tenant, knr, gnr, bnr, api_token)
# Find Kommuneplanens arealdel
kommuneplan = find_kommuneplanens_arealdel(plans)
if not kommuneplan:
raise RuntimeError("No 'Kommuneplanens arealdel' plan found for the given coordinates")
plan_id = kommuneplan.get('id')
plan_name = kommuneplan.get('planNavn')
print(f"Using plan: {plan_name} (ID: {plan_id})")
# Get documents for the kommuneplan
docs = get_documents(tenant, plan_id, api_token)
print(f"Found {len(docs)} documents for kommuneplan:")
for i, doc in enumerate(docs):
print(f" {i+1}. {doc.get('dokumentnavn')} (ID: {doc.get('id')}, Type: {doc.get('dokumenttype')})")
# Split documents (same logic as regular plans)
rule_doc, planning_docs = split_rulebook_and_planning(docs)
# Prepare result structure
result = {
"inputs": {
"tenant": tenant,
"knr": knr,
"gnr": gnr,
"bnr": bnr,
"coordinates_url": coordinates_url,
"kommuneplan_id": plan_id,
"kommuneplan_name": plan_name,
},
"kommuneplan_info": {
"id": plan_id,
"name": plan_name,
"type": kommuneplan.get('planType'),
"status": kommuneplan.get('planStatus'),
"iKraft": kommuneplan.get('iKraft'),
},
"rule_book": None,
"planning_documents": [],
"debug_info": {} if debug_mode else None
}
# Process rule book
if rule_doc:
doc_id = rule_doc.get('id')
doc_name = rule_doc.get('dokumentnavn')
print(f"\n{'='*100}")
print(f"PROCESSING KOMMUNEPLAN RULE BOOK: {doc_name}")
print(f"{'='*100}")
if debug_mode:
working_url = find_working_pdf_url(tenant, doc_id, doc_name, api_token)
else:
# Simple mode - just get direkteUrl
doc_data = get_single_document_info(tenant, doc_id, api_token)
working_url = doc_data.get('direkteUrl') if doc_data else None
result["rule_book"] = {
"id": doc_id,
"name": doc_name,
"type": rule_doc.get("dokumenttype"),
"date": rule_doc.get("dokumentdato"),
"link": working_url,
"status": "success" if working_url else "failed"
}
# Process planning documents
result["planning_documents"] = process_document_list(planning_docs, tenant, api_token, debug_mode, "kommuneplan document")
print(f"\n{'='*100}")
print("πŸ“Š KOMMUNEPLANENS RESOLUTION SUMMARY")
print(f"{'='*100}")
print(f"Kommuneplan: {plan_name}")
print(f"Rule book: {'βœ… SUCCESS' if result['rule_book'] and result['rule_book']['link'] else '❌ FAILED'}")
print(f"Planning documents: {sum(1 for doc in result['planning_documents'] if doc['link'])} / {len(result['planning_documents'])} successful")
return result
except Exception as e:
print(f"❌ Kommuneplanens pipeline error: {e}")
raise
def comprehensive_resolve_other_documents(step1_url: str, api_token: Optional[str] = None, debug_mode: bool = True):
"""Complete resolution pipeline for other documents (treatments and exemptions)"""
print("πŸš€ COMPREHENSIVE AREALPLANER OTHER DOCUMENTS RESOLVER")
print("=" * 80)
api_token = (api_token or API_TOKEN or "").strip()
if not api_token:
print("⚠️ Warning: No API token provided")
try:
# Parse URL
tenant, kommunenummer, planident = parse_step1_url(step1_url)
# Get internal plan ID
internal_id = get_internal_plan_id(tenant, kommunenummer, planident, api_token)
print(f"Internal Plan ID: {internal_id}")
# Get planning treatments
try:
treatment_docs = get_planning_treatments(tenant, internal_id, api_token)
print(f"Found {len(treatment_docs)} treatment documents")
except Exception as e:
print(f"Error getting planning treatments: {e}")
treatment_docs = []
# Get exemptions
try:
exemption_docs = get_exemptions(tenant, internal_id, api_token)
print(f"Found {len(exemption_docs)} exemption documents")
except Exception as e:
print(f"Error getting exemptions: {e}")
exemption_docs = []
# Prepare result structure
result = {
"inputs": {
"tenant": tenant,
"kommunenummer": kommunenummer,
"planidentifikasjon": planident,
"internal_plan_id": internal_id,
"step1_url": step1_url,
},
"planning_treatments": [],
"exemptions": [],
"debug_info": {} if debug_mode else None
}
# Process planning treatments
result["planning_treatments"] = process_document_list(treatment_docs, tenant, api_token, debug_mode, "planning treatment", use_blob=False)
# Process exemptions
result["exemptions"] = process_document_list(exemption_docs, tenant, api_token, debug_mode, "exemption", use_blob=False)
print(f"\n{'='*100}")
print("πŸ“Š OTHER DOCUMENTS RESOLUTION SUMMARY")
print(f"{'='*100}")
print(f"Planning treatments: {sum(1 for doc in result['planning_treatments'] if doc['link'])} / {len(result['planning_treatments'])} successful")
print(f"Exemptions: {sum(1 for doc in result['exemptions'] if doc['link'])} / {len(result['exemptions'])} successful")
return result
except Exception as e:
print(f"❌ Other documents pipeline error: {e}")
raise
# FastAPI Application
class ResolveRequest(BaseModel):
step1_url: str
api_token: Optional[str] = None
debug_mode: Optional[bool] = True
class KommuneplansRequest(BaseModel):
coordinates_url: str
knr: str
gnr: str
bnr: str
api_token: Optional[str] = None
debug_mode: Optional[bool] = True
class QuickTestRequest(BaseModel):
blob_url: str
app = FastAPI(title="Arealplaner Blob URL Resolver with Debugging")
@app.get("/")
def root():
return {
"ok": True,
"endpoints": {
"health": "/health",
"docs": "/docs",
"resolve": {"POST": "/resolve"},
"resolve-simple": {"POST": "/resolve-simple"},
"other-documents": {"POST": "/other-documents"},
"kommuneplanens": {"POST": "/kommuneplanens"},
"test-blob": {"POST": "/test-blob"},
"debug": {"POST": "/debug"}
}
}
@app.get("/health")
def health():
return {"ok": True}
@app.post("/resolve")
def resolve_with_debug(req: ResolveRequest):
"""Full resolution with debugging"""
try:
result = comprehensive_resolve_pipeline(req.step1_url, req.api_token, debug_mode=True)
result = fix_failed_documents(result, req.api_token) # ADD THIS LINE
return result
except Exception as e:
print(f"Error in resolve endpoint: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/resolve-simple")
def resolve_simple(req: ResolveRequest):
"""Simple resolution without extensive debugging"""
try:
return comprehensive_resolve_pipeline(req.step1_url, req.api_token, debug_mode=False)
except Exception as e:
print(f"Error in resolve-simple endpoint: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/other-documents")
def resolve_other_documents(req: ResolveRequest):
"""Resolve other documents (planning treatments and exemptions)"""
try:
return comprehensive_resolve_other_documents(req.step1_url, req.api_token, req.debug_mode)
except Exception as e:
print(f"Error in other-documents endpoint: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/kommuneplanens")
def resolve_kommuneplanens(req: KommuneplansRequest):
"""Resolve Kommuneplanens arealdel documents by property coordinates"""
try:
return comprehensive_resolve_kommuneplanens(
req.coordinates_url,
req.knr,
req.gnr,
req.bnr,
req.api_token,
req.debug_mode
)
except Exception as e:
print(f"Error in kommuneplanens endpoint: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/test-blob")
def test_blob_url_endpoint(req: QuickTestRequest):
"""Quick test of a specific blob URL"""
try:
result = debug_blob_url_content(req.blob_url, "Test Document")
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/debug")
def debug_specific_document(req: ResolveRequest):
"""Debug a specific plan - shows detailed information"""
try:
return comprehensive_resolve_pipeline(req.step1_url, req.api_token, debug_mode=True)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
# Test directly
test_url = "https://www.arealplaner.no/asker3203/gi?funksjon=VisPlan&kommunenummer=3203&planidentifikasjon=0627191"
print("πŸ§ͺ RUNNING DIRECT TEST")
print("=" * 80)
try:
result = comprehensive_resolve_pipeline(test_url, API_TOKEN, debug_mode=True)
print("\n" + "="*100)
print("πŸ“‹ FINAL RESULTS")
print("="*100)
if result['rule_book']:
rb = result['rule_book']
print(f"Rule Book: {rb['name']} - {'βœ…' if rb['link'] else '❌'}")
if rb['link']:
print(f" URL: {rb['link']}")
for doc in result['planning_documents']:
print(f"Planning Doc: {doc['name']} - {'βœ…' if doc['link'] else '❌'}")
if doc['link']:
print(f" URL: {doc['link']}")
# Test kommuneplanens
print("\n" + "="*100)
print("πŸ§ͺ TESTING KOMMUNEPLANENS")
print("="*100)
test_koordinat_url = "https://www.arealplaner.no/asker3203/gi?funksjon=VisPlan&kommunenummer=3203&planidentifikasjon=022077"
kommuneplan_result = comprehensive_resolve_kommuneplanens(
test_koordinat_url, "3203", "114", "426", API_TOKEN, debug_mode=True
)
print("\nπŸ“‹ KOMMUNEPLANENS RESULTS")
print("="*60)
if kommuneplan_result['rule_book']:
rb = kommuneplan_result['rule_book']
print(f"Kommuneplan Rule Book: {rb['name']} - {'βœ…' if rb['link'] else '❌'}")
if rb['link']:
print(f" URL: {rb['link']}")
for doc in kommuneplan_result['planning_documents']:
print(f"Kommuneplan Doc: {doc['name']} - {'βœ…' if doc['link'] else '❌'}")
if doc['link']:
print(f" URL: {doc['link']}")
except Exception as e:
print(f"❌ Test failed: {e}")
import traceback
traceback.print_exc()
# Start server
import uvicorn
port = int(os.getenv("PORT", "7860"))
uvicorn.run(app, host="0.0.0.0", port=port)