import os
import json
import tempfile
import base64
import fitz # PyMuPDF
import pandas as pd
import streamlit as st
from data_loader import insert_material_rows
from page_files.categorized.Backend.upload_backend import (
call_gemini_from_bytes,
convert_to_dataframe,
create_zip,
extract_images,
save_matched_images,
save_single_image_with_property,
)
def inject_upload_page_styles():
st.markdown(
"""
""",
unsafe_allow_html=True,
)
def render_top_bar():
logo_html = ""
try:
with open("logo.png", "rb") as fh:
logo_b64 = base64.b64encode(fh.read()).decode()
logo_html = f"
"
except Exception:
logo_html = ""
st.markdown(
f"
{logo_html}AIM Composites
",
unsafe_allow_html=True,
)
def input_form():
property_categories = {
"Polymer": [
"Thermal",
"Mechanical",
"Processing",
"Physical",
"Descriptive",
],
"Fiber": [
"Mechanical",
"Physical",
"Thermal",
"Descriptive",
],
"Composite": [
"Mechanical",
"Thermal",
"Processing",
"Physical",
"Descriptive",
"Composition / Reinforcement",
"Architecture / Structure",
],
}
property_names = {
"Polymer": {
"Thermal": [
"Glass transition temperature (Tg)",
"Melting temperature (Tm)",
"Crystallization temperature (Tc)",
"Degree of crystallinity",
"Decomposition temperature",
],
"Mechanical": [
"Tensile modulus",
"Tensile strength",
"Elongation at break",
"Flexural modulus",
"Impact strength",
],
"Processing": [
"Melt flow index (MFI)",
"Processing temperature",
"Cooling rate",
"Mold shrinkage",
],
"Physical": [
"Density",
"Specific gravity",
],
"Descriptive": [
"Material grade",
"Manufacturer",
],
},
"Fiber": {
"Mechanical": [
"Tensile modulus",
"Tensile strength",
"Strain to failure",
],
"Physical": [
"Density",
"Fiber diameter",
],
"Thermal": [
"Decomposition temperature",
],
"Descriptive": [
"Fiber type",
"Surface treatment",
],
},
"Composite": {
"Mechanical": [
"Longitudinal modulus (E1)",
"Transverse modulus (E2)",
"Shear modulus (G12)",
"Poissons ratio (V12)",
"Tensile strength (fiber direction)",
"Interlaminar shear strength",
],
"Thermal": [
"Glass transition temperature (matrix)",
"Coefficient of thermal expansion (CTE)",
],
"Processing": [
"Curing temperature",
"Curing pressure",
],
"Physical": [
"Density",
],
"Descriptive": [
"Laminate type",
],
"Composition / Reinforcement": [
"Fiber volume fraction",
"Fiber weight fraction",
"Fiber type",
"Matrix type",
],
"Architecture / Structure": [
"Weave type",
"Ply orientation",
"Number of plies",
"Stacking sequence",
],
},
}
with st.container(border=False, key="material_ident_card"):
st.markdown("iMaterial Identification
", unsafe_allow_html=True)
col_a, col_b = st.columns(2)
with col_a:
material_class = st.selectbox(
"Material Class",
("Polymer", "Fiber", "Composite"),
index=None,
placeholder="Choose material class",
key="manual_material_class",
)
with col_b:
if material_class:
property_category = st.selectbox(
"Property Type",
property_categories[material_class],
index=None,
placeholder="Choose property type",
key="manual_property_category",
)
else:
property_category = None
st.selectbox(
"Property Type",
["Choose material class first"],
index=0,
disabled=True,
key="manual_property_category_disabled",
)
if material_class and property_category:
property_options = property_names[material_class][property_category] + ["Something else"]
property_name = st.selectbox(
"Property Name",
property_options,
index=None,
placeholder="Choose property",
key="manual_property_name",
)
else:
property_name = None
custom_property_name = ""
if property_name == "Something else":
custom_property_name = st.text_input(
"Custom Property Name",
placeholder="Type property name",
key="manual_custom_property_name",
).strip()
selected_property_name = (
custom_property_name if property_name == "Something else" else property_name
)
if material_class and property_category and selected_property_name:
with st.container(border=False, key="material_form_card"):
with st.form("user_input"):
st.subheader("Enter Data")
material_name = st.text_input("Material Name")
material_abbr = st.text_input("Material Abbreviation")
value = st.text_input("Value")
unit = st.text_input("Unit (SI)")
english = st.text_input("English Units")
test_condition = st.text_input("Test Condition")
comments = st.text_area("Comments")
submitted = st.form_submit_button("Submit")
if submitted:
if not (material_name and value):
st.error("Material name and value are required.")
return False
else:
input_db = pd.DataFrame(
[
{
"material_class": material_class,
"material_name": material_name,
"material_abbreviation": material_abbr,
"section": property_category,
"property_name": selected_property_name,
"value": value,
"unit": unit,
"english": english,
"test_condition": test_condition,
"comments": comments,
}
]
)
try:
inserted = insert_material_rows(input_db)
except Exception as exc:
st.error(f"Failed to save to PostgreSQL: {exc}")
return False
if inserted <= 0:
st.error("No rows were inserted into PostgreSQL.")
return False
st.cache_data.clear()
st.success("Property added successfully to PostgreSQL.")
st.dataframe(input_db)
return True
return False
return False
def main():
inject_upload_page_styles()
render_top_bar()
st.subheader("Submit Scientific Material")
st.caption("Provide technical data and research documentation for the central repository.")
if "image_results" not in st.session_state:
st.session_state.image_results = []
if "pdf_processed" not in st.session_state:
st.session_state.pdf_processed = False
if "current_pdf_name" not in st.session_state:
st.session_state.current_pdf_name = None
if "form_submitted" not in st.session_state:
st.session_state.form_submitted = False
if "pdf_data_extracted" not in st.session_state:
st.session_state.pdf_data_extracted = False
if "pdf_extracted_df" not in st.session_state:
st.session_state.pdf_extracted_df = pd.DataFrame()
if "saved_image_mapping" not in st.session_state:
st.session_state.saved_image_mapping = {}
with st.container(border=True, key="ud_main_card"):
if input_form():
st.session_state.form_submitted = True
st.markdown("iResearch Documentation
", unsafe_allow_html=True)
uploaded_file = st.file_uploader(
"Upload PDF (Material Datasheet or Research Paper)", type=["pdf"]
)
if not uploaded_file:
st.info("Upload a PDF to extract material data and plots")
if not uploaded_file:
st.session_state.pdf_processed = False
st.session_state.current_pdf_name = None
st.session_state.image_results = []
st.session_state.form_submitted = False
st.session_state.pdf_data_extracted = False
st.session_state.pdf_extracted_df = pd.DataFrame()
st.session_state.saved_image_mapping = {}
return
paper_id = os.path.splitext(uploaded_file.name)[0].replace(" ", "_")
if st.session_state.current_pdf_name != uploaded_file.name:
st.session_state.pdf_processed = False
st.session_state.current_pdf_name = uploaded_file.name
st.session_state.image_results = []
st.session_state.form_submitted = False
st.session_state.saved_image_mapping = {}
if st.session_state.form_submitted:
st.session_state.form_submitted = False
st.info(
"A Form was submitted. But your previous extracted data has been added already. "
"If you want to extract more data/plots upload again"
)
tab1, tab2 = st.tabs(["Material Data", "Extracted Plots"])
with tab1:
st.info("Material data from form has been added to database.")
with tab2:
st.info("Plots already extracted")
return
tab1, tab2 = st.tabs([" Material Data", " Extracted Plots"])
with tempfile.TemporaryDirectory() as tmpdir:
pdf_path = os.path.join(tmpdir, uploaded_file.name)
with open(pdf_path, "wb") as f:
f.write(uploaded_file.getbuffer())
with tab1:
st.subheader("Material Properties Data")
if not st.session_state.pdf_data_extracted:
with st.spinner(" Extracting material data..."):
with open(pdf_path, "rb") as f:
pdf_bytes = f.read()
data = call_gemini_from_bytes(pdf_bytes, uploaded_file.name)
if data:
df = convert_to_dataframe(data)
if not df.empty:
st.session_state.pdf_extracted_df = df
st.session_state.pdf_data_extracted = True
st.session_state.pdf_extracted_meta = data
else:
st.warning("No data extracted")
else:
st.error("Failed to extract data from PDF")
df = st.session_state.pdf_extracted_df
if not df.empty:
data = st.session_state.get("pdf_extracted_meta", {})
st.success(f"Extracted {len(df)} properties")
col1, col2 = st.columns(2)
with col1:
st.metric("Material", data.get("material_name", "N/A"))
with col2:
st.metric("Abbreviation", data.get("material_abbreviation", "N/A"))
st.dataframe(df, use_container_width=True, height=400)
st.subheader("Assign Material Category")
extracted_material_class = st.selectbox(
"Select category for this material",
["Polymer", "Fiber", "Composite"],
index=None,
placeholder="Required before adding to database",
)
if st.button("+Add to Database"):
if not extracted_material_class:
st.error("Please select a material category before adding.")
else:
df["material_class"] = extracted_material_class
df["material_type"] = extracted_material_class
if st.session_state.image_results:
with st.spinner("Saving matched plot images..."):
saved_images = save_matched_images(
df,
st.session_state.image_results,
save_dir="images",
)
if saved_images:
st.success(f" Saved {len(saved_images)} plot image(s)")
with st.expander("View saved images"):
for img_info in saved_images:
st.write(
f"? **{img_info['property']}** ? {img_info['caption']}"
)
st.write(f" Saved to: `{img_info['path']}`")
else:
st.info("? No plots matched the extracted properties")
if "user_uploaded_data" not in st.session_state:
st.session_state["user_uploaded_data"] = df
else:
st.session_state["user_uploaded_data"] = pd.concat(
[st.session_state["user_uploaded_data"], df],
ignore_index=True,
)
st.success(f"Added to {extracted_material_class} database!")
with tab2:
st.subheader("Extracted Plot Images")
if not st.session_state.pdf_processed:
with st.spinner(" Extracting plots from PDF..."):
doc = fitz.open(pdf_path)
st.session_state.image_results = extract_images(doc)
doc.close()
st.session_state.pdf_processed = True
if st.session_state.image_results:
has_extracted_data = not st.session_state.pdf_extracted_df.empty
if has_extracted_data:
mat_abbr = st.session_state.pdf_extracted_df.iloc[0][
"material_abbreviation"
]
property_list = (
st.session_state.pdf_extracted_df["property_name"].unique().tolist()
)
st.info(
f" Material: **{mat_abbr}** | {len(property_list)} properties available for mapping"
)
else:
st.warning(
" No extracted material data found. Please extract material data first (Tab 1) to enable property mapping."
)
subtab1, subtab2 = st.tabs([" Images", "JSON Preview"])
with subtab1:
st.success(
f"Extracted {len(st.session_state.image_results)} plots"
)
col_img, col_json, col_all = st.columns(3)
with col_img:
img_zip = create_zip(st.session_state.image_results, include_json=False)
st.download_button(
" Download Images Only",
data=img_zip,
file_name=f"{paper_id}_images.zip",
mime="application/zip",
use_container_width=True,
key="download_images",
)
with col_json:
json_data = [
{
"caption": r["caption"],
"page": r["page"],
"image_count": len(r["image_data"]),
}
for r in st.session_state.image_results
]
st.download_button(
" Download JSON",
data=json.dumps(json_data, indent=4),
file_name=f"{paper_id}_metadata.json",
mime="application/json",
use_container_width=True,
key="download_json_top",
)
with col_all:
full_zip = create_zip(st.session_state.image_results, include_json=True)
st.download_button(
" Download All",
data=full_zip,
file_name=f"{paper_id}_complete.zip",
mime="application/zip",
use_container_width=True,
key="download_all",
)
st.divider()
if st.session_state.saved_image_mapping:
with st.expander(" Saved Image Mappings", expanded=False):
for img_key, mapping_info in st.session_state.saved_image_mapping.items():
st.write(
f" **{mapping_info['caption']}** ? `{mapping_info['property']}`"
)
st.write(
f" Saved as: `{mapping_info['filename']}`"
)
st.divider()
results_copy = st.session_state.image_results.copy()
for idx in range(len(results_copy)):
if idx >= len(st.session_state.image_results):
break
result = st.session_state.image_results[idx]
with st.container(border=True):
col_cap, col_btn = st.columns([0.85, 0.15])
col_cap.markdown(
f"**Page {result['page']}** - {result['caption']}"
)
if col_btn.button("Delete", key=f"del_g_{idx}_{result['page']}"):
del st.session_state.image_results[idx]
st.rerun()
image_data_list = result["image_data"]
if image_data_list and len(image_data_list) > 0:
for p_idx in range(len(image_data_list)):
if p_idx >= len(st.session_state.image_results[idx]["image_data"]):
break
img_data = st.session_state.image_results[idx]["image_data"][p_idx]
img_unique_key = f"{idx}_{p_idx}_{result['page']}"
st.image(img_data["array"], width=300, channels="BGR")
if has_extracted_data:
col_dropdown, col_add_btn, col_remove = st.columns(
[0.6, 0.2, 0.2]
)
with col_dropdown:
selected_property = st.selectbox(
"Select Property",
options=["-- Select --"] + property_list,
key=f"prop_select_{img_unique_key}",
label_visibility="collapsed",
)
with col_add_btn:
if st.button(" Add", key=f"add_btn_{img_unique_key}"):
if selected_property and selected_property != "-- Select --":
filepath = save_single_image_with_property(
img_data["array"],
mat_abbr,
selected_property,
save_dir="images",
)
st.session_state.saved_image_mapping[
img_unique_key
] = {
"property": selected_property,
"caption": result["caption"],
"filename": os.path.basename(filepath),
"path": filepath,
}
st.success(
f" Saved as `{mat_abbr}_{selected_property}.png`"
)
st.rerun()
else:
st.warning("Please select a property first")
with col_remove:
if st.button("Remove", key=f"del_s_{img_unique_key}"):
if img_unique_key in st.session_state.saved_image_mapping:
del st.session_state.saved_image_mapping[img_unique_key]
del st.session_state.image_results[idx]["image_data"][p_idx]
if len(st.session_state.image_results[idx]["image_data"]) == 0:
del st.session_state.image_results[idx]
st.rerun()
if img_unique_key in st.session_state.saved_image_mapping:
mapping = st.session_state.saved_image_mapping[img_unique_key]
st.info(f"Mapped to: **{mapping['property']}**")
else:
col_info, col_remove = st.columns([0.8, 0.2])
with col_info:
st.caption(
"Extract material data first to enable property mapping"
)
with col_remove:
if st.button("Remove", key=f"del_s_{img_unique_key}"):
del st.session_state.image_results[idx]["image_data"][p_idx]
if len(st.session_state.image_results[idx]["image_data"]) == 0:
del st.session_state.image_results[idx]
st.rerun()
st.divider()
with subtab2:
st.subheader("Metadata Preview")
json_data = [
{
"caption": r["caption"],
"page": r["page"],
"image_count": len(r["image_data"]),
"images": [img["filename"] for img in r["image_data"]],
}
for r in st.session_state.image_results
]
st.download_button(
" Download JSON",
data=json.dumps(json_data, indent=4),
file_name=f"{paper_id}_metadata.json",
mime="application/json",
key="download_json_bottom",
)
st.json(json_data)
else:
st.warning("No plots found in PDF")
main()