Spaces:
Running
Running
| """ | |
| core/admin_tasks.py | |
| Centralized admin / maintenance functions used by both the Gradio UI (app.py) | |
| and the FastAPI admin endpoints (api.py). These are synchronous as in your | |
| current setup and return friendly status strings for display. | |
| """ | |
| import os | |
| import json | |
| import shutil | |
| import glob | |
| import traceback | |
| try: | |
| import pandas as pd | |
| except Exception: | |
| pd = None | |
| try: | |
| import faiss | |
| except Exception: | |
| faiss = None | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| except Exception: | |
| SentenceTransformer = None | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| # functions from your project (should exist) | |
| # rebuild_faiss_from_glossary should return (index, metas) like before. | |
| try: | |
| from core.vector_sync import rebuild_faiss_from_glossary, _upload_to_dataset | |
| except Exception: | |
| rebuild_faiss_from_glossary = None | |
| _upload_to_dataset = None | |
| # Optional web loader | |
| try: | |
| from core.web_loader import web_crawler_loader | |
| except Exception: | |
| web_crawler_loader = None | |
| PERSISTENT_DIR = "/home/user/app/persistent" | |
| DATASET_INDEX_REPO = os.environ.get("DATASET_INDEX_REPO", "essprasad/CT-Chat-Index") | |
| DATASET_DOCS_REPO = os.environ.get("DATASET_DOCS_REPO", "essprasad/CT-Chat-Docs") | |
| def _ensure_dirs(): | |
| os.makedirs(PERSISTENT_DIR, exist_ok=True) | |
| def clear_index(): | |
| """Delete local FAISS and related caches. Returns a message string.""" | |
| removed = [] | |
| paths = [ | |
| os.path.join(PERSISTENT_DIR, "faiss.index"), | |
| os.path.join(PERSISTENT_DIR, "faiss.index.meta.json"), | |
| os.path.join(PERSISTENT_DIR, "glossary.json"), | |
| "/home/user/app/data/docs_cache", | |
| "/home/user/app/runtime_faiss", | |
| ] | |
| for p in paths: | |
| try: | |
| if os.path.isdir(p): | |
| shutil.rmtree(p, ignore_errors=True) | |
| removed.append(f"🗑️ Deleted folder: {p}") | |
| elif os.path.exists(p): | |
| os.remove(p) | |
| removed.append(f"🗑️ Deleted file: {p}") | |
| except Exception as e: | |
| removed.append(f"⚠️ Failed to delete {p}: {e}") | |
| if not removed: | |
| return "ℹ️ No cache files found." | |
| return "\n".join(removed) | |
| def rebuild_glossary(): | |
| """ | |
| Calls the existing glossary builder (core.glossary_builder.rebuild_and_upload). | |
| Returns status string. | |
| """ | |
| try: | |
| from core.glossary_builder import rebuild_and_upload | |
| except Exception as e: | |
| return f"⚠️ Cannot import glossary builder: {e}" | |
| try: | |
| rebuild_and_upload() | |
| return "✅ Glossary rebuilt and uploaded successfully." | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| return f"⚠️ Glossary rebuild failed: {e}\n{tb}" | |
| def rebuild_index(force_download_glossary: bool = False): | |
| """ | |
| Rebuild FAISS index from glossary.json + Excel + (optionally) web content. | |
| Returns status string. Mirrors the logic in your previous rebuild_index implementation. | |
| """ | |
| _ensure_dirs() | |
| try: | |
| if rebuild_faiss_from_glossary is None: | |
| return "⚠️ rebuild_faiss_from_glossary is not available in core.vector_sync." | |
| glossary_path = os.path.join(PERSISTENT_DIR, "glossary.json") | |
| # Attempt to download glossary.json from HF dataset if missing | |
| if not os.path.exists(glossary_path) or force_download_glossary: | |
| try: | |
| downloaded = hf_hub_download(repo_id=DATASET_INDEX_REPO, filename="persistent/glossary.json", repo_type="dataset") | |
| shutil.copy2(downloaded, glossary_path) | |
| except Exception as e: | |
| # Continue even if glossary download fails; rebuild_faiss_from_glossary may handle absent file | |
| return f"⚠️ Could not download glossary.json from {DATASET_INDEX_REPO}: {e}" | |
| # Build faiss index using the project helper | |
| index, metas = rebuild_faiss_from_glossary(glossary_path=glossary_path) | |
| loaded = len(metas) if isinstance(metas, (list, tuple)) else 0 | |
| # Index Excel files from docs dataset | |
| try: | |
| repo_files = list_repo_files(DATASET_DOCS_REPO, repo_type="dataset") | |
| excel_files = [f for f in repo_files if f.lower().endswith((".xls", ".xlsx"))] | |
| except Exception: | |
| excel_files = [] | |
| # If we have SentenceTransformer available we will embed and add Excel content | |
| if SentenceTransformer is not None and faiss is not None and excel_files: | |
| model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| excel_entries = [] | |
| for file_name in excel_files: | |
| try: | |
| fp = hf_hub_download(repo_id=DATASET_DOCS_REPO, filename=file_name, repo_type="dataset") | |
| # read sheets and look for MRCT-style columns (best-effort) | |
| try: | |
| xls = pd.read_excel(fp, sheet_name=None) | |
| except Exception: | |
| xls = {} | |
| for sheet, df in xls.items(): | |
| if not isinstance(df, pd.DataFrame): | |
| continue | |
| cols = [c.lower() for c in df.columns.astype(str)] | |
| # heuristic — look for "glossary term" or "glossary term" header | |
| if not any("glossary term" in c or "term" == c.strip().lower() for c in cols): | |
| continue | |
| df = df.fillna("").dropna(how="all") | |
| for _, row in df.iterrows(): | |
| term = str(row.get("Glossary Term", "") or row.get("term", "")).strip() | |
| if not term: | |
| # try first column | |
| try: | |
| term = str(row.iloc[0]).strip() | |
| except Exception: | |
| term = "" | |
| if not term: | |
| continue | |
| combined = " ".join(str(x) for x in row.values if str(x).strip()) | |
| excel_entries.append({ | |
| "file": file_name, | |
| "sheet": sheet, | |
| "term": term, | |
| "type": "excel", | |
| "text": combined, | |
| "source": file_name | |
| }) | |
| except Exception: | |
| # non-fatal: skip problematic excel | |
| continue | |
| if excel_entries: | |
| texts = [e["text"] for e in excel_entries] | |
| embs = model.encode(texts, show_progress_bar=False, convert_to_numpy=True).astype("float32") | |
| try: | |
| faiss.normalize_L2(embs) | |
| index.add(embs) | |
| if isinstance(metas, list): | |
| metas.extend(excel_entries) | |
| loaded = len(metas) | |
| except Exception: | |
| # index may be incompatible or None | |
| pass | |
| # Optionally fetch & embed web content if web_crawler_loader exists | |
| if web_crawler_loader is not None and SentenceTransformer is not None and faiss is not None: | |
| try: | |
| web_entries = web_crawler_loader( | |
| urls_file="/home/user/app/data/urls.txt", | |
| cache_path=os.path.join(PERSISTENT_DIR, "web_cache.json"), | |
| max_pages=2, | |
| timeout=15, | |
| force_refresh=False, | |
| ) | |
| if web_entries: | |
| web_texts = [w.get("text", "") for w in web_entries if len(w.get("text","")) > 50] | |
| if web_texts: | |
| model = model if 'model' in locals() else SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| web_emb = model.encode(web_texts, show_progress_bar=False, convert_to_numpy=True).astype("float32") | |
| faiss.normalize_L2(web_emb) | |
| index.add(web_emb) | |
| if isinstance(metas, list): | |
| metas.extend(web_entries) | |
| loaded = len(metas) | |
| except Exception: | |
| pass | |
| # Save the index + meta back to persistent | |
| try: | |
| faiss_path = os.path.join(PERSISTENT_DIR, "faiss.index") | |
| meta_path = os.path.join(PERSISTENT_DIR, "faiss.index.meta.json") | |
| if faiss is not None and hasattr(faiss, "write_index"): | |
| faiss.write_index(index, faiss_path) | |
| with open(meta_path, "w", encoding="utf-8") as f: | |
| json.dump(metas, f, indent=2) | |
| # Try upload if helper present | |
| if _upload_to_dataset is not None: | |
| try: | |
| _upload_to_dataset(faiss_path, meta_path, DATASET_INDEX_REPO) | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| return f"✅ Rebuild complete: {loaded} entries." | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| return f"⚠️ Rebuild failed: {e}\n{tb}" | |
| def reset_faiss_cache(): | |
| """ | |
| Wipe persistent & runtime FAISS/glossary, then call rebuild_glossary + rebuild_index. | |
| Returns concatenated status string. | |
| """ | |
| msgs = [] | |
| # wipe persistent | |
| try: | |
| to_remove = [ | |
| os.path.join(PERSISTENT_DIR, "faiss.index"), | |
| os.path.join(PERSISTENT_DIR, "faiss.index.meta.json"), | |
| os.path.join(PERSISTENT_DIR, "glossary.json"), | |
| os.path.join(PERSISTENT_DIR, "web_cache.json"), | |
| "/home/user/app/runtime_faiss", | |
| ] | |
| for p in to_remove: | |
| try: | |
| if os.path.isdir(p): | |
| shutil.rmtree(p, ignore_errors=True) | |
| elif os.path.exists(p): | |
| os.remove(p) | |
| except Exception: | |
| pass | |
| msgs.append("🧹 Persistent FAISS + glossary caches cleared.") | |
| except Exception as e: | |
| msgs.append(f"⚠️ Failed clearing caches: {e}") | |
| # Rebuild glossary then index | |
| try: | |
| msgs.append(rebuild_glossary()) | |
| except Exception as e: | |
| msgs.append(f"⚠️ Rebuild glossary failed: {e}") | |
| try: | |
| msgs.append(rebuild_index()) | |
| except Exception as e: | |
| msgs.append(f"⚠️ Rebuild index failed: {e}") | |
| return "\n".join(msgs) | |