# app.py import os import re from functools import lru_cache import gradio as gr from langchain_community.document_loaders import PyPDFLoader from langchain_community.vectorstores import FAISS from langchain_huggingface import HuggingFaceEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter os.environ["TOKENIZERS_PARALLELISM"] = "false" # ========================================================= # 1) LOAD PDF + BUILD VECTOR STORE # ========================================================= PDF_FILE = "data.pdf" loader = PyPDFLoader(PDF_FILE) documents = loader.load() text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, ) docs = text_splitter.split_documents(documents) embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-small-en-v1.5") db = FAISS.from_documents(docs, embeddings) # ========================================================= # 2) HELPER FUNCTIONS # ========================================================= def normalize_lines(text: str): return [line.strip() for line in text.splitlines() if line.strip()] def html_escape(text: str) -> str: if text is None: return "" return ( str(text) .replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) ) def nl2br(text: str) -> str: return html_escape(text).replace("\n", "
") def clean_compare_text(text: str) -> str: if not text: return "" text = text.lower().strip() text = re.sub(r"\s+", " ", text) text = re.sub(r"[^a-z0-9\[\]\(\)_=/,.\- ]", "", text) return text def clean_formula_text(text: str) -> str: if not text: return "" text = text.lower() text = re.sub(r"--.*", "", text) text = re.sub(r"\s+", "", text) return text def is_metadata_line(line: str) -> bool: l = line.lower().strip() metadata_patterns = [ r"^name$", r"^kpi id", r"^measure name", r"^description$", r"^definition$", r"^business meaning$", r"^category$", r"^owner$", r"^source$", r"^dashboard$", r"^glossary$", r"^#", ] return any(re.search(p, l) for p in metadata_patterns) def looks_like_formula_start(line: str) -> bool: l = line.lower().strip() formula_starts = [ "calculate(", "sum(", "count(", "distinctcount(", "divide(", "if(", "filter(", "removefilters(", "all(", "average(", "var ", "return", "switch(", "countrows(", "summarize(", "lookupvalue(", "selectedvalue(", ] if any(fs in l for fs in formula_starts): return True if "[" in line and "]" in line: return True if "=" in line: return True return False def extract_formula_block(lines): formula_lines = [] in_formula = False paren_balance = 0 for i, line in enumerate(lines): stripped = line.strip() if not in_formula and looks_like_formula_start(stripped): in_formula = True formula_lines.append(stripped) paren_balance += stripped.count("(") - stripped.count(")") continue if in_formula: if is_metadata_line(stripped) and paren_balance <= 0: break formula_lines.append(stripped) paren_balance += stripped.count("(") - stripped.count(")") if paren_balance <= 0: next_line = lines[i + 1].strip() if i + 1 < len(lines) else "" if next_line and is_metadata_line(next_line): break if not formula_lines: for line in lines: if looks_like_formula_start(line): formula_lines.append(line) cleaned = [] for line in formula_lines: if len(cleaned) > 0 and is_metadata_line(line): break cleaned.append(line) return "\n".join(cleaned).strip() def extract_label_block(lines, labels): start_idx = None wanted = [lbl.lower() for lbl in labels] for i, line in enumerate(lines): if line.lower().strip() in wanted: start_idx = i + 1 break if start_idx is None: return "" collected = [] for j in range(start_idx, len(lines)): current = lines[j].strip() if is_metadata_line(current) and current.lower() not in wanted: break collected.append(current) return " ".join(collected).strip() def extract_named_field(lines, possible_labels): wanted = [lbl.lower() for lbl in possible_labels] for i, line in enumerate(lines): if line.lower().strip() in wanted: if i + 1 < len(lines): return lines[i + 1].strip() return "" def remove_formula_from_text(lines, formula_text): if not formula_text.strip(): return lines formula_lines = {ln.strip() for ln in formula_text.splitlines() if ln.strip()} return [ln for ln in lines if ln.strip() not in formula_lines] def build_business_meaning(audience, measure_name, kpi_name): base_name = measure_name or kpi_name or "This KPI" if audience == "Leadership": return ( f"{base_name} helps leadership monitor overall performance, coverage, " f"and execution trends for strategic decision-making." ) if audience == "Analytics User": return ( f"{base_name} is used in reporting and analysis. It should be interpreted " f"along with filters, source logic, and any deduplication or exclusion rules." ) return ( f"{base_name} helps business users understand what is being tracked and how " f"this KPI reflects field activity, engagement, or performance." ) def build_notes(lines, kpi_name, kpi_id, measure_name, page_no): notes = [] if kpi_name: notes.append(f"**KPI Name:** {kpi_name}") if kpi_id: notes.append(f"**KPI ID:** {kpi_id}") if measure_name: notes.append(f"**Power BI Measure:** {measure_name}") if page_no is not None: notes.append(f"**Page:** {page_no + 1}") extra_info = [] skip_labels = { "name", "description", "definition", "kpi id from kpi glossary", "measure name in the pbi", "business meaning", "category" } for line in lines: l = line.lower().strip() if l in skip_labels: continue if is_metadata_line(line): continue if len(line) < 2: continue extra_info.append(line) if extra_info: joined = " ".join(extra_info[:3]) notes.append(f"**Additional Context:** {joined}") if not notes: return "No additional notes found." return "\n\n".join(notes) def parse_doc_entry(doc, audience): context = doc.page_content lines = normalize_lines(context) formula_text = extract_formula_block(lines) non_formula_lines = remove_formula_from_text(lines, formula_text) kpi_name = extract_named_field(non_formula_lines, ["Name"]) kpi_id = extract_named_field(non_formula_lines, ["KPI ID from KPI Glossary", "KPI ID"]) measure_name = extract_named_field(non_formula_lines, ["Measure name in the PBI", "Measure Name"]) definition_text = extract_label_block(non_formula_lines, ["Description", "Definition"]) if not definition_text: heuristic_lines = [] for line in non_formula_lines: l = line.lower() if any( phrase in l for phrase in [ "number of", "count of", "unique", "per day", "per calendar", "calculated as", "rate of", "ratio of", ] ): heuristic_lines.append(line) definition_text = " ".join(heuristic_lines[:3]).strip() if not definition_text: definition_text = "Definition not found clearly in the source extract." if not formula_text: formula_text = "Formula not found in source extract." page_no = doc.metadata.get("page", None) business_text = build_business_meaning(audience, measure_name, kpi_name) notes_text = build_notes(non_formula_lines, kpi_name, kpi_id, measure_name, page_no) return { "page": page_no, "kpi_name": kpi_name or "Not found", "kpi_id": kpi_id or "Not found", "measure_name": measure_name or "Not found", "definition": definition_text, "business": business_text, "formula": formula_text, "notes": notes_text, } def doc_is_distinct(doc_a, doc_b): if doc_a is None or doc_b is None: return True a = clean_compare_text(doc_a.page_content[:700]) b = clean_compare_text(doc_b.page_content[:700]) return a != b def get_top_two_distinct_docs(question): results = db.similarity_search_with_score(question, k=10) top_docs = [doc for doc, score in results] if not top_docs: return None, None first = top_docs[0] second = None for candidate in top_docs[1:]: if doc_is_distinct(first, candidate): second = candidate break return first, second def compare_same(value1, value2, formula=False): if formula: return clean_formula_text(value1) == clean_formula_text(value2) return clean_compare_text(value1) == clean_compare_text(value2) def build_summary_cards(entry1, entry2=None): def badge(text, kind="default"): return f"{html_escape(text)}" page1 = f"Page {entry1['page'] + 1}" if entry1 and entry1['page'] is not None else "Page not found" cards = [ f"""
KPI Name
{html_escape(entry1['kpi_name'])}
{badge(page1, 'info')}
""", f"""
KPI ID
{html_escape(entry1['kpi_id'])}
{badge('Glossary reference', 'neutral')}
""", f"""
PBI Measure
{html_escape(entry1['measure_name'])}
{badge('Primary match', 'success')}
""", ] compare_hint = "One occurrence found" compare_kind = "neutral" if entry2: same_all = ( compare_same(entry1['kpi_name'], entry2['kpi_name']) and compare_same(entry1['kpi_id'], entry2['kpi_id']) and compare_same(entry1['measure_name'], entry2['measure_name']) and compare_same(entry1['definition'], entry2['definition']) and compare_same(entry1['formula'], entry2['formula'], formula=True) ) compare_hint = "Occurrences are same" if same_all else "Differences detected" compare_kind = "success" if same_all else "warning" cards.append( f"""
Comparison Status
{html_escape(compare_hint)}
{badge('2 matches checked' if entry2 else '1 match checked', compare_kind)}
""" ) return "
" + "".join(cards) + "
" def compare_field_status(value1, value2, formula=False): return "same" if compare_same(value1, value2, formula=formula) else "different" def build_side_by_side_comparison(entry1, entry2): if not entry1 and not entry2: return "
No relevant KPI entry found.
" if entry1 and not entry2: page_text = f"Page {entry1['page'] + 1}" if entry1['page'] is not None else "Unknown page" return f"""
Only one occurrence found in the PDF ({html_escape(page_text)}). No second occurrence available for comparison.
""" if not entry1 or not entry2: return "
Only one occurrence available for comparison.
" same_all = ( compare_same(entry1['kpi_name'], entry2['kpi_name']) and compare_same(entry1['kpi_id'], entry2['kpi_id']) and compare_same(entry1['measure_name'], entry2['measure_name']) and compare_same(entry1['definition'], entry2['definition']) and compare_same(entry1['formula'], entry2['formula'], formula=True) ) overall_class = "success" if same_all else "warning" overall_text = "Both occurrences are the same" if same_all else "Differences found between the two occurrences" page1 = f"Page {entry1['page'] + 1}" if entry1['page'] is not None else "Unknown" page2 = f"Page {entry2['page'] + 1}" if entry2['page'] is not None else "Unknown" rows = [] fields = [ ("KPI Name", entry1['kpi_name'], entry2['kpi_name'], False), ("KPI ID", entry1['kpi_id'], entry2['kpi_id'], False), ("Power BI Measure", entry1['measure_name'], entry2['measure_name'], False), ("Definition", entry1['definition'], entry2['definition'], False), ("Formula", entry1['formula'], entry2['formula'], True), ] for label, left_val, right_val, is_formula in fields: status = compare_field_status(left_val, right_val, formula=is_formula) rows.append( f"""
{html_escape(label)}
{'SAME' if status == 'same' else 'DIFFERENT'}
Occurrence 1
{nl2br(left_val or 'Not found')}
Occurrence 2
{nl2br(right_val or 'Not found')}
""" ) return f"""
{html_escape(overall_text)}
Occurrence 1
{html_escape(page1)}
{html_escape(entry1['kpi_name'])}
Occurrence 2
{html_escape(page2)}
{html_escape(entry2['kpi_name'])}
{''.join(rows)}
""" # ========================================================= # 3) MAIN QUERY FUNCTION # ========================================================= @lru_cache(maxsize=100) def get_answer(question, audience): if not question or not question.strip(): return ( "
Ask a KPI question to see the summary cards.
", "Please enter a KPI question.", "", "", "", "
No comparison available.
", ) doc1, doc2 = get_top_two_distinct_docs(question) if doc1 is None: return ( "
No KPI match found.
", "Definition not found.", "", "", "", "
No relevant KPI entry found.
", ) entry1 = parse_doc_entry(doc1, audience) entry2 = parse_doc_entry(doc2, audience) if doc2 else None summary_html = build_summary_cards(entry1, entry2) comparison_html = build_side_by_side_comparison(entry1, entry2) return ( summary_html, entry1["definition"], entry1["business"], entry1["formula"], entry1["notes"], comparison_html, ) def clear_all(): return ( "
Ask a KPI question to see the summary cards.
", "", "", "", "", "
Comparison results will appear here.
", "", ) # ========================================================= # 4) UI # ========================================================= CUSTOM_CSS = """ """ with gr.Blocks() as demo: gr.HTML(CUSTOM_CSS) gr.HTML("""
💊 Pharma KPI Copilot

Fast KPI explanations with structured sections for Definition, Business Meaning, Formula, Notes, and Comparison. If the same KPI appears in more than one place in the PDF, the Copilot compares both occurrences side by side and highlights whether they are same or different.

""") with gr.Row(): with gr.Column(scale=4, elem_classes=["panel"]): question = gr.Textbox( label="Ask KPI question", placeholder="e.g. Rep Parent Calls, What is HCP Reach?, Calls/Day", lines=2, ) audience = gr.Dropdown( choices=["Business User", "Analytics User", "Leadership"], value="Business User", label="Explain for", ) submit_btn = gr.Button("Submit", variant="primary") clear_btn = gr.Button("Clear") gr.HTML( "
Tip: Ask with KPI name, measure name, or business term. Example: Rep Parent Calls or How is Calls/Day calculated?
" ) gr.Examples( examples=[ ["Rep Parent Calls", "Business User"], ["What is HCP Reach?", "Business User"], ["How is Calls/Day calculated?", "Analytics User"], ["Why is HCP Reach important?", "Leadership"], ], inputs=[question, audience], ) with gr.Column(scale=8, elem_classes=["panel"]): summary_cards = gr.HTML("
Ask a KPI question to see the summary cards.
") with gr.Tab("Definition"): definition = gr.Markdown() with gr.Tab("Business Meaning"): business = gr.Markdown() with gr.Tab("Formula"): formula = gr.Textbox(label="Formula", lines=14) with gr.Tab("Notes"): notes = gr.Markdown() with gr.Tab("Comparison"): comparison = gr.HTML("
Comparison results will appear here.
") submit_btn.click( fn=get_answer, inputs=[question, audience], outputs=[summary_cards, definition, business, formula, notes, comparison], ) clear_btn.click( fn=clear_all, inputs=[], outputs=[question, audience, summary_cards, definition, business, formula, notes, comparison], ) demo.launch()