Spaces:
Sleeping
Sleeping
| import os, re, json, requests | |
| from urllib.parse import urlparse, parse_qs, urljoin | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| from typing import Optional, Dict, List, Any | |
| import logging | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| API_TOKEN = os.getenv("AREALPLANER_API_TOKEN", "") | |
| HTTP_TIMEOUT = int(os.getenv("HTTP_TIMEOUT", "30")) | |
| BLOB_RE = re.compile(r'https://[a-z0-9.-]*blob\.core\.windows\.net/[^\s"\'<>]+\.pdf[^\s"\'<>]*', re.IGNORECASE) | |
| def _std_headers(): | |
| return { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "Accept": "application/pdf,text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Referer": "https://www.arealplaner.no/", | |
| "Accept-Language": "en-US,en;q=0.9,nb;q=0.8", | |
| } | |
| def debug_blob_url_content(blob_url: str, doc_name: str = "Unknown") -> Dict[str, Any]: | |
| """Debug what's actually being returned by the blob URL""" | |
| print(f"\n{'='*80}") | |
| print(f"π DEBUGGING BLOB URL CONTENT") | |
| print(f"Document: {doc_name}") | |
| print(f"URL: {blob_url}") | |
| print(f"{'='*80}") | |
| # Parse URL to check SAS token expiration | |
| parsed = urlparse(blob_url) | |
| query_params = parse_qs(parsed.query) | |
| print(f"\nπ URL ANALYSIS:") | |
| print(f" Host: {parsed.netloc}") | |
| print(f" Path: {parsed.path}") | |
| print(f" SAS Version (sv): {query_params.get('sv', ['Not found'])[0]}") | |
| print(f" Expiry (se): {query_params.get('se', ['Not found'])[0]}") | |
| print(f" Resource (sr): {query_params.get('sr', ['Not found'])[0]}") | |
| print(f" Permissions (sp): {query_params.get('sp', ['Not found'])[0]}") | |
| # Test HEAD request | |
| print(f"\nπ HEAD REQUEST TEST:") | |
| try: | |
| head_response = requests.head(blob_url, timeout=HTTP_TIMEOUT) | |
| print(f" Status Code: {head_response.status_code}") | |
| print(f" Headers:") | |
| for key, value in head_response.headers.items(): | |
| print(f" {key}: {value}") | |
| if head_response.status_code != 200: | |
| print(f" β HEAD request failed with status {head_response.status_code}") | |
| return { | |
| "status": "head_failed", | |
| "status_code": head_response.status_code, | |
| "headers": dict(head_response.headers) | |
| } | |
| except Exception as e: | |
| print(f" β HEAD request error: {e}") | |
| return {"status": "head_error", "error": str(e)} | |
| # Test partial GET request (first 1KB) | |
| print(f"\nπ PARTIAL GET REQUEST TEST (first 1KB):") | |
| try: | |
| headers = {"Range": "bytes=0-1023"} | |
| partial_response = requests.get(blob_url, headers=headers, timeout=HTTP_TIMEOUT) | |
| print(f" Status Code: {partial_response.status_code}") | |
| print(f" Content-Length: {len(partial_response.content)} bytes") | |
| # Check if it's a PDF | |
| content = partial_response.content | |
| first_bytes = content[:10] | |
| print(f" First 10 bytes (hex): {first_bytes.hex()}") | |
| print(f" First 10 bytes (ascii): {repr(first_bytes)}") | |
| is_pdf = content.startswith(b'%PDF-') | |
| is_html = b'<html' in content.lower() or b'<!doctype' in content.lower() | |
| is_xml = content.startswith(b'<?xml') or content.startswith(b'<') | |
| print(f" Is PDF: {'β YES' if is_pdf else 'β NO'}") | |
| print(f" Is HTML: {'β οΈ YES' if is_html else 'β NO'}") | |
| print(f" Is XML: {'β οΈ YES' if is_xml else 'β NO'}") | |
| # If it's text content, show some of it | |
| if is_html or is_xml or not is_pdf: | |
| try: | |
| text_content = content.decode('utf-8', errors='ignore')[:500] | |
| print(f" Content Preview:") | |
| print(f" {repr(text_content)}") | |
| except: | |
| print(f" Content Preview: (binary data)") | |
| return { | |
| "status": "success", | |
| "content_type": partial_response.headers.get('Content-Type', 'Unknown'), | |
| "content_length": len(content), | |
| "is_pdf": is_pdf, | |
| "is_html": is_html, | |
| "is_xml": is_xml, | |
| "first_bytes": first_bytes.hex(), | |
| "content_preview": content.decode('utf-8', errors='ignore')[:200] if not is_pdf else "PDF content" | |
| } | |
| except Exception as e: | |
| print(f" β Partial GET request error: {e}") | |
| return {"status": "get_error", "error": str(e)} | |
| def test_alternative_pdf_access(tenant: str, doc_id: int, api_token: str) -> List[Dict[str, Any]]: | |
| """Try alternative ways to access the PDF""" | |
| print(f"\n{'='*60}") | |
| print(f"π§ TESTING ALTERNATIVE PDF ACCESS METHODS") | |
| print(f"Document ID: {doc_id}") | |
| print(f"{'='*60}") | |
| headers = _std_headers() | |
| if api_token: | |
| headers["x-waapi-token"] = api_token | |
| # Try different endpoints | |
| endpoints_to_try = [ | |
| f"https://www.arealplaner.no/{tenant}/dokumenter/{doc_id}", | |
| f"https://www.arealplaner.no/{tenant}/dokumenter/{doc_id}/download", | |
| f"https://api.arealplaner.no/api/kunder/{tenant}/dokumenter/{doc_id}/download", | |
| f"https://api.arealplaner.no/api/kunder/{tenant}/dokumenter/{doc_id}/fil", | |
| f"https://api.arealplaner.no/api/kunder/{tenant}/dokumenter/{doc_id}/direkteurl", | |
| ] | |
| results = [] | |
| for endpoint in endpoints_to_try: | |
| print(f"\nπ Trying: {endpoint}") | |
| try: | |
| # Try HEAD first | |
| head_resp = requests.head(endpoint, headers=headers, allow_redirects=True, timeout=HTTP_TIMEOUT) | |
| print(f" HEAD Status: {head_resp.status_code}") | |
| print(f" Final URL: {head_resp.url}") | |
| print(f" Content-Type: {head_resp.headers.get('Content-Type', 'Unknown')}") | |
| # If it's a PDF or looks promising, try GET | |
| content_type = head_resp.headers.get('Content-Type', '').lower() | |
| if 'pdf' in content_type or head_resp.status_code == 200: | |
| # Try partial GET | |
| get_resp = requests.get(endpoint, headers={**headers, "Range": "bytes=0-1023"}, | |
| allow_redirects=True, timeout=HTTP_TIMEOUT) | |
| is_pdf = get_resp.content.startswith(b'%PDF-') | |
| print(f" GET Status: {get_resp.status_code}") | |
| print(f" Is PDF: {'β YES' if is_pdf else 'β NO'}") | |
| if is_pdf: | |
| print(f" π FOUND WORKING PDF ENDPOINT!") | |
| results.append({ | |
| "endpoint": endpoint, | |
| "status": "success", | |
| "final_url": get_resp.url, | |
| "content_type": get_resp.headers.get('Content-Type'), | |
| "is_pdf": True | |
| }) | |
| else: | |
| # Show what we got instead | |
| preview = get_resp.content[:200].decode('utf-8', errors='ignore') | |
| print(f" Content preview: {repr(preview)}") | |
| results.append({ | |
| "endpoint": endpoint, | |
| "status": "not_pdf", | |
| "content_preview": preview, | |
| "is_pdf": False | |
| }) | |
| else: | |
| results.append({ | |
| "endpoint": endpoint, | |
| "status": "failed", | |
| "status_code": head_resp.status_code, | |
| "is_pdf": False | |
| }) | |
| except Exception as e: | |
| print(f" β Error: {e}") | |
| results.append({ | |
| "endpoint": endpoint, | |
| "status": "error", | |
| "error": str(e), | |
| "is_pdf": False | |
| }) | |
| return results | |
| def parse_step1_url(url: str): | |
| """Parse the step 1 URL to extract tenant, kommune, and plan ID""" | |
| p = urlparse(url) | |
| parts = [s for s in p.path.split("/") if s] | |
| if not parts: | |
| raise ValueError("Invalid Step-1 URL path") | |
| tenant = parts[0] | |
| qs = parse_qs(p.query) | |
| kommunenummer = qs.get("kommunenummer", [None])[0] | |
| planident = qs.get("planidentifikasjon", [None])[0] | |
| print(f"Parsed URL - Tenant: {tenant}, Kommune: {kommunenummer}, Plan: {planident}") | |
| if not (tenant and kommunenummer and planident): | |
| raise ValueError("Missing tenant/kommunenummer/planidentifikasjon") | |
| return tenant, kommunenummer, planident | |
| def get_internal_plan_id(tenant: str, kommunenummer: str, planident: str, api_token: str): | |
| """Get the internal plan ID""" | |
| url = f"https://api.arealplaner.no/api/gi/kunder/{tenant}/arealplaner/id/{kommunenummer}/{planident}" | |
| print(f"Fetching internal plan ID from: {url}") | |
| try: | |
| r = requests.get(url, params={"apitoken": api_token}, timeout=HTTP_TIMEOUT) | |
| if r.status_code == 401: | |
| r = requests.get(url, headers={"x-waapi-token": api_token}, timeout=HTTP_TIMEOUT) | |
| r.raise_for_status() | |
| print(f"Internal plan ID response status: {r.status_code}") | |
| print(f"Internal plan ID response text: {r.text}") | |
| # Try to parse as JSON first, then as plain text | |
| try: | |
| data = r.json() | |
| print(f"Internal plan ID JSON data: {data}") | |
| if isinstance(data, int): | |
| return data | |
| if isinstance(data, dict): | |
| for k in ("id", "planId", "arealplanId"): | |
| v = data.get(k) | |
| if isinstance(v, int): | |
| return v | |
| except json.JSONDecodeError: | |
| pass | |
| # Try as plain text | |
| txt = r.text.strip() | |
| if txt.isdigit(): | |
| return int(txt) | |
| raise RuntimeError(f"Unexpected internal plan id response: {txt[:200]}") | |
| except Exception as e: | |
| print(f"Error getting internal plan ID: {e}") | |
| raise | |
| def get_documents(tenant: str, internal_plan_id: int, api_token: str): | |
| """Get the list of documents for a plan""" | |
| url = f"https://api.arealplaner.no/api/kunder/{tenant}/arealplaner/{internal_plan_id}/dokumenter" | |
| print(f"Fetching documents from: {url}") | |
| try: | |
| r = requests.get(url, headers={"x-waapi-token": api_token}, timeout=HTTP_TIMEOUT) | |
| if r.status_code == 401: | |
| raise RuntimeError("Unauthorized: invalid x-waapi-token") | |
| r.raise_for_status() | |
| print(f"Documents response status: {r.status_code}") | |
| data = r.json() | |
| print(f"Documents count: {len(data) if isinstance(data, list) else 'Not a list'}") | |
| if not isinstance(data, list): | |
| raise RuntimeError("Documents response not a list") | |
| return data | |
| except Exception as e: | |
| print(f"Error getting documents: {e}") | |
| raise | |
| def get_planning_treatments(tenant: str, internal_plan_id: int, api_token: str): | |
| """Get the list of planning treatments for a plan""" | |
| url = f"https://api.arealplaner.no/api/kunder/{tenant}/arealplaner/{internal_plan_id}/behandlinger?includeDokumenter=true" | |
| print(f"Fetching planning treatments from: {url}") | |
| try: | |
| r = requests.get(url, headers={"x-waapi-token": api_token}, timeout=HTTP_TIMEOUT) | |
| if r.status_code == 401: | |
| raise RuntimeError("Unauthorized: invalid x-waapi-token") | |
| r.raise_for_status() | |
| print(f"Planning treatments response status: {r.status_code}") | |
| data = r.json() | |
| print(f"Planning treatments count: {len(data) if isinstance(data, list) else 'Not a list'}") | |
| if not isinstance(data, list): | |
| raise RuntimeError("Planning treatments response not a list") | |
| # Extract documents from treatments | |
| treatment_docs = [] | |
| for treatment in data: | |
| if isinstance(treatment, dict) and 'dokumenter' in treatment: | |
| docs = treatment.get('dokumenter', []) | |
| if isinstance(docs, list): | |
| treatment_docs.extend(docs) | |
| print(f"Total treatment documents found: {len(treatment_docs)}") | |
| return treatment_docs | |
| except Exception as e: | |
| print(f"Error getting planning treatments: {e}") | |
| raise | |
| def get_exemptions(tenant: str, internal_plan_id: int, api_token: str): | |
| """Get the list of exemptions for a plan""" | |
| url = f"https://api.arealplaner.no/api/kunder/{tenant}/arealplaner/{internal_plan_id}/dispensasjoner" | |
| print(f"Fetching exemptions from: {url}") | |
| try: | |
| r = requests.get(url, headers={"x-waapi-token": api_token}, timeout=HTTP_TIMEOUT) | |
| if r.status_code == 401: | |
| raise RuntimeError("Unauthorized: invalid x-waapi-token") | |
| r.raise_for_status() | |
| print(f"Exemptions response status: {r.status_code}") | |
| data = r.json() | |
| print(f"Exemptions count: {len(data) if isinstance(data, list) else 'Not a list'}") | |
| if not isinstance(data, list): | |
| raise RuntimeError("Exemptions response not a list") | |
| return data | |
| except Exception as e: | |
| print(f"Error getting exemptions: {e}") | |
| raise | |
| def get_single_document_info(tenant: str, doc_id: int, api_token: str) -> Optional[Dict[str, Any]]: | |
| """Get individual document info with validation""" | |
| url = f"https://api.arealplaner.no/api/kunder/{tenant}/dokumenter/{doc_id}" | |
| headers = _std_headers() | |
| if api_token: | |
| headers["x-waapi-token"] = api_token | |
| try: | |
| print(f"Fetching document info for ID {doc_id}") | |
| r = requests.get(url, headers=headers, timeout=HTTP_TIMEOUT) | |
| r.raise_for_status() | |
| data = r.json() | |
| if isinstance(data, dict) and data.get('id') == doc_id: | |
| print(f"Successfully fetched document {doc_id}: {data.get('dokumentnavn')}") | |
| return data | |
| else: | |
| print(f"ERROR: API returned wrong document (expected ID {doc_id}, got {data.get('id') if isinstance(data, dict) else 'invalid response'})") | |
| return None | |
| except Exception as e: | |
| print(f"Failed to fetch document {doc_id}: {e}") | |
| return None | |
| def find_working_pdf_url(tenant: str, doc_id: int, doc_name: str, api_token: str) -> Optional[str]: | |
| """Find a working PDF URL for a document""" | |
| print(f"\n{'='*80}") | |
| print(f"π FINDING WORKING PDF URL") | |
| print(f"Document: {doc_name} (ID: {doc_id})") | |
| print(f"{'='*80}") | |
| # Step 1: Get fresh document info | |
| doc_data = get_single_document_info(tenant, doc_id, api_token) | |
| if not doc_data: | |
| return None | |
| # Step 2: Test direkteUrl if available | |
| direkte_url = doc_data.get('direkteUrl') | |
| if direkte_url: | |
| print(f"\nπ Testing direkteUrl: {direkte_url}") | |
| blob_debug = debug_blob_url_content(direkte_url, doc_name) | |
| if blob_debug.get('status') == 'success' and blob_debug.get('is_pdf'): | |
| print(f"β direkteUrl works!") | |
| return direkte_url | |
| else: | |
| print(f"β direkteUrl doesn't work: {blob_debug.get('status')}") | |
| # Step 3: Try alternative access methods | |
| print(f"\nπ Testing alternative access methods...") | |
| alt_results = test_alternative_pdf_access(tenant, doc_id, api_token) | |
| # Find the first working PDF endpoint | |
| for result in alt_results: | |
| if result.get('status') == 'success' and result.get('is_pdf'): | |
| working_url = result.get('final_url') | |
| print(f"β Found working PDF URL: {working_url}") | |
| return working_url | |
| print(f"β No working PDF URL found for document {doc_name}") | |
| return None | |
| def split_rulebook_and_planning(docs): | |
| """Split documents into rulebook and planning documents""" | |
| print("Splitting documents into rulebook and planning documents") | |
| bestem = [d for d in docs if (d.get("dokumenttype") or "").lower() == "bestemmelser" or d.get("dokumenttypeId") == 5] | |
| print(f"Found {len(bestem)} bestemmelser documents") | |
| rule_doc = sorted(bestem, key=lambda d: (d.get("dokumentdato") or ""), reverse=True)[0] if bestem else (docs[0] if docs else None) | |
| planning_docs = [d for d in docs if d is not rule_doc] | |
| print(f"Rule document: {rule_doc.get('dokumentnavn') if rule_doc else 'None'}") | |
| print(f"Planning documents: {len(planning_docs)}") | |
| return rule_doc, planning_docs | |
| def fix_failed_documents(result: Dict, api_token: str) -> Dict: | |
| """ | |
| Post-process result to fix any failed documents by getting direkteUrl from API | |
| """ | |
| tenant = result["inputs"]["tenant"] | |
| # Fix rule book if failed | |
| if result.get("rule_book") and result["rule_book"]["status"] == "failed" and result["rule_book"]["link"] is None: | |
| doc_id = result["rule_book"]["id"] | |
| try: | |
| url = f"https://api.arealplaner.no/api/kunder/{tenant}/dokumenter/{doc_id}" | |
| headers = {"x-waapi-token": api_token, "Accept": "application/json"} | |
| response = requests.get(url, headers=headers, timeout=HTTP_TIMEOUT) | |
| if response.status_code == 200: | |
| data = response.json() | |
| direkte_url = data.get('direkteUrl') | |
| if direkte_url: | |
| result["rule_book"]["link"] = direkte_url | |
| result["rule_book"]["status"] = "success" | |
| except: | |
| pass # Keep original failed status | |
| # Fix planning documents if failed | |
| for i, doc in enumerate(result["planning_documents"]): | |
| if doc["status"] == "failed" and doc["link"] is None: | |
| doc_id = doc["id"] | |
| try: | |
| url = f"https://api.arealplaner.no/api/kunder/{tenant}/dokumenter/{doc_id}" | |
| headers = {"x-waapi-token": api_token, "Accept": "application/json"} | |
| response = requests.get(url, headers=headers, timeout=HTTP_TIMEOUT) | |
| if response.status_code == 200: | |
| data = response.json() | |
| direkte_url = data.get('direkteUrl') | |
| if direkte_url: | |
| result["planning_documents"][i]["link"] = direkte_url | |
| result["planning_documents"][i]["status"] = "success" | |
| except: | |
| pass # Keep original failed status | |
| return result | |
| def process_document_list( | |
| docs: List[Dict], | |
| tenant: str, | |
| api_token: str, | |
| debug_mode: bool, | |
| doc_type: str = "document", | |
| use_blob: bool = True | |
| ) -> List[Dict[str, Any]]: | |
| """Process a list of documents and resolve their PDF URLs""" | |
| processed_docs = [] | |
| for i, doc in enumerate(docs): | |
| doc_id = doc.get('id') | |
| doc_name = doc.get('dokumentnavn') or doc.get('name', f"Unknown {doc_type}") | |
| if not doc_id: | |
| print(f"β οΈ Skipping {doc_type} without ID: {doc_name}") | |
| continue | |
| print(f"\n{'='*100}") | |
| print(f"PROCESSING {doc_type.upper()} {i+1}/{len(docs)}: {doc_name}") | |
| print(f"{'='*100}") | |
| working_url = None | |
| if use_blob: | |
| # Normal blob resolution (rule books / planning docs) | |
| if debug_mode: | |
| working_url = find_working_pdf_url(tenant, doc_id, doc_name, api_token) | |
| else: | |
| doc_data = get_single_document_info(tenant, doc_id, api_token) | |
| working_url = doc_data.get('direkteUrl') if doc_data else None | |
| else: | |
| # π For other-documents β just use the API-provided link directly | |
| working_url = ( | |
| doc.get("url") | |
| or (doc.get("dokumenter")[0]["url"] if doc.get("dokumenter") else None) | |
| or doc.get("direkteUrl") | |
| ) | |
| processed_docs.append({ | |
| "id": doc_id, | |
| "name": doc_name, | |
| "type": doc.get("dokumenttype") or doc.get("type"), | |
| "date": doc.get("dokumentdato") or doc.get("date"), | |
| "link": working_url, | |
| "source":doc, | |
| "status": "success" if working_url else "failed" | |
| }) | |
| return processed_docs | |
| def comprehensive_resolve_pipeline(step1_url: str, api_token: Optional[str] = None, debug_mode: bool = True): | |
| """Complete resolution pipeline with debugging""" | |
| print("π COMPREHENSIVE AREALPLANER RESOLVER") | |
| print("=" * 80) | |
| api_token = (api_token or API_TOKEN or "").strip() | |
| if not api_token: | |
| print("β οΈ Warning: No API token provided") | |
| try: | |
| # Parse URL | |
| tenant, kommunenummer, planident = parse_step1_url(step1_url) | |
| # Get internal plan ID | |
| internal_id = get_internal_plan_id(tenant, kommunenummer, planident, api_token) | |
| print(f"Internal Plan ID: {internal_id}") | |
| # Get documents | |
| docs = get_documents(tenant, internal_id, api_token) | |
| print(f"Found {len(docs)} documents:") | |
| for i, doc in enumerate(docs): | |
| print(f" {i+1}. {doc.get('dokumentnavn')} (ID: {doc.get('id')}, Type: {doc.get('dokumenttype')})") | |
| # Split documents | |
| rule_doc, planning_docs = split_rulebook_and_planning(docs) | |
| # Prepare result structure | |
| result = { | |
| "inputs": { | |
| "tenant": tenant, | |
| "kommunenummer": kommunenummer, | |
| "planidentifikasjon": planident, | |
| "internal_plan_id": internal_id, | |
| "step1_url": step1_url, | |
| }, | |
| "rule_book": None, | |
| "planning_documents": [], | |
| "debug_info": {} if debug_mode else None | |
| } | |
| # Process rule book | |
| if rule_doc: | |
| doc_id = rule_doc.get('id') | |
| doc_name = rule_doc.get('dokumentnavn') | |
| print(f"\n{'='*100}") | |
| print(f"PROCESSING RULE BOOK: {doc_name}") | |
| print(f"{'='*100}") | |
| if debug_mode: | |
| working_url = find_working_pdf_url(tenant, doc_id, doc_name, api_token) | |
| else: | |
| # Simple mode - just get direkteUrl | |
| doc_data = get_single_document_info(tenant, doc_id, api_token) | |
| working_url = doc_data.get('direkteUrl') if doc_data else None | |
| result["rule_book"] = { | |
| "id": doc_id, | |
| "name": doc_name, | |
| "type": rule_doc.get("dokumenttype"), | |
| "date": rule_doc.get("dokumentdato"), | |
| "link": working_url, | |
| "status": "success" if working_url else "failed" | |
| } | |
| # Process planning documents | |
| result["planning_documents"] = process_document_list(planning_docs, tenant, api_token, debug_mode, "planning document") | |
| print(f"\n{'='*100}") | |
| print("π RESOLUTION SUMMARY") | |
| print(f"{'='*100}") | |
| print(f"Rule book: {'β SUCCESS' if result['rule_book'] and result['rule_book']['link'] else 'β FAILED'}") | |
| print(f"Planning documents: {sum(1 for doc in result['planning_documents'] if doc['link'])} / {len(result['planning_documents'])} successful") | |
| return result | |
| except Exception as e: | |
| print(f"β Pipeline error: {e}") | |
| raise | |
| def get_plans_by_coordinates(tenant: str, knr: str, gnr: str, bnr: str, api_token: str): | |
| """Get plans by property coordinates (knr, gnr, bnr)""" | |
| url = f"https://api.arealplaner.no/api/kunder/{tenant}/arealplaner" | |
| params = {"knr": knr, "gnr": gnr, "bnr": bnr, "page": "1"} | |
| print(f"Fetching plans by coordinates from: {url}") | |
| print(f"Parameters: knr={knr}, gnr={gnr}, bnr={bnr}") | |
| try: | |
| r = requests.get(url, headers={"x-waapi-token": api_token}, params=params, timeout=HTTP_TIMEOUT) | |
| if r.status_code == 401: | |
| raise RuntimeError("Unauthorized: invalid x-waapi-token") | |
| r.raise_for_status() | |
| print(f"Plans response status: {r.status_code}") | |
| data = r.json() | |
| print(f"Plans count: {len(data) if isinstance(data, list) else 'Not a list'}") | |
| if not isinstance(data, list): | |
| raise RuntimeError("Plans response not a list") | |
| # Log all found plans | |
| for i, plan in enumerate(data): | |
| print(f" {i+1}. {plan.get('planNavn')} (ID: {plan.get('id')}, Type: {plan.get('planType')})") | |
| return data | |
| except Exception as e: | |
| print(f"Error getting plans by coordinates: {e}") | |
| raise | |
| def find_kommuneplanens_arealdel(plans: List[Dict]) -> Optional[Dict]: | |
| """Find the Kommuneplanens arealdel plan from the list""" | |
| print("Looking for 'Kommuneplanens arealdel' plan...") | |
| for plan in plans: | |
| plan_type = plan.get('planType', '').strip() | |
| if plan_type == 'Kommuneplanens arealdel': | |
| print(f"β Found Kommuneplanens arealdel: {plan.get('planNavn')} (ID: {plan.get('id')})") | |
| return plan | |
| print("β No 'Kommuneplanens arealdel' plan found") | |
| return None | |
| def parse_coordinates_url(url: str): | |
| """Parse URL to extract tenant and coordinates parameters""" | |
| p = urlparse(url) | |
| parts = [s for s in p.path.split("/") if s] | |
| if not parts: | |
| raise ValueError("Invalid coordinates URL path") | |
| tenant = parts[0] | |
| print(f"Parsed URL - Tenant: {tenant}") | |
| if not tenant: | |
| raise ValueError("Missing tenant in URL") | |
| return tenant | |
| def comprehensive_resolve_kommuneplanens(coordinates_url: str, knr: str, gnr: str, bnr: str, api_token: Optional[str] = None, debug_mode: bool = True): | |
| """Complete resolution pipeline for Kommuneplanens arealdel documents""" | |
| print("π COMPREHENSIVE KOMMUNEPLANENS RESOLVER") | |
| print("=" * 80) | |
| print(f"Coordinates: knr={knr}, gnr={gnr}, bnr={bnr}") | |
| api_token = (api_token or API_TOKEN or "").strip() | |
| if not api_token: | |
| print("β οΈ Warning: No API token provided") | |
| try: | |
| # Parse URL to get tenant | |
| tenant = parse_coordinates_url(coordinates_url) | |
| # Get plans by coordinates | |
| plans = get_plans_by_coordinates(tenant, knr, gnr, bnr, api_token) | |
| # Find Kommuneplanens arealdel | |
| kommuneplan = find_kommuneplanens_arealdel(plans) | |
| if not kommuneplan: | |
| raise RuntimeError("No 'Kommuneplanens arealdel' plan found for the given coordinates") | |
| plan_id = kommuneplan.get('id') | |
| plan_name = kommuneplan.get('planNavn') | |
| print(f"Using plan: {plan_name} (ID: {plan_id})") | |
| # Get documents for the kommuneplan | |
| docs = get_documents(tenant, plan_id, api_token) | |
| print(f"Found {len(docs)} documents for kommuneplan:") | |
| for i, doc in enumerate(docs): | |
| print(f" {i+1}. {doc.get('dokumentnavn')} (ID: {doc.get('id')}, Type: {doc.get('dokumenttype')})") | |
| # Split documents (same logic as regular plans) | |
| rule_doc, planning_docs = split_rulebook_and_planning(docs) | |
| # Prepare result structure | |
| result = { | |
| "inputs": { | |
| "tenant": tenant, | |
| "knr": knr, | |
| "gnr": gnr, | |
| "bnr": bnr, | |
| "coordinates_url": coordinates_url, | |
| "kommuneplan_id": plan_id, | |
| "kommuneplan_name": plan_name, | |
| }, | |
| "kommuneplan_info": { | |
| "id": plan_id, | |
| "name": plan_name, | |
| "type": kommuneplan.get('planType'), | |
| "status": kommuneplan.get('planStatus'), | |
| "iKraft": kommuneplan.get('iKraft'), | |
| }, | |
| "rule_book": None, | |
| "planning_documents": [], | |
| "debug_info": {} if debug_mode else None | |
| } | |
| # Process rule book | |
| if rule_doc: | |
| doc_id = rule_doc.get('id') | |
| doc_name = rule_doc.get('dokumentnavn') | |
| print(f"\n{'='*100}") | |
| print(f"PROCESSING KOMMUNEPLAN RULE BOOK: {doc_name}") | |
| print(f"{'='*100}") | |
| if debug_mode: | |
| working_url = find_working_pdf_url(tenant, doc_id, doc_name, api_token) | |
| else: | |
| # Simple mode - just get direkteUrl | |
| doc_data = get_single_document_info(tenant, doc_id, api_token) | |
| working_url = doc_data.get('direkteUrl') if doc_data else None | |
| result["rule_book"] = { | |
| "id": doc_id, | |
| "name": doc_name, | |
| "type": rule_doc.get("dokumenttype"), | |
| "date": rule_doc.get("dokumentdato"), | |
| "link": working_url, | |
| "status": "success" if working_url else "failed" | |
| } | |
| # Process planning documents | |
| result["planning_documents"] = process_document_list(planning_docs, tenant, api_token, debug_mode, "kommuneplan document") | |
| print(f"\n{'='*100}") | |
| print("π KOMMUNEPLANENS RESOLUTION SUMMARY") | |
| print(f"{'='*100}") | |
| print(f"Kommuneplan: {plan_name}") | |
| print(f"Rule book: {'β SUCCESS' if result['rule_book'] and result['rule_book']['link'] else 'β FAILED'}") | |
| print(f"Planning documents: {sum(1 for doc in result['planning_documents'] if doc['link'])} / {len(result['planning_documents'])} successful") | |
| return result | |
| except Exception as e: | |
| print(f"β Kommuneplanens pipeline error: {e}") | |
| raise | |
| def comprehensive_resolve_other_documents(step1_url: str, api_token: Optional[str] = None, debug_mode: bool = True): | |
| """Complete resolution pipeline for other documents (treatments and exemptions)""" | |
| print("π COMPREHENSIVE AREALPLANER OTHER DOCUMENTS RESOLVER") | |
| print("=" * 80) | |
| api_token = (api_token or API_TOKEN or "").strip() | |
| if not api_token: | |
| print("β οΈ Warning: No API token provided") | |
| try: | |
| # Parse URL | |
| tenant, kommunenummer, planident = parse_step1_url(step1_url) | |
| # Get internal plan ID | |
| internal_id = get_internal_plan_id(tenant, kommunenummer, planident, api_token) | |
| print(f"Internal Plan ID: {internal_id}") | |
| # Get planning treatments | |
| try: | |
| treatment_docs = get_planning_treatments(tenant, internal_id, api_token) | |
| print(f"Found {len(treatment_docs)} treatment documents") | |
| except Exception as e: | |
| print(f"Error getting planning treatments: {e}") | |
| treatment_docs = [] | |
| # Get exemptions | |
| try: | |
| exemption_docs = get_exemptions(tenant, internal_id, api_token) | |
| print(f"Found {len(exemption_docs)} exemption documents") | |
| except Exception as e: | |
| print(f"Error getting exemptions: {e}") | |
| exemption_docs = [] | |
| # Prepare result structure | |
| result = { | |
| "inputs": { | |
| "tenant": tenant, | |
| "kommunenummer": kommunenummer, | |
| "planidentifikasjon": planident, | |
| "internal_plan_id": internal_id, | |
| "step1_url": step1_url, | |
| }, | |
| "planning_treatments": [], | |
| "exemptions": [], | |
| "debug_info": {} if debug_mode else None | |
| } | |
| # Process planning treatments | |
| result["planning_treatments"] = process_document_list(treatment_docs, tenant, api_token, debug_mode, "planning treatment", use_blob=False) | |
| # Process exemptions | |
| result["exemptions"] = process_document_list(exemption_docs, tenant, api_token, debug_mode, "exemption", use_blob=False) | |
| print(f"\n{'='*100}") | |
| print("π OTHER DOCUMENTS RESOLUTION SUMMARY") | |
| print(f"{'='*100}") | |
| print(f"Planning treatments: {sum(1 for doc in result['planning_treatments'] if doc['link'])} / {len(result['planning_treatments'])} successful") | |
| print(f"Exemptions: {sum(1 for doc in result['exemptions'] if doc['link'])} / {len(result['exemptions'])} successful") | |
| return result | |
| except Exception as e: | |
| print(f"β Other documents pipeline error: {e}") | |
| raise | |
| # FastAPI Application | |
| class ResolveRequest(BaseModel): | |
| step1_url: str | |
| api_token: Optional[str] = None | |
| debug_mode: Optional[bool] = True | |
| class KommuneplansRequest(BaseModel): | |
| coordinates_url: str | |
| knr: str | |
| gnr: str | |
| bnr: str | |
| api_token: Optional[str] = None | |
| debug_mode: Optional[bool] = True | |
| class QuickTestRequest(BaseModel): | |
| blob_url: str | |
| app = FastAPI(title="Arealplaner Blob URL Resolver with Debugging") | |
| def root(): | |
| return { | |
| "ok": True, | |
| "endpoints": { | |
| "health": "/health", | |
| "docs": "/docs", | |
| "resolve": {"POST": "/resolve"}, | |
| "resolve-simple": {"POST": "/resolve-simple"}, | |
| "other-documents": {"POST": "/other-documents"}, | |
| "kommuneplanens": {"POST": "/kommuneplanens"}, | |
| "test-blob": {"POST": "/test-blob"}, | |
| "debug": {"POST": "/debug"} | |
| } | |
| } | |
| def health(): | |
| return {"ok": True} | |
| def resolve_with_debug(req: ResolveRequest): | |
| """Full resolution with debugging""" | |
| try: | |
| result = comprehensive_resolve_pipeline(req.step1_url, req.api_token, debug_mode=True) | |
| result = fix_failed_documents(result, req.api_token) # ADD THIS LINE | |
| return result | |
| except Exception as e: | |
| print(f"Error in resolve endpoint: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def resolve_simple(req: ResolveRequest): | |
| """Simple resolution without extensive debugging""" | |
| try: | |
| return comprehensive_resolve_pipeline(req.step1_url, req.api_token, debug_mode=False) | |
| except Exception as e: | |
| print(f"Error in resolve-simple endpoint: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def resolve_other_documents(req: ResolveRequest): | |
| """Resolve other documents (planning treatments and exemptions)""" | |
| try: | |
| return comprehensive_resolve_other_documents(req.step1_url, req.api_token, req.debug_mode) | |
| except Exception as e: | |
| print(f"Error in other-documents endpoint: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def resolve_kommuneplanens(req: KommuneplansRequest): | |
| """Resolve Kommuneplanens arealdel documents by property coordinates""" | |
| try: | |
| return comprehensive_resolve_kommuneplanens( | |
| req.coordinates_url, | |
| req.knr, | |
| req.gnr, | |
| req.bnr, | |
| req.api_token, | |
| req.debug_mode | |
| ) | |
| except Exception as e: | |
| print(f"Error in kommuneplanens endpoint: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def test_blob_url_endpoint(req: QuickTestRequest): | |
| """Quick test of a specific blob URL""" | |
| try: | |
| result = debug_blob_url_content(req.blob_url, "Test Document") | |
| return result | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def debug_specific_document(req: ResolveRequest): | |
| """Debug a specific plan - shows detailed information""" | |
| try: | |
| return comprehensive_resolve_pipeline(req.step1_url, req.api_token, debug_mode=True) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| # Test directly | |
| test_url = "https://www.arealplaner.no/asker3203/gi?funksjon=VisPlan&kommunenummer=3203&planidentifikasjon=0627191" | |
| print("π§ͺ RUNNING DIRECT TEST") | |
| print("=" * 80) | |
| try: | |
| result = comprehensive_resolve_pipeline(test_url, API_TOKEN, debug_mode=True) | |
| print("\n" + "="*100) | |
| print("π FINAL RESULTS") | |
| print("="*100) | |
| if result['rule_book']: | |
| rb = result['rule_book'] | |
| print(f"Rule Book: {rb['name']} - {'β ' if rb['link'] else 'β'}") | |
| if rb['link']: | |
| print(f" URL: {rb['link']}") | |
| for doc in result['planning_documents']: | |
| print(f"Planning Doc: {doc['name']} - {'β ' if doc['link'] else 'β'}") | |
| if doc['link']: | |
| print(f" URL: {doc['link']}") | |
| # Test kommuneplanens | |
| print("\n" + "="*100) | |
| print("π§ͺ TESTING KOMMUNEPLANENS") | |
| print("="*100) | |
| test_koordinat_url = "https://www.arealplaner.no/asker3203/gi?funksjon=VisPlan&kommunenummer=3203&planidentifikasjon=022077" | |
| kommuneplan_result = comprehensive_resolve_kommuneplanens( | |
| test_koordinat_url, "3203", "114", "426", API_TOKEN, debug_mode=True | |
| ) | |
| print("\nπ KOMMUNEPLANENS RESULTS") | |
| print("="*60) | |
| if kommuneplan_result['rule_book']: | |
| rb = kommuneplan_result['rule_book'] | |
| print(f"Kommuneplan Rule Book: {rb['name']} - {'β ' if rb['link'] else 'β'}") | |
| if rb['link']: | |
| print(f" URL: {rb['link']}") | |
| for doc in kommuneplan_result['planning_documents']: | |
| print(f"Kommuneplan Doc: {doc['name']} - {'β ' if doc['link'] else 'β'}") | |
| if doc['link']: | |
| print(f" URL: {doc['link']}") | |
| except Exception as e: | |
| print(f"β Test failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Start server | |
| import uvicorn | |
| port = int(os.getenv("PORT", "7860")) | |
| uvicorn.run(app, host="0.0.0.0", port=port) |