Spaces:
Running
Running
| import json | |
| import re | |
| import streamlit as st | |
| import pandas as pd | |
| from typing import Any, List | |
| from langchain_groq import ChatGroq | |
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # --- 1. Config --- | |
| DEFAULT_FIELDS = [{"name": "number", "datatype": "int", "description": "Description of the item"}] | |
| TYPE_MAPPING_STR = {"int": "int", "float": "float", "str": "str"} | |
| def normalize_fields(fields: Any) -> List[dict]: | |
| """Convert DataFrame/list input into a clean list of field dicts.""" | |
| try: | |
| if isinstance(fields, pd.DataFrame): | |
| parsed = fields.fillna("").to_dict(orient="records") | |
| elif isinstance(fields, list): | |
| parsed = fields | |
| else: | |
| return [] | |
| cleaned = [] | |
| for item in parsed: | |
| if not isinstance(item, dict): | |
| continue | |
| cleaned.append( | |
| { | |
| "name": str(item.get("name", "")).strip(), | |
| "datatype": str(item.get("datatype", "str")).strip() or "str", | |
| "description": str(item.get("description", "")).strip(), | |
| } | |
| ) | |
| return cleaned | |
| except Exception: | |
| return [] | |
| def generate_schema_json(fields: Any) -> str: | |
| """Generate JSON schema-like object from field rows.""" | |
| normalized_fields = normalize_fields(fields) | |
| properties = {} | |
| required = [] | |
| for f in normalized_fields: | |
| field_name = f.get("name", "").strip() | |
| if not field_name: | |
| continue | |
| dtype = TYPE_MAPPING_STR.get(f.get("datatype", "str"), "str") | |
| properties[field_name] = { | |
| "type": dtype, | |
| "description": f.get("description", ""), | |
| "nullable": True, | |
| } | |
| required.append(field_name) | |
| schema = { | |
| "type": "object", | |
| "properties": properties, | |
| "required": required, | |
| "additionalProperties": False, | |
| } | |
| return json.dumps(schema, indent=2) | |
| def is_valid_text(text: str) -> bool: | |
| """Guardrail: reject empty or whitespace-only input.""" | |
| return bool((text or "").strip()) | |
| def parse_json_from_text(text: str) -> dict | None: | |
| """Extract JSON object from model response text.""" | |
| try: | |
| # 1) direct JSON | |
| parsed = json.loads(text) | |
| return parsed if isinstance(parsed, dict) else None | |
| except Exception: | |
| pass | |
| try: | |
| # 2) fenced code block | |
| fenced = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, flags=re.DOTALL | re.IGNORECASE) | |
| if fenced: | |
| parsed = json.loads(fenced.group(1)) | |
| return parsed if isinstance(parsed, dict) else None | |
| except Exception: | |
| pass | |
| try: | |
| # 3) first object-looking block | |
| obj = re.search(r"(\{.*\})", text, flags=re.DOTALL) | |
| if obj: | |
| parsed = json.loads(obj.group(1)) | |
| return parsed if isinstance(parsed, dict) else None | |
| except Exception: | |
| pass | |
| return None | |
| def cast_to_dtype(value: Any, dtype: str) -> Any: | |
| if value is None: | |
| return None | |
| try: | |
| if dtype == "int": | |
| return int(value) | |
| if dtype == "float": | |
| return float(value) | |
| return str(value) | |
| except Exception: | |
| return None | |
| def extract_structured(fields: Any, unstructured_text: str) -> dict | str: | |
| """ | |
| Extract structured data from unstructured text based on user-defined fields. | |
| Args: | |
| fields: A list of dicts or a pd.DataFrame with columns | |
| [name, datatype, description]. | |
| unstructured_text: Raw text to extract data from. | |
| Returns: | |
| A JSON dict on success, or an error string. | |
| """ | |
| if not is_valid_text(unstructured_text): | |
| return "Input text is empty. Please provide some text to extract from." | |
| # Build schema from user-defined fields | |
| normalized_fields = normalize_fields(fields) | |
| schema_properties = {} | |
| field_order = [] | |
| for f in normalized_fields: | |
| field_name = f.get("name", "").strip() | |
| if not field_name: | |
| continue | |
| if not field_name.isidentifier(): | |
| return f"Invalid field name '{field_name}'. Use letters, numbers, and underscores only." | |
| field_type = TYPE_MAPPING_STR.get(f.get("datatype", "str"), "str") | |
| schema_properties[field_name] = { | |
| "type": field_type, | |
| "description": f.get("description", ""), | |
| } | |
| field_order.append(field_name) | |
| if not schema_properties: | |
| return "Please add at least one valid field before extraction." | |
| # Initialize LLM | |
| llm = ChatGroq( | |
| model="openai/gpt-oss-120b", | |
| temperature=0, | |
| api_key=os.getenv("GROQ_API_KEY"), | |
| ) | |
| # Extract structured data | |
| try: | |
| schema_json = json.dumps(schema_properties, indent=2) | |
| response = llm.invoke( | |
| "Extract information from the text below.\n" | |
| "Return ONLY one valid JSON object and no extra text.\n" | |
| "Use exactly the fields in this schema.\n" | |
| "If a value is missing, return null.\n\n" | |
| f"Schema:\n{schema_json}\n\n" | |
| f"Text:\n{unstructured_text}" | |
| ) | |
| content = response.content if hasattr(response, "content") else str(response) | |
| if isinstance(content, list): | |
| content = "".join( | |
| part.get("text", "") if isinstance(part, dict) else str(part) | |
| for part in content | |
| ) | |
| parsed = parse_json_from_text(str(content)) | |
| if not parsed: | |
| return f"Could not parse JSON from model output: {content}" | |
| # Coerce output to requested schema and order | |
| cleaned = {} | |
| for field_name in field_order: | |
| dtype = schema_properties[field_name]["type"] | |
| cleaned[field_name] = cast_to_dtype(parsed.get(field_name), dtype) | |
| return cleaned | |
| except Exception as e: | |
| return f"Error during extraction: {str(e)}" | |
| def render_styles(): | |
| st.markdown( | |
| """ | |
| <style> | |
| .main-title { | |
| font-size: 34px; | |
| font-weight: 700; | |
| margin-bottom: 4px; | |
| } | |
| .sub-title { | |
| color: #6b7280; | |
| margin-bottom: 20px; | |
| } | |
| .block-header { | |
| font-size: 22px; | |
| font-weight: 600; | |
| margin: 8px 0 8px 0; | |
| } | |
| </style> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| def main(): | |
| st.set_page_config(page_title="Dynamic Extraction", layout="wide") | |
| render_styles() | |
| st.markdown('<div class="main-title">Dynamic Invoice Extraction</div>', unsafe_allow_html=True) | |
| st.markdown('<div class="sub-title">Json structured output</div>', unsafe_allow_html=True) | |
| if "fields_df" not in st.session_state: | |
| st.session_state.fields_df = pd.DataFrame(DEFAULT_FIELDS) | |
| if "generated_schema" not in st.session_state: | |
| st.session_state.generated_schema = "" | |
| if "structured_result" not in st.session_state: | |
| st.session_state.structured_result = "" | |
| if "structured_result_json" not in st.session_state: | |
| st.session_state.structured_result_json = {} | |
| left_col, right_col = st.columns(2) | |
| with left_col: | |
| st.markdown('<div class="block-header">Define Entities / Fields</div>', unsafe_allow_html=True) | |
| if st.button("+ Add Field", width="stretch"): | |
| st.session_state.fields_df = pd.concat( | |
| [st.session_state.fields_df, pd.DataFrame([{"name": "", "datatype": "str", "description": ""}])], | |
| ignore_index=True, | |
| ) | |
| edited_df = st.data_editor( | |
| st.session_state.fields_df, | |
| width="stretch", | |
| num_rows="dynamic", | |
| column_config={ | |
| "name": st.column_config.TextColumn("name"), | |
| "datatype": st.column_config.SelectboxColumn("datatype", options=["str", "int", "float"]), | |
| "description": st.column_config.TextColumn("description"), | |
| }, | |
| key="fields_editor", | |
| ) | |
| st.session_state.fields_df = edited_df | |
| st.markdown('<div class="block-header">Paste Unstructured Text</div>', unsafe_allow_html=True) | |
| unstructured_text = st.text_area( | |
| "Example: https://huggingface.co/spaces/opendatalab/MinerU", | |
| "Click on the above link and extract the mqarkdown text from that page and paste it here...", | |
| placeholder="Paste your text here...", | |
| height=220, | |
| ) | |
| if st.button("Extract Structured Data", type="primary", width="stretch"): | |
| with st.spinner("Extracting structured data..."): | |
| result = extract_structured(st.session_state.fields_df, unstructured_text) | |
| if isinstance(result, dict): | |
| st.session_state.structured_result_json = result | |
| st.session_state.structured_result = "" | |
| else: | |
| st.session_state.structured_result_json = {} | |
| st.session_state.structured_result = result | |
| with right_col: | |
| st.markdown("### Structured Output (Transposed Table)") | |
| if st.session_state.structured_result_json: | |
| transposed_df = ( | |
| pd.DataFrame([st.session_state.structured_result_json]) | |
| .T.reset_index() | |
| .rename(columns={"index": "Field", 0: "Value"}) | |
| ) | |
| st.dataframe(transposed_df, width="stretch", hide_index=True) | |
| elif st.session_state.structured_result: | |
| st.error(st.session_state.structured_result) | |
| else: | |
| st.info("Run extraction to see transposed table output.") | |
| if __name__ == "__main__": | |
| main() | |