streamlit-web-crawler / src /pages /03_Document_Generator.py
Muhammad Risqi Firdaus
fix: set signing date to today
f71c9cb
import streamlit as st
import requests
import datetime
import pandas as pd
import copy
import uuid
import os
import sys
from dotenv import load_dotenv
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from src.document_generator_models import (
DocumentType,
DocumentSchema,
DOCUMENT_REGISTRY,
REPRESENTATIVES,
PrefillDocumentRequest,
DocChatRequest,
GenerateDraftRequest,
ExportDocumentRequest,
)
load_dotenv()
API_BASE_URL = os.getenv("URL_CE_BOT", "http://localhost:5000")
TODAY = datetime.date.today()
# ─────────────────────────────────────────────────────────
# GENERIC SESSION STATE
# Keys: doc_{doc_type}_{suffix} (one set per registered doc type)
# ─────────────────────────────────────────────────────────
def _sk(doc_type: str, suffix: str) -> str:
"""Compose a session-state key from doc type + suffix."""
return f"doc_{doc_type}_{suffix}"
def _init_all_doc_states():
"""Initialize session-state slots for every registered document type."""
for doc_type in DOCUMENT_REGISTRY:
defaults = {
_sk(doc_type, "data"): None,
_sk(doc_type, "chat_messages"): [],
_sk(doc_type, "api_history"): [],
_sk(doc_type, "history_stack"): [],
_sk(doc_type, "redo_stack"): [],
_sk(doc_type, "session_uuid"): str(uuid.uuid4()),
_sk(doc_type, "app_id"): "",
_sk(doc_type, "draft_content"): "",
_sk(doc_type, "draft_generated"): False,
_sk(doc_type, "doc_structure"): None,
_sk(doc_type, "doc_url"): None,
}
for key, val in defaults.items():
if key not in st.session_state:
st.session_state[key] = val
_init_all_doc_states()
# ─────────────────────────────────────────────────────────
# UNDO / REDO
# ─────────────────────────────────────────────────────────
def _clear_widget_keys(doc_type: str):
"""Pop all widget keys for a doc type so widgets reset from value= on next render."""
for key in DOCUMENT_REGISTRY[doc_type].widget_keys():
st.session_state.pop(key, None)
st.session_state.pop(f"{doc_type}_draft_editor", None)
def push_history(doc_type: str, data):
if data is None:
return
st.session_state[_sk(doc_type, "history_stack")].append({
"data": copy.deepcopy(data),
"chat": copy.deepcopy(st.session_state[_sk(doc_type, "chat_messages")]),
"api": copy.deepcopy(st.session_state[_sk(doc_type, "api_history")]),
"draft": st.session_state[_sk(doc_type, "draft_content")],
"draft_generated": st.session_state[_sk(doc_type, "draft_generated")],
})
st.session_state[_sk(doc_type, "redo_stack")].clear()
def do_undo(doc_type: str):
history = st.session_state[_sk(doc_type, "history_stack")]
if not history:
return
st.session_state[_sk(doc_type, "redo_stack")].append({
"data": copy.deepcopy(st.session_state[_sk(doc_type, "data")]),
"chat": copy.deepcopy(st.session_state[_sk(doc_type, "chat_messages")]),
"api": copy.deepcopy(st.session_state[_sk(doc_type, "api_history")]),
"draft": st.session_state[_sk(doc_type, "draft_content")],
"draft_generated": st.session_state[_sk(doc_type, "draft_generated")],
})
prev = history.pop()
st.session_state[_sk(doc_type, "data")] = prev["data"]
st.session_state[_sk(doc_type, "chat_messages")] = prev["chat"]
st.session_state[_sk(doc_type, "api_history")] = prev["api"]
st.session_state[_sk(doc_type, "draft_content")] = prev.get("draft", "")
st.session_state[_sk(doc_type, "draft_generated")] = prev.get("draft_generated", False)
_clear_widget_keys(doc_type)
def do_redo(doc_type: str):
redo = st.session_state[_sk(doc_type, "redo_stack")]
if not redo:
return
st.session_state[_sk(doc_type, "history_stack")].append({
"data": copy.deepcopy(st.session_state[_sk(doc_type, "data")]),
"chat": copy.deepcopy(st.session_state[_sk(doc_type, "chat_messages")]),
"api": copy.deepcopy(st.session_state[_sk(doc_type, "api_history")]),
"draft": st.session_state[_sk(doc_type, "draft_content")],
"draft_generated": st.session_state[_sk(doc_type, "draft_generated")],
})
nxt = redo.pop()
st.session_state[_sk(doc_type, "data")] = nxt["data"]
st.session_state[_sk(doc_type, "chat_messages")] = nxt["chat"]
st.session_state[_sk(doc_type, "api_history")] = nxt["api"]
st.session_state[_sk(doc_type, "draft_content")] = nxt.get("draft", "")
st.session_state[_sk(doc_type, "draft_generated")] = nxt.get("draft_generated", False)
_clear_widget_keys(doc_type)
# ─────────────────────────────────────────────────────────
# API HELPERS (doc-type-agnostic)
# ─────────────────────────────────────────────────────────
def call_doc_chat_api(query, history, session_uuid, current_document_content="", structure=None):
"""POST /generate-document/chat β€” stateless revision."""
try:
req = DocChatRequest(
query=query,
history=history,
session_uuid=session_uuid,
current_document_content=current_document_content,
structure=structure,
)
resp = requests.post(
f"{API_BASE_URL}/generate-document/chat",
json=req.model_dump(exclude_none=True),
timeout=120,
)
resp.raise_for_status()
return resp.json(), None
except Exception as e:
return None, str(e)
def call_generate_draft(schema: DocumentSchema, form_data, session_uuid=None):
"""POST /generate-document/draft β€” returns Markdown, no Google Doc created."""
try:
structure = [
{"key": f.data_key, "label": f.label, "description": f.description or f.placeholder, "required": False}
for f in schema.fields
]
req = GenerateDraftRequest(
doc_type=str(schema.document_type),
data=form_data or {},
structure=structure,
session_uuid=session_uuid,
)
resp = requests.post(
f"{API_BASE_URL}/generate-document/draft",
json=req.model_dump(exclude_none=True),
timeout=120,
)
resp.raise_for_status()
return resp.json(), None
except Exception as e:
return None, str(e)
def call_export_document(document_content, title=None):
"""POST /export-document β€” converts Markdown to Google Doc."""
try:
req = ExportDocumentRequest(document_content=document_content, title=title)
resp = requests.post(
f"{API_BASE_URL}/export-document",
json=req.model_dump(exclude_none=True),
timeout=120,
)
resp.raise_for_status()
return resp.json(), None
except Exception as e:
return None, str(e)
# ─────────────────────────────────────────────────────────
# FORM RENDERING (driven by DocumentSchema metadata)
# ─────────────────────────────────────────────────────────
def _get_field_value(data: dict, field) -> str:
"""Read a field's current value, respecting nested_under."""
if field.nested_under:
return (data.get(field.nested_under) or {}).get(field.data_key, "") or ""
return data.get(field.data_key, "") or ""
def _apply_field_change(updated_data: dict, original_data: dict, field, new_val: str):
"""Write a changed value into updated_data, respecting nested_under."""
if field.nested_under:
if updated_data.get(field.nested_under) is None:
updated_data[field.nested_under] = copy.deepcopy(
original_data.get(field.nested_under) or {}
)
updated_data[field.nested_under][field.data_key] = new_val
else:
updated_data[field.data_key] = new_val
def _render_section(
schema: DocumentSchema,
section_key: str,
data: dict,
updated_data: dict,
disabled_keys: frozenset = frozenset(),
) -> bool:
"""
Render all fields for one section.
Returns True if any field value changed.
Fields whose data_key is in disabled_keys are rendered as read-only.
"""
st.subheader(schema.sections[section_key])
changed = False
for field in schema.fields_in_section(section_key):
cur = _get_field_value(data, field)
disabled = field.data_key in disabled_keys
if field.field_type == "select":
options = field.options or []
all_options = [""] + options
idx = all_options.index(cur) if cur in all_options else 0
new = st.selectbox(field.label, all_options, index=idx, key=field.widget_key, disabled=disabled)
elif field.field_type == "text_area":
new = st.text_area(field.label, value=cur, placeholder=field.placeholder, key=field.widget_key, disabled=disabled)
else:
new = st.text_input(field.label, value=cur, placeholder=field.placeholder, key=field.widget_key, disabled=disabled)
if not disabled and new != cur:
_apply_field_change(updated_data, data, field, new)
changed = True
return changed
def render_doc_form(schema: DocumentSchema):
"""
Render the editable form driven entirely by DocumentSchema.
Layout rules:
- 1 section β†’ full width
- 2+ sections β†’ first N//2 sections in left column, remainder in right column
Special elements:
- has_trip_type=True β†’ radio above columns
- has_group_members=True β†’ data editor below columns (shown when trip_type=="Group"
or when the schema has no trip_type toggle)
"""
doc_type = str(schema.document_type)
data = st.session_state[_sk(doc_type, "data")]
updated_data = copy.deepcopy(data)
form_changed = False
st.subheader("✏️ Edit Details")
# ── Trip type radio ──────────────────────────────────
trip_type = None
if schema.has_trip_type:
opts = ["Individual", "Group"]
cur = data.get("trip_type", "Individual")
if cur not in opts:
cur = "Individual"
trip_type = st.radio(
"Trip Type", opts,
index=opts.index(cur),
horizontal=True,
key=f"{doc_type}_trip_type",
)
if trip_type != cur:
updated_data["trip_type"] = trip_type
form_changed = True
# ── Section columns ──────────────────────────────────
# Rep detail fields are always read-only β€” only updated via the rep_name dropdown
rep_disabled = frozenset({"rep_id", "rep_address", "rep_city"})
# For adult LOA, passport_name and signing city are always derived from grantor β€” disable them
loa_auth_disabled = (
frozenset({"passport_name", "city"})
if schema.document_type == DocumentType.LETTER_OF_AUTHORIZATION
else frozenset()
)
def _disabled_for(sk: str) -> frozenset:
if sk == "representative":
return rep_disabled
if sk == "authorization":
return loa_auth_disabled
return frozenset()
section_keys = list(schema.sections.keys())
if len(section_keys) == 1:
changed = _render_section(schema, section_keys[0], data, updated_data, _disabled_for(section_keys[0]))
form_changed = form_changed or changed
else:
mid = len(section_keys) // 2 # e.g. 3 sections β†’ mid=1 (left:1, right:2)
left_sections = section_keys[:mid]
right_sections = section_keys[mid:]
col1, col2 = st.columns(2)
with col1:
for sk in left_sections:
changed = _render_section(schema, sk, data, updated_data, _disabled_for(sk))
form_changed = form_changed or changed
with col2:
for sk in right_sections:
changed = _render_section(schema, sk, data, updated_data, _disabled_for(sk))
form_changed = form_changed or changed
# ── Group members data editor ────────────────────────
if schema.has_group_members:
show_group = (trip_type == "Group") if schema.has_trip_type else True
if show_group:
st.subheader("πŸ‘₯ Group Members")
gm_list = data.get("group_members", []) or []
gm_cols = ["relationship", "name", "dob", "occupation", "nationality", "passport_number"]
gm_df = pd.DataFrame(
gm_list or [dict.fromkeys(gm_cols, "")],
columns=gm_cols,
)
edited_gm = st.data_editor(
gm_df,
num_rows="dynamic",
use_container_width=True,
key=f"{doc_type}_group_members_editor",
)
new_gm = edited_gm.to_dict("records")
if new_gm != gm_list:
updated_data["group_members"] = new_gm
form_changed = True
# ── Auto-fill auth details from grantor (adult LOA only) ──────────
# passport_name always mirrors grantor_name; signing city mirrors grantor_city
if schema.document_type == DocumentType.LETTER_OF_AUTHORIZATION:
for src_key, dst_key in (("grantor_name", "passport_name"), ("grantor_city", "city")):
src_val = updated_data.get(src_key) or data.get(src_key, "")
dst_val = data.get(dst_key, "")
if src_val and src_val != dst_val:
updated_data[dst_key] = src_val
for f in schema.fields:
if f.data_key == dst_key:
st.session_state.pop(f.widget_key, None)
form_changed = True
# ── Auto-fill signing date to today for LOA types (when empty) ────
if schema.document_type in (DocumentType.LETTER_OF_AUTHORIZATION, DocumentType.LETTER_OF_AUTHORIZATION_MINOR):
if not (updated_data.get("date") or data.get("date", "")):
updated_data["date"] = TODAY.strftime("%d %B %Y")
for f in schema.fields:
if f.data_key == "date":
st.session_state.pop(f.widget_key, None)
form_changed = True
# ── Auto-fill representative fields when rep_name dropdown changes ──
old_rep_name = data.get("rep_name", "")
new_rep_name = updated_data.get("rep_name", old_rep_name)
if old_rep_name != new_rep_name and new_rep_name in REPRESENTATIVES:
rep_info = REPRESENTATIVES[new_rep_name]
updated_data.update(rep_info)
# Clear widget keys for auto-filled fields so they re-render with new values
for f in schema.fields:
if f.data_key in rep_info:
st.session_state.pop(f.widget_key, None)
form_changed = True
# ── Persist changes ──────────────────────────────────
if form_changed:
push_history(doc_type, data)
st.session_state[_sk(doc_type, "data")] = updated_data
# ─────────────────────────────────────────────────────────
# FULL TAB RENDERER
# ─────────────────────────────────────────────────────────
def render_document_tab(schema: DocumentSchema):
"""Render the complete UI for one document type inside its tab."""
doc_type = str(schema.document_type)
def sk(suffix: str) -> str:
return _sk(doc_type, suffix)
st.header(f"{schema.icon} {schema.title}")
st.caption(
"Load applicant data by Application ID, edit the fields directly, "
"chat to revise, then export the Google Doc."
)
# ── Application ID row ──────────────────────────────
load_c1, load_c2, load_c3 = st.columns([2, 1, 3])
with load_c1:
app_id_input = st.text_input(
"Application ID",
value=st.session_state[sk("app_id")],
placeholder="Enter Application ID (e.g. 5786)",
key=f"{doc_type}_app_id_input",
label_visibility="collapsed",
)
with load_c2:
load_clicked = st.button(
"πŸ” Load from DB",
use_container_width=True,
key=f"{doc_type}_load_btn",
)
with load_c3:
if st.session_state[sk("data")] is not None and st.session_state[sk("app_id")]:
info_c, clear_c = st.columns([4, 1])
with info_c:
st.caption(f"βœ… Loaded: Application ID **{st.session_state[sk('app_id')]}**")
with clear_c:
if st.button("βœ•", key=f"{doc_type}_clear_btn", help="Clear and start over"):
push_history(doc_type, st.session_state[sk("data")])
_clear_widget_keys(doc_type)
st.session_state[sk("data")] = None
st.session_state[sk("app_id")] = ""
st.session_state[sk("doc_url")] = None
st.session_state[sk("draft_content")] = ""
st.session_state[sk("draft_generated")] = False
st.session_state[sk("doc_structure")] = None
st.session_state[sk("chat_messages")] = []
st.session_state[sk("api_history")] = []
st.session_state[sk("session_uuid")] = str(uuid.uuid4())
st.rerun()
# ── Load from DB ────────────────────────────────────
if load_clicked:
if app_id_input:
try:
structure = [
{
"key": f.data_key,
"label": f.label,
"description": f.description or f.placeholder,
"required": False,
}
for f in schema.fields
]
prefill_req = PrefillDocumentRequest(
application_id=int(app_id_input),
structure=structure,
)
resp = requests.post(
f"{API_BASE_URL}/prefill-document",
json=prefill_req.model_dump(),
timeout=60,
)
if resp.status_code == 200:
flat = resp.json()
# Drop metadata keys (_missing_required, etc.) and reconstruct
# nested fields (e.g. personal_details.name) from the flat response.
fetched = {k: v for k, v in flat.items() if not k.startswith("_")}
for field in schema.fields:
if field.nested_under and field.data_key in fetched:
fetched.setdefault(field.nested_under, {})
fetched[field.nested_under][field.data_key] = fetched.pop(field.data_key)
push_history(doc_type, st.session_state[sk("data")])
_clear_widget_keys(doc_type)
st.session_state[sk("data")] = fetched
st.session_state[sk("app_id")] = app_id_input
st.session_state[sk("doc_url")] = None
st.session_state[sk("draft_content")] = ""
st.session_state[sk("draft_generated")] = False
st.session_state[sk("doc_structure")] = None
st.session_state[sk("chat_messages")] = []
st.session_state[sk("api_history")] = []
st.session_state[sk("session_uuid")] = str(uuid.uuid4())
st.success(f"βœ… Loaded data for Application ID: {app_id_input}")
st.rerun()
else:
st.error(f"❌ Error fetching data (Status {resp.status_code})")
try:
st.json(resp.json())
except Exception:
st.text(resp.text)
except requests.exceptions.ConnectionError:
st.error(f"❌ Connection Error: Could not connect to API at {API_BASE_URL}.")
except Exception as e:
st.error(f"An unexpected error occurred: {e}")
else:
st.warning("Please enter an Application ID.")
st.divider()
if not st.session_state[sk("data")]:
st.info("πŸ“Œ Enter an Application ID above to load data from the database.")
return
# ── Editable form ────────────────────────────────────
render_doc_form(schema)
# ── Toolbar: Undo / Redo / Generate Draft ───────────
st.divider()
tb = st.columns([1, 1, 2, 4, 2])
with tb[0]:
st.button(
"↩️ Undo",
on_click=do_undo,
args=(doc_type,),
disabled=not st.session_state[sk("history_stack")],
use_container_width=True,
key=f"{doc_type}_undo_btn",
)
with tb[1]:
st.button(
"β†ͺ️ Redo",
on_click=do_redo,
args=(doc_type,),
disabled=not st.session_state[sk("redo_stack")],
use_container_width=True,
key=f"{doc_type}_redo_btn",
)
with tb[2]:
h = len(st.session_state[sk("history_stack")])
st.caption(f"πŸ“œ {h} version{'s' if h != 1 else ''}")
with tb[4]:
draft_clicked = st.button(
"πŸ“ Generate Draft",
type="primary",
use_container_width=True,
key=f"{doc_type}_gen_btn",
)
if draft_clicked:
with st.spinner("Generating document draft..."):
result, err = call_generate_draft(
schema,
st.session_state[sk("data")],
session_uuid=st.session_state[sk("session_uuid")],
)
if err:
st.error(f"❌ Error generating draft: {err}")
elif result and result.get("document_content"):
push_history(doc_type, st.session_state[sk("data")])
st.session_state[sk("draft_content")] = result["document_content"]
st.session_state[sk("draft_generated")] = True
# Store field structure for future core API handover
st.session_state[sk("doc_structure")] = [
{"key": f.data_key, "label": f.label, "required": False}
for f in schema.fields
]
st.session_state[sk("chat_messages")] = []
st.session_state[sk("api_history")] = []
st.session_state.pop(f"{doc_type}_draft_editor", None)
st.rerun()
else:
st.error("❌ Failed to generate draft.")
# ── Draft preview + export ───────────────────────────
if st.session_state[sk("draft_generated")]:
st.divider()
draft_hdr_col, draft_undo_col, draft_redo_col = st.columns([6, 1, 1])
with draft_hdr_col:
st.subheader("πŸ“„ Document Draft")
with draft_undo_col:
st.button(
"↩️ Undo",
on_click=do_undo,
args=(doc_type,),
disabled=not st.session_state[sk("history_stack")],
use_container_width=True,
key=f"{doc_type}_draft_undo_btn",
)
with draft_redo_col:
st.button(
"β†ͺ️ Redo",
on_click=do_redo,
args=(doc_type,),
disabled=not st.session_state[sk("redo_stack")],
use_container_width=True,
key=f"{doc_type}_draft_redo_btn",
)
st.caption(
"Data values are **highlighted in bold**. "
"Use the chat below to revise, or expand the raw editor to edit directly."
)
# Rendered markdown preview (bold highlights are visible here)
with st.container(border=True):
st.markdown(st.session_state[sk("draft_content")])
export_col, link_col = st.columns([2, 3])
with export_col:
export_clicked = st.button(
"🌐 Export to Google Docs",
type="primary",
use_container_width=True,
key=f"{doc_type}_export_btn",
)
with link_col:
if st.session_state[sk("doc_url")]:
st.link_button(
"πŸ“‚ Open Exported Doc",
st.session_state[sk("doc_url")],
use_container_width=True,
)
if export_clicked:
data = st.session_state[sk("data")] or {}
applicant_name = (data.get("personal_details") or {}).get("name") or ""
doc_label = schema.title
export_title = (
f"{doc_label} – {applicant_name}".strip(" –")
if applicant_name
else doc_label
)
with st.spinner("Exporting to Google Docs..."):
result, err = call_export_document(
st.session_state[sk("draft_content")],
title=export_title,
)
if err:
st.error(f"❌ Export failed: {err}")
elif result and result.get("url"):
st.session_state[sk("doc_url")] = result["url"]
st.rerun()
else:
st.error("❌ Export failed (no URL returned).")
# Collapsible raw Markdown editor for direct edits
with st.expander("✏️ Edit Raw Markdown"):
edited_raw = st.text_area(
"Raw editor",
value=st.session_state[sk("draft_content")],
height=420,
key=f"{doc_type}_draft_editor",
label_visibility="collapsed",
)
if edited_raw != st.session_state[sk("draft_content")]:
push_history(doc_type, st.session_state[sk("data")])
st.session_state[sk("draft_content")] = edited_raw
st.rerun()
# ── Chat ────────────────────────────────────────
st.divider()
st.caption(
"πŸ’¬ Chat to revise the draft β€” "
"e.g. *'make the second paragraph more formal'* or *'update the passport number to A9876543'*"
)
chat_c1, chat_c2 = st.columns([5, 1])
with chat_c1:
chat_text = st.text_input(
"Chat",
placeholder="Describe your revision...",
key=f"{doc_type}_chat_input",
label_visibility="collapsed",
)
with chat_c2:
send_btn = st.button(
"Send ➀",
type="primary",
use_container_width=True,
key=f"{doc_type}_send_btn",
)
if send_btn and chat_text:
st.session_state[sk("chat_messages")].append({"role": "user", "content": chat_text})
st.session_state[sk("api_history")].append({"role": "user", "content": chat_text})
with st.spinner("πŸ€– Revising document..."):
result, err = call_doc_chat_api(
chat_text,
st.session_state[sk("api_history")],
st.session_state[sk("session_uuid")],
current_document_content=st.session_state[sk("draft_content")],
structure=st.session_state[sk("doc_structure")],
)
if err:
st.session_state[sk("chat_messages")].append(
{"role": "assistant", "content": f"❌ Error: {err}"}
)
elif result:
answer = result.get("answer", "Document updated.")
st.session_state[sk("chat_messages")].append({"role": "assistant", "content": answer})
st.session_state[sk("api_history")].append({"role": "assistant", "content": answer})
if result.get("updated_document_content"):
push_history(doc_type, st.session_state[sk("data")])
st.session_state[sk("draft_content")] = result["updated_document_content"]
st.session_state.pop(f"{doc_type}_draft_editor", None)
if result.get("structure") is not None:
st.session_state[sk("doc_structure")] = result["structure"]
st.session_state.pop(f"{doc_type}_chat_input", None)
st.rerun()
elif send_btn:
st.warning("Please type a message first.")
# ── Chat history ─────────────────────────────────
for msg in st.session_state[sk("chat_messages")]:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
# ─────────────────────────────────────────────────────────
# PAGE ENTRY POINT
# ─────────────────────────────────────────────────────────
st.set_page_config(page_title="Document Generator", layout="wide")
st.title("🌍 Document Generator")
# Tabs are generated from DOCUMENT_REGISTRY β€” add a new DocumentSchema to get a new tab
tab_labels = [f"{schema.icon} {schema.title}" for schema in DOCUMENT_REGISTRY.values()]
tabs = st.tabs(tab_labels)
for tab, (doc_type, schema) in zip(tabs, DOCUMENT_REGISTRY.items()):
with tab:
render_document_tab(schema)