Spaces:
Sleeping
Sleeping
| import io, os, re | |
| from typing import List, Dict | |
| import streamlit as st | |
| import pandas as pd | |
| # --- PDF text | |
| import pdfplumber | |
| from pypdf import PdfReader | |
| # --- OCR | |
| from pdf2image import convert_from_bytes | |
| import pytesseract | |
| from PIL import Image | |
| # ====================================================================== | |
| # SCHEMA TABELLA (colonne fisse) | |
| # ====================================================================== | |
| SCHEMA = [ | |
| "Piece","SKU","Title","Capacity","% Recycled","Weight","Color","Material / Resin","Class","Source File", | |
| # nuove colonne | |
| "Component","Function","General description of the packaging","Material Ref GCAS","Material Family" | |
| ] | |
| # ====================================================================== | |
| # ESTRATTORI LOW-LEVEL | |
| # ====================================================================== | |
| def extract_text_pages(pdf_bytes: bytes) -> List[str]: | |
| pages = [] | |
| # 1) pdfplumber | |
| try: | |
| with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: | |
| for p in pdf.pages: | |
| pages.append(p.extract_text() or "") | |
| except Exception: | |
| pages = [] | |
| # 2) pypdf fallback | |
| if not pages or all(not (t or "").strip() for t in pages): | |
| try: | |
| reader = PdfReader(io.BytesIO(pdf_bytes)) | |
| pages = [(p.extract_text() or "") for p in reader.pages] | |
| except Exception: | |
| pages = [] | |
| return pages | |
| def run_ocr(pdf_bytes: bytes, lang: str, dpi: int, tesseract_cmd: str | None) -> List[str]: | |
| if tesseract_cmd: | |
| pytesseract.pytesseract.tesseract_cmd = tesseract_cmd | |
| images = convert_from_bytes(pdf_bytes, dpi=dpi) | |
| texts = [] | |
| config = "--psm 6 -c preserve_interword_spaces=1" | |
| for img in images: | |
| if not isinstance(img, Image.Image): | |
| img = img.convert("RGB") | |
| texts.append(pytesseract.image_to_string(img, lang=lang, config=config) or "") | |
| return texts | |
| # --- OCR rapido SOLO per il peso (prime pagine, DPI bassi, stop appena trovato) | |
| def run_ocr_for_weight(pdf_bytes: bytes, lang: str, tesseract_cmd: str | None, max_pages: int = 2, dpi_weight: int = 200) -> str: | |
| if tesseract_cmd: | |
| pytesseract.pytesseract.tesseract_cmd = tesseract_cmd | |
| images = convert_from_bytes(pdf_bytes, dpi=dpi_weight, first_page=1, last_page=max_pages) | |
| config = "--psm 6 -c preserve_interword_spaces=1" | |
| acc = [] | |
| for img in images: | |
| if not isinstance(img, Image.Image): | |
| img = img.convert("RGB") | |
| txt = pytesseract.image_to_string(img, lang=lang, config=config) or "" | |
| w = weight_from(txt) # definita sotto | |
| if w: | |
| return w | |
| acc.append(txt) | |
| return weight_from("\n".join(acc)) or "" | |
| # ====================================================================== | |
| # PARSING DOMINIO (euristiche/regex leggere) | |
| # ====================================================================== | |
| SKU_RE = re.compile(r"\b(?:Name|SKU|Part(?:\s*No\.?)?)\s*[:#]?\s*([A-Z0-9\-_/\.]{5,})", re.I) | |
| TITLE_RE = re.compile(r"\bTitle\s*[:\-]\s*(.+)", re.I) | |
| CLASS_RE = re.compile(r"\bClass\s*([A-Za-z ]+)", re.I) | |
| def _first(text: str, pattern: re.Pattern, group: int = 1) -> str: | |
| m = pattern.search(text or "") | |
| return m.group(group).strip() if m else "" | |
| def capacity_from(text: str) -> str: | |
| m = re.search(r"([0-9]+(?:[.,][0-9]+)?)\s*(L|Liter|ml|mL)\b", text or "", re.I) | |
| if not m: return "" | |
| unit = m.group(2).upper().replace("LITER","L").replace("ML","ml") | |
| return f"{m.group(1).replace(',', '.')} {unit}" | |
| def color_from(text: str) -> str: | |
| m = re.search(r"(?:Part\s*Color|Color)\s*[:\-]?\s*([A-Z ]{3,})", text, re.I) | |
| if m: return m.group(1).strip() | |
| m = re.search(r"\b([A-Z ]{4,}(?:GREEN|TRANSPARENT|WHITE|BLACK|BLUE|RED|CLEAR)[A-Z ]*)\b", text) | |
| return (m.group(1).strip() if m else "") | |
| def material_from(text: str) -> str: | |
| for line in (text or "").splitlines(): | |
| if re.search(r"\bRESIN\b", line, re.I): | |
| return line.strip() | |
| m = re.search(r"(SERIOPLAST.*?RESIN)", text, re.I) | |
| return m.group(1).strip() if m else "" | |
| # ====================================================================== | |
| # WEIGHT: prendi TUTTA la riga a partire da "Weight ..." e normalizza spazi/OCR | |
| # Esempio: "Weight 9 4 +/- 3 g" -> "Weight 94Β±3g" | |
| # ====================================================================== | |
| WEIGHT_LINE_RE = re.compile(r"(?is)\bweight\b[^\n\r]*") | |
| def _normalize_weight_line(s: str) -> str: | |
| s = (s or "").strip() | |
| # comprimi spazi ripetuti | |
| s = re.sub(r"\s+", " ", s) | |
| # togli spazi interni tra cifre (OCR: "9 4" -> "94") | |
| s = re.sub(r"(?<=\d)\s+(?=\d)", "", s) | |
| # unifica simboli Β± | |
| s = re.sub(r"\+\s*/\s*-\s*|\+\s*-\s*", "Β±", s) | |
| s = s.replace("οΌοΌ", "Β±").replace("οΉ’", "+").replace("οΌ", "-") | |
| # rimuovi spazi attorno a Β± | |
| s = re.sub(r"\s*Β±\s*", "Β±", s) | |
| # rimuovi spazi prima dell'unitΓ | |
| s = re.sub(r"\s+(?=(?:mg|g|kg)\b)", "", s, flags=re.I) | |
| # punti/virgole | |
| s = s.replace(",", ".") | |
| return s | |
| def weight_from(text: str) -> str: | |
| if not text: | |
| return "" | |
| # preferisci la prima riga che contiene anche l'unitΓ | |
| lines = [m.group(0) for m in WEIGHT_LINE_RE.finditer(text)] | |
| for ln in lines: | |
| if re.search(r"\b(?:mg|g|kg)\b", ln, re.I): | |
| return _normalize_weight_line(ln) | |
| # se non trovata unitΓ , restituisci comunque la prima occorrenza normalizzata | |
| return _normalize_weight_line(lines[0]) if lines else "" | |
| # --------------------- PIECE da "Packaging Component Type" --------------------- | |
| _ALLOWED_PIECES = { | |
| "ribbon": "ribbon", | |
| "bottle": "bottle", | |
| "film bundle": "film bundle", | |
| "container": "container", | |
| "label - adhesive": "LABEL - ADHESIVE", | |
| "label adhesive": "LABEL - ADHESIVE", | |
| "label-adhesive": "LABEL - ADHESIVE", | |
| "label - back": "LABEL - BACK", | |
| "back label": "LABEL - BACK", | |
| "label back": "LABEL - BACK", | |
| "closure": "CLOSURE", | |
| } | |
| _PACK_COMP_TYPE_RE = re.compile( | |
| r"Packaging\s+Component\s+Type\s*[:\-]?\s*([^\n\r]+)", re.I | |
| ) | |
| def _normalize_piece(s: str) -> str: | |
| s0 = (s or "").strip() | |
| s1 = re.sub(r"\s+", " ", s0) | |
| s2 = s1.lower() | |
| s2 = s2.replace("β", "-").replace("β", "-") | |
| s2 = s2.replace("label- ", "label ").replace(" -", " - ").strip() | |
| if s2 in _ALLOWED_PIECES: | |
| return _ALLOWED_PIECES[s2] | |
| s2 = s2.replace(" ", " ") | |
| if s2 in _ALLOWED_PIECES: | |
| return _ALLOWED_PIECES[s2] | |
| for key, canon in _ALLOWED_PIECES.items(): | |
| if key in s2: | |
| return canon | |
| return "" | |
| def piece_from(text: str, cls: str) -> str: | |
| m = _PACK_COMP_TYPE_RE.search(text or "") | |
| if m: | |
| val = m.group(1) | |
| normalized = _normalize_piece(val) | |
| if normalized: | |
| return normalized | |
| m2 = re.search(r"Packaging\s*Material\s*Type\s*([^\n]+)", text or "", re.I) | |
| if m2: | |
| seg = m2.group(1) | |
| norm = _normalize_piece(seg) | |
| if norm: | |
| return norm | |
| if cls: | |
| norm = _normalize_piece(cls) | |
| if norm: | |
| return norm | |
| if "bottle" in cls.lower(): | |
| return "bottle" | |
| if "cap" in cls.lower() or "closure" in cls.lower(): | |
| return "CLOSURE" | |
| if "corrugated" in cls.lower(): | |
| return "container" | |
| if "label" in cls.lower(): | |
| return "LABEL - BACK" | |
| return "" | |
| # --- Nuove colonne: euristiche base | |
| FUNCTION_RE = re.compile(r"\b(Primary|Secondary(?:\s*or\s*Tertiary)?|Tertiary)\b", re.I) | |
| def component_from(text: str, piece: str, cls: str) -> str: | |
| txt = text.lower() | |
| if "ink" in txt and "cartridge" in txt: return "Ink cartridge" | |
| if "ink foil" in txt: return "Ink foil" | |
| if "tape" in txt: return "Tape" | |
| if "label" in txt and ("psl" in txt or "wet glue" in txt or "iml" in txt or "htl" in txt): return "Labels" | |
| if "adhesive" in txt or "hot melt" in txt: return "Adhesive" | |
| if "cartonboard" in txt or "sheet" in txt: return "Cartonboard / Sheet" | |
| if "corrugated" in txt or "case" in txt or "outercase" in txt: return "Corrugated box" | |
| if "bundle" in txt: return "Bundle" | |
| if piece: return piece | |
| if cls: | |
| if "bottle" in cls.lower(): return "Bottle" | |
| if "cap" in cls.lower(): return "Closure" | |
| if "corrugated" in cls.lower(): return "Corrugated box" | |
| if "label" in cls.lower(): return "Labels" | |
| return "" | |
| def function_from(text: str) -> str: | |
| m = FUNCTION_RE.search(text or "") | |
| return m.group(1).title() if m else "" | |
| def material_ref_gcas_from(text: str) -> str: | |
| m = re.findall(r"\b(\d{7,9})\b", text or "") | |
| if m: | |
| seen = set(); out=[] | |
| for x in m: | |
| if x not in seen: | |
| seen.add(x); out.append(x) | |
| return ", ".join(out[:3]) | |
| m2 = re.findall(r"\((\d{5,})\s*kg\s*pack\)", text or "", re.I) | |
| if m2: | |
| seen=set(); out=[] | |
| for x in m2: | |
| if x not in seen: | |
| seen.add(x); out.append(x) | |
| return ", ".join(out[:3]) | |
| return "" | |
| def material_family_from(text: str) -> str: | |
| families = [ | |
| "Monolayer HDPE","Polypropylene (PP)","Paper","Flexible Film β Mono non Metallized", | |
| "Flexible - Label PSL WGL IML HTL","Rigid Paper β Corrugated Case", | |
| "Inks and solvents","Hot melt adhesive","Wet Glue Label", | |
| "Coated paper","Wood","Ink foil","Fasson PE 85 TOP White" | |
| ] | |
| t = text or "" | |
| for fam in families: | |
| if fam.lower() in t.lower(): | |
| return fam | |
| if re.search(r"\bHDPE\b", t): return "Monolayer HDPE" | |
| if re.search(r"\bPP\b|\bPolypropylene\b", t, re.I): return "Polypropylene (PP)" | |
| if "corrugated" in t.lower(): return "Rigid Paper β Corrugated Case" | |
| if "paper" in t.lower(): return "Paper" | |
| return "" | |
| def parse_record(pages: List[str], source_name: str) -> Dict[str, str]: | |
| full = "\n".join(pages or [""]) | |
| sku = _first(full, SKU_RE) | |
| title = _first(full, TITLE_RE) | |
| cls = _first(full, CLASS_RE) | |
| cap = capacity_from(title) or capacity_from(full) | |
| color = color_from(full) | |
| material = material_from(full) | |
| piece = piece_from(full, cls) | |
| # nuove colonne | |
| comp = component_from(full, piece, cls) | |
| func = function_from(full) | |
| gcas = material_ref_gcas_from(full) | |
| mfam = material_family_from(full) | |
| # WEIGHT: prendi l'intera riga "Weight ..." | |
| wght = weight_from(full) | |
| return { | |
| "Piece": piece or "", | |
| "SKU": sku or "", | |
| "Title": title or "", | |
| "Capacity": cap or "", | |
| "% Recycled": "β", | |
| "Weight": wght or "β", | |
| "Color": color or "", | |
| "Material / Resin": material or "", | |
| "Class": cls or "", | |
| "Source File": source_name, | |
| "Component": comp or "", | |
| "Function": func or "", | |
| "General description of the packaging": "", | |
| "Material Ref GCAS": gcas or "", | |
| "Material Family": mfam or "" | |
| } | |
| # ====================================================================== | |
| # UI STREAMLIT | |
| # ====================================================================== | |
| st.set_page_config(page_title="PDF β Table (OCR-ready)", layout="wide") | |
| st.title("πβπ PDF β Table (OCR-ready)") | |
| st.caption("Carica PDF (anche scansioni). Compilo la tabella con i campi richiesti; OCR mirato per il peso.") | |
| with st.sidebar: | |
| files = st.file_uploader("Seleziona PDF", type=["pdf"], accept_multiple_files=True) | |
| st.markdown("---") | |
| st.subheader("OCR") | |
| ocr_fallback = st.checkbox("Usa OCR se non c'Γ¨ testo", value=True) | |
| ocr_lang = st.text_input("Lingue OCR (comma)", value="eng,ita") | |
| ocr_dpi = st.number_input("DPI OCR", 200, 600, 300, 50) | |
| tess_path = st.text_input("Percorso Tesseract (se non nel PATH)", value="") | |
| run_btn = st.button("βΆοΈ Estrai") | |
| if not run_btn: | |
| st.info("Carica i PDF e premi **Estrai**.") | |
| st.stop() | |
| if not files: | |
| st.warning("Nessun PDF caricato.") | |
| st.stop() | |
| lang = "+".join([p.strip() for p in ocr_lang.split(",") if p.strip()]) or "eng" | |
| tess_cmd = tess_path.strip() or None | |
| rows, errors = [], [] | |
| for up in files: | |
| try: | |
| raw = up.read() | |
| pages = extract_text_pages(raw) | |
| # Se il PDF non ha testo estraibile, OCR completo una sola volta | |
| if ocr_fallback and not any((p or "").strip() for p in pages): | |
| pages = run_ocr(raw, lang=lang, dpi=int(ocr_dpi), tesseract_cmd=tess_cmd) | |
| rec = parse_record(pages, up.name) | |
| # Se Weight Γ¨ vuoto, OCR rapido (prime pagine) e stop appena trovato | |
| if (not rec.get("Weight") or rec["Weight"] == "β") and ocr_fallback: | |
| w_ocr = run_ocr_for_weight(raw, lang=lang, tesseract_cmd=tess_cmd, max_pages=2, dpi_weight=200) | |
| if w_ocr: | |
| rec["Weight"] = w_ocr | |
| rows.append(rec) | |
| except Exception as e: | |
| errors.append((up.name, str(e))) | |
| if errors: | |
| with st.expander("Errori"): | |
| for name, err in errors: | |
| st.error(f"{name}: {err}") | |
| df = pd.DataFrame(rows, columns=SCHEMA) | |
| st.success(f"Creat{ 'e' if len(df)!=1 else 'a' } {len(df)} riga/e.") | |
| st.dataframe(df, use_container_width=True) | |
| c1, c2 = st.columns(2) | |
| with c1: | |
| st.download_button("β¬οΈ CSV", df.to_csv(index=False).encode("utf-8"), "table.csv", "text/csv") | |
| with c2: | |
| bio = io.BytesIO() | |
| with pd.ExcelWriter(bio, engine="openpyxl") as xw: | |
| df.to_excel(xw, index=False, sheet_name="data") | |
| st.download_button("β¬οΈ Excel", bio.getvalue(), "table.xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") | |