# app.py
import os
import re
from functools import lru_cache
import gradio as gr
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# =========================================================
# 1) LOAD PDF + BUILD VECTOR STORE
# =========================================================
PDF_FILE = "data.pdf"
loader = PyPDFLoader(PDF_FILE)
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
)
docs = text_splitter.split_documents(documents)
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-small-en-v1.5")
db = FAISS.from_documents(docs, embeddings)
# =========================================================
# 2) HELPER FUNCTIONS
# =========================================================
def normalize_lines(text: str):
return [line.strip() for line in text.splitlines() if line.strip()]
def html_escape(text: str) -> str:
if text is None:
return ""
return (
str(text)
.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
.replace('"', """)
)
def nl2br(text: str) -> str:
return html_escape(text).replace("\n", "
")
def clean_compare_text(text: str) -> str:
if not text:
return ""
text = text.lower().strip()
text = re.sub(r"\s+", " ", text)
text = re.sub(r"[^a-z0-9\[\]\(\)_=/,.\- ]", "", text)
return text
def clean_formula_text(text: str) -> str:
if not text:
return ""
text = text.lower()
text = re.sub(r"--.*", "", text)
text = re.sub(r"\s+", "", text)
return text
def is_metadata_line(line: str) -> bool:
l = line.lower().strip()
metadata_patterns = [
r"^name$",
r"^kpi id",
r"^measure name",
r"^description$",
r"^definition$",
r"^business meaning$",
r"^category$",
r"^owner$",
r"^source$",
r"^dashboard$",
r"^glossary$",
r"^#",
]
return any(re.search(p, l) for p in metadata_patterns)
def looks_like_formula_start(line: str) -> bool:
l = line.lower().strip()
formula_starts = [
"calculate(",
"sum(",
"count(",
"distinctcount(",
"divide(",
"if(",
"filter(",
"removefilters(",
"all(",
"average(",
"var ",
"return",
"switch(",
"countrows(",
"summarize(",
"lookupvalue(",
"selectedvalue(",
]
if any(fs in l for fs in formula_starts):
return True
if "[" in line and "]" in line:
return True
if "=" in line:
return True
return False
def extract_formula_block(lines):
formula_lines = []
in_formula = False
paren_balance = 0
for i, line in enumerate(lines):
stripped = line.strip()
if not in_formula and looks_like_formula_start(stripped):
in_formula = True
formula_lines.append(stripped)
paren_balance += stripped.count("(") - stripped.count(")")
continue
if in_formula:
if is_metadata_line(stripped) and paren_balance <= 0:
break
formula_lines.append(stripped)
paren_balance += stripped.count("(") - stripped.count(")")
if paren_balance <= 0:
next_line = lines[i + 1].strip() if i + 1 < len(lines) else ""
if next_line and is_metadata_line(next_line):
break
if not formula_lines:
for line in lines:
if looks_like_formula_start(line):
formula_lines.append(line)
cleaned = []
for line in formula_lines:
if len(cleaned) > 0 and is_metadata_line(line):
break
cleaned.append(line)
return "\n".join(cleaned).strip()
def extract_label_block(lines, labels):
start_idx = None
wanted = [lbl.lower() for lbl in labels]
for i, line in enumerate(lines):
if line.lower().strip() in wanted:
start_idx = i + 1
break
if start_idx is None:
return ""
collected = []
for j in range(start_idx, len(lines)):
current = lines[j].strip()
if is_metadata_line(current) and current.lower() not in wanted:
break
collected.append(current)
return " ".join(collected).strip()
def extract_named_field(lines, possible_labels):
wanted = [lbl.lower() for lbl in possible_labels]
for i, line in enumerate(lines):
if line.lower().strip() in wanted:
if i + 1 < len(lines):
return lines[i + 1].strip()
return ""
def remove_formula_from_text(lines, formula_text):
if not formula_text.strip():
return lines
formula_lines = {ln.strip() for ln in formula_text.splitlines() if ln.strip()}
return [ln for ln in lines if ln.strip() not in formula_lines]
def build_business_meaning(audience, measure_name, kpi_name):
base_name = measure_name or kpi_name or "This KPI"
if audience == "Leadership":
return (
f"{base_name} helps leadership monitor overall performance, coverage, "
f"and execution trends for strategic decision-making."
)
if audience == "Analytics User":
return (
f"{base_name} is used in reporting and analysis. It should be interpreted "
f"along with filters, source logic, and any deduplication or exclusion rules."
)
return (
f"{base_name} helps business users understand what is being tracked and how "
f"this KPI reflects field activity, engagement, or performance."
)
def build_notes(lines, kpi_name, kpi_id, measure_name, page_no):
notes = []
if kpi_name:
notes.append(f"**KPI Name:** {kpi_name}")
if kpi_id:
notes.append(f"**KPI ID:** {kpi_id}")
if measure_name:
notes.append(f"**Power BI Measure:** {measure_name}")
if page_no is not None:
notes.append(f"**Page:** {page_no + 1}")
extra_info = []
skip_labels = {
"name", "description", "definition", "kpi id from kpi glossary",
"measure name in the pbi", "business meaning", "category"
}
for line in lines:
l = line.lower().strip()
if l in skip_labels:
continue
if is_metadata_line(line):
continue
if len(line) < 2:
continue
extra_info.append(line)
if extra_info:
joined = " ".join(extra_info[:3])
notes.append(f"**Additional Context:** {joined}")
if not notes:
return "No additional notes found."
return "\n\n".join(notes)
def parse_doc_entry(doc, audience):
context = doc.page_content
lines = normalize_lines(context)
formula_text = extract_formula_block(lines)
non_formula_lines = remove_formula_from_text(lines, formula_text)
kpi_name = extract_named_field(non_formula_lines, ["Name"])
kpi_id = extract_named_field(non_formula_lines, ["KPI ID from KPI Glossary", "KPI ID"])
measure_name = extract_named_field(non_formula_lines, ["Measure name in the PBI", "Measure Name"])
definition_text = extract_label_block(non_formula_lines, ["Description", "Definition"])
if not definition_text:
heuristic_lines = []
for line in non_formula_lines:
l = line.lower()
if any(
phrase in l for phrase in [
"number of",
"count of",
"unique",
"per day",
"per calendar",
"calculated as",
"rate of",
"ratio of",
]
):
heuristic_lines.append(line)
definition_text = " ".join(heuristic_lines[:3]).strip()
if not definition_text:
definition_text = "Definition not found clearly in the source extract."
if not formula_text:
formula_text = "Formula not found in source extract."
page_no = doc.metadata.get("page", None)
business_text = build_business_meaning(audience, measure_name, kpi_name)
notes_text = build_notes(non_formula_lines, kpi_name, kpi_id, measure_name, page_no)
return {
"page": page_no,
"kpi_name": kpi_name or "Not found",
"kpi_id": kpi_id or "Not found",
"measure_name": measure_name or "Not found",
"definition": definition_text,
"business": business_text,
"formula": formula_text,
"notes": notes_text,
}
def doc_is_distinct(doc_a, doc_b):
if doc_a is None or doc_b is None:
return True
a = clean_compare_text(doc_a.page_content[:700])
b = clean_compare_text(doc_b.page_content[:700])
return a != b
def get_top_two_distinct_docs(question):
results = db.similarity_search_with_score(question, k=10)
top_docs = [doc for doc, score in results]
if not top_docs:
return None, None
first = top_docs[0]
second = None
for candidate in top_docs[1:]:
if doc_is_distinct(first, candidate):
second = candidate
break
return first, second
def compare_same(value1, value2, formula=False):
if formula:
return clean_formula_text(value1) == clean_formula_text(value2)
return clean_compare_text(value1) == clean_compare_text(value2)
def build_summary_cards(entry1, entry2=None):
def badge(text, kind="default"):
return f"{html_escape(text)}"
page1 = f"Page {entry1['page'] + 1}" if entry1 and entry1['page'] is not None else "Page not found"
cards = [
f"""
Fast KPI explanations with structured sections for Definition, Business Meaning, Formula, Notes, and Comparison. If the same KPI appears in more than one place in the PDF, the Copilot compares both occurrences side by side and highlights whether they are same or different.