chatbot / app.py
dipsikha25's picture
Update app.py
764cfa2 verified
# app.py
import os
import re
from functools import lru_cache
import gradio as gr
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# =========================================================
# 1) LOAD PDF + BUILD VECTOR STORE
# =========================================================
PDF_FILE = "data.pdf"
loader = PyPDFLoader(PDF_FILE)
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
)
docs = text_splitter.split_documents(documents)
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-small-en-v1.5")
db = FAISS.from_documents(docs, embeddings)
# =========================================================
# 2) HELPER FUNCTIONS
# =========================================================
def normalize_lines(text: str):
return [line.strip() for line in text.splitlines() if line.strip()]
def html_escape(text: str) -> str:
if text is None:
return ""
return (
str(text)
.replace("&", "&")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
)
def nl2br(text: str) -> str:
return html_escape(text).replace("\n", "<br>")
def clean_compare_text(text: str) -> str:
if not text:
return ""
text = text.lower().strip()
text = re.sub(r"\s+", " ", text)
text = re.sub(r"[^a-z0-9\[\]\(\)_=/,.\- ]", "", text)
return text
def clean_formula_text(text: str) -> str:
if not text:
return ""
text = text.lower()
text = re.sub(r"--.*", "", text)
text = re.sub(r"\s+", "", text)
return text
def is_metadata_line(line: str) -> bool:
l = line.lower().strip()
metadata_patterns = [
r"^name$",
r"^kpi id",
r"^measure name",
r"^description$",
r"^definition$",
r"^business meaning$",
r"^category$",
r"^owner$",
r"^source$",
r"^dashboard$",
r"^glossary$",
r"^#",
]
return any(re.search(p, l) for p in metadata_patterns)
def looks_like_formula_start(line: str) -> bool:
l = line.lower().strip()
formula_starts = [
"calculate(",
"sum(",
"count(",
"distinctcount(",
"divide(",
"if(",
"filter(",
"removefilters(",
"all(",
"average(",
"var ",
"return",
"switch(",
"countrows(",
"summarize(",
"lookupvalue(",
"selectedvalue(",
]
if any(fs in l for fs in formula_starts):
return True
if "[" in line and "]" in line:
return True
if "=" in line:
return True
return False
def extract_formula_block(lines):
formula_lines = []
in_formula = False
paren_balance = 0
for i, line in enumerate(lines):
stripped = line.strip()
if not in_formula and looks_like_formula_start(stripped):
in_formula = True
formula_lines.append(stripped)
paren_balance += stripped.count("(") - stripped.count(")")
continue
if in_formula:
if is_metadata_line(stripped) and paren_balance <= 0:
break
formula_lines.append(stripped)
paren_balance += stripped.count("(") - stripped.count(")")
if paren_balance <= 0:
next_line = lines[i + 1].strip() if i + 1 < len(lines) else ""
if next_line and is_metadata_line(next_line):
break
if not formula_lines:
for line in lines:
if looks_like_formula_start(line):
formula_lines.append(line)
cleaned = []
for line in formula_lines:
if len(cleaned) > 0 and is_metadata_line(line):
break
cleaned.append(line)
return "\n".join(cleaned).strip()
def extract_label_block(lines, labels):
start_idx = None
wanted = [lbl.lower() for lbl in labels]
for i, line in enumerate(lines):
if line.lower().strip() in wanted:
start_idx = i + 1
break
if start_idx is None:
return ""
collected = []
for j in range(start_idx, len(lines)):
current = lines[j].strip()
if is_metadata_line(current) and current.lower() not in wanted:
break
collected.append(current)
return " ".join(collected).strip()
def extract_named_field(lines, possible_labels):
wanted = [lbl.lower() for lbl in possible_labels]
for i, line in enumerate(lines):
if line.lower().strip() in wanted:
if i + 1 < len(lines):
return lines[i + 1].strip()
return ""
def remove_formula_from_text(lines, formula_text):
if not formula_text.strip():
return lines
formula_lines = {ln.strip() for ln in formula_text.splitlines() if ln.strip()}
return [ln for ln in lines if ln.strip() not in formula_lines]
def build_business_meaning(audience, measure_name, kpi_name):
base_name = measure_name or kpi_name or "This KPI"
if audience == "Leadership":
return (
f"{base_name} helps leadership monitor overall performance, coverage, "
f"and execution trends for strategic decision-making."
)
if audience == "Analytics User":
return (
f"{base_name} is used in reporting and analysis. It should be interpreted "
f"along with filters, source logic, and any deduplication or exclusion rules."
)
return (
f"{base_name} helps business users understand what is being tracked and how "
f"this KPI reflects field activity, engagement, or performance."
)
def build_notes(lines, kpi_name, kpi_id, measure_name, page_no):
notes = []
if kpi_name:
notes.append(f"**KPI Name:** {kpi_name}")
if kpi_id:
notes.append(f"**KPI ID:** {kpi_id}")
if measure_name:
notes.append(f"**Power BI Measure:** {measure_name}")
if page_no is not None:
notes.append(f"**Page:** {page_no + 1}")
extra_info = []
skip_labels = {
"name", "description", "definition", "kpi id from kpi glossary",
"measure name in the pbi", "business meaning", "category"
}
for line in lines:
l = line.lower().strip()
if l in skip_labels:
continue
if is_metadata_line(line):
continue
if len(line) < 2:
continue
extra_info.append(line)
if extra_info:
joined = " ".join(extra_info[:3])
notes.append(f"**Additional Context:** {joined}")
if not notes:
return "No additional notes found."
return "\n\n".join(notes)
def parse_doc_entry(doc, audience):
context = doc.page_content
lines = normalize_lines(context)
formula_text = extract_formula_block(lines)
non_formula_lines = remove_formula_from_text(lines, formula_text)
kpi_name = extract_named_field(non_formula_lines, ["Name"])
kpi_id = extract_named_field(non_formula_lines, ["KPI ID from KPI Glossary", "KPI ID"])
measure_name = extract_named_field(non_formula_lines, ["Measure name in the PBI", "Measure Name"])
definition_text = extract_label_block(non_formula_lines, ["Description", "Definition"])
if not definition_text:
heuristic_lines = []
for line in non_formula_lines:
l = line.lower()
if any(
phrase in l for phrase in [
"number of",
"count of",
"unique",
"per day",
"per calendar",
"calculated as",
"rate of",
"ratio of",
]
):
heuristic_lines.append(line)
definition_text = " ".join(heuristic_lines[:3]).strip()
if not definition_text:
definition_text = "Definition not found clearly in the source extract."
if not formula_text:
formula_text = "Formula not found in source extract."
page_no = doc.metadata.get("page", None)
business_text = build_business_meaning(audience, measure_name, kpi_name)
notes_text = build_notes(non_formula_lines, kpi_name, kpi_id, measure_name, page_no)
return {
"page": page_no,
"kpi_name": kpi_name or "Not found",
"kpi_id": kpi_id or "Not found",
"measure_name": measure_name or "Not found",
"definition": definition_text,
"business": business_text,
"formula": formula_text,
"notes": notes_text,
}
def doc_is_distinct(doc_a, doc_b):
if doc_a is None or doc_b is None:
return True
a = clean_compare_text(doc_a.page_content[:700])
b = clean_compare_text(doc_b.page_content[:700])
return a != b
def get_top_two_distinct_docs(question):
results = db.similarity_search_with_score(question, k=10)
top_docs = [doc for doc, score in results]
if not top_docs:
return None, None
first = top_docs[0]
second = None
for candidate in top_docs[1:]:
if doc_is_distinct(first, candidate):
second = candidate
break
return first, second
def compare_same(value1, value2, formula=False):
if formula:
return clean_formula_text(value1) == clean_formula_text(value2)
return clean_compare_text(value1) == clean_compare_text(value2)
def build_summary_cards(entry1, entry2=None):
def badge(text, kind="default"):
return f"<span class='pill {kind}'>{html_escape(text)}</span>"
page1 = f"Page {entry1['page'] + 1}" if entry1 and entry1['page'] is not None else "Page not found"
cards = [
f"""
<div class='summary-card'>
<div class='summary-label'>KPI Name</div>
<div class='summary-value'>{html_escape(entry1['kpi_name'])}</div>
<div class='summary-sub'>{badge(page1, 'info')}</div>
</div>
""",
f"""
<div class='summary-card'>
<div class='summary-label'>KPI ID</div>
<div class='summary-value'>{html_escape(entry1['kpi_id'])}</div>
<div class='summary-sub'>{badge('Glossary reference', 'neutral')}</div>
</div>
""",
f"""
<div class='summary-card'>
<div class='summary-label'>PBI Measure</div>
<div class='summary-value'>{html_escape(entry1['measure_name'])}</div>
<div class='summary-sub'>{badge('Primary match', 'success')}</div>
</div>
""",
]
compare_hint = "One occurrence found"
compare_kind = "neutral"
if entry2:
same_all = (
compare_same(entry1['kpi_name'], entry2['kpi_name'])
and compare_same(entry1['kpi_id'], entry2['kpi_id'])
and compare_same(entry1['measure_name'], entry2['measure_name'])
and compare_same(entry1['definition'], entry2['definition'])
and compare_same(entry1['formula'], entry2['formula'], formula=True)
)
compare_hint = "Occurrences are same" if same_all else "Differences detected"
compare_kind = "success" if same_all else "warning"
cards.append(
f"""
<div class='summary-card'>
<div class='summary-label'>Comparison Status</div>
<div class='summary-value'>{html_escape(compare_hint)}</div>
<div class='summary-sub'>{badge('2 matches checked' if entry2 else '1 match checked', compare_kind)}</div>
</div>
"""
)
return "<div class='summary-grid'>" + "".join(cards) + "</div>"
def compare_field_status(value1, value2, formula=False):
return "same" if compare_same(value1, value2, formula=formula) else "different"
def build_side_by_side_comparison(entry1, entry2):
if not entry1 and not entry2:
return "<div class='empty-state'>No relevant KPI entry found.</div>"
if entry1 and not entry2:
page_text = f"Page {entry1['page'] + 1}" if entry1['page'] is not None else "Unknown page"
return f"""
<div class='compare-wrap single'>
<div class='compare-banner neutral'>Only one occurrence found in the PDF ({html_escape(page_text)}). No second occurrence available for comparison.</div>
</div>
"""
if not entry1 or not entry2:
return "<div class='empty-state'>Only one occurrence available for comparison.</div>"
same_all = (
compare_same(entry1['kpi_name'], entry2['kpi_name'])
and compare_same(entry1['kpi_id'], entry2['kpi_id'])
and compare_same(entry1['measure_name'], entry2['measure_name'])
and compare_same(entry1['definition'], entry2['definition'])
and compare_same(entry1['formula'], entry2['formula'], formula=True)
)
overall_class = "success" if same_all else "warning"
overall_text = "Both occurrences are the same" if same_all else "Differences found between the two occurrences"
page1 = f"Page {entry1['page'] + 1}" if entry1['page'] is not None else "Unknown"
page2 = f"Page {entry2['page'] + 1}" if entry2['page'] is not None else "Unknown"
rows = []
fields = [
("KPI Name", entry1['kpi_name'], entry2['kpi_name'], False),
("KPI ID", entry1['kpi_id'], entry2['kpi_id'], False),
("Power BI Measure", entry1['measure_name'], entry2['measure_name'], False),
("Definition", entry1['definition'], entry2['definition'], False),
("Formula", entry1['formula'], entry2['formula'], True),
]
for label, left_val, right_val, is_formula in fields:
status = compare_field_status(left_val, right_val, formula=is_formula)
rows.append(
f"""
<div class='compare-row {status}'>
<div class='compare-field'>
<div class='field-name'>{html_escape(label)}</div>
<div class='field-status {status}'>{'SAME' if status == 'same' else 'DIFFERENT'}</div>
</div>
<div class='compare-cell'>
<div class='cell-title'>Occurrence 1</div>
<div class='cell-content {'code-block' if is_formula else ''}'>{nl2br(left_val or 'Not found')}</div>
</div>
<div class='compare-cell'>
<div class='cell-title'>Occurrence 2</div>
<div class='cell-content {'code-block' if is_formula else ''}'>{nl2br(right_val or 'Not found')}</div>
</div>
</div>
"""
)
return f"""
<div class='compare-wrap'>
<div class='compare-banner {overall_class}'>{html_escape(overall_text)}</div>
<div class='compare-head'>
<div class='head-card'>
<div class='head-label'>Occurrence 1</div>
<div class='head-page'>{html_escape(page1)}</div>
<div class='head-name'>{html_escape(entry1['kpi_name'])}</div>
</div>
<div class='head-card'>
<div class='head-label'>Occurrence 2</div>
<div class='head-page'>{html_escape(page2)}</div>
<div class='head-name'>{html_escape(entry2['kpi_name'])}</div>
</div>
</div>
<div class='compare-table'>
{''.join(rows)}
</div>
</div>
"""
# =========================================================
# 3) MAIN QUERY FUNCTION
# =========================================================
@lru_cache(maxsize=100)
def get_answer(question, audience):
if not question or not question.strip():
return (
"<div class='empty-state'>Ask a KPI question to see the summary cards.</div>",
"Please enter a KPI question.",
"",
"",
"",
"<div class='empty-state'>No comparison available.</div>",
)
doc1, doc2 = get_top_two_distinct_docs(question)
if doc1 is None:
return (
"<div class='empty-state'>No KPI match found.</div>",
"Definition not found.",
"",
"",
"",
"<div class='empty-state'>No relevant KPI entry found.</div>",
)
entry1 = parse_doc_entry(doc1, audience)
entry2 = parse_doc_entry(doc2, audience) if doc2 else None
summary_html = build_summary_cards(entry1, entry2)
comparison_html = build_side_by_side_comparison(entry1, entry2)
return (
summary_html,
entry1["definition"],
entry1["business"],
entry1["formula"],
entry1["notes"],
comparison_html,
)
def clear_all():
return (
"<div class='empty-state'>Ask a KPI question to see the summary cards.</div>",
"",
"",
"",
"",
"<div class='empty-state'>Comparison results will appear here.</div>",
"",
)
# =========================================================
# 4) UI
# =========================================================
CUSTOM_CSS = """
<style>
:root {
--bg1: #f6f8ff;
--bg2: #fafdff;
--bg3: #eef4ff;
--card: rgba(255,255,255,0.82);
--card-strong: rgba(255,255,255,0.94);
--stroke: rgba(99, 102, 241, 0.14);
--text: #14213d;
--muted: #667085;
--primary: #5b5bd6;
--primary-2: #7c4dff;
--primary-3: #3b82f6;
--success-bg: #ecfdf3;
--success-text: #067647;
--warning-bg: #fff7ed;
--warning-text: #c2410c;
--neutral-bg: #f8fafc;
--neutral-text: #475467;
--shadow: 0 18px 40px rgba(34, 55, 110, 0.10);
}
body, .gradio-container {
background: linear-gradient(135deg, var(--bg1) 0%, var(--bg2) 45%, var(--bg3) 100%) !important;
}
.gradio-container {
max-width: 1400px !important;
padding-top: 18px !important;
}
.hero {
background: linear-gradient(135deg, rgba(91,91,214,0.14), rgba(124,77,255,0.08), rgba(59,130,246,0.06));
border: 1px solid rgba(124,77,255,0.14);
box-shadow: var(--shadow);
border-radius: 26px;
padding: 26px 30px;
margin-bottom: 18px;
backdrop-filter: blur(10px);
}
.hero-title {
font-size: 34px;
font-weight: 800;
color: var(--text);
margin: 0 0 8px 0;
}
.hero-subtitle {
font-size: 15px;
color: var(--muted);
margin: 0;
line-height: 1.65;
}
.panel {
background: var(--card) !important;
border: 1px solid var(--stroke) !important;
border-radius: 22px !important;
box-shadow: var(--shadow) !important;
padding: 16px !important;
backdrop-filter: blur(12px);
}
.gr-box, .gr-panel, .gr-form, .gr-group {
border-radius: 18px !important;
}
textarea, input, .gr-textbox, .gr-dropdown {
border-radius: 16px !important;
}
button.primary, button[class*='primary'] {
background: linear-gradient(135deg, var(--primary), var(--primary-2)) !important;
border: none !important;
color: white !important;
border-radius: 16px !important;
box-shadow: 0 10px 22px rgba(91,91,214,0.22) !important;
}
button.secondary {
border-radius: 16px !important;
}
button[role='tab'] {
border-radius: 14px 14px 0 0 !important;
font-weight: 700 !important;
}
button[role='tab'][aria-selected='true'] {
color: var(--primary) !important;
border-bottom: 3px solid var(--primary) !important;
}
.kpi-note {
background: rgba(255,255,255,0.68);
border: 1px dashed rgba(91,91,214,0.18);
border-radius: 16px;
padding: 12px 14px;
color: var(--muted);
font-size: 13px;
margin-top: 8px;
}
.summary-grid {
display: grid;
grid-template-columns: repeat(4, minmax(0, 1fr));
gap: 14px;
margin-bottom: 16px;
}
.summary-card {
background: linear-gradient(180deg, var(--card-strong), rgba(255,255,255,0.72));
border: 1px solid rgba(91,91,214,0.12);
border-radius: 20px;
padding: 16px;
box-shadow: 0 12px 28px rgba(56, 72, 122, 0.08);
min-height: 122px;
}
.summary-label {
color: var(--muted);
font-size: 12px;
font-weight: 700;
letter-spacing: 0.04em;
text-transform: uppercase;
margin-bottom: 10px;
}
.summary-value {
color: var(--text);
font-size: 20px;
font-weight: 800;
line-height: 1.25;
word-break: break-word;
}
.summary-sub {
margin-top: 14px;
}
.pill {
display: inline-flex;
align-items: center;
gap: 6px;
padding: 7px 11px;
border-radius: 999px;
font-size: 12px;
font-weight: 700;
}
.pill.info {
background: rgba(59,130,246,0.12);
color: #1d4ed8;
}
.pill.success {
background: rgba(16,185,129,0.14);
color: #047857;
}
.pill.warning {
background: rgba(245,158,11,0.16);
color: #b45309;
}
.pill.neutral {
background: rgba(100,116,139,0.12);
color: #475467;
}
.compare-wrap {
display: flex;
flex-direction: column;
gap: 14px;
}
.compare-banner {
padding: 14px 16px;
border-radius: 16px;
font-weight: 800;
font-size: 14px;
border: 1px solid transparent;
}
.compare-banner.success {
background: var(--success-bg);
color: var(--success-text);
border-color: rgba(6,118,71,0.12);
}
.compare-banner.warning {
background: var(--warning-bg);
color: var(--warning-text);
border-color: rgba(194,65,12,0.12);
}
.compare-banner.neutral {
background: var(--neutral-bg);
color: var(--neutral-text);
border-color: rgba(71,84,103,0.12);
}
.compare-head {
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr));
gap: 14px;
}
.head-card {
background: rgba(255,255,255,0.82);
border: 1px solid rgba(99,102,241,0.12);
border-radius: 18px;
padding: 16px;
}
.head-label {
color: var(--muted);
font-size: 12px;
font-weight: 700;
text-transform: uppercase;
letter-spacing: 0.04em;
}
.head-page {
color: var(--primary);
font-size: 13px;
font-weight: 700;
margin-top: 6px;
}
.head-name {
color: var(--text);
font-size: 18px;
font-weight: 800;
margin-top: 8px;
}
.compare-table {
display: flex;
flex-direction: column;
gap: 12px;
}
.compare-row {
display: grid;
grid-template-columns: 220px 1fr 1fr;
gap: 12px;
align-items: stretch;
}
.compare-field, .compare-cell {
background: rgba(255,255,255,0.82);
border: 1px solid rgba(99,102,241,0.10);
border-radius: 18px;
padding: 14px;
}
.compare-row.same .compare-field {
background: linear-gradient(180deg, #f0fdf4, #ffffff);
}
.compare-row.different .compare-field {
background: linear-gradient(180deg, #fff7ed, #ffffff);
}
.field-name {
color: var(--text);
font-weight: 800;
font-size: 15px;
}
.field-status {
display: inline-block;
margin-top: 12px;
padding: 6px 10px;
border-radius: 999px;
font-size: 11px;
font-weight: 800;
letter-spacing: 0.05em;
}
.field-status.same {
background: rgba(16,185,129,0.14);
color: #047857;
}
.field-status.different {
background: rgba(245,158,11,0.16);
color: #b45309;
}
.cell-title {
color: var(--muted);
font-size: 12px;
font-weight: 700;
text-transform: uppercase;
letter-spacing: 0.04em;
margin-bottom: 8px;
}
.cell-content {
color: var(--text);
font-size: 14px;
line-height: 1.6;
white-space: normal;
word-break: break-word;
}
.code-block {
font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", monospace;
background: #f8fafc;
border: 1px solid rgba(148,163,184,0.16);
border-radius: 14px;
padding: 12px;
white-space: pre-wrap;
}
.empty-state {
background: rgba(255,255,255,0.74);
border: 1px dashed rgba(91,91,214,0.20);
border-radius: 18px;
padding: 18px;
color: var(--muted);
}
@media (max-width: 1100px) {
.summary-grid {
grid-template-columns: repeat(2, minmax(0, 1fr));
}
.compare-row {
grid-template-columns: 1fr;
}
.compare-head {
grid-template-columns: 1fr;
}
}
@media (max-width: 700px) {
.summary-grid {
grid-template-columns: 1fr;
}
}
</style>
"""
with gr.Blocks() as demo:
gr.HTML(CUSTOM_CSS)
gr.HTML("""
<div class='hero'>
<div class='hero-title'>💊 Pharma KPI Copilot</div>
<p class='hero-subtitle'>
Fast KPI explanations with structured sections for <b>Definition</b>, <b>Business Meaning</b>, <b>Formula</b>, <b>Notes</b>, and <b>Comparison</b>.
If the same KPI appears in more than one place in the PDF, the Copilot compares both occurrences side by side and highlights whether they are same or different.
</p>
</div>
""")
with gr.Row():
with gr.Column(scale=4, elem_classes=["panel"]):
question = gr.Textbox(
label="Ask KPI question",
placeholder="e.g. Rep Parent Calls, What is HCP Reach?, Calls/Day",
lines=2,
)
audience = gr.Dropdown(
choices=["Business User", "Analytics User", "Leadership"],
value="Business User",
label="Explain for",
)
submit_btn = gr.Button("Submit", variant="primary")
clear_btn = gr.Button("Clear")
gr.HTML(
"<div class='kpi-note'>Tip: Ask with KPI name, measure name, or business term. Example: <b>Rep Parent Calls</b> or <b>How is Calls/Day calculated?</b></div>"
)
gr.Examples(
examples=[
["Rep Parent Calls", "Business User"],
["What is HCP Reach?", "Business User"],
["How is Calls/Day calculated?", "Analytics User"],
["Why is HCP Reach important?", "Leadership"],
],
inputs=[question, audience],
)
with gr.Column(scale=8, elem_classes=["panel"]):
summary_cards = gr.HTML("<div class='empty-state'>Ask a KPI question to see the summary cards.</div>")
with gr.Tab("Definition"):
definition = gr.Markdown()
with gr.Tab("Business Meaning"):
business = gr.Markdown()
with gr.Tab("Formula"):
formula = gr.Textbox(label="Formula", lines=14)
with gr.Tab("Notes"):
notes = gr.Markdown()
with gr.Tab("Comparison"):
comparison = gr.HTML("<div class='empty-state'>Comparison results will appear here.</div>")
submit_btn.click(
fn=get_answer,
inputs=[question, audience],
outputs=[summary_cards, definition, business, formula, notes, comparison],
)
clear_btn.click(
fn=clear_all,
inputs=[],
outputs=[question, audience, summary_cards, definition, business, formula, notes, comparison],
)
demo.launch()