|
|
import os |
|
|
import json |
|
|
import re |
|
|
from typing import Dict, Any, List |
|
|
|
|
|
import gradio as gr |
|
|
from docx import Document |
|
|
from deepdiff import DeepDiff |
|
|
import mysql.connector |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
UNIVERSITY_ID_MAP = { |
|
|
"Indiana University of Pennsylvania (IUP)": 1, |
|
|
"Missouri State University": 2, |
|
|
"University Of Kentucky (UK)": 3, |
|
|
"University of Louisville (UofL)": 4, |
|
|
"University of Delaware (UD)": 6, |
|
|
"Grand Valley State University": 7, |
|
|
"Quinnipiac University": 9, |
|
|
"William Jessup University": 10, |
|
|
"Wilkes University": 14, |
|
|
"University of South Dakota (USD)": 16, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_db_connection(): |
|
|
return mysql.connector.connect( |
|
|
host=os.getenv("DB_HOST", "localhost"), |
|
|
port=int(os.getenv("DB_PORT", "3306")), |
|
|
user=os.getenv("DB_USER", "root"), |
|
|
password=os.getenv("DB_PASSWORD", ""), |
|
|
database=os.getenv("DB_NAME", ""), |
|
|
) |
|
|
|
|
|
|
|
|
def fetch_section_json(university_id: int, section_key: str): |
|
|
conn = get_db_connection() |
|
|
try: |
|
|
cursor = conn.cursor() |
|
|
cursor.execute( |
|
|
""" |
|
|
SELECT section_json |
|
|
FROM university_handbook_sections |
|
|
WHERE university_id=%s AND section_key=%s |
|
|
LIMIT 1 |
|
|
""", |
|
|
(university_id, section_key), |
|
|
) |
|
|
row = cursor.fetchone() |
|
|
if not row or not row[0]: |
|
|
return None |
|
|
try: |
|
|
return json.loads(row[0]) |
|
|
except Exception: |
|
|
return None |
|
|
finally: |
|
|
cursor.close() |
|
|
conn.close() |
|
|
|
|
|
|
|
|
def update_section_json(university_id: int, section_key: str, new_data: Dict[str, Any]): |
|
|
conn = get_db_connection() |
|
|
try: |
|
|
cursor = conn.cursor() |
|
|
new_json = json.dumps(new_data, ensure_ascii=False) |
|
|
cursor.execute( |
|
|
""" |
|
|
UPDATE university_handbook_sections |
|
|
SET section_json=%s |
|
|
WHERE university_id=%s AND section_key=%s |
|
|
""", |
|
|
(new_json, university_id, section_key), |
|
|
) |
|
|
conn.commit() |
|
|
finally: |
|
|
cursor.close() |
|
|
conn.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_text(t: str) -> str: |
|
|
return " ".join(t.split()).strip() |
|
|
|
|
|
|
|
|
def split_doc_by_university(doc: Document) -> Dict[str, List[str]]: |
|
|
paragraphs = [normalize_text(p.text) for p in doc.paragraphs if p.text.strip()] |
|
|
indices: List[tuple[int, str]] = [] |
|
|
for i, p in enumerate(paragraphs): |
|
|
for uni in UNIVERSITY_ID_MAP.keys(): |
|
|
if p == uni or p.startswith(uni): |
|
|
indices.append((i, uni)) |
|
|
|
|
|
indices.sort(key=lambda x: x[0]) |
|
|
|
|
|
uni_blocks: Dict[str, List[str]] = {} |
|
|
for idx, (start, uni_name) in enumerate(indices): |
|
|
end = indices[idx + 1][0] if idx + 1 < len(indices) else len(paragraphs) |
|
|
uni_blocks[uni_name] = paragraphs[start:end] |
|
|
return uni_blocks |
|
|
|
|
|
|
|
|
def parse_overview_block(block: List[str]) -> Dict[str, Any]: |
|
|
""" |
|
|
Parse the top 'overview' section (Founded, Total Students, etc.) |
|
|
in a robust way that doesn't assume colons are always present. |
|
|
""" |
|
|
data: Dict[str, Any] = {} |
|
|
|
|
|
def after_colon(line: str) -> str: |
|
|
"""Safely return the part after ':' if present, else empty string.""" |
|
|
parts = line.split(":", 1) |
|
|
return parts[1].strip() if len(parts) > 1 else "" |
|
|
|
|
|
for raw_line in block: |
|
|
line = raw_line.strip() |
|
|
if not line: |
|
|
continue |
|
|
|
|
|
|
|
|
if line.startswith("Founded"): |
|
|
value = after_colon(line) or line |
|
|
digits = re.sub(r"[^\d]", "", value) |
|
|
if digits: |
|
|
data["founded"] = int(digits) |
|
|
|
|
|
|
|
|
elif line.startswith("Total Students"): |
|
|
value = after_colon(line) or line |
|
|
digits = re.sub(r"[^\d]", "", value) |
|
|
if digits: |
|
|
data["total_students"] = int(digits) |
|
|
|
|
|
|
|
|
elif "Postgraduate" in line: |
|
|
value = after_colon(line) or line |
|
|
digits = re.sub(r"[^\d]", "", value) |
|
|
data["postgraduate_students"] = int(digits) if digits else None |
|
|
|
|
|
|
|
|
elif line.startswith("Acceptance rate"): |
|
|
value = after_colon(line) or line |
|
|
data["acceptance_rate"] = value |
|
|
|
|
|
|
|
|
elif line.startswith("Location"): |
|
|
value = after_colon(line) or line |
|
|
data["location"] = value |
|
|
|
|
|
|
|
|
elif "Tuition" in line: |
|
|
value = after_colon(line) or line |
|
|
digits = re.sub(r"[^\d]", "", value) |
|
|
data["tuition_out_of_state_yearly"] = int(digits) if digits else None |
|
|
|
|
|
return data |
|
|
|
|
|
|
|
|
def extract_between(block: List[str], start: str, stops: List[str]) -> List[str]: |
|
|
out: List[str] = [] |
|
|
started = False |
|
|
for line in block: |
|
|
if not started and start in line: |
|
|
started = True |
|
|
continue |
|
|
if started: |
|
|
if any(s in line for s in stops): |
|
|
break |
|
|
if line.strip(): |
|
|
out.append(line) |
|
|
return out |
|
|
|
|
|
|
|
|
def parse_benefits_block(block: List[str]) -> Dict[str, Any]: |
|
|
lines = extract_between( |
|
|
block, |
|
|
"Benefits for ISP students at this school", |
|
|
["To qualify for The International Scholars Program"], |
|
|
) |
|
|
return {"benefits": [normalize_text(l) for l in lines]} |
|
|
|
|
|
|
|
|
def parse_programs_block(block: List[str]) -> Dict[str, Any]: |
|
|
lines = extract_between( |
|
|
block, |
|
|
"To qualify for The International Scholars Program", |
|
|
list(UNIVERSITY_ID_MAP.keys()), |
|
|
) |
|
|
headers = { |
|
|
"Program", |
|
|
"Designation", |
|
|
"Entrance Exam Required", |
|
|
"Examples of Career Pathways", |
|
|
"Funding Category", |
|
|
} |
|
|
|
|
|
cleaned = [l for l in lines if l not in headers] |
|
|
|
|
|
programs: List[Dict[str, Any]] = [] |
|
|
i = 0 |
|
|
while i < len(cleaned): |
|
|
|
|
|
if len(cleaned) - i < 4: |
|
|
break |
|
|
|
|
|
name = cleaned[i] |
|
|
designation = cleaned[i + 1] |
|
|
exam = cleaned[i + 2] |
|
|
careers: List[str] = [] |
|
|
j = i + 3 |
|
|
while j < len(cleaned) and not cleaned[j].startswith("TIER"): |
|
|
careers.append(cleaned[j]) |
|
|
j += 1 |
|
|
tier = cleaned[j] if j < len(cleaned) else "" |
|
|
|
|
|
programs.append( |
|
|
{ |
|
|
"program_name": name, |
|
|
"designation": designation, |
|
|
"entrance_exam": exam, |
|
|
"career_pathways": careers, |
|
|
"funding_category": tier, |
|
|
} |
|
|
) |
|
|
i = j + 1 |
|
|
|
|
|
return {"programs": programs} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_university_block(uni_name: str, block: List[str]) -> Dict[str, Any]: |
|
|
""" |
|
|
Given all lines for a single university block, return a dict with: |
|
|
- overview |
|
|
- benefits |
|
|
- programs |
|
|
""" |
|
|
overview = parse_overview_block(block) |
|
|
benefits = parse_benefits_block(block) |
|
|
programs = parse_programs_block(block) |
|
|
|
|
|
result: Dict[str, Any] = {} |
|
|
if overview: |
|
|
result["overview"] = overview |
|
|
if benefits: |
|
|
result["benefits"] = benefits |
|
|
if programs: |
|
|
result["programs"] = programs |
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_full_sync(docx_file): |
|
|
if docx_file is None: |
|
|
return "No handbook file uploaded." |
|
|
|
|
|
try: |
|
|
|
|
|
doc = Document(docx_file.name) |
|
|
except Exception as e: |
|
|
return f"Error reading DOCX: {e}" |
|
|
|
|
|
uni_blocks = split_doc_by_university(doc) |
|
|
logs: List[str] = [] |
|
|
updated = 0 |
|
|
|
|
|
for uni_name, uni_id in UNIVERSITY_ID_MAP.items(): |
|
|
block = uni_blocks.get(uni_name) |
|
|
if not block: |
|
|
logs.append(f"[WARN] Missing block: {uni_name}") |
|
|
continue |
|
|
|
|
|
parsed = parse_university_block(uni_name, block) |
|
|
if not parsed: |
|
|
logs.append(f"[WARN] Cannot parse: {uni_name}") |
|
|
continue |
|
|
|
|
|
for key, new_json in parsed.items(): |
|
|
if key not in ("overview", "benefits", "programs"): |
|
|
continue |
|
|
|
|
|
old_json = fetch_section_json(uni_id, key) |
|
|
diff = DeepDiff(old_json or {}, new_json, ignore_order=True) |
|
|
|
|
|
if not diff: |
|
|
logs.append(f"[OK] {uni_name} [{key}] unchanged.") |
|
|
continue |
|
|
|
|
|
try: |
|
|
update_section_json(uni_id, key, new_json) |
|
|
logs.append(f"[UPDATED] {uni_name} [{key}] updated.") |
|
|
updated += 1 |
|
|
except Exception as e: |
|
|
logs.append(f"[ERROR] {uni_name} [{key}]: {e}") |
|
|
|
|
|
logs.append(f"\nTotal sections updated: {updated}") |
|
|
return "\n".join(logs) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LOGO_SRC = "data:image/svg+xml;base64,..." |
|
|
|
|
|
|
|
|
ISP_PRIMARY = "#062A4D" |
|
|
ISP_GOLD = "#D6A229" |
|
|
ISP_BG = "#F5F7FA" |
|
|
|
|
|
CUSTOM_CSS = f""" |
|
|
<style> |
|
|
#isp-header {{ |
|
|
background: {ISP_PRIMARY}; |
|
|
padding: 20px; |
|
|
border-radius: 10px; |
|
|
display: flex; |
|
|
align-items: center; |
|
|
gap: 20px; |
|
|
}} |
|
|
#isp-header h1 {{ |
|
|
color: white; |
|
|
margin: 0; |
|
|
font-size: 28px; |
|
|
}} |
|
|
#isp-logo {{ |
|
|
height: 60px; |
|
|
}} |
|
|
button {{ |
|
|
background-color: {ISP_GOLD} !important; |
|
|
color: black !important; |
|
|
border-radius: 8px !important; |
|
|
font-weight: bold !important; |
|
|
}} |
|
|
.gradio-container {{ |
|
|
background: {ISP_BG} !important; |
|
|
}} |
|
|
</style> |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Automated Handbook Sync Data Pipeline") as demo: |
|
|
gr.HTML(CUSTOM_CSS) |
|
|
|
|
|
|
|
|
gr.HTML( |
|
|
f""" |
|
|
<div id='isp-header'> |
|
|
<img id='isp-logo' src='{LOGO_SRC}' alt='ISP Logo'/> |
|
|
<h1>ISP Handbook → Data Pipeline Sync (Full Auto)</h1> |
|
|
</div> |
|
|
""" |
|
|
) |
|
|
|
|
|
gr.Markdown( |
|
|
""" |
|
|
Upload the official ISP Handbook (.docx). |
|
|
This tool will compare, detect differences, and update changed sections. |
|
|
--- |
|
|
""" |
|
|
) |
|
|
|
|
|
file_input = gr.File(label="Upload Handbook DOCX", file_types=[".docx"]) |
|
|
log_output = gr.Textbox(label="Sync Log", lines=30) |
|
|
run_btn = gr.Button("Run Full Sync") |
|
|
|
|
|
run_btn.click(run_full_sync, inputs=file_input, outputs=log_output) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|