import os import json import tempfile import base64 import fitz # PyMuPDF import pandas as pd import streamlit as st from data_loader import insert_material_rows from page_files.categorized.Backend.upload_backend import ( call_gemini_from_bytes, convert_to_dataframe, create_zip, extract_images, save_matched_images, save_single_image_with_property, ) def inject_upload_page_styles(): st.markdown( """ """, unsafe_allow_html=True, ) def render_top_bar(): logo_html = "" try: with open("logo.png", "rb") as fh: logo_b64 = base64.b64encode(fh.read()).decode() logo_html = f"AIM" except Exception: logo_html = "" st.markdown( f"
{logo_html}AIM Composites
", unsafe_allow_html=True, ) def input_form(): property_categories = { "Polymer": [ "Thermal", "Mechanical", "Processing", "Physical", "Descriptive", ], "Fiber": [ "Mechanical", "Physical", "Thermal", "Descriptive", ], "Composite": [ "Mechanical", "Thermal", "Processing", "Physical", "Descriptive", "Composition / Reinforcement", "Architecture / Structure", ], } property_names = { "Polymer": { "Thermal": [ "Glass transition temperature (Tg)", "Melting temperature (Tm)", "Crystallization temperature (Tc)", "Degree of crystallinity", "Decomposition temperature", ], "Mechanical": [ "Tensile modulus", "Tensile strength", "Elongation at break", "Flexural modulus", "Impact strength", ], "Processing": [ "Melt flow index (MFI)", "Processing temperature", "Cooling rate", "Mold shrinkage", ], "Physical": [ "Density", "Specific gravity", ], "Descriptive": [ "Material grade", "Manufacturer", ], }, "Fiber": { "Mechanical": [ "Tensile modulus", "Tensile strength", "Strain to failure", ], "Physical": [ "Density", "Fiber diameter", ], "Thermal": [ "Decomposition temperature", ], "Descriptive": [ "Fiber type", "Surface treatment", ], }, "Composite": { "Mechanical": [ "Longitudinal modulus (E1)", "Transverse modulus (E2)", "Shear modulus (G12)", "Poissons ratio (V12)", "Tensile strength (fiber direction)", "Interlaminar shear strength", ], "Thermal": [ "Glass transition temperature (matrix)", "Coefficient of thermal expansion (CTE)", ], "Processing": [ "Curing temperature", "Curing pressure", ], "Physical": [ "Density", ], "Descriptive": [ "Laminate type", ], "Composition / Reinforcement": [ "Fiber volume fraction", "Fiber weight fraction", "Fiber type", "Matrix type", ], "Architecture / Structure": [ "Weave type", "Ply orientation", "Number of plies", "Stacking sequence", ], }, } with st.container(border=False, key="material_ident_card"): st.markdown("
iMaterial Identification
", unsafe_allow_html=True) col_a, col_b = st.columns(2) with col_a: material_class = st.selectbox( "Material Class", ("Polymer", "Fiber", "Composite"), index=None, placeholder="Choose material class", key="manual_material_class", ) with col_b: if material_class: property_category = st.selectbox( "Property Type", property_categories[material_class], index=None, placeholder="Choose property type", key="manual_property_category", ) else: property_category = None st.selectbox( "Property Type", ["Choose material class first"], index=0, disabled=True, key="manual_property_category_disabled", ) if material_class and property_category: property_options = property_names[material_class][property_category] + ["Something else"] property_name = st.selectbox( "Property Name", property_options, index=None, placeholder="Choose property", key="manual_property_name", ) else: property_name = None custom_property_name = "" if property_name == "Something else": custom_property_name = st.text_input( "Custom Property Name", placeholder="Type property name", key="manual_custom_property_name", ).strip() selected_property_name = ( custom_property_name if property_name == "Something else" else property_name ) if material_class and property_category and selected_property_name: with st.container(border=False, key="material_form_card"): with st.form("user_input"): st.subheader("Enter Data") material_name = st.text_input("Material Name") material_abbr = st.text_input("Material Abbreviation") value = st.text_input("Value") unit = st.text_input("Unit (SI)") english = st.text_input("English Units") test_condition = st.text_input("Test Condition") comments = st.text_area("Comments") submitted = st.form_submit_button("Submit") if submitted: if not (material_name and value): st.error("Material name and value are required.") return False else: input_db = pd.DataFrame( [ { "material_class": material_class, "material_name": material_name, "material_abbreviation": material_abbr, "section": property_category, "property_name": selected_property_name, "value": value, "unit": unit, "english": english, "test_condition": test_condition, "comments": comments, } ] ) try: inserted = insert_material_rows(input_db) except Exception as exc: st.error(f"Failed to save to PostgreSQL: {exc}") return False if inserted <= 0: st.error("No rows were inserted into PostgreSQL.") return False st.cache_data.clear() st.success("Property added successfully to PostgreSQL.") st.dataframe(input_db) return True return False return False def main(): inject_upload_page_styles() render_top_bar() st.subheader("Submit Scientific Material") st.caption("Provide technical data and research documentation for the central repository.") if "image_results" not in st.session_state: st.session_state.image_results = [] if "pdf_processed" not in st.session_state: st.session_state.pdf_processed = False if "current_pdf_name" not in st.session_state: st.session_state.current_pdf_name = None if "form_submitted" not in st.session_state: st.session_state.form_submitted = False if "pdf_data_extracted" not in st.session_state: st.session_state.pdf_data_extracted = False if "pdf_extracted_df" not in st.session_state: st.session_state.pdf_extracted_df = pd.DataFrame() if "saved_image_mapping" not in st.session_state: st.session_state.saved_image_mapping = {} with st.container(border=True, key="ud_main_card"): if input_form(): st.session_state.form_submitted = True st.markdown("
iResearch Documentation
", unsafe_allow_html=True) uploaded_file = st.file_uploader( "Upload PDF (Material Datasheet or Research Paper)", type=["pdf"] ) if not uploaded_file: st.info("Upload a PDF to extract material data and plots") if not uploaded_file: st.session_state.pdf_processed = False st.session_state.current_pdf_name = None st.session_state.image_results = [] st.session_state.form_submitted = False st.session_state.pdf_data_extracted = False st.session_state.pdf_extracted_df = pd.DataFrame() st.session_state.saved_image_mapping = {} return paper_id = os.path.splitext(uploaded_file.name)[0].replace(" ", "_") if st.session_state.current_pdf_name != uploaded_file.name: st.session_state.pdf_processed = False st.session_state.current_pdf_name = uploaded_file.name st.session_state.image_results = [] st.session_state.form_submitted = False st.session_state.saved_image_mapping = {} if st.session_state.form_submitted: st.session_state.form_submitted = False st.info( "A Form was submitted. But your previous extracted data has been added already. " "If you want to extract more data/plots upload again" ) tab1, tab2 = st.tabs(["Material Data", "Extracted Plots"]) with tab1: st.info("Material data from form has been added to database.") with tab2: st.info("Plots already extracted") return tab1, tab2 = st.tabs([" Material Data", " Extracted Plots"]) with tempfile.TemporaryDirectory() as tmpdir: pdf_path = os.path.join(tmpdir, uploaded_file.name) with open(pdf_path, "wb") as f: f.write(uploaded_file.getbuffer()) with tab1: st.subheader("Material Properties Data") if not st.session_state.pdf_data_extracted: with st.spinner(" Extracting material data..."): with open(pdf_path, "rb") as f: pdf_bytes = f.read() data = call_gemini_from_bytes(pdf_bytes, uploaded_file.name) if data: df = convert_to_dataframe(data) if not df.empty: st.session_state.pdf_extracted_df = df st.session_state.pdf_data_extracted = True st.session_state.pdf_extracted_meta = data else: st.warning("No data extracted") else: st.error("Failed to extract data from PDF") df = st.session_state.pdf_extracted_df if not df.empty: data = st.session_state.get("pdf_extracted_meta", {}) st.success(f"Extracted {len(df)} properties") col1, col2 = st.columns(2) with col1: st.metric("Material", data.get("material_name", "N/A")) with col2: st.metric("Abbreviation", data.get("material_abbreviation", "N/A")) st.dataframe(df, use_container_width=True, height=400) st.subheader("Assign Material Category") extracted_material_class = st.selectbox( "Select category for this material", ["Polymer", "Fiber", "Composite"], index=None, placeholder="Required before adding to database", ) if st.button("+Add to Database"): if not extracted_material_class: st.error("Please select a material category before adding.") else: df["material_class"] = extracted_material_class df["material_type"] = extracted_material_class if st.session_state.image_results: with st.spinner("Saving matched plot images..."): saved_images = save_matched_images( df, st.session_state.image_results, save_dir="images", ) if saved_images: st.success(f" Saved {len(saved_images)} plot image(s)") with st.expander("View saved images"): for img_info in saved_images: st.write( f"? **{img_info['property']}** ? {img_info['caption']}" ) st.write(f" Saved to: `{img_info['path']}`") else: st.info("? No plots matched the extracted properties") if "user_uploaded_data" not in st.session_state: st.session_state["user_uploaded_data"] = df else: st.session_state["user_uploaded_data"] = pd.concat( [st.session_state["user_uploaded_data"], df], ignore_index=True, ) st.success(f"Added to {extracted_material_class} database!") with tab2: st.subheader("Extracted Plot Images") if not st.session_state.pdf_processed: with st.spinner(" Extracting plots from PDF..."): doc = fitz.open(pdf_path) st.session_state.image_results = extract_images(doc) doc.close() st.session_state.pdf_processed = True if st.session_state.image_results: has_extracted_data = not st.session_state.pdf_extracted_df.empty if has_extracted_data: mat_abbr = st.session_state.pdf_extracted_df.iloc[0][ "material_abbreviation" ] property_list = ( st.session_state.pdf_extracted_df["property_name"].unique().tolist() ) st.info( f" Material: **{mat_abbr}** | {len(property_list)} properties available for mapping" ) else: st.warning( " No extracted material data found. Please extract material data first (Tab 1) to enable property mapping." ) subtab1, subtab2 = st.tabs([" Images", "JSON Preview"]) with subtab1: st.success( f"Extracted {len(st.session_state.image_results)} plots" ) col_img, col_json, col_all = st.columns(3) with col_img: img_zip = create_zip(st.session_state.image_results, include_json=False) st.download_button( " Download Images Only", data=img_zip, file_name=f"{paper_id}_images.zip", mime="application/zip", use_container_width=True, key="download_images", ) with col_json: json_data = [ { "caption": r["caption"], "page": r["page"], "image_count": len(r["image_data"]), } for r in st.session_state.image_results ] st.download_button( " Download JSON", data=json.dumps(json_data, indent=4), file_name=f"{paper_id}_metadata.json", mime="application/json", use_container_width=True, key="download_json_top", ) with col_all: full_zip = create_zip(st.session_state.image_results, include_json=True) st.download_button( " Download All", data=full_zip, file_name=f"{paper_id}_complete.zip", mime="application/zip", use_container_width=True, key="download_all", ) st.divider() if st.session_state.saved_image_mapping: with st.expander(" Saved Image Mappings", expanded=False): for img_key, mapping_info in st.session_state.saved_image_mapping.items(): st.write( f" **{mapping_info['caption']}** ? `{mapping_info['property']}`" ) st.write( f" Saved as: `{mapping_info['filename']}`" ) st.divider() results_copy = st.session_state.image_results.copy() for idx in range(len(results_copy)): if idx >= len(st.session_state.image_results): break result = st.session_state.image_results[idx] with st.container(border=True): col_cap, col_btn = st.columns([0.85, 0.15]) col_cap.markdown( f"**Page {result['page']}** - {result['caption']}" ) if col_btn.button("Delete", key=f"del_g_{idx}_{result['page']}"): del st.session_state.image_results[idx] st.rerun() image_data_list = result["image_data"] if image_data_list and len(image_data_list) > 0: for p_idx in range(len(image_data_list)): if p_idx >= len(st.session_state.image_results[idx]["image_data"]): break img_data = st.session_state.image_results[idx]["image_data"][p_idx] img_unique_key = f"{idx}_{p_idx}_{result['page']}" st.image(img_data["array"], width=300, channels="BGR") if has_extracted_data: col_dropdown, col_add_btn, col_remove = st.columns( [0.6, 0.2, 0.2] ) with col_dropdown: selected_property = st.selectbox( "Select Property", options=["-- Select --"] + property_list, key=f"prop_select_{img_unique_key}", label_visibility="collapsed", ) with col_add_btn: if st.button(" Add", key=f"add_btn_{img_unique_key}"): if selected_property and selected_property != "-- Select --": filepath = save_single_image_with_property( img_data["array"], mat_abbr, selected_property, save_dir="images", ) st.session_state.saved_image_mapping[ img_unique_key ] = { "property": selected_property, "caption": result["caption"], "filename": os.path.basename(filepath), "path": filepath, } st.success( f" Saved as `{mat_abbr}_{selected_property}.png`" ) st.rerun() else: st.warning("Please select a property first") with col_remove: if st.button("Remove", key=f"del_s_{img_unique_key}"): if img_unique_key in st.session_state.saved_image_mapping: del st.session_state.saved_image_mapping[img_unique_key] del st.session_state.image_results[idx]["image_data"][p_idx] if len(st.session_state.image_results[idx]["image_data"]) == 0: del st.session_state.image_results[idx] st.rerun() if img_unique_key in st.session_state.saved_image_mapping: mapping = st.session_state.saved_image_mapping[img_unique_key] st.info(f"Mapped to: **{mapping['property']}**") else: col_info, col_remove = st.columns([0.8, 0.2]) with col_info: st.caption( "Extract material data first to enable property mapping" ) with col_remove: if st.button("Remove", key=f"del_s_{img_unique_key}"): del st.session_state.image_results[idx]["image_data"][p_idx] if len(st.session_state.image_results[idx]["image_data"]) == 0: del st.session_state.image_results[idx] st.rerun() st.divider() with subtab2: st.subheader("Metadata Preview") json_data = [ { "caption": r["caption"], "page": r["page"], "image_count": len(r["image_data"]), "images": [img["filename"] for img in r["image_data"]], } for r in st.session_state.image_results ] st.download_button( " Download JSON", data=json.dumps(json_data, indent=4), file_name=f"{paper_id}_metadata.json", mime="application/json", key="download_json_bottom", ) st.json(json_data) else: st.warning("No plots found in PDF") main()