Spaces:
Running
Running
| import os | |
| import re | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from langchain_core.documents import Document | |
| from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter | |
| from langchain_chroma import Chroma | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| load_dotenv() | |
| DATA_DIR = Path("data") | |
| DB_DIR = "db" | |
| COLLECTION_NAME = "handbook_docs" | |
| EMBED_MODEL = "BAAI/bge-small-en-v1.5" | |
| CHUNK_SIZE = 2000 | |
| CHUNK_OVERLAP = 200 | |
| CHROMA_API_KEY = os.getenv("CHROMA_API_KEY") | |
| CHROMA_TENANT = os.getenv("CHROMA_TENANT") | |
| CHROMA_DATABASE = os.getenv("CHROMA_DATABASE") | |
| USE_CHROMA_CLOUD = bool(CHROMA_API_KEY) | |
| def get_chroma_client(): | |
| """Return a chromadb client — CloudClient if Chroma Cloud env vars are | |
| set, otherwise a local PersistentClient writing to DB_DIR.""" | |
| import chromadb | |
| if USE_CHROMA_CLOUD: | |
| return chromadb.CloudClient( | |
| api_key=CHROMA_API_KEY, | |
| tenant=CHROMA_TENANT, | |
| database=CHROMA_DATABASE, | |
| ) | |
| return chromadb.PersistentClient(path=DB_DIR) | |
| HEADERS_TO_SPLIT = [ | |
| ("#", "h1"), | |
| ("##", "h2"), | |
| ("###", "h3"), | |
| ] | |
| _TABLE_LINK_MAP = { | |
| r'<u>\s*Table\s*2\s*</u>': ( | |
| "Table 2 (NPTEL Dep/Free Electives — " | |
| "https://docs.google.com/spreadsheets/d/e/2PACX-1vSJXV0JECyoQvgWvBlVxO13G0KRm5a1qNCRBa7rAw8GDY4e0cfm1KiVCwlgs_ed80ObtzQ1rfx_JWIR/pub?gid=399341609&single=true)" | |
| ), | |
| r'<u>\s*NPTEL-Table\s*3\s*</u>': ( | |
| "Table 3 NPTEL HS/MG Electives — " | |
| "https://docs.google.com/spreadsheets/d/e/2PACX-1vSJXV0JECyoQvgWvBlVxO13G0KRm5a1qNCRBa7rAw8GDY4e0cfm1KiVCwlgs_ed80ObtzQ1rfx_JWIR/pub?gid=1418834182&single=true)" | |
| ), | |
| } | |
| def expand_table_links(text: str) -> str: | |
| """Replace bare <u>Table N</u> anchors with their full resolved URLs.""" | |
| import re | |
| for pattern, replacement in _TABLE_LINK_MAP.items(): | |
| text = re.sub(pattern, replacement, text, flags=re.IGNORECASE) | |
| return text | |
| def _extract_table_rows(table_html: str) -> list[list[str]]: | |
| """Extract all cell values from an HTML table as a list of rows (no rowspan handling).""" | |
| rows = re.findall(r'<tr[^>]*>(.*?)</tr>', table_html, re.DOTALL | re.IGNORECASE) | |
| result = [] | |
| for row in rows: | |
| cells = re.findall(r'<t[hd][^>]*>(.*?)</t[hd]>', row, re.DOTALL | re.IGNORECASE) | |
| cleaned = [re.sub(r'<[^>]+>', '', cell).strip() for cell in cells] | |
| if any(cleaned): | |
| result.append(cleaned) | |
| return result | |
| def _parse_cell(attrs: str, content: str) -> dict: | |
| """Parse a single table cell including rowspan/colspan.""" | |
| text = re.sub(r'<br\s*/?>', ' ', content, flags=re.IGNORECASE) | |
| text = re.sub(r'<[^>]+>', '', text).strip() | |
| rowspan = int(re.search(r'rowspan=["\']?(\d+)', attrs, re.I).group(1)) if re.search(r'rowspan', attrs, re.I) else 1 | |
| colspan = int(re.search(r'colspan=["\']?(\d+)', attrs, re.I).group(1)) if re.search(r'colspan', attrs, re.I) else 1 | |
| return {'text': text, 'rowspan': rowspan, 'colspan': colspan} | |
| def _extract_cells_raw(table_html: str) -> list[list[dict]]: | |
| """Extract rows as dicts with text/rowspan/colspan preserved.""" | |
| rows = re.findall(r'<tr[^>]*>(.*?)</tr>', table_html, re.DOTALL | re.IGNORECASE) | |
| return [ | |
| [_parse_cell(a, c) for a, c in re.findall(r'<t[hd]([^>]*)>(.*?)</t[hd]>', row, re.DOTALL | re.IGNORECASE)] | |
| for row in rows | |
| ] | |
| def _expand_rowspans(raw_rows: list, n_cols: int) -> list[list[str]]: | |
| """Expand rowspan/colspan into a full 2D grid of strings.""" | |
| grid = [] | |
| carry: dict = {} | |
| for row in raw_rows: | |
| expanded: list = [] | |
| col = 0 | |
| cell_iter = iter(row) | |
| while col < n_cols: | |
| if col in carry: | |
| text, rem = carry[col] | |
| expanded.append(text) | |
| if rem > 1: | |
| carry[col] = (text, rem - 1) | |
| else: | |
| del carry[col] | |
| col += 1 | |
| else: | |
| try: | |
| cell = next(cell_iter) | |
| except StopIteration: | |
| expanded.append('') | |
| col += 1 | |
| continue | |
| for c in range(cell['colspan']): | |
| expanded.append(cell['text']) | |
| if cell['rowspan'] > 1: | |
| carry[col + c] = (cell['text'], cell['rowspan'] - 1) | |
| col += cell['colspan'] | |
| grid.append(expanded[:n_cols]) | |
| return grid | |
| # --- OPPE Schedule table conversion --- | |
| _OPPE_SCHEDULE_HEADING = re.compile(r'OPPE\s+SCHEDULE', re.IGNORECASE) | |
| _OPPE_CONTINUATION = re.compile(r'OPPE\s*2\s*\(Day\s*[34]\)', re.IGNORECASE) | |
| def _is_oppe_header_row(row_cells: list) -> bool: | |
| texts = {c['text'] for c in row_cells} | |
| return bool({'Exam', 'Python', 'Timing'} & texts) | |
| def _is_date_row(grid_row: list) -> bool: | |
| non_empty = [c for c in grid_row if c.strip()] | |
| unique = set(non_empty) | |
| return len(unique) == 1 and bool(re.search(r'\b20\d\d\b', list(unique)[0])) | |
| def _oppe_table_to_prose(table_html: str, inherited_date: str = '') -> tuple: | |
| """ | |
| Convert an OPPE schedule table (or fragment) into prose lines like: | |
| "OPPE1 (Day 1) on Saturday, August 1, 2026, 2.30 PM to 4.30 PM: Python" | |
| Returns (lines, last_date_seen) so split fragments can pass the date forward. | |
| """ | |
| raw_rows = _extract_cells_raw(table_html) | |
| if not raw_rows: | |
| return [], inherited_date | |
| header_indices = [i for i, r in enumerate(raw_rows) if _is_oppe_header_row(r)] | |
| if not header_indices: | |
| return [], inherited_date | |
| first_hdr = raw_rows[header_indices[0]] | |
| headers: list = [] | |
| for c in first_hdr: | |
| for _ in range(c['colspan']): | |
| headers.append(c['text']) | |
| n_cols = len(headers) | |
| col_exam = next((i for i, h in enumerate(headers) if h == 'Exam'), 0) | |
| col_timing = next((i for i, h in enumerate(headers) if 'iming' in h), 1) | |
| subject_cols = [(i, h) for i, h in enumerate(headers) if i not in (col_exam, col_timing) and h.strip()] | |
| data_rows = [row for i, row in enumerate(raw_rows) if i not in set(header_indices)] | |
| grid = _expand_rowspans(data_rows, n_cols) | |
| current_date = inherited_date | |
| current_exam = '' | |
| lines: list = [] | |
| for row in grid: | |
| if not row or len(row) < 2: | |
| continue | |
| if _is_date_row(row): | |
| current_date = next(c for c in row if c.strip()) | |
| continue | |
| exam = row[col_exam].strip() if col_exam < len(row) else '' | |
| timing = row[col_timing].strip() if col_timing < len(row) else '' | |
| if exam and exam != 'Exam': | |
| current_exam = exam | |
| subjects = [ | |
| h for i, h in subject_cols | |
| if i < len(row) and row[i].strip() and not re.search(r'\b20\d\d\b', row[i]) | |
| ] | |
| if subjects and timing and current_exam: | |
| lines.append(f"{current_exam} on {current_date}, {timing}: {', '.join(subjects)}") | |
| return lines, current_date | |
| def _electives_table_to_prose(table_html: str) -> str: | |
| """ | |
| Convert the Department Core/Elective Courses table into term-grouped | |
| prose sentences instead of a pipe table. | |
| Each term gets one sentence listing every course offered that term, | |
| e.g.: | |
| Courses offered in May 2026: Software Engineering (BSCS3001, Core_BP), | |
| Deep Learning (BSCS3004, Core_BD), ... | |
| This keeps the full content in a small, naturally retrievable chunk. | |
| """ | |
| rows = _extract_table_rows(table_html) | |
| # Find the header row that contains "Course ID" | |
| header_idx = next( | |
| (i for i, r in enumerate(rows) if any("Course ID" in c for c in r)), None | |
| ) | |
| if header_idx is None: | |
| # Fallback: convert normally if structure is unexpected | |
| return _table_to_markdown(table_html) | |
| header = rows[header_idx] | |
| # Locate column indices dynamically | |
| def col(name): | |
| for i, h in enumerate(header): | |
| if name.lower() in h.lower(): | |
| return i | |
| return None | |
| idx_id = col("Course ID") | |
| idx_name = col("Course Name") | |
| idx_type = col("Course Type") | |
| idx_level = col("Course Level") | |
| idx_may = col("May 2026") | |
| idx_sep = col("Sep 2026") | |
| idx_jan = col("Jan 2027") | |
| if any(i is None for i in [idx_id, idx_name, idx_may, idx_sep, idx_jan]): | |
| return _table_to_markdown(table_html) | |
| data_rows = rows[header_idx + 1:] | |
| def collect_term(term_idx): | |
| courses = [] | |
| for r in data_rows: | |
| if len(r) > term_idx and r[term_idx].strip().upper() == 'Y': | |
| cid = r[idx_id].strip() if idx_id < len(r) else '' | |
| cname = r[idx_name].strip() if idx_name < len(r) else '' | |
| ctype = r[idx_type].strip() if idx_type is not None and idx_type < len(r) else '' | |
| clvl = r[idx_level].strip() if idx_level is not None and idx_level < len(r) else '' | |
| parts = f"{cname} ({cid}" | |
| if ctype: | |
| parts += f", {ctype}" | |
| if clvl: | |
| parts += f", {clvl}" | |
| parts += ")" | |
| courses.append(parts) | |
| return courses | |
| lines = [] | |
| for term_label, term_idx in [("May 2026", idx_may), ("Sep 2026", idx_sep), ("Jan 2027", idx_jan)]: | |
| courses = collect_term(term_idx) | |
| if courses: | |
| lines.append(f"Courses offered in {term_label}: {', '.join(courses)}.") | |
| return "\n\n" + "\n\n".join(lines) + "\n\n" | |
| def _table_to_markdown(table_html: str) -> str: | |
| """Default HTML table → Markdown pipe table (used for all non-elective tables).""" | |
| rows = _extract_table_rows(table_html) | |
| if not rows: | |
| return table_html | |
| md_rows = ['| ' + ' | '.join(r) + ' |' for r in rows] | |
| separator = '| ' + ' | '.join(['---'] * len(rows[0])) + ' |' | |
| md_rows.insert(1, separator) | |
| return '\n' + '\n'.join(md_rows) + '\n' | |
| # Heading text that immediately precedes the electives table in the source doc | |
| _ELECTIVES_TABLE_HEADING = re.compile( | |
| r'Table\s+1\s*:\s*Department\s+Core/Elective\s+Courses', | |
| re.IGNORECASE, | |
| ) | |
| # Matches a continuation fragment: <table><tbody> with no <thead>, first cell is a course ID | |
| _ELECTIVES_CONTINUATION = re.compile( | |
| r'^<table[^>]*>\s*<tbody>\s*<tr>\s*<td>\s*BS[A-Z0-9]+\s*</td>', | |
| re.IGNORECASE, | |
| ) | |
| def stitch_electives_table(text: str) -> str: | |
| """ | |
| The Google Docs export inserts page-break boilerplate mid-table, producing | |
| two separate <table> blocks for Table 1. Detect the continuation fragment | |
| (no <thead>, first cell is a course ID like BSMA3014) and merge its <tbody> | |
| rows back into the preceding table before any conversion happens. | |
| """ | |
| table_pattern = re.compile(r'(<table[^>]*>.*?</table>)', re.DOTALL | re.IGNORECASE) | |
| parts = table_pattern.split(text) | |
| # parts alternates: [text, table, text, table, ...] | |
| i = 0 | |
| while i < len(parts): | |
| if i >= 2 and table_pattern.match(parts[i]) and _ELECTIVES_CONTINUATION.match(parts[i].strip()): | |
| # Extract just the <tr>...</tr> rows from the continuation tbody | |
| extra_rows = re.findall(r'(<tr[^>]*>.*?</tr>)', parts[i], re.DOTALL | re.IGNORECASE) | |
| if extra_rows: | |
| # Inject them before </tbody></table> of the preceding table (parts[i-2]) | |
| parts[i - 2] = re.sub( | |
| r'(</tbody>\s*</table>)\s*$', | |
| '\n'.join(extra_rows) + r'\n</tbody>\n</table>', | |
| parts[i - 2], | |
| flags=re.DOTALL | re.IGNORECASE, | |
| ) | |
| parts[i] = '' # remove the now-merged fragment | |
| parts[i - 1] = '' # remove the noise between them | |
| i += 1 | |
| return ''.join(parts) | |
| def html_tables_to_markdown(text: str) -> str: | |
| """ | |
| 1. Stitch the page-break-split electives table back into one block. | |
| 2. Convert the stitched electives table → term-grouped prose sentences. | |
| 3. Convert OPPE schedule tables (including page-break fragments) → prose lines, | |
| carrying the last-seen date from fragment 1 into fragment 2. | |
| 4. Convert all other tables → standard Markdown pipe table. | |
| """ | |
| text = stitch_electives_table(text) | |
| table_pattern = re.compile(r'<table[^>]*>.*?</table>', re.DOTALL | re.IGNORECASE) | |
| oppe_date_carry = [''] # mutable so nested func can update it | |
| def convert_table(match): | |
| table_html = match.group(0) | |
| start = match.start() | |
| preceding = text[max(0, start - 300): start] | |
| if _ELECTIVES_TABLE_HEADING.search(preceding): | |
| return _electives_table_to_prose(table_html) | |
| if _OPPE_SCHEDULE_HEADING.search(preceding) or _OPPE_SCHEDULE_HEADING.search(table_html[:200]) or _OPPE_CONTINUATION.search(table_html): | |
| lines, last_date = _oppe_table_to_prose(table_html, inherited_date=oppe_date_carry[0]) | |
| oppe_date_carry[0] = last_date | |
| if lines: | |
| return "\n\nOPPE Schedule (May 2026 Term):\n" + "\n".join(f"- {l}" for l in lines) + "\n\n" | |
| return _table_to_markdown(table_html) | |
| return table_pattern.sub(convert_table, text) | |
| def clean_markdown(raw_text: str) -> str: | |
| text = expand_table_links(raw_text) # resolve Table 2/3 anchors before URL stripping | |
| noise_patterns = [ | |
| r'\d{1,2}/\d{1,2}/\d{2,4},\s+\d{1,2}:\d{2}\s+[AP]M[^\n]*', | |
| r'Google Docs(?:\s+icon|\s+logo)?\s+Published using Google Docs[^\n]*', | |
| r'\d+/\d+\s+info\s+icon\s+Published\s+using\s+Google\s+Docs[^\n]*', | |
| r'Published\s+using\s+Google\s+Docs[^\n]*', | |
| r'\binfo\s+icon\b[^\n]*', | |
| r'IITM\s+BS\s+Degree\s+Programme\s*[-–]\s*Student\s+Hand\w*[^\n]*', | |
| r'Report\s+abuse\s+Learn\s+more[^\n]*', | |
| r'Updated\s+automatically\s+every\s+\d+\s+minutes[^\n]*', | |
| r'https://docs\.google\.com/document/[^\s]+\.{3,}[^\n]*', # truncated self-referential doc URLs | |
| r'(?<=\n)https://docs\.google\.com/document/\S+(?=\s*\n\s*\d{1,3}/\d{2,3})', # doc URL followed by page counter | |
| r'(?<![\d>=])\b\d{1,3}/(?!100\b)\d{2,3}\b(?!\d)\s*(?=\n|$)', | |
| r'^#{1,3}\s*BS-DS_\s*May\s*2026\s*Grading\s*document\s*\(Student\)\s*$', | |
| r'^BS-DS_\s*May\s*2026\s*Grading\s*document\s*\(Student\)\s*$', | |
| r'^Updated\s+automatically\s+every\s+\d+\s+minutes\s*$', | |
| r'^#\s+BS-DS_[^\n]*Grading\s+document[^\n]*$', | |
| r'^BS-DS_[^\n]*Grading\s+document[^\n]*$', | |
| r'^\d{1,3}/\d{2,3}\s*$', | |
| ] | |
| for pattern in noise_patterns: | |
| text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.MULTILINE) | |
| text = html_tables_to_markdown(text) | |
| text = re.sub(r'[ \t]{2,}', ' ', text) | |
| text = re.sub(r'\n{3,}', '\n\n', text).strip() | |
| text = re.sub(r'([a-zA-Z,])\n([a-z])', r'\1 \2', text) | |
| return text | |
| def load_and_split(md_path: Path) -> list[Document]: | |
| raw = md_path.read_text(encoding="utf-8") | |
| cleaned = clean_markdown(raw) | |
| header_splitter = MarkdownHeaderTextSplitter( | |
| headers_to_split_on=HEADERS_TO_SPLIT, | |
| strip_headers=False, | |
| return_each_line=False, | |
| ) | |
| header_docs = header_splitter.split_text(cleaned) | |
| char_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=CHUNK_SIZE, | |
| chunk_overlap=CHUNK_OVERLAP, | |
| separators=["\n\n", "\n", " ", ""], | |
| ) | |
| final_docs = char_splitter.split_documents(header_docs) | |
| for doc in final_docs: | |
| header_parts = [ | |
| doc.metadata.get("h1", ""), | |
| doc.metadata.get("h2", ""), | |
| doc.metadata.get("h3", ""), | |
| ] | |
| header_path = " > ".join(p for p in header_parts if p) | |
| if header_path: | |
| doc.page_content = ( | |
| f"[Course: {header_path}]\n" | |
| f"{doc.page_content}\n" | |
| f"[/Course: {header_path}]" | |
| ) | |
| for doc in final_docs: | |
| doc.metadata["source"] = md_path.name | |
| print(f" {md_path.name}: {len(final_docs)} chunks") | |
| return final_docs | |
| def build_index(): | |
| print(f"Loading embedding model: {EMBED_MODEL}") | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name=EMBED_MODEL, | |
| encode_kwargs={"normalize_embeddings": True}, | |
| ) | |
| all_docs: list[Document] = [] | |
| for md_path in sorted(DATA_DIR.glob("*.md")): | |
| print(f"\nProcessing: {md_path.name}") | |
| all_docs.extend(load_and_split(md_path)) | |
| if not all_docs: | |
| print("No valid documents found. Exiting.") | |
| return | |
| target = "Chroma Cloud" if USE_CHROMA_CLOUD else f"local dir '{DB_DIR}'" | |
| print(f"\nEmbedding and indexing {len(all_docs)} chunks into {target}...") | |
| client = get_chroma_client() | |
| vectorstore = Chroma.from_documents( | |
| documents=all_docs, | |
| embedding=embeddings, | |
| client=client, | |
| collection_name=COLLECTION_NAME, | |
| ) | |
| print(f"\n✅ Ingestion complete! {vectorstore._collection.count()} chunks stored.") | |
| if __name__ == "__main__": | |
| build_index() |