""" ComplyFlow AI - Streamlit Demo UI """ from __future__ import annotations import os import re import sys from io import BytesIO import html from pathlib import Path from typing import Optional import streamlit as st from dotenv import load_dotenv # Allow running as a script from the project root PROJECT_ROOT = Path(__file__).resolve().parent if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from services.decision import analyze_document from services.retrieval import retrieve_policies load_dotenv() def _read_pdf(file_bytes: bytes) -> str: try: from pypdf import PdfReader except Exception as exc: raise RuntimeError("pypdf is not installed. Run: pip install pypdf") from exc reader = PdfReader(BytesIO(file_bytes)) pages = [page.extract_text() or "" for page in reader.pages] return "\n".join(pages).strip() def _read_text_file(file_bytes: bytes) -> str: return file_bytes.decode("utf-8", errors="ignore").strip() def _load_sample_doc(sample_path: Path) -> str: return sample_path.read_text(encoding="utf-8").strip() def _get_sample_docs() -> list[Path]: sample_dir = PROJECT_ROOT / "data" / "sample_docs" if not sample_dir.exists(): return [] return sorted(sample_dir.glob("*.txt")) def _ensure_qdrant_hint() -> Optional[str]: qdrant_url = os.getenv("QDRANT_URL") qdrant_path = os.getenv("QDRANT_PATH") if not qdrant_url and not qdrant_path: return ( "Set `QDRANT_PATH` for local mode or `QDRANT_URL` for server mode " "in your `.env`." ) return None st.set_page_config(page_title="ComplyFlow AI", page_icon="✅", layout="wide") st.markdown( """ """, unsafe_allow_html=True, ) st.markdown( """
ComplyFlow AI
AI Compliance Decision System
Automates compliance decisions by comparing documents against policies, scoring risk, and producing a clear audit trail for review.
""", unsafe_allow_html=True, ) hint = _ensure_qdrant_hint() if hint: st.warning(hint) col_left, col_right = st.columns([1, 2], gap="large") with col_left: st.markdown("
Input
", unsafe_allow_html=True) sample_docs = _get_sample_docs() sample_names = ["(None)"] + [p.name for p in sample_docs] selected_sample = st.selectbox("Sample document", sample_names, index=0) uploaded = st.file_uploader("Upload a document (TXT or PDF)", type=["txt", "pdf"]) doc_text = st.text_area("Or paste text here", height=220, placeholder="Paste document text...") top_k = 5 threshold = 0.5 run_clicked = st.button("Analyze", type="primary", use_container_width=True) def _resolve_document_text() -> str: if selected_sample and selected_sample != "(None)": sample_path = PROJECT_ROOT / "data" / "sample_docs" / selected_sample return _load_sample_doc(sample_path) if uploaded is not None: file_bytes = uploaded.read() if uploaded.name.lower().endswith(".pdf"): return _read_pdf(file_bytes) return _read_text_file(file_bytes) return doc_text.strip() def _parse_policy_summary(summary: str) -> tuple[list[str], str]: summary_lines: list[str] = [] for raw_line in summary.splitlines(): line = raw_line.strip() if not line: continue if line.startswith(("* ", "- ")): summary_lines.append(line[2:].strip()) else: summary_lines.append(line) conclusion = "" if "Conclusion:" in summary: head, tail = summary.split("Conclusion:", 1) summary_lines = [seg.strip(" *") for seg in head.split("*") if seg.strip(" *")] or summary_lines conclusion = tail.strip() if not summary_lines: summary_lines = [summary] merged: list[str] = [] pending_label: str | None = None for line in summary_lines: if re.fullmatch(r"[A-Za-z][A-Za-z\s]{0,30}:", line): pending_label = line.rstrip(":") continue if pending_label: merged.append(f"{pending_label}: {line}") pending_label = None else: merged.append(line) if pending_label: merged.append(f"{pending_label}:") return merged, conclusion def _to_bullets(text: str, max_items: int | None = None) -> list[str]: if not text: return [] lines = [line.strip() for line in text.splitlines() if line.strip()] bullets = [] for line in lines: if line.startswith(("* ", "- ")): bullets.append(line[2:].strip()) else: bullets.append(line) if not bullets and text.strip(): bullets = [text.strip()] if max_items is None: return bullets return bullets[:max_items] def _format_bold(text: str) -> str: escaped = html.escape(text or "") return re.sub(r"\*\*(.+?)\*\*", r"\1", escaped) def _format_label_value(text: str) -> str: match = re.match(r"^([A-Za-z][A-Za-z\s]{0,30}):\s*(.+)$", text) if match: label = match.group(1).strip() value = match.group(2).strip() return ( f"{_format_bold(label)}: " f"{_format_bold(value)}" ) return _format_bold(text) def _render_audit_trail(trail: list[dict]) -> None: for step in trail: step_name = step.get("step", "step") if step_name == "policy_agent": summary = step.get("summary") or "No summary available." summary_lines, conclusion = _parse_policy_summary(summary) used_llm = "Yes" if step.get("used_llm") else "No" with st.expander(f"Policy Agent (LLM: {used_llm})", expanded=False): pills_html = "".join( [ f"
" f"{_format_label_value(item)}
" for i, item in enumerate(summary_lines) ] ) st.markdown(f"
{pills_html}
", unsafe_allow_html=True) if conclusion: st.markdown( f"
Conclusion: {_format_bold(conclusion)}
", unsafe_allow_html=True, ) elif step_name == "risk_agent": score = step.get("score") explanation = step.get("explanation") or "" score_value = score if isinstance(score, (int, float)) else 0 card_class = "cf-card cf-card-warn" if score_value >= 50 else "cf-card cf-card-safe" st.markdown( f"
Risk Agent
" f"Score: {score}
{explanation}
", unsafe_allow_html=True, ) elif step_name == "workflow_agent": decision = step.get("decision") rationale = step.get("rationale") or "" decision_text = str(decision or "").lower() decision_class = "cf-card cf-card-safe" if decision_text == "approve" else "cf-card cf-card-warn" st.markdown( f"
Workflow Agent
" f"Decision: {decision}
{rationale}
", unsafe_allow_html=True, ) def _render_policy_findings(findings: list[dict]) -> None: if not findings: st.info("No policy findings returned.") return for finding in findings: policy_id = finding.get("policy_id", "UNKNOWN") title = finding.get("title", "Untitled") category = finding.get("category", "General") score = finding.get("relevance", "N/A") violation = "Yes" if finding.get("possible_violation") else "No" notes = finding.get("notes") or [] hits = finding.get("keyword_hits") or [] card_class = "cf-card cf-card-warn" if finding.get("possible_violation") else "cf-card cf-card-safe" status_chip = "cf-chip-warn" if finding.get("possible_violation") else "cf-chip-safe" status_text = "Violation" if finding.get("possible_violation") else "Compliant" st.markdown( f"
" f"{policy_id}: {title} " f"{category}" f"{status_text}
" f"Relevance: {score} | Possible violation: {violation}
" f"{'
'.join(notes) if notes else 'No notes.'}" f"{'
Hits: ' + ', '.join(hits) + '' if hits else ''}" f"
", unsafe_allow_html=True, ) with col_right: st.markdown("
Results
", unsafe_allow_html=True) if run_clicked: try: text = _resolve_document_text() if not text: st.error("Please provide a document (upload, sample, or pasted text).") else: with st.spinner("Retrieving policies and analyzing..."): policies = retrieve_policies(text, top_k=top_k, similarity_threshold=threshold) result = analyze_document(text, policies) st.success("Analysis complete.") with st.expander("Result Details", expanded=True): st.markdown("
Result Details
", unsafe_allow_html=True) dcol1, dcol2, dcol3 = st.columns([1, 1, 2]) with dcol1: st.markdown("**Decision**") st.metric("Decision", result["decision"]) with dcol2: st.markdown("**Risk Score**") st.metric("Risk Score", result["score"]) with dcol3: st.markdown(" ") ex_col, sum_col = st.columns([1, 1]) with ex_col: st.markdown("**Explanation**") exp_bullets = _to_bullets(result["explanation"]) st.markdown( f"
" f"{''.join([f'
• {_format_bold(item)}
' for item in exp_bullets])}" f"
", unsafe_allow_html=True, ) with sum_col: st.markdown("**Policy Summary**") policy_summary = next( ( step.get("summary") for step in result.get("audit_trail", []) if step.get("step") == "policy_agent" ), "", ) if policy_summary: lines, conclusion = _parse_policy_summary(policy_summary) st.markdown( f"
" f"{''.join([f'
• {_format_bold(item)}
' for item in lines[:4]])}" f"{f'
Conclusion: {_format_bold(conclusion)}
' if conclusion else ''}" f"
", unsafe_allow_html=True, ) else: st.markdown( "
No summary available.
", unsafe_allow_html=True, ) st.markdown("
Details
", unsafe_allow_html=True) dcol_left, dcol_right = st.columns([1, 1]) with dcol_left: with st.expander("Audit Trail", expanded=False): _render_audit_trail(result["audit_trail"]) with dcol_right: with st.expander("Policy Findings", expanded=False): _render_policy_findings(result["policy_findings"]) except Exception as exc: st.error(f"Error: {exc}") else: st.info("Run analysis to see results.")