from __future__ import annotations import copy import json import re import shutil from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from threading import Lock from typing import Iterable, List, Optional from uuid import uuid4 from fastapi import UploadFile from PIL import Image, ImageOps, UnidentifiedImageError from ..core.config import get_settings IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp"} DOC_EXTS = {".pdf", ".doc", ".docx"} DATA_EXTS = {".csv", ".xls", ".xlsx"} EXIF_NORMALIZE_EXTS = {".jpg", ".jpeg", ".png", ".webp"} UPLOAD_IMAGE_MAX_LONG_EDGE_PX = 2400 UPLOAD_JPEG_QUALITY = 82 UPLOAD_WEBP_QUALITY = 80 SESSION_ID_RE = re.compile(r"^[0-9a-f]{32}$") BUILTIN_PAGE_TEMPLATES = [ { "id": "repex:standard", "name": "Standard Job Sheet", "description": "Observations + up to two photos.", "blank": False, "variant": "full", "photo_layout": "auto", "source": "builtin", }, { "id": "repex:photos", "name": "Photo Continuation", "description": "Photo-only continuation page.", "blank": False, "variant": "photos", "photo_layout": "auto", "source": "builtin", }, { "id": "repex:blank", "name": "Blank Canvas", "description": "Blank white page.", "blank": True, "variant": "full", "photo_layout": "auto", "source": "builtin", }, ] BUILTIN_PAGE_TEMPLATE_MAP = {item["id"]: item for item in BUILTIN_PAGE_TEMPLATES} @dataclass class StoredFile: id: str name: str size: int content_type: str category: str path: str def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _safe_name(name: str) -> str: name = Path(name).name name = re.sub(r"[^a-zA-Z0-9._-]", "_", name) return name or "upload" def _category_for(filename: str) -> str: ext = Path(filename).suffix.lower() if ext in IMAGE_EXTS: return "photos" if ext in DOC_EXTS: return "documents" if ext in DATA_EXTS: return "data_files" return "documents" def _normalize_uploaded_photo(path: Path) -> None: ext = path.suffix.lower() if ext not in EXIF_NORMALIZE_EXTS: return try: with Image.open(path) as image: normalized = ImageOps.exif_transpose(image) resampling = getattr(Image, "Resampling", None) lanczos = resampling.LANCZOS if resampling is not None else Image.LANCZOS long_edge = max(normalized.width, normalized.height) if long_edge > UPLOAD_IMAGE_MAX_LONG_EDGE_PX: ratio = UPLOAD_IMAGE_MAX_LONG_EDGE_PX / float(long_edge) normalized = normalized.resize( ( max(1, int(round(normalized.width * ratio))), max(1, int(round(normalized.height * ratio))), ), lanczos, ) if ext in {".jpg", ".jpeg"}: if normalized.mode in ("RGBA", "LA", "P"): normalized = normalized.convert("RGB") normalized.save( path, format="JPEG", quality=UPLOAD_JPEG_QUALITY, optimize=True, progressive=True, exif=b"", ) elif ext == ".webp": if normalized.mode not in ("RGB", "RGBA"): normalized = normalized.convert("RGB") normalized.save( path, format="WEBP", quality=UPLOAD_WEBP_QUALITY, method=6, ) else: # png normalized.save(path, format="PNG", optimize=True) except (UnidentifiedImageError, OSError, ValueError, TypeError): # Keep original bytes if the file cannot be decoded by Pillow. return def _validate_session_id(session_id: str) -> str: if not session_id: raise ValueError("Invalid session id.") normalized = session_id.lower() if not SESSION_ID_RE.match(normalized): raise ValueError("Invalid session id.") return normalized def _merge_text(primary: str, secondary: str) -> str: primary = (primary or "").strip() secondary = (secondary or "").strip() if not secondary: return primary if not primary: return secondary if secondary in primary: return primary return f"{primary} - {secondary}" def _normalize_template_fields(template: Optional[dict]) -> dict: if not isinstance(template, dict): return {} normalized = dict(template) item_description = _merge_text( normalized.get("item_description", ""), normalized.pop("condition_description", ""), ) if item_description: normalized["item_description"] = item_description else: normalized.pop("item_description", None) action_type = normalized.pop("action_type", "") required_action = _merge_text(action_type, normalized.get("required_action", "")) if required_action: normalized["required_action"] = required_action else: normalized.pop("required_action", None) figure_caption = _merge_text( normalized.get("figure_caption", ""), normalized.pop("figure_description", ""), ) if figure_caption: normalized["figure_caption"] = figure_caption else: normalized.pop("figure_caption", None) for legacy_key in ("accompanied_by", "project", "client_site"): normalized.pop(legacy_key, None) return normalized def _infer_template_id(page: dict) -> str: template_id = str(page.get("page_template") or "").strip() if template_id: return template_id if page.get("blank"): return "repex:blank" if str(page.get("variant") or "").strip().lower() == "photos": return "repex:photos" return "repex:standard" def _normalize_page_templates(templates: Optional[List[dict]]) -> List[dict]: normalized: List[dict] = [] seen: set[str] = set() for template in templates or []: if hasattr(template, "model_dump"): template = template.model_dump() elif hasattr(template, "dict"): template = template.dict() if not isinstance(template, dict): continue template_id = str(template.get("id") or "").strip() if not template_id or template_id in BUILTIN_PAGE_TEMPLATE_MAP: continue if template_id in seen: continue seen.add(template_id) name = str(template.get("name") or template_id).strip() or template_id variant = str(template.get("variant") or "full").strip().lower() if variant not in {"full", "photos"}: variant = "full" photo_layout = str(template.get("photo_layout") or "auto").strip().lower() if photo_layout not in {"auto", "two-column", "stacked"}: photo_layout = "auto" normalized.append( { "id": template_id, "name": name, "description": str(template.get("description") or "").strip(), "blank": bool(template.get("blank")), "variant": variant, "photo_layout": photo_layout, "source": "custom", } ) return normalized class SessionStore: def __init__(self, base_dir: Optional[Path] = None) -> None: settings = get_settings() self.base_dir = (base_dir or settings.storage_dir).resolve() self.sessions_dir = self.base_dir / "sessions" self.sessions_dir.mkdir(parents=True, exist_ok=True) self.max_upload_bytes = settings.max_upload_mb * 1024 * 1024 self._lock = Lock() self._migrate_storage() def _migrate_storage(self) -> None: for session_file in self.sessions_dir.glob("*/session.json"): try: raw = json.loads(session_file.read_text(encoding="utf-8")) except Exception: continue normalized = self._normalize_session(copy.deepcopy(raw)) if normalized != raw: try: session_file.write_text( json.dumps(normalized, indent=2), encoding="utf-8" ) except Exception: continue def _template_index(self, session: dict) -> dict: custom_templates = _normalize_page_templates(session.get("page_templates") or []) session["page_templates"] = custom_templates merged = {key: dict(value) for key, value in BUILTIN_PAGE_TEMPLATE_MAP.items()} for template in custom_templates: merged[template["id"]] = template return merged def _normalize_page(self, page: dict, template_index: dict) -> dict: template = _normalize_template_fields(page.get("template")) normalized = {**page, "template": template} template_id = _infer_template_id(normalized) definition = template_index.get(template_id) or BUILTIN_PAGE_TEMPLATE_MAP["repex:standard"] normalized["page_template"] = definition["id"] normalized["blank"] = bool(definition.get("blank")) normalized["variant"] = ( str(definition.get("variant") or normalized.get("variant") or "full") .strip() .lower() ) if normalized["variant"] not in {"full", "photos"}: normalized["variant"] = "full" if normalized.get("photo_layout") is None and definition.get("photo_layout"): normalized["photo_layout"] = definition["photo_layout"] return normalized def list_sessions(self) -> List[dict]: sessions: List[dict] = [] for session_file in sorted(self.sessions_dir.glob("*/session.json"), reverse=True): try: session = json.loads(session_file.read_text(encoding="utf-8")) session = self._normalize_session(session) sessions.append(session) except Exception: continue return sessions def create_session(self, document_no: str, inspection_date: str) -> dict: session_id = uuid4().hex now = _now_iso() session = { "id": session_id, "status": "ready", "created_at": now, "updated_at": now, "document_no": document_no, "inspection_date": inspection_date, "uploads": {"photos": [], "documents": [], "data_files": []}, "selected_photo_ids": [], "page_count": 0, "pages": [], "jobsheet_sections": [], "headings": [], "page_templates": [], } self._save_session(session) return session def validate_session_id(self, session_id: str) -> str: return _validate_session_id(session_id) def get_session(self, session_id: str) -> Optional[dict]: session_path = self._session_file(session_id) if not session_path.exists(): return None try: session = json.loads(session_path.read_text(encoding="utf-8")) return self._normalize_session(session) except Exception: return None def update_session(self, session: dict) -> None: session = self._normalize_session(session) session["updated_at"] = _now_iso() self._save_session(session) def delete_session(self, session_id: str) -> bool: session_dir = self._session_dir(session_id) if not session_dir.exists(): return False with self._lock: shutil.rmtree(session_dir, ignore_errors=False) return True def add_uploads(self, session: dict, uploads: Iterable[StoredFile]) -> dict: for item in uploads: session["uploads"].setdefault(item.category, []) session["uploads"][item.category].append( { "id": item.id, "name": item.name, "size": item.size, "content_type": item.content_type, "category": item.category, "path": item.path, } ) if not session.get("pages"): photo_count = len(session.get("uploads", {}).get("photos", []) or []) session["page_count"] = max(1, photo_count) self.update_session(session) return session def set_selected_photos(self, session: dict, selected_ids: List[str]) -> dict: session["selected_photo_ids"] = selected_ids if not session.get("pages"): session["page_count"] = max(1, len(selected_ids)) self.update_session(session) return session def set_pages(self, session: dict, pages: List[dict]) -> dict: if not pages: pages = [{"items": []}] template_index = self._template_index(session) normalized_pages = [] for page in pages: if not isinstance(page, dict): normalized_pages.append( self._normalize_page({"items": []}, template_index) ) continue normalized_pages.append(self._normalize_page(page, template_index)) # Legacy compatibility: store as a single section. session["jobsheet_sections"] = [ {"id": uuid4().hex, "title": "Section 1", "pages": normalized_pages} ] session["pages"] = [] session["page_count"] = len(normalized_pages) self.update_session(session) return session def ensure_pages(self, session: dict) -> List[dict]: # Legacy compatibility: flatten sections to pages. sections = self.ensure_sections(session) pages: List[dict] = [] for section in sections: pages.extend(section.get("pages") or []) session["page_count"] = len(pages) return pages def set_sections(self, session: dict, sections: List[dict]) -> dict: template_index = self._template_index(session) normalized: List[dict] = [] for section in sections or []: if hasattr(section, "model_dump"): section = section.model_dump() elif hasattr(section, "dict"): section = section.dict() pages = section.get("pages") or [] if pages: normalized_pages = [] for page in pages: if hasattr(page, "model_dump"): normalized_pages.append(page.model_dump()) elif hasattr(page, "dict"): normalized_pages.append(page.dict()) else: normalized_pages.append(page) pages = normalized_pages normalized_pages = [] for page in pages: if not isinstance(page, dict): normalized_pages.append( self._normalize_page({"items": []}, template_index) ) continue normalized_pages.append(self._normalize_page(page, template_index)) normalized.append( { "id": section.get("id") or uuid4().hex, "title": section.get("title") or "Section", "pages": normalized_pages if normalized_pages else [{"items": []}], } ) if not normalized: normalized = [{"id": uuid4().hex, "title": "Section 1", "pages": [{"items": []}]}] session["jobsheet_sections"] = normalized session["pages"] = [] session["page_count"] = sum(len(section.get("pages") or []) for section in normalized) self.update_session(session) return session def set_headings(self, session: dict, headings: List[dict]) -> dict: normalized: List[dict] = [] for heading in headings or []: if hasattr(heading, "model_dump"): heading = heading.model_dump() elif hasattr(heading, "dict"): heading = heading.dict() if not isinstance(heading, dict): continue number = str(heading.get("number") or "").strip() name = str(heading.get("name") or "").strip() normalized.append({"number": number, "name": name}) session["headings"] = normalized self.update_session(session) return session def set_page_templates(self, session: dict, templates: List[dict]) -> dict: session["page_templates"] = _normalize_page_templates(templates) template_index = self._template_index(session) sections = session.get("jobsheet_sections") or [] normalized_sections = [] for section in sections: if not isinstance(section, dict): continue pages = section.get("pages") or [] normalized_pages = [] for page in pages: if not isinstance(page, dict): normalized_pages.append( self._normalize_page({"items": []}, template_index) ) continue normalized_pages.append(self._normalize_page(page, template_index)) normalized_sections.append( { "id": section.get("id") or uuid4().hex, "title": section.get("title") or "Section", "pages": normalized_pages if normalized_pages else [{"items": []}], } ) if normalized_sections: session["jobsheet_sections"] = normalized_sections session["page_count"] = sum( len(section.get("pages") or []) for section in normalized_sections ) self.update_session(session) return session def ensure_sections(self, session: dict) -> List[dict]: template_index = self._template_index(session) sections = session.get("jobsheet_sections") or [] if sections: normalized_sections: List[dict] = [] for section in sections: if not isinstance(section, dict): continue pages = section.get("pages") or [] normalized_pages = [] for page in pages: if not isinstance(page, dict): normalized_pages.append( self._normalize_page({"items": []}, template_index) ) continue normalized_pages.append(self._normalize_page(page, template_index)) normalized_sections.append( { "id": section.get("id") or uuid4().hex, "title": section.get("title") or "Section", "pages": normalized_pages if normalized_pages else [{"items": []}], } ) session["jobsheet_sections"] = normalized_sections session["page_count"] = sum( len(section.get("pages") or []) for section in normalized_sections ) self.update_session(session) return normalized_sections pages = session.get("pages") or [] if not pages: selected_count = len(session.get("selected_photo_ids") or []) photo_count = len(session.get("uploads", {}).get("photos", []) or []) count = selected_count or photo_count or session.get("page_count", 1) or 1 pages = [{"items": []} for _ in range(count)] pages = [ self._normalize_page(page if isinstance(page, dict) else {"items": []}, template_index) for page in pages ] sections = [{"id": uuid4().hex, "title": "Section 1", "pages": pages}] session["jobsheet_sections"] = sections session["pages"] = [] session["page_count"] = len(pages) self.update_session(session) return sections def _normalize_session(self, session: dict) -> dict: if not isinstance(session, dict): return session document_no = _merge_text( session.get("document_no", ""), session.get("project_name", ""), ) if document_no: session["document_no"] = document_no session.pop("project_name", None) session.pop("notes", None) headings = session.get("headings") if isinstance(headings, dict): session["headings"] = [ {"number": str(key).strip(), "name": str(value).strip()} for key, value in headings.items() ] elif isinstance(headings, list): normalized_headings = [] for heading in headings: if hasattr(heading, "model_dump"): heading = heading.model_dump() elif hasattr(heading, "dict"): heading = heading.dict() if not isinstance(heading, dict): continue number = str(heading.get("number") or "").strip() name = str(heading.get("name") or "").strip() normalized_headings.append({"number": number, "name": name}) session["headings"] = normalized_headings else: session["headings"] = [] session["page_templates"] = _normalize_page_templates( session.get("page_templates") or [] ) template_index = self._template_index(session) pages = session.get("pages") or [] if pages: normalized_pages = [] for page in pages: if not isinstance(page, dict): normalized_pages.append( self._normalize_page({"items": []}, template_index) ) continue normalized_pages.append(self._normalize_page(page, template_index)) session["pages"] = normalized_pages sections = session.get("jobsheet_sections") or [] if sections: normalized_sections = [] for section in sections: if not isinstance(section, dict): continue pages = section.get("pages") or [] normalized_pages = [] for page in pages: if not isinstance(page, dict): normalized_pages.append( self._normalize_page({"items": []}, template_index) ) continue normalized_pages.append(self._normalize_page(page, template_index)) normalized_sections.append( { "id": section.get("id") or uuid4().hex, "title": section.get("title") or "Section", "pages": normalized_pages if normalized_pages else [{"items": []}], } ) session["jobsheet_sections"] = normalized_sections return session def save_upload(self, session_id: str, upload: UploadFile) -> StoredFile: filename = _safe_name(upload.filename or "upload") ext = Path(filename).suffix file_id = uuid4().hex stored_name = f"{file_id}{ext}" session_dir = self._session_dir(session_id) uploads_dir = session_dir / "uploads" uploads_dir.mkdir(parents=True, exist_ok=True) dest = uploads_dir / stored_name size = 0 with dest.open("wb") as handle: while True: chunk = upload.file.read(1024 * 1024) if not chunk: break size += len(chunk) if size > self.max_upload_bytes: handle.close() dest.unlink(missing_ok=True) raise ValueError("File exceeds maximum upload size.") handle.write(chunk) category = _category_for(filename) if category == "photos": _normalize_uploaded_photo(dest) size = dest.stat().st_size return StoredFile( id=file_id, name=filename, size=size, content_type=upload.content_type or "application/octet-stream", category=category, path=f"uploads/{stored_name}", ) def _session_dir(self, session_id: str) -> Path: safe_id = _validate_session_id(session_id) path = (self.sessions_dir / safe_id).resolve() if not str(path).startswith(str(self.sessions_dir.resolve())): raise ValueError("Invalid session id.") return path def session_dir(self, session_id: str) -> Path: return self._session_dir(session_id) def _session_file(self, session_id: str) -> Path: return self._session_dir(session_id) / "session.json" def _save_session(self, session: dict) -> None: session_dir = self._session_dir(session["id"]) session_dir.mkdir(parents=True, exist_ok=True) session_path = self._session_file(session["id"]) with self._lock: session_path.write_text(json.dumps(session, indent=2), encoding="utf-8") def resolve_upload_path(self, session: dict, file_id: str) -> Optional[Path]: uploads = session.get("uploads") or {} for items in uploads.values(): for item in items: if item.get("id") == file_id: relative = item.get("path") if relative: return self._session_dir(session["id"]) / relative return None