import os import re import json import zipfile from io import BytesIO from typing import Dict, Any, Optional from collections import defaultdict import cv2 import fitz # PyMuPDF import numpy as np import pandas as pd import requests import streamlit as st import base64 API_KEY = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") API_URL = ( "https://generativelanguage.googleapis.com/v1beta/" "models/gemini-2.5-flash-preview-09-2025:generateContent?key=" f"{API_KEY}" if API_KEY else None ) SCHEMA = { "type": "OBJECT", "properties": { "material_name": {"type": "STRING"}, "material_abbreviation": {"type": "STRING"}, "trade_grade": { "type": "STRING", "description": "Commercial or trade grade name of the material; '' if not provided", }, "manufacturer": { "type": "STRING", "description": "Company or organization producing the material; '' if not provided", }, "mechanical_properties": { "type": "ARRAY", "items": { "type": "OBJECT", "properties": { "section": {"type": "STRING"}, "property_name": {"type": "STRING"}, "value": {"type": "STRING"}, "unit": {"type": "STRING"}, "english": {"type": "STRING"}, "test_condition": {"type": "STRING"}, "comments": {"type": "STRING"}, }, "required": [ "section", "property_name", "value", "english", "comments", ], }, }, }, } DPI = 300 CAP_RE = re.compile(r"^(Fig\.?\s*\d+|Figure\s*\d+)\b", re.IGNORECASE) def make_abbreviation(name: str) -> str: if not name: return "UNKNOWN" words = name.split() abbr = "".join(w[0] for w in words if w and w[0].isalpha()).upper() return abbr or name[:6].upper() def call_gemini_from_bytes(pdf_bytes: bytes, filename: str) -> Optional[Dict[str, Any]]: if not API_KEY or not API_URL: st.error("Missing Gemini API key. Set GEMINI_API_KEY in environment variables.") return None try: encoded_file = base64.b64encode(pdf_bytes).decode("utf-8") mime_type = "application/pdf" except Exception as exc: st.error(f"Error encoding PDF: {exc}") return None prompt = ( "You are an expert materials scientist. From the attached PDF, extract:\n" "- material_name (generic material, e.g., isotactic polypropylene)\n" "- material_abbreviation\n" "- trade_grade (commercial or trade name; write '' if not provided)\n" "- manufacturer (company or organization producing the material; write '' if not provided)\n\n" "Extract ALL properties across categories (Mechanical, Thermal, Electrical, Physical, " "Optical, Rheological, etc.) and return them as 'mechanical_properties' (a single list).\n\n" "For each property, you MUST extract:\n" "- property_name\n" "- value (or range)\n" "- unit\n" "- english (converted or alternate units, e.g., psi, °F, inches; write '' if not provided)\n" "- test_condition\n" "- comments (include any notes, footnotes, standards, remarks; write '' if none)\n\n" "All fields including english and comments are REQUIRED.\n" "Respond ONLY with valid JSON following the schema." ) payload = { "contents": [ { "parts": [ {"text": prompt}, {"inlineData": {"mimeType": mime_type, "data": encoded_file}}, ] } ], "generationConfig": { "temperature": 0, "responseMimeType": "application/json", "responseSchema": SCHEMA, }, } try: response = requests.post(API_URL, json=payload, timeout=300) response.raise_for_status() data = response.json() candidates = data.get("candidates", []) if not candidates: return None parts = candidates[0].get("content", {}).get("parts", []) json_text = None for part in parts: text = part.get("text", "") if text.strip().startswith("{"): json_text = text break return json.loads(json_text) if json_text else None except Exception as exc: st.error(f"Gemini API Error: {exc}") return None def convert_to_dataframe(data: Dict[str, Any]) -> pd.DataFrame: mat_name = data.get("material_name", "") or "" mat_abbr = data.get("material_abbreviation", "") or "" trade_grade = data.get("trade_grade", "") or "" manufacturer = data.get("manufacturer", "") or "" if not mat_abbr: mat_abbr = make_abbreviation(mat_name) rows = [] for item in data.get("mechanical_properties", []): rows.append( { "material_name": mat_name, "material_abbreviation": mat_abbr, "trade_grade": trade_grade, "manufacturer": manufacturer, "section": item.get("section", "") or "Mechanical", "property_name": item.get("property_name", "") or "Unknown property", "value": item.get("value", "") or "N/A", "unit": item.get("unit", "") or "", "english": item.get("english", "") or "", "test_condition": item.get("test_condition", "") or "", "comments": item.get("comments", "") or "", } ) return pd.DataFrame(rows) def get_page_image(page): pix = page.get_pixmap(matrix=fitz.Matrix(DPI / 72, DPI / 72)) img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, 3) return cv2.cvtColor(img, cv2.COLOR_RGB2BGR) def is_valid_plot_geometry(binary_crop): height, width = binary_crop.shape if height < 100 or width < 100: return False ink_density = cv2.countNonZero(binary_crop) / (width * height) if ink_density > 0.35: return False h_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (width // 4, 1)) v_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, height // 4)) has_h = cv2.countNonZero(cv2.erode(binary_crop, h_kernel, iterations=1)) > 0 has_v = cv2.countNonZero(cv2.erode(binary_crop, v_kernel, iterations=1)) > 0 return has_h or has_v def merge_boxes(rects): if not rects: return [] rects = sorted(rects, key=lambda r: r[2] * r[3], reverse=True) merged = [] for rect in rects: rx, ry, rw, rh = rect if not any( rx >= m[0] - 15 and ry >= m[1] - 15 and rx + rw <= m[0] + m[2] + 15 and ry + rh <= m[1] + m[3] + 15 for m in merged ): merged.append(rect) return merged def extract_images(pdf_doc): grouped_data = defaultdict(lambda: {"page": 0, "image_data": []}) padding = 30 for page_num, page in enumerate(pdf_doc, start=1): img_bgr = get_page_image(page) gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) _, binary = cv2.threshold(gray, 225, 255, cv2.THRESH_BINARY_INV) kernel = np.ones((10, 10), np.uint8) dilated = cv2.dilate(binary, kernel, iterations=1) contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) candidates = [] page_h, page_w = gray.shape for cnt in contours: x, y, w, h = cv2.boundingRect(cnt) if 0.03 < (w * h) / (page_w * page_h) < 0.8: if is_valid_plot_geometry(binary[y : y + h, x : x + w]): candidates.append((x, y, w, h)) final_rects = merge_boxes(candidates) blocks = page.get_text("blocks") for (cx, cy, cw, ch) in final_rects: best_caption = f"Figure on Page {page_num} (Unlabeled)" min_dist = float("inf") for block in blocks: text = block[4].strip() if CAP_RE.match(text): cap_y = block[1] * (DPI / 72) dist = cap_y - (cy + ch) if 0 < dist < (page_h * 0.3) and dist < min_dist: best_caption = text.replace("\n", " ") min_dist = dist x1, y1 = max(0, cx - padding), max(0, cy - padding) x2, y2 = min(page_w, cx + cw + padding), min(page_h, cy + ch + padding) crop = img_bgr[int(y1) : int(y2), int(x1) : int(x2)] _, buffer = cv2.imencode(".png", crop) img_bytes = buffer.tobytes() fname = f"pg{page_num}_{cx}_{cy}.png" grouped_data[best_caption]["page"] = page_num grouped_data[best_caption]["image_data"].append( {"filename": fname, "bytes": img_bytes, "array": crop} ) return [ {"caption": key, "page": value["page"], "image_data": value["image_data"]} for key, value in grouped_data.items() ] def create_zip(results, include_json=True): buf = BytesIO() with zipfile.ZipFile(buf, "w") as zf: if include_json: json_data = [ {"caption": item["caption"], "page": item["page"], "image_count": len(item["image_data"])} for item in results ] zf.writestr("plot_data.json", json.dumps(json_data, indent=4)) for item in results: for img_data in item["image_data"]: zf.writestr(img_data["filename"], img_data["bytes"]) buf.seek(0) return buf.getvalue() def match_caption_to_property(caption: str, property_name: str) -> bool: caption_lower = caption.lower() prop_lower = property_name.lower() if prop_lower in caption_lower: return True keyword_map = { "tensile modulus": ["tensile", "modulus", "young", "elastic"], "tensile strength": ["tensile", "strength", "ultimate"], "elongation at break": ["elongation", "strain", "break"], "glass transition temperature": ["glass transition", "tg", "transition"], "melting temperature": ["melting", "tm", "melt"], "density": ["density", "specific gravity"], "impact strength": ["impact", "izod", "charpy"], "flexural modulus": ["flexural", "bending", "flex"], "stress": ["stress", "strain"], "thermal": ["thermal", "temperature", "heat"], "crystallinity": ["crystallinity", "crystalline", "xrd"], } for prop_key, keywords in keyword_map.items(): if prop_key in prop_lower and any(kw in caption_lower for kw in keywords): return True prop_words = set(prop_lower.replace("(", "").replace(")", "").split()) caption_words = set(caption_lower.replace("(", "").replace(")", "").split()) common_words = prop_words & caption_words significant_words = common_words - {"the", "of", "at", "in", "a", "an"} return len(significant_words) >= 2 def save_matched_images(df: pd.DataFrame, image_results: list, save_dir: str = "images"): os.makedirs(save_dir, exist_ok=True) saved_images = [] if df.empty: return saved_images mat_abbr = df.iloc[0]["material_abbreviation"] properties = df["property_name"].unique() matched_properties = set() for img_result in image_results: caption = img_result["caption"] for prop in properties: if prop in matched_properties: continue if match_caption_to_property(caption, prop): if img_result["image_data"]: first_img = img_result["image_data"][0] filename = f"{mat_abbr}_{prop}.png" filepath = os.path.join(save_dir, filename) cv2.imwrite(filepath, first_img["array"]) saved_images.append({"property": prop, "caption": caption, "path": filepath}) matched_properties.add(prop) break return saved_images def save_single_image_with_property( img_array, mat_abbr: str, property_name: str, save_dir: str = "images" ) -> str: os.makedirs(save_dir, exist_ok=True) filename = f"{mat_abbr}_{property_name}.png" filepath = os.path.join(save_dir, filename) cv2.imwrite(filepath, img_array) return filepath