Spaces:
Sleeping
Sleeping
| import io, os, re | |
| from typing import List, Dict | |
| import streamlit as st | |
| import pandas as pd | |
| # --- PDF text | |
| import pdfplumber | |
| from pypdf import PdfReader | |
| # --- OCR | |
| from pdf2image import convert_from_bytes | |
| import pytesseract | |
| from PIL import Image | |
| # ====================================================================== | |
| # SCHEMA TABELLA | |
| # ====================================================================== | |
| SCHEMA = [ | |
| "Piece","SKU","Title","Capacity","% Recycled","Weight","Color","Material / Resin","Class","Source File", | |
| "Component","Function","General description of the packaging","Material Ref GCAS","Material Family" | |
| ] | |
| # ====================================================================== | |
| # FUNZIONI BASE PDF / OCR | |
| # ====================================================================== | |
| def extract_text_pages(pdf_bytes: bytes) -> List[str]: | |
| try: | |
| with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: | |
| return [p.extract_text() or "" for p in pdf.pages] | |
| except Exception: | |
| pass | |
| try: | |
| reader = PdfReader(io.BytesIO(pdf_bytes)) | |
| return [(p.extract_text() or "") for p in reader.pages] | |
| except Exception: | |
| return [] | |
| def run_ocr(pdf_bytes: bytes, lang: str, dpi: int, tesseract_cmd: str | None) -> List[str]: | |
| if tesseract_cmd: | |
| pytesseract.pytesseract.tesseract_cmd = tesseract_cmd | |
| imgs = convert_from_bytes(pdf_bytes, dpi=dpi) | |
| config = "--psm 6 -c preserve_interword_spaces=1" | |
| return [pytesseract.image_to_string(im, lang=lang, config=config) or "" for im in imgs] | |
| # ====================================================================== | |
| # PARSER CAMPI TESTUALI | |
| # ====================================================================== | |
| SKU_RE = re.compile(r"\b(?:Name|SKU|Part(?:\s*No\.?)?)\s*[:#]?\s*([A-Z0-9\-_/\.]{5,})", re.I) | |
| TITLE_RE = re.compile(r"\bTitle\s*[:\-]\s*(.+)", re.I) | |
| CLASS_RE = re.compile(r"\bClass\s*([A-Za-z ]+)", re.I) | |
| def _first(text, pat): | |
| m = pat.search(text or "") | |
| return m.group(1).strip() if m else "" | |
| def capacity_from(t): | |
| m = re.search(r"([0-9]+(?:[.,][0-9]+)?)\s*(L|Liter|ml|mL)\b", t or "", re.I) | |
| if not m: return "" | |
| return f"{m.group(1).replace(',', '.')} {m.group(2).upper().replace('LITER','L').replace('ML','ml')}" | |
| def color_from(t): | |
| m = re.search(r"(?:Part\s*Color|Color)\s*[:\-]?\s*([A-Z ]{3,})", t, re.I) | |
| if m: return m.group(1).strip() | |
| m = re.search(r"\b([A-Z ]{4,}(?:GREEN|WHITE|BLACK|BLUE|RED|CLEAR)[A-Z ]*)\b", t) | |
| return m.group(1).strip() if m else "" | |
| def material_from(t): | |
| for l in (t or "").splitlines(): | |
| if re.search(r"\bRESIN\b", l, re.I): | |
| return l.strip() | |
| m = re.search(r"(SERIOPLAST.*?RESIN)", t, re.I) | |
| return m.group(1).strip() if m else "" | |
| # ====================================================================== | |
| # WEIGHT PARSER → restituisce esattamente "94±3g" | |
| # ====================================================================== | |
| WEIGHT_VALUE_RE = re.compile( | |
| r"""(?ix) | |
| \bweight\b | |
| [^\n\r]{0,80}? | |
| ( | |
| (?:\d+(?:[.,]\d+)?\s* | |
| (?:±|\+/?-|\+-)\s* | |
| \d+(?:[.,]\d+)?\s* | |
| (?:mg|g|kg)) | |
| | | |
| (?:\d+(?:[.,]\d+)?\s*(?:mg|g|kg)) | |
| ) | |
| """, | |
| ) | |
| def _normalize_weight(s: str) -> str: | |
| s = (s or "").strip() | |
| s = s.replace(" ", "") | |
| s = s.replace("+/-", "±").replace("+-", "±") | |
| s = s.replace(",", ".") | |
| return s | |
| def weight_from(t: str) -> str: | |
| if not t: | |
| return "" | |
| m = WEIGHT_VALUE_RE.search(t) | |
| if m: | |
| return _normalize_weight(m.group(1)) | |
| for line in (t or "").splitlines(): | |
| if "weight" in line.lower(): | |
| m2 = re.search( | |
| r"(?ix)\bweight\b[^\n\r]*?((?:\d+(?:[.,]\d+)?\s*(?:±|\+/?-|\+-)\s*\d+(?:[.,]\d+)?\s*(?:mg|g|kg))|(?:\d+(?:[.,]\d+)?\s*(?:mg|g|kg)))", | |
| line, | |
| ) | |
| if m2: | |
| return _normalize_weight(m2.group(1)) | |
| return "" | |
| # ====================================================================== | |
| # ALTRE FUNZIONI | |
| # ====================================================================== | |
| _ALLOWED_PIECES = { | |
| "ribbon":"ribbon","bottle":"bottle","film bundle":"film bundle","container":"container", | |
| "label - adhesive":"LABEL - ADHESIVE","label adhesive":"LABEL - ADHESIVE","label-adhesive":"LABEL - ADHESIVE", | |
| "label - back":"LABEL - BACK","back label":"LABEL - BACK","label back":"LABEL - BACK","closure":"CLOSURE" | |
| } | |
| _PACK_COMP_TYPE_RE = re.compile(r"Packaging\s+Component\s+Type\s*[:\-]?\s*([^\n\r]+)", re.I) | |
| def _normalize_piece(s): | |
| s2 = re.sub(r"\s+", " ", (s or "").strip().lower()) | |
| for k,v in _ALLOWED_PIECES.items(): | |
| if k in s2: return v | |
| return "" | |
| def piece_from(t,cls): | |
| m = _PACK_COMP_TYPE_RE.search(t or "") | |
| if m: | |
| val = _normalize_piece(m.group(1)) | |
| if val: return val | |
| if cls: | |
| if "bottle" in cls.lower(): return "bottle" | |
| if "cap" in cls.lower(): return "CLOSURE" | |
| if "corrugated" in cls.lower(): return "container" | |
| if "label" in cls.lower(): return "LABEL - BACK" | |
| return "" | |
| FUNCTION_RE = re.compile(r"\b(Primary|Secondary(?:\s*or\s*Tertiary)?|Tertiary)\b", re.I) | |
| def component_from(t,piece,cls): | |
| txt = t.lower() | |
| if "label" in txt: return "Labels" | |
| if piece: return piece | |
| if cls and "bottle" in cls.lower(): return "Bottle" | |
| return "" | |
| def function_from(t): | |
| m = FUNCTION_RE.search(t or "") | |
| return m.group(1).title() if m else "" | |
| def material_ref_gcas_from(t): | |
| m = re.findall(r"\b(\d{7,9})\b", t or "") | |
| return ", ".join(sorted(set(m))) if m else "" | |
| def material_family_from(t): | |
| fams = ["Monolayer HDPE","Polypropylene (PP)","Paper","Rigid Paper – Corrugated Case"] | |
| for f in fams: | |
| if f.lower() in (t or "").lower(): return f | |
| if re.search(r"\bHDPE\b", t): return "Monolayer HDPE" | |
| if re.search(r"\bPP\b", t, re.I): return "Polypropylene (PP)" | |
| return "" | |
| # ====================================================================== | |
| # PARSER PRINCIPALE | |
| # ====================================================================== | |
| def parse_record(pages: List[str], source_name: str) -> Dict[str,str]: | |
| full = "\n".join(pages or [""]) | |
| sku = _first(full, SKU_RE) | |
| title = _first(full, TITLE_RE) | |
| cls = _first(full, CLASS_RE) | |
| cap = capacity_from(title) or capacity_from(full) | |
| color = color_from(full) | |
| material = material_from(full) | |
| piece = piece_from(full, cls) | |
| comp = component_from(full, piece, cls) | |
| func = function_from(full) | |
| gcas = material_ref_gcas_from(full) | |
| mfam = material_family_from(full) | |
| wght = weight_from(full) | |
| return { | |
| "Piece": piece or "","SKU": sku or "","Title": title or "", | |
| "Capacity": cap or "","% Recycled": "–","Weight": wght or "–", | |
| "Color": color or "","Material / Resin": material or "","Class": cls or "", | |
| "Source File": source_name,"Component": comp or "","Function": func or "", | |
| "General description of the packaging": "","Material Ref GCAS": gcas or "", | |
| "Material Family": mfam or "" | |
| } | |
| # ====================================================================== | |
| # STREAMLIT UI | |
| # ====================================================================== | |
| st.set_page_config(page_title="PDF → Table (OCR-ready)", layout="wide") | |
| st.title("📄→📊 PDF → Table (OCR-ready)") | |
| st.caption("Estrae automaticamente i campi, incluso il peso dalle immagini OCR.") | |
| with st.sidebar: | |
| files = st.file_uploader("Seleziona PDF", type=["pdf"], accept_multiple_files=True) | |
| st.markdown("---") | |
| st.subheader("OCR") | |
| ocr_fallback = st.checkbox("Usa OCR se non c'è testo", value=True) | |
| ocr_lang = st.text_input("Lingue OCR (comma)", value="eng,ita") | |
| ocr_dpi = st.number_input("DPI OCR", 200, 600, 300, 50) | |
| tess_path = st.text_input("Percorso Tesseract (se non nel PATH)", value="") | |
| run_btn = st.button("▶️ Estrai") | |
| if not run_btn: | |
| st.info("Carica i PDF e premi **Estrai**.") | |
| st.stop() | |
| if not files: | |
| st.warning("Nessun PDF caricato.") | |
| st.stop() | |
| lang = "+".join([p.strip() for p in ocr_lang.split(",") if p.strip()]) or "eng" | |
| tess_cmd = tess_path.strip() or None | |
| rows, errors = [], [] | |
| for up in files: | |
| try: | |
| raw = up.read() | |
| pages = extract_text_pages(raw) | |
| if ocr_fallback and not any((p or "").strip() for p in pages): | |
| pages = run_ocr(raw, lang=lang, dpi=int(ocr_dpi), tesseract_cmd=tess_cmd) | |
| rec = parse_record(pages, up.name) | |
| # se Weight vuoto, prova OCR diretto | |
| if (not rec.get("Weight") or rec["Weight"] == "–") and ocr_fallback: | |
| ocr_pages = run_ocr(raw, lang=lang, dpi=int(ocr_dpi), tesseract_cmd=tess_cmd) | |
| w_ocr = weight_from("\n".join(ocr_pages)) | |
| if w_ocr: rec["Weight"] = w_ocr | |
| rows.append(rec) | |
| except Exception as e: | |
| errors.append((up.name, str(e))) | |
| if errors: | |
| with st.expander("Errori"): | |
| for n,e in errors: st.error(f"{n}: {e}") | |
| df = pd.DataFrame(rows, columns=SCHEMA) | |
| st.success(f"Creat{ 'e' if len(df)!=1 else 'a' } {len(df)} riga/e.") | |
| st.dataframe(df,use_container_width=True) | |
| c1,c2 = st.columns(2) | |
| with c1: | |
| st.download_button("⬇️ CSV", df.to_csv(index=False).encode("utf-8"), "table.csv", "text/csv") | |
| with c2: | |
| bio = io.BytesIO() | |
| with pd.ExcelWriter(bio, engine="openpyxl") as xw: | |
| df.to_excel(xw, index=False, sheet_name="data") | |
| st.download_button("⬇️ Excel", bio.getvalue(), "table.xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") | |