Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import csv | |
| import re | |
| import unicodedata | |
| from dataclasses import dataclass | |
| from datetime import date, datetime | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Optional, Set, Tuple | |
| from openpyxl import load_workbook | |
| import xlrd | |
| from .session_store import SessionStore | |
| GENERAL_SHEET = "general information" | |
| HEADINGS_SHEET = "headings" | |
| ITEMS_SHEET = "item spesific" | |
| ITEMS_SHEET_ALT = "item specific" | |
| IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tif", ".tiff"} | |
| IMAGE_NAME_RE = re.compile( | |
| r"(?i)([^,;\n\r]+?\.(?:jpe?g|png|gif|webp|bmp|tiff?))" | |
| ) | |
| IMAGE_REF_SPLIT_RE = re.compile(r"[;\n\r]+") | |
| IMAGE_REF_PREFIX_RE = re.compile(r"^(?:fig(?:ure)?|image)\s*\d*\s*[:\-]\s*", re.IGNORECASE) | |
| class PhotoLookup: | |
| by_exact: Dict[str, Set[str]] | |
| by_stem: Dict[str, Set[str]] | |
| def _normalize_text(value: str) -> str: | |
| return " ".join(str(value or "").strip().lower().split()) | |
| def _cell_to_str(value: object) -> str: | |
| if value is None: | |
| return "" | |
| if isinstance(value, (datetime, date)): | |
| return value.strftime("%Y-%m-%d") | |
| if isinstance(value, float): | |
| if value.is_integer(): | |
| return str(int(value)) | |
| return str(value) | |
| return str(value).strip() | |
| def _merge_text(primary: str, secondary: str) -> str: | |
| primary = (primary or "").strip() | |
| secondary = (secondary or "").strip() | |
| if not secondary: | |
| return primary | |
| if not primary: | |
| return secondary | |
| if secondary in primary: | |
| return primary | |
| return f"{primary} - {secondary}" | |
| def _parse_general_info(rows: Iterable[Iterable[object]]) -> Dict[str, str]: | |
| info: Dict[str, str] = {} | |
| for row in rows: | |
| cells = list(row) | |
| if not cells: | |
| continue | |
| key = _normalize_text(cells[0]) | |
| if not key: | |
| continue | |
| value = _cell_to_str(cells[1] if len(cells) > 1 else "") | |
| if value: | |
| info[key] = value | |
| return info | |
| def _find_sheet(sheets: Dict[str, object], target: str) -> Optional[object]: | |
| if target in sheets: | |
| return sheets[target] | |
| target_key = _normalize_text(target).replace(" ", "") | |
| for name, sheet in sheets.items(): | |
| key = _normalize_text(name).replace(" ", "") | |
| if target_key and target_key in key: | |
| return sheet | |
| return None | |
| def _parse_headings(rows: Iterable[Iterable[object]]) -> List[Dict[str, str]]: | |
| headings: List[Dict[str, str]] = [] | |
| rows = [list(row) for row in rows] | |
| if not rows: | |
| return headings | |
| header_row_index: Optional[int] = None | |
| number_idx: Optional[int] = None | |
| name_idx: Optional[int] = None | |
| for idx, row in enumerate(rows[:5]): | |
| headers = [_normalize_text(cell) for cell in row] | |
| for col_idx, header in enumerate(headers): | |
| if "heading number" in header or header == "number": | |
| number_idx = col_idx | |
| if "heading name" in header or header == "name": | |
| name_idx = col_idx | |
| if number_idx is not None or name_idx is not None: | |
| header_row_index = idx | |
| break | |
| start_index = (header_row_index + 1) if header_row_index is not None else 1 | |
| for row in rows[start_index:]: | |
| if not any(_cell_to_str(cell) for cell in row): | |
| continue | |
| number = _cell_to_str(row[number_idx]) if number_idx is not None and number_idx < len(row) else "" | |
| name = _cell_to_str(row[name_idx]) if name_idx is not None and name_idx < len(row) else "" | |
| if not number and not name: | |
| if len(row) >= 2: | |
| number = _cell_to_str(row[0]) | |
| name = _cell_to_str(row[1]) | |
| combined = _cell_to_str(row[0] if row else "") | |
| match = re.match(r"^(\\d+)\\s*[-–.]?\\s*(.+)$", combined) | |
| if match: | |
| number = match.group(1) | |
| name = match.group(2) | |
| if number or name: | |
| headings.append({"number": number, "name": name}) | |
| return headings | |
| def _header_map(headers: List[str]) -> Dict[str, List[int]]: | |
| mapping: Dict[str, List[int]] = {} | |
| for idx, raw in enumerate(headers): | |
| name = _normalize_text(raw) | |
| if not name: | |
| continue | |
| mapping.setdefault(name, []).append(idx) | |
| compact = re.sub(r"[^a-z0-9]", "", name) | |
| if compact and compact != name: | |
| mapping.setdefault(compact, []).append(idx) | |
| return mapping | |
| def _clean_image_ref(value: str) -> str: | |
| text = str(value or "").strip().strip(" \t\r\n'\"[](){}") | |
| if not text: | |
| return "" | |
| text = text.replace("\\", "/").split("/")[-1] | |
| text = IMAGE_REF_PREFIX_RE.sub("", text).strip() | |
| return text | |
| def _extract_image_names(value: str, *, allow_stem_without_ext: bool) -> List[str]: | |
| if not value: | |
| return [] | |
| found: List[str] = [] | |
| for section in IMAGE_REF_SPLIT_RE.split(str(value)): | |
| chunks = [section] | |
| if "," in section: | |
| chunks = section.split(",") | |
| for chunk in chunks: | |
| cleaned = _clean_image_ref(chunk) | |
| if not cleaned: | |
| continue | |
| explicit = IMAGE_NAME_RE.findall(cleaned) | |
| if explicit: | |
| for match in explicit: | |
| candidate = _clean_image_ref(match) | |
| if candidate and candidate not in found: | |
| found.append(candidate) | |
| continue | |
| if allow_stem_without_ext and re.search(r"[A-Za-z0-9]", cleaned): | |
| if cleaned not in found: | |
| found.append(cleaned) | |
| return found | |
| def _find_reference_value(cells: List[object]) -> str: | |
| dotted_ref = re.compile(r"^\d+(?:\.\d+)+[a-z]?$", re.IGNORECASE) | |
| numeric_ref = re.compile(r"^\d+$") | |
| for cell in cells: | |
| value = _cell_to_str(cell) | |
| if value and dotted_ref.match(value): | |
| return value | |
| if cells: | |
| first_value = _cell_to_str(cells[0]) | |
| if numeric_ref.match(first_value): | |
| return first_value | |
| return "" | |
| def _image_column_indices(headers: List[str]) -> Dict[int, int]: | |
| indices: Dict[int, int] = {} | |
| for idx, raw in enumerate(headers): | |
| name = _normalize_text(raw).replace(" ", "") | |
| if not name: | |
| continue | |
| match = re.search(r"(image|img)(name)?(\\d+)", name) | |
| if not match: | |
| continue | |
| try: | |
| number = int(match.group(3)) | |
| except ValueError: | |
| continue | |
| if 1 <= number <= 6 and number not in indices: | |
| indices[number] = idx | |
| return indices | |
| def _row_value(row: List[object], index: Optional[int]) -> str: | |
| if index is None: | |
| return "" | |
| if index >= len(row): | |
| return "" | |
| return _cell_to_str(row[index]) | |
| def _parse_items(rows: Iterable[Iterable[object]]) -> List[Dict[str, str | List[str]]]: | |
| rows = list(rows) | |
| if not rows: | |
| return [] | |
| headers = [_cell_to_str(cell) for cell in list(rows[0])] | |
| mapping = _header_map(headers) | |
| image_indices = _image_column_indices(headers) | |
| def indices_for(name: str) -> List[int]: | |
| return mapping.get(_normalize_text(name)) or [] | |
| def first_index(name: str) -> Optional[int]: | |
| values = indices_for(name) | |
| return values[0] if values else None | |
| def image_index(n: int) -> Optional[int]: | |
| return image_indices.get(n) or first_index(f"image name {n}") or first_index( | |
| f"image {n}" | |
| ) | |
| items: List[Dict[str, str | List[str]]] = [] | |
| ref_index = first_index("ref") or first_index("reference") | |
| area_index = ( | |
| first_index("area") | |
| or first_index("heading name") | |
| or first_index("heading") | |
| ) | |
| for row in rows[1:]: | |
| cells = list(row) | |
| if not any(_cell_to_str(cell) for cell in cells): | |
| continue | |
| item_desc_candidates = [ | |
| _row_value(cells, idx) for idx in indices_for("item description") | |
| ] | |
| item_desc = max(item_desc_candidates, key=len) if item_desc_candidates else "" | |
| condition_desc = _row_value(cells, first_index("condition description")) | |
| if condition_desc and condition_desc not in item_desc: | |
| item_desc = " - ".join( | |
| [value for value in [item_desc, condition_desc] if value] | |
| ) | |
| reference = _row_value(cells, ref_index) | |
| if not reference: | |
| reference = _find_reference_value(cells) | |
| action_type = _row_value(cells, first_index("action type")) | |
| required_action_candidates = [ | |
| _row_value(cells, idx) for idx in indices_for("required action") | |
| ] | |
| required_action = ( | |
| max(required_action_candidates, key=len) | |
| if required_action_candidates | |
| else "" | |
| ) | |
| if action_type and action_type not in required_action: | |
| required_action = " - ".join( | |
| [value for value in [action_type, required_action] if value] | |
| ) | |
| figure_caption_candidates = [ | |
| _row_value(cells, idx) for idx in indices_for("figure caption") | |
| ] | |
| figure_caption = ( | |
| max(figure_caption_candidates, key=len) | |
| if figure_caption_candidates | |
| else "" | |
| ) | |
| figure_description = _row_value(cells, first_index("figure description")) | |
| if figure_description and figure_description not in figure_caption: | |
| figure_caption = " - ".join( | |
| [value for value in [figure_caption, figure_description] if value] | |
| ) | |
| image_names: List[str] = [] | |
| for number in range(1, 7): | |
| raw_value = _row_value(cells, image_index(number)) | |
| if not raw_value: | |
| continue | |
| for candidate in _extract_image_names( | |
| raw_value, allow_stem_without_ext=True | |
| ): | |
| if candidate in image_names: | |
| continue | |
| image_names.append(candidate) | |
| if len(image_names) >= 6: | |
| break | |
| if len(image_names) >= 6: | |
| break | |
| if len(image_names) < 2: | |
| for cell in cells: | |
| value = _cell_to_str(cell) | |
| if not value: | |
| continue | |
| candidates = _extract_image_names( | |
| value, allow_stem_without_ext=False | |
| ) | |
| for candidate in candidates: | |
| if candidate in image_names: | |
| continue | |
| image_names.append(candidate) | |
| if len(image_names) >= 6: | |
| break | |
| if len(image_names) >= 6: | |
| break | |
| items.append( | |
| { | |
| "reference": reference, | |
| "area": _row_value(cells, area_index), | |
| "functional_location": _row_value( | |
| cells, first_index("functional location") or first_index("location") | |
| ), | |
| "item_description": item_desc, | |
| "category": _row_value(cells, first_index("category")), | |
| "priority": _row_value(cells, first_index("priority")), | |
| "required_action": required_action, | |
| "figure_caption": figure_caption, | |
| "image_names": [name for name in image_names if name], | |
| } | |
| ) | |
| return items | |
| def _parse_csv(path: Path) -> Dict[str, object]: | |
| with path.open("r", encoding="utf-8-sig", newline="") as handle: | |
| reader = csv.reader(handle) | |
| rows = list(reader) | |
| return { | |
| "general": {}, | |
| "headings": {}, | |
| "items": _parse_items(rows), | |
| } | |
| def _parse_excel(path: Path) -> Dict[str, object]: | |
| workbook = load_workbook(path, data_only=True) | |
| sheets = {sheet.title.strip().lower(): sheet for sheet in workbook.worksheets} | |
| general_sheet = _find_sheet(sheets, GENERAL_SHEET) | |
| headings_sheet = _find_sheet(sheets, HEADINGS_SHEET) | |
| items_sheet = _find_sheet(sheets, ITEMS_SHEET) or _find_sheet(sheets, ITEMS_SHEET_ALT) | |
| general = ( | |
| _parse_general_info(general_sheet.values) if general_sheet else {} | |
| ) | |
| headings = _parse_headings(headings_sheet.values) if headings_sheet else {} | |
| items = _parse_items(items_sheet.values) if items_sheet else [] | |
| return { | |
| "general": general, | |
| "headings": headings, | |
| "items": items, | |
| } | |
| def _parse_xls(path: Path) -> Dict[str, object]: | |
| workbook = xlrd.open_workbook(path) | |
| sheets = {sheet.name.strip().lower(): sheet for sheet in workbook.sheets()} | |
| def sheet_rows(sheet: xlrd.sheet.Sheet) -> Iterable[List[object]]: | |
| for row_idx in range(sheet.nrows): | |
| yield sheet.row_values(row_idx) | |
| general_sheet = _find_sheet(sheets, GENERAL_SHEET) | |
| headings_sheet = _find_sheet(sheets, HEADINGS_SHEET) | |
| items_sheet = _find_sheet(sheets, ITEMS_SHEET) or _find_sheet(sheets, ITEMS_SHEET_ALT) | |
| general = _parse_general_info(sheet_rows(general_sheet)) if general_sheet else {} | |
| headings = _parse_headings(sheet_rows(headings_sheet)) if headings_sheet else {} | |
| items = _parse_items(sheet_rows(items_sheet)) if items_sheet else [] | |
| return { | |
| "general": general, | |
| "headings": headings, | |
| "items": items, | |
| } | |
| def _normalize_key(value: str) -> str: | |
| text = str(value or "").strip().replace("\\", "/").split("/")[-1] | |
| if not text: | |
| return "" | |
| text = unicodedata.normalize("NFKD", text) | |
| text = "".join(ch for ch in text if not unicodedata.combining(ch)) | |
| text = re.sub(r"\s+", " ", text).strip().lower() | |
| return re.sub(r"[^a-z0-9]", "", text) | |
| def _normalize_name(name: str) -> str: | |
| return _normalize_key(Path(name).name) | |
| def _normalize_stem(name: str) -> str: | |
| normalized = str(name or "").replace("\\", "/").split("/")[-1].strip() | |
| if not normalized: | |
| return "" | |
| suffix = Path(normalized).suffix.lower() | |
| if suffix in IMAGE_EXTENSIONS: | |
| normalized = normalized[: -len(suffix)] | |
| return _normalize_key(normalized) | |
| def _add_lookup_value(mapping: Dict[str, Set[str]], key: str, file_id: str) -> None: | |
| if not key: | |
| return | |
| values = mapping.setdefault(key, set()) | |
| values.add(file_id) | |
| def _build_photo_lookup(uploads: List[dict]) -> PhotoLookup: | |
| exact: Dict[str, Set[str]] = {} | |
| stem: Dict[str, Set[str]] = {} | |
| for item in uploads: | |
| name = item.get("name") or "" | |
| file_id = item.get("id") | |
| if not name or not file_id: | |
| continue | |
| _add_lookup_value(exact, _normalize_name(name), file_id) | |
| _add_lookup_value(stem, _normalize_stem(name), file_id) | |
| return PhotoLookup(by_exact=exact, by_stem=stem) | |
| def _resolve_photo_id(name: str, lookup: PhotoLookup) -> Optional[str]: | |
| name = str(name or "").strip() | |
| if not name: | |
| return None | |
| exact_key = _normalize_name(name) | |
| stem_key = _normalize_stem(name) | |
| has_suffix = Path(str(name).replace("\\", "/").split("/")[-1]).suffix.lower() in IMAGE_EXTENSIONS | |
| exact_matches = lookup.by_exact.get(exact_key, set()) | |
| stem_matches = lookup.by_stem.get(stem_key, set()) | |
| if has_suffix and len(exact_matches) == 1: | |
| return next(iter(exact_matches)) | |
| if len(stem_matches) == 1: | |
| return next(iter(stem_matches)) | |
| if not has_suffix and len(exact_matches) == 1: | |
| return next(iter(exact_matches)) | |
| return None | |
| def _collect_image_refs(names: List[str]) -> List[str]: | |
| refs: List[str] = [] | |
| for raw in names: | |
| for candidate in _extract_image_names( | |
| str(raw), allow_stem_without_ext=True | |
| ): | |
| if candidate and candidate not in refs: | |
| refs.append(candidate) | |
| return refs | |
| def _photo_ids_for_names(names: List[str], lookup: PhotoLookup) -> Tuple[List[str], List[str], List[str]]: | |
| refs = _collect_image_refs(names) | |
| ids: List[str] = [] | |
| unresolved: List[str] = [] | |
| for ref in refs: | |
| resolved = _resolve_photo_id(ref, lookup) | |
| if resolved: | |
| if resolved not in ids: | |
| ids.append(resolved) | |
| elif ref not in unresolved: | |
| unresolved.append(ref) | |
| return ids, unresolved, refs | |
| def populate_session_from_data_files( | |
| store: SessionStore, session: dict | |
| ) -> dict: | |
| data_files = session.get("uploads", {}).get("data_files", []) or [] | |
| if not data_files: | |
| return session | |
| def score(file_meta: dict) -> int: | |
| name = (file_meta.get("name") or "").lower() | |
| if name.endswith((".xlsx", ".xlsm", ".xls")): | |
| return 2 | |
| if name.endswith(".csv"): | |
| return 1 | |
| return 0 | |
| target = sorted(data_files, key=score, reverse=True)[0] | |
| path = store.resolve_upload_path(session, target.get("id", "")) | |
| if not path or not path.exists(): | |
| return session | |
| ext = path.suffix.lower() | |
| if ext in {".xlsx", ".xlsm"}: | |
| parsed = _parse_excel(path) | |
| elif ext == ".xls": | |
| parsed = _parse_xls(path) | |
| elif ext == ".csv": | |
| parsed = _parse_csv(path) | |
| else: | |
| return session | |
| general = parsed.get("general") or {} | |
| headings = parsed.get("headings") or [] | |
| items = parsed.get("items") or [] | |
| # Update session-wide fields if provided | |
| document_no = general.get("document no") or general.get("document number") or "" | |
| if document_no: | |
| session["document_no"] = document_no | |
| inspection_date = general.get("inspection date") | |
| if inspection_date: | |
| session["inspection_date"] = inspection_date | |
| photo_lookup = _build_photo_lookup( | |
| session.get("uploads", {}).get("photos", []) or [] | |
| ) | |
| if isinstance(headings, dict): | |
| headings = [{"number": key, "name": value} for key, value in headings.items()] | |
| session["headings"] = headings if isinstance(headings, list) else [] | |
| sections: List[dict] = [] | |
| selected_photo_ids: List[str] = [] | |
| for idx, item in enumerate(items): | |
| company_logo = ( | |
| general.get("client logo image name") | |
| or general.get("client logo") | |
| or general.get("company logo") | |
| or "" | |
| ) | |
| template = { | |
| "inspection_date": inspection_date or session.get("inspection_date", ""), | |
| "inspector": general.get("inspector", ""), | |
| "document_no": document_no or session.get("document_no", ""), | |
| "company_logo": company_logo, | |
| "reference": item.get("reference", ""), | |
| "area": item.get("area", ""), | |
| "functional_location": item.get("functional_location", ""), | |
| "item_description": item.get("item_description", ""), | |
| "category": item.get("category", ""), | |
| "priority": item.get("priority", ""), | |
| "required_action": item.get("required_action", ""), | |
| "figure_caption": item.get("figure_caption", ""), | |
| } | |
| image_names = item.get("image_names", []) or [] | |
| photo_ids, unresolved_refs, normalized_refs = _photo_ids_for_names( | |
| image_names, photo_lookup | |
| ) | |
| template["image_name_refs"] = normalized_refs | |
| if unresolved_refs: | |
| template["unresolved_image_refs"] = unresolved_refs | |
| for photo_id in photo_ids: | |
| if photo_id not in selected_photo_ids: | |
| selected_photo_ids.append(photo_id) | |
| page = { | |
| "items": [], | |
| "template": template, | |
| "photo_ids": photo_ids, | |
| "page_template": "repex:standard", | |
| "blank": False, | |
| "variant": "full", | |
| } | |
| title = item.get("reference") or item.get("area") or f"Section {idx + 1}" | |
| sections.append({"id": None, "title": title, "pages": [page]}) | |
| if sections: | |
| if selected_photo_ids: | |
| session["selected_photo_ids"] = selected_photo_ids | |
| store.set_sections(session, sections) | |
| return session | |
| def reconcile_session_image_links(store: SessionStore, session: dict) -> dict: | |
| uploads = session.get("uploads", {}).get("photos", []) or [] | |
| if not uploads: | |
| return session | |
| lookup = _build_photo_lookup(uploads) | |
| sections = store.ensure_sections(session) | |
| changed = False | |
| selected_photo_ids = list(session.get("selected_photo_ids") or []) | |
| for section in sections: | |
| pages = section.get("pages") or [] | |
| for page in pages: | |
| template = page.get("template") | |
| if not isinstance(template, dict): | |
| continue | |
| refs_raw = template.get("image_name_refs") or [] | |
| refs: List[str] | |
| if isinstance(refs_raw, str): | |
| refs = _collect_image_refs([refs_raw]) | |
| elif isinstance(refs_raw, list): | |
| refs = _collect_image_refs([str(value) for value in refs_raw if value]) | |
| else: | |
| refs = [] | |
| if not refs: | |
| continue | |
| resolved_ids, unresolved_refs, normalized_refs = _photo_ids_for_names( | |
| refs, lookup | |
| ) | |
| existing = [value for value in (page.get("photo_ids") or []) if isinstance(value, str) and value] | |
| merged = existing + [photo_id for photo_id in resolved_ids if photo_id not in existing] | |
| if merged != existing: | |
| page["photo_ids"] = merged | |
| changed = True | |
| for photo_id in merged: | |
| if photo_id not in selected_photo_ids: | |
| selected_photo_ids.append(photo_id) | |
| if template.get("image_name_refs") != normalized_refs: | |
| template["image_name_refs"] = normalized_refs | |
| changed = True | |
| if unresolved_refs: | |
| if template.get("unresolved_image_refs") != unresolved_refs: | |
| template["unresolved_image_refs"] = unresolved_refs | |
| changed = True | |
| elif "unresolved_image_refs" in template: | |
| template.pop("unresolved_image_refs", None) | |
| changed = True | |
| if selected_photo_ids != list(session.get("selected_photo_ids") or []): | |
| session["selected_photo_ids"] = selected_photo_ids | |
| changed = True | |
| if changed: | |
| store.update_session(session) | |
| return session | |