Prosento_RepEx / server /app /services /session_store.py
ChristopherJKoen's picture
Update template sizing/box wrapping fixes
dd94ad9
from __future__ import annotations
import copy
import json
import re
import shutil
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from threading import Lock
from typing import Iterable, List, Optional
from uuid import uuid4
from fastapi import UploadFile
from PIL import Image, ImageOps, UnidentifiedImageError
from ..core.config import get_settings
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp"}
DOC_EXTS = {".pdf", ".doc", ".docx"}
DATA_EXTS = {".csv", ".xls", ".xlsx"}
EXIF_NORMALIZE_EXTS = {".jpg", ".jpeg", ".png", ".webp"}
UPLOAD_IMAGE_MAX_LONG_EDGE_PX = 2400
UPLOAD_JPEG_QUALITY = 82
UPLOAD_WEBP_QUALITY = 80
SESSION_ID_RE = re.compile(r"^[0-9a-f]{32}$")
BUILTIN_PAGE_TEMPLATES = [
{
"id": "repex:standard",
"name": "Standard Job Sheet",
"description": "Observations + up to two photos.",
"blank": False,
"variant": "full",
"photo_layout": "auto",
"source": "builtin",
},
{
"id": "repex:photos",
"name": "Photo Continuation",
"description": "Photo-only continuation page.",
"blank": False,
"variant": "photos",
"photo_layout": "auto",
"source": "builtin",
},
{
"id": "repex:blank",
"name": "Blank Canvas",
"description": "Blank white page.",
"blank": True,
"variant": "full",
"photo_layout": "auto",
"source": "builtin",
},
]
BUILTIN_PAGE_TEMPLATE_MAP = {item["id"]: item for item in BUILTIN_PAGE_TEMPLATES}
@dataclass
class StoredFile:
id: str
name: str
size: int
content_type: str
category: str
path: str
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _safe_name(name: str) -> str:
name = Path(name).name
name = re.sub(r"[^a-zA-Z0-9._-]", "_", name)
return name or "upload"
def _category_for(filename: str) -> str:
ext = Path(filename).suffix.lower()
if ext in IMAGE_EXTS:
return "photos"
if ext in DOC_EXTS:
return "documents"
if ext in DATA_EXTS:
return "data_files"
return "documents"
def _normalize_uploaded_photo(path: Path) -> None:
ext = path.suffix.lower()
if ext not in EXIF_NORMALIZE_EXTS:
return
try:
with Image.open(path) as image:
normalized = ImageOps.exif_transpose(image)
resampling = getattr(Image, "Resampling", None)
lanczos = resampling.LANCZOS if resampling is not None else Image.LANCZOS
long_edge = max(normalized.width, normalized.height)
if long_edge > UPLOAD_IMAGE_MAX_LONG_EDGE_PX:
ratio = UPLOAD_IMAGE_MAX_LONG_EDGE_PX / float(long_edge)
normalized = normalized.resize(
(
max(1, int(round(normalized.width * ratio))),
max(1, int(round(normalized.height * ratio))),
),
lanczos,
)
if ext in {".jpg", ".jpeg"}:
if normalized.mode in ("RGBA", "LA", "P"):
normalized = normalized.convert("RGB")
normalized.save(
path,
format="JPEG",
quality=UPLOAD_JPEG_QUALITY,
optimize=True,
progressive=True,
exif=b"",
)
elif ext == ".webp":
if normalized.mode not in ("RGB", "RGBA"):
normalized = normalized.convert("RGB")
normalized.save(
path,
format="WEBP",
quality=UPLOAD_WEBP_QUALITY,
method=6,
)
else: # png
normalized.save(path, format="PNG", optimize=True)
except (UnidentifiedImageError, OSError, ValueError, TypeError):
# Keep original bytes if the file cannot be decoded by Pillow.
return
def _validate_session_id(session_id: str) -> str:
if not session_id:
raise ValueError("Invalid session id.")
normalized = session_id.lower()
if not SESSION_ID_RE.match(normalized):
raise ValueError("Invalid session id.")
return normalized
def _merge_text(primary: str, secondary: str) -> str:
primary = (primary or "").strip()
secondary = (secondary or "").strip()
if not secondary:
return primary
if not primary:
return secondary
if secondary in primary:
return primary
return f"{primary} - {secondary}"
def _normalize_template_fields(template: Optional[dict]) -> dict:
if not isinstance(template, dict):
return {}
normalized = dict(template)
item_description = _merge_text(
normalized.get("item_description", ""),
normalized.pop("condition_description", ""),
)
if item_description:
normalized["item_description"] = item_description
else:
normalized.pop("item_description", None)
action_type = normalized.pop("action_type", "")
required_action = _merge_text(action_type, normalized.get("required_action", ""))
if required_action:
normalized["required_action"] = required_action
else:
normalized.pop("required_action", None)
figure_caption = _merge_text(
normalized.get("figure_caption", ""),
normalized.pop("figure_description", ""),
)
if figure_caption:
normalized["figure_caption"] = figure_caption
else:
normalized.pop("figure_caption", None)
for legacy_key in ("accompanied_by", "project", "client_site"):
normalized.pop(legacy_key, None)
return normalized
def _infer_template_id(page: dict) -> str:
template_id = str(page.get("page_template") or "").strip()
if template_id:
return template_id
if page.get("blank"):
return "repex:blank"
if str(page.get("variant") or "").strip().lower() == "photos":
return "repex:photos"
return "repex:standard"
def _normalize_page_templates(templates: Optional[List[dict]]) -> List[dict]:
normalized: List[dict] = []
seen: set[str] = set()
for template in templates or []:
if hasattr(template, "model_dump"):
template = template.model_dump()
elif hasattr(template, "dict"):
template = template.dict()
if not isinstance(template, dict):
continue
template_id = str(template.get("id") or "").strip()
if not template_id or template_id in BUILTIN_PAGE_TEMPLATE_MAP:
continue
if template_id in seen:
continue
seen.add(template_id)
name = str(template.get("name") or template_id).strip() or template_id
variant = str(template.get("variant") or "full").strip().lower()
if variant not in {"full", "photos"}:
variant = "full"
photo_layout = str(template.get("photo_layout") or "auto").strip().lower()
if photo_layout not in {"auto", "two-column", "stacked"}:
photo_layout = "auto"
normalized.append(
{
"id": template_id,
"name": name,
"description": str(template.get("description") or "").strip(),
"blank": bool(template.get("blank")),
"variant": variant,
"photo_layout": photo_layout,
"source": "custom",
}
)
return normalized
class SessionStore:
def __init__(self, base_dir: Optional[Path] = None) -> None:
settings = get_settings()
self.base_dir = (base_dir or settings.storage_dir).resolve()
self.sessions_dir = self.base_dir / "sessions"
self.sessions_dir.mkdir(parents=True, exist_ok=True)
self.max_upload_bytes = settings.max_upload_mb * 1024 * 1024
self._lock = Lock()
self._migrate_storage()
def _migrate_storage(self) -> None:
for session_file in self.sessions_dir.glob("*/session.json"):
try:
raw = json.loads(session_file.read_text(encoding="utf-8"))
except Exception:
continue
normalized = self._normalize_session(copy.deepcopy(raw))
if normalized != raw:
try:
session_file.write_text(
json.dumps(normalized, indent=2), encoding="utf-8"
)
except Exception:
continue
def _template_index(self, session: dict) -> dict:
custom_templates = _normalize_page_templates(session.get("page_templates") or [])
session["page_templates"] = custom_templates
merged = {key: dict(value) for key, value in BUILTIN_PAGE_TEMPLATE_MAP.items()}
for template in custom_templates:
merged[template["id"]] = template
return merged
def _normalize_page(self, page: dict, template_index: dict) -> dict:
template = _normalize_template_fields(page.get("template"))
normalized = {**page, "template": template}
template_id = _infer_template_id(normalized)
definition = template_index.get(template_id) or BUILTIN_PAGE_TEMPLATE_MAP["repex:standard"]
normalized["page_template"] = definition["id"]
normalized["blank"] = bool(definition.get("blank"))
normalized["variant"] = (
str(definition.get("variant") or normalized.get("variant") or "full")
.strip()
.lower()
)
if normalized["variant"] not in {"full", "photos"}:
normalized["variant"] = "full"
if normalized.get("photo_layout") is None and definition.get("photo_layout"):
normalized["photo_layout"] = definition["photo_layout"]
return normalized
def list_sessions(self) -> List[dict]:
sessions: List[dict] = []
for session_file in sorted(self.sessions_dir.glob("*/session.json"), reverse=True):
try:
session = json.loads(session_file.read_text(encoding="utf-8"))
session = self._normalize_session(session)
sessions.append(session)
except Exception:
continue
return sessions
def create_session(self, document_no: str, inspection_date: str) -> dict:
session_id = uuid4().hex
now = _now_iso()
session = {
"id": session_id,
"status": "ready",
"created_at": now,
"updated_at": now,
"document_no": document_no,
"inspection_date": inspection_date,
"uploads": {"photos": [], "documents": [], "data_files": []},
"selected_photo_ids": [],
"page_count": 0,
"pages": [],
"jobsheet_sections": [],
"headings": [],
"page_templates": [],
}
self._save_session(session)
return session
def validate_session_id(self, session_id: str) -> str:
return _validate_session_id(session_id)
def get_session(self, session_id: str) -> Optional[dict]:
session_path = self._session_file(session_id)
if not session_path.exists():
return None
try:
session = json.loads(session_path.read_text(encoding="utf-8"))
return self._normalize_session(session)
except Exception:
return None
def update_session(self, session: dict) -> None:
session = self._normalize_session(session)
session["updated_at"] = _now_iso()
self._save_session(session)
def delete_session(self, session_id: str) -> bool:
session_dir = self._session_dir(session_id)
if not session_dir.exists():
return False
with self._lock:
shutil.rmtree(session_dir, ignore_errors=False)
return True
def add_uploads(self, session: dict, uploads: Iterable[StoredFile]) -> dict:
for item in uploads:
session["uploads"].setdefault(item.category, [])
session["uploads"][item.category].append(
{
"id": item.id,
"name": item.name,
"size": item.size,
"content_type": item.content_type,
"category": item.category,
"path": item.path,
}
)
if not session.get("pages"):
photo_count = len(session.get("uploads", {}).get("photos", []) or [])
session["page_count"] = max(1, photo_count)
self.update_session(session)
return session
def set_selected_photos(self, session: dict, selected_ids: List[str]) -> dict:
session["selected_photo_ids"] = selected_ids
if not session.get("pages"):
session["page_count"] = max(1, len(selected_ids))
self.update_session(session)
return session
def set_pages(self, session: dict, pages: List[dict]) -> dict:
if not pages:
pages = [{"items": []}]
template_index = self._template_index(session)
normalized_pages = []
for page in pages:
if not isinstance(page, dict):
normalized_pages.append(
self._normalize_page({"items": []}, template_index)
)
continue
normalized_pages.append(self._normalize_page(page, template_index))
# Legacy compatibility: store as a single section.
session["jobsheet_sections"] = [
{"id": uuid4().hex, "title": "Section 1", "pages": normalized_pages}
]
session["pages"] = []
session["page_count"] = len(normalized_pages)
self.update_session(session)
return session
def ensure_pages(self, session: dict) -> List[dict]:
# Legacy compatibility: flatten sections to pages.
sections = self.ensure_sections(session)
pages: List[dict] = []
for section in sections:
pages.extend(section.get("pages") or [])
session["page_count"] = len(pages)
return pages
def set_sections(self, session: dict, sections: List[dict]) -> dict:
template_index = self._template_index(session)
normalized: List[dict] = []
for section in sections or []:
if hasattr(section, "model_dump"):
section = section.model_dump()
elif hasattr(section, "dict"):
section = section.dict()
pages = section.get("pages") or []
if pages:
normalized_pages = []
for page in pages:
if hasattr(page, "model_dump"):
normalized_pages.append(page.model_dump())
elif hasattr(page, "dict"):
normalized_pages.append(page.dict())
else:
normalized_pages.append(page)
pages = normalized_pages
normalized_pages = []
for page in pages:
if not isinstance(page, dict):
normalized_pages.append(
self._normalize_page({"items": []}, template_index)
)
continue
normalized_pages.append(self._normalize_page(page, template_index))
normalized.append(
{
"id": section.get("id") or uuid4().hex,
"title": section.get("title") or "Section",
"pages": normalized_pages if normalized_pages else [{"items": []}],
}
)
if not normalized:
normalized = [{"id": uuid4().hex, "title": "Section 1", "pages": [{"items": []}]}]
session["jobsheet_sections"] = normalized
session["pages"] = []
session["page_count"] = sum(len(section.get("pages") or []) for section in normalized)
self.update_session(session)
return session
def set_headings(self, session: dict, headings: List[dict]) -> dict:
normalized: List[dict] = []
for heading in headings or []:
if hasattr(heading, "model_dump"):
heading = heading.model_dump()
elif hasattr(heading, "dict"):
heading = heading.dict()
if not isinstance(heading, dict):
continue
number = str(heading.get("number") or "").strip()
name = str(heading.get("name") or "").strip()
normalized.append({"number": number, "name": name})
session["headings"] = normalized
self.update_session(session)
return session
def set_page_templates(self, session: dict, templates: List[dict]) -> dict:
session["page_templates"] = _normalize_page_templates(templates)
template_index = self._template_index(session)
sections = session.get("jobsheet_sections") or []
normalized_sections = []
for section in sections:
if not isinstance(section, dict):
continue
pages = section.get("pages") or []
normalized_pages = []
for page in pages:
if not isinstance(page, dict):
normalized_pages.append(
self._normalize_page({"items": []}, template_index)
)
continue
normalized_pages.append(self._normalize_page(page, template_index))
normalized_sections.append(
{
"id": section.get("id") or uuid4().hex,
"title": section.get("title") or "Section",
"pages": normalized_pages if normalized_pages else [{"items": []}],
}
)
if normalized_sections:
session["jobsheet_sections"] = normalized_sections
session["page_count"] = sum(
len(section.get("pages") or []) for section in normalized_sections
)
self.update_session(session)
return session
def ensure_sections(self, session: dict) -> List[dict]:
template_index = self._template_index(session)
sections = session.get("jobsheet_sections") or []
if sections:
normalized_sections: List[dict] = []
for section in sections:
if not isinstance(section, dict):
continue
pages = section.get("pages") or []
normalized_pages = []
for page in pages:
if not isinstance(page, dict):
normalized_pages.append(
self._normalize_page({"items": []}, template_index)
)
continue
normalized_pages.append(self._normalize_page(page, template_index))
normalized_sections.append(
{
"id": section.get("id") or uuid4().hex,
"title": section.get("title") or "Section",
"pages": normalized_pages if normalized_pages else [{"items": []}],
}
)
session["jobsheet_sections"] = normalized_sections
session["page_count"] = sum(
len(section.get("pages") or []) for section in normalized_sections
)
self.update_session(session)
return normalized_sections
pages = session.get("pages") or []
if not pages:
selected_count = len(session.get("selected_photo_ids") or [])
photo_count = len(session.get("uploads", {}).get("photos", []) or [])
count = selected_count or photo_count or session.get("page_count", 1) or 1
pages = [{"items": []} for _ in range(count)]
pages = [
self._normalize_page(page if isinstance(page, dict) else {"items": []}, template_index)
for page in pages
]
sections = [{"id": uuid4().hex, "title": "Section 1", "pages": pages}]
session["jobsheet_sections"] = sections
session["pages"] = []
session["page_count"] = len(pages)
self.update_session(session)
return sections
def _normalize_session(self, session: dict) -> dict:
if not isinstance(session, dict):
return session
document_no = _merge_text(
session.get("document_no", ""),
session.get("project_name", ""),
)
if document_no:
session["document_no"] = document_no
session.pop("project_name", None)
session.pop("notes", None)
headings = session.get("headings")
if isinstance(headings, dict):
session["headings"] = [
{"number": str(key).strip(), "name": str(value).strip()}
for key, value in headings.items()
]
elif isinstance(headings, list):
normalized_headings = []
for heading in headings:
if hasattr(heading, "model_dump"):
heading = heading.model_dump()
elif hasattr(heading, "dict"):
heading = heading.dict()
if not isinstance(heading, dict):
continue
number = str(heading.get("number") or "").strip()
name = str(heading.get("name") or "").strip()
normalized_headings.append({"number": number, "name": name})
session["headings"] = normalized_headings
else:
session["headings"] = []
session["page_templates"] = _normalize_page_templates(
session.get("page_templates") or []
)
template_index = self._template_index(session)
pages = session.get("pages") or []
if pages:
normalized_pages = []
for page in pages:
if not isinstance(page, dict):
normalized_pages.append(
self._normalize_page({"items": []}, template_index)
)
continue
normalized_pages.append(self._normalize_page(page, template_index))
session["pages"] = normalized_pages
sections = session.get("jobsheet_sections") or []
if sections:
normalized_sections = []
for section in sections:
if not isinstance(section, dict):
continue
pages = section.get("pages") or []
normalized_pages = []
for page in pages:
if not isinstance(page, dict):
normalized_pages.append(
self._normalize_page({"items": []}, template_index)
)
continue
normalized_pages.append(self._normalize_page(page, template_index))
normalized_sections.append(
{
"id": section.get("id") or uuid4().hex,
"title": section.get("title") or "Section",
"pages": normalized_pages if normalized_pages else [{"items": []}],
}
)
session["jobsheet_sections"] = normalized_sections
return session
def save_upload(self, session_id: str, upload: UploadFile) -> StoredFile:
filename = _safe_name(upload.filename or "upload")
ext = Path(filename).suffix
file_id = uuid4().hex
stored_name = f"{file_id}{ext}"
session_dir = self._session_dir(session_id)
uploads_dir = session_dir / "uploads"
uploads_dir.mkdir(parents=True, exist_ok=True)
dest = uploads_dir / stored_name
size = 0
with dest.open("wb") as handle:
while True:
chunk = upload.file.read(1024 * 1024)
if not chunk:
break
size += len(chunk)
if size > self.max_upload_bytes:
handle.close()
dest.unlink(missing_ok=True)
raise ValueError("File exceeds maximum upload size.")
handle.write(chunk)
category = _category_for(filename)
if category == "photos":
_normalize_uploaded_photo(dest)
size = dest.stat().st_size
return StoredFile(
id=file_id,
name=filename,
size=size,
content_type=upload.content_type or "application/octet-stream",
category=category,
path=f"uploads/{stored_name}",
)
def _session_dir(self, session_id: str) -> Path:
safe_id = _validate_session_id(session_id)
path = (self.sessions_dir / safe_id).resolve()
if not str(path).startswith(str(self.sessions_dir.resolve())):
raise ValueError("Invalid session id.")
return path
def session_dir(self, session_id: str) -> Path:
return self._session_dir(session_id)
def _session_file(self, session_id: str) -> Path:
return self._session_dir(session_id) / "session.json"
def _save_session(self, session: dict) -> None:
session_dir = self._session_dir(session["id"])
session_dir.mkdir(parents=True, exist_ok=True)
session_path = self._session_file(session["id"])
with self._lock:
session_path.write_text(json.dumps(session, indent=2), encoding="utf-8")
def resolve_upload_path(self, session: dict, file_id: str) -> Optional[Path]:
uploads = session.get("uploads") or {}
for items in uploads.values():
for item in items:
if item.get("id") == file_id:
relative = item.get("path")
if relative:
return self._session_dir(session["id"]) / relative
return None