Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| import gradio as gr | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import uuid | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| import subprocess, sys | |
| subprocess.run([sys.executable, "-m", "pip", "install", "-q", "llama-cpp-python==0.3.12"], check=True) | |
| import llama_cpp | |
| from huggingface_hub import hf_hub_download | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import Distance, VectorParams, PointStruct | |
| # ========================= | |
| # CONFIG: FILL THESE IN | |
| # ========================= | |
| REPO_EMBED = "mixedbread-ai/mxbai-embed-large-v1" | |
| REPO_LLM = "unsloth/Phi-4-mini-instruct-GGUF" | |
| REPO_PIPER = "nardocolin/nardocolin-pipertts" | |
| EMBED_FILE = "gguf/mxbai-embed-large-v1-f16.gguf" | |
| LLM_FILE = "Phi-4-mini-instruct-Q4_K_M.gguf" | |
| PIPER_ONNX = "high/colin-voice_high.onnx" | |
| PIPER_JSON = "high/colin-voice_high.onnx.json" | |
| EMBED_DIM = 1024 | |
| COLLECTION_NAME = "data" | |
| # ========================= | |
| # PATHS / DIRECTORIES | |
| # ========================= | |
| SPACE_DIR = Path(__file__).parent | |
| DATA_DIR = SPACE_DIR / "data" | |
| EMB_DIR = SPACE_DIR / "embeddings" | |
| AUDIO_DIR = SPACE_DIR / "audio" | |
| DATA_DIR.mkdir(exist_ok=True) | |
| EMB_DIR.mkdir(exist_ok=True) | |
| AUDIO_DIR.mkdir(exist_ok=True) | |
| STRUCTURED_JSON = DATA_DIR / "structured-cv.json" | |
| # ========================= | |
| # DOWNLOAD WEIGHTS | |
| # ========================= | |
| embed_path = hf_hub_download(REPO_EMBED, EMBED_FILE) | |
| llm_path = hf_hub_download(REPO_LLM, LLM_FILE) | |
| piper_onnx = hf_hub_download(REPO_PIPER, PIPER_ONNX) | |
| piper_json = hf_hub_download(REPO_PIPER, PIPER_JSON) | |
| # ========================= | |
| # LOAD MODELS (CPU) | |
| # ========================= | |
| embedding_llm = llama_cpp.Llama( | |
| model_path=embed_path, | |
| embedding=True, | |
| verbose=False | |
| ) | |
| llm = llama_cpp.Llama( | |
| model_path=llm_path, | |
| n_ctx=8192, | |
| verbose=False | |
| ) | |
| # ========================= | |
| # QDRANT (LOCAL, FILE BACKEND) | |
| # ========================= | |
| client = QdrantClient(path=str(EMB_DIR)) | |
| def qdrant_collection_exists() -> bool: | |
| try: | |
| cols = client.get_collections().collections | |
| return any(c.name == COLLECTION_NAME for c in cols) | |
| except Exception: | |
| return False | |
| def ensure_collection(): | |
| if qdrant_collection_exists(): | |
| return | |
| client.create_collection( | |
| collection_name=COLLECTION_NAME, | |
| vectors_config=VectorParams(size=EMBED_DIM, distance=Distance.COSINE), | |
| ) | |
| # ========================= | |
| # RAG BUILD FROM STRUCTURED JSON | |
| # ========================= | |
| def _extract_texts_from_structured_json(d: Dict[str, Any]) -> List[str]: | |
| texts: List[str] = [] | |
| # summary | |
| if d.get("summary"): | |
| texts.append(d["summary"]) | |
| # professional_focus | |
| pf = d.get("professional_focus", {}) | |
| for lst_key in ("problem_solving_style", "leadership_and_teamwork"): | |
| for item in pf.get(lst_key, []) or []: | |
| texts.append(item) | |
| # technical_philosophy | |
| tp = d.get("technical_philosophy", {}) | |
| if tp.get("title"): | |
| texts.append(tp["title"]) | |
| for pt in tp.get("points", []) or []: | |
| texts.append(pt) | |
| # education details | |
| edu = d.get("education", {}) | |
| if edu.get("degree"): | |
| texts.append(f"{edu.get('institution','')} – {edu['degree']}") | |
| for det in edu.get("details", []) or []: | |
| texts.append(det) | |
| # projects | |
| for p in d.get("projects", []) or []: | |
| if p.get("title"): | |
| texts.append(p["title"]) | |
| if p.get("organization"): | |
| texts.append(p["organization"]) | |
| for c in p.get("contributions", []) or []: | |
| texts.append(c) | |
| if p.get("key_takeaways"): | |
| texts.append(p["key_takeaways"]) | |
| if p.get("technical_deep_dive"): | |
| texts.append(p["technical_deep_dive"]) | |
| # experience | |
| for e in d.get("experience", []) or []: | |
| if e.get("role") and e.get("company"): | |
| texts.append(f"{e['role']} @ {e['company']}") | |
| if e.get("description"): | |
| texts.append(e["description"]) | |
| # skills (flatten) | |
| skills = d.get("skills", {}) | |
| for k, v in skills.items(): | |
| if isinstance(v, list): | |
| for item in v: | |
| if isinstance(item, dict): | |
| # spoken_languages entries | |
| lang = item.get("language") | |
| prof = item.get("proficiency") | |
| if lang and prof: | |
| texts.append(f"{lang} – {prof}") | |
| else: | |
| texts.append(str(item)) | |
| # personal_info (light) | |
| pi = d.get("personal_info", {}) | |
| for key in ("name", "email", "linkedin", "website"): | |
| if pi.get(key): | |
| texts.append(str(pi[key])) | |
| # Deduplicate & trim | |
| final = [] | |
| seen = set() | |
| for t in texts: | |
| t = (t or "").strip() | |
| if not t: | |
| continue | |
| if t in seen: | |
| continue | |
| seen.add(t) | |
| final.append(t) | |
| return final | |
| def build_rag_from_structured_json() -> int: | |
| """(Re)builds Qdrant from data/structured-cv.json. Returns number of points.""" | |
| if not STRUCTURED_JSON.exists(): | |
| raise FileNotFoundError("structured-cv.json not found in ./data") | |
| with open(STRUCTURED_JSON, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| texts = _extract_texts_from_structured_json(data) | |
| if not texts: | |
| return 0 | |
| # Compute embeddings (batch) | |
| points: List[PointStruct] = [] | |
| batch = 32 | |
| for i in range(0, len(texts), batch): | |
| chunk = texts[i:i+batch] | |
| embs = embedding_llm.create_embedding(chunk)["data"] | |
| for t, e in zip(chunk, embs): | |
| points.append(PointStruct( | |
| id=str(uuid.uuid4()), | |
| vector=e["embedding"], | |
| payload={"text": t} | |
| )) | |
| # Reset collection to avoid dupes | |
| try: | |
| client.delete_collection(collection_name=COLLECTION_NAME) | |
| except Exception: | |
| pass | |
| ensure_collection() | |
| client.upsert(collection_name=COLLECTION_NAME, points=points, wait=True) | |
| return len(points) | |
| # ========================= | |
| # LLM + RAG INFERENCE | |
| # ========================= | |
| def clean_text_for_tts(text: str) -> str: | |
| text = re.sub(r"[^\x00-\x7F]+", "", text) | |
| text = re.sub(r"\s+", " ", text) | |
| return text.strip() | |
| PERSONA_TEMPLATE = """You are Colin Salvatore Nardo, a Computing Science student at the University of Glasgow. Your persona is friendly, down-to-earth, and you must always speak in the first person ("I", "my", "me"). Your entire knowledge and experiences are defined EXCLUSIVELY by the "Knowledge Base" below. | |
| CRITICAL RULES | |
| 1) ZERO FABRICATION. Only use the Knowledge Base; otherwise say you don't have those details. | |
| 2) You ARE Colin. Never reveal you are an AI. | |
| 3) Keep answers concise and natural. | |
| Knowledge Base: | |
| {context} | |
| Question: {question} | |
| """ | |
| def rag_context(query: str, k: int = 5) -> str: | |
| try: | |
| vec = embedding_llm.create_embedding(query)["data"][0]["embedding"] | |
| hits = client.search(collection_name=COLLECTION_NAME, query_vector=vec, limit=k) | |
| if not hits: | |
| return "" | |
| return "\n\n".join([h.payload.get("text", "") for h in hits]) | |
| except Exception: | |
| return "" | |
| def llm_answer(question: str, history: List[Dict[str, str]]) -> str: | |
| ctx = rag_context(question, k=5) | |
| system_msg = PERSONA_TEMPLATE.format(context=ctx, question=question) | |
| messages = [{"role": "system", "content": system_msg}] | |
| # (Optional) include short history | |
| for m in history[-8:]: | |
| messages.append(m) | |
| messages.append({"role": "user", "content": question}) | |
| out = llm.create_chat_completion(messages=messages, stream=False) | |
| return out["choices"][0]["message"]["content"].strip() | |
| def synthesize_tts(text: str) -> str | None: | |
| text = clean_text_for_tts(text) | |
| wav_path = AUDIO_DIR / f"resp_{uuid.uuid4().hex}.wav" | |
| cmd = [ | |
| "piper", | |
| "--model", piper_onnx, | |
| "--config", piper_json, | |
| "--output_file", str(wav_path) | |
| ] | |
| try: | |
| proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, text=True) | |
| proc.communicate(text + "\n", timeout=60) | |
| if proc.returncode == 0 and wav_path.exists(): | |
| return str(wav_path) | |
| except Exception: | |
| pass | |
| return None | |
| # ========================= | |
| # BOOTSTRAP: ensure RAG exists (build once) | |
| # ========================= | |
| try: | |
| if not qdrant_collection_exists(): | |
| n = build_rag_from_structured_json() | |
| print(f"[RAG] Built collection with {n} chunks.") | |
| else: | |
| print("[RAG] Existing collection found; skipping rebuild.") | |
| except Exception as e: | |
| print(f"[RAG] Skipped build: {e}") | |
| # ========================= | |
| # GRADIO UI | |
| # ========================= | |
| with gr.Blocks(title="Colin-AI (CPU) — Local LLM + RAG + TTS") as demo: | |
| gr.Markdown("### Colin-AI — CPU-only demo (phi-4-mini + Qdrant RAG + Piper TTS)") | |
| with gr.Row(): | |
| chat = gr.Chatbot(height=360) | |
| with gr.Row(): | |
| q = gr.Textbox(label="Ask Colin", placeholder="Ask something…", scale=4) | |
| send = gr.Button("Send", scale=1) | |
| with gr.Row(): | |
| tts_toggle = gr.Checkbox(value=True, label="Speak reply (Piper)") | |
| audio_out = gr.Audio(label="TTS", type="filepath") | |
| state = gr.State([]) | |
| last_answer = gr.State("") | |
| def respond(user_msg, history): | |
| if not user_msg or not user_msg.strip(): | |
| return history, None, history | |
| ans = llm_answer(user_msg, history) | |
| history = history + [{"role": "user", "content": user_msg}, {"role": "assistant", "content": ans}] | |
| pairs = [] | |
| for i in range(0, len(history), 2): | |
| u = history[i]["content"] if i < len(history) else "" | |
| a = history[i + 1]["content"] if i + 1 < len(history) else "" | |
| pairs.append((u, a)) | |
| return pairs, ans, history | |
| def maybe_tts(answer_text, tts_on): | |
| if not tts_on or not answer_text: | |
| return None | |
| return synthesize_tts(answer_text) | |
| send.click(respond, [q, state], [chat, last_answer, state]) \ | |
| .then(maybe_tts, [last_answer, tts_toggle], [audio_out]) | |
| q.submit(respond, [q, state], [chat, last_answer, state]) \ | |
| .then(maybe_tts, [last_answer, tts_toggle], [audio_out]) | |
| gr.Markdown("---") | |
| rebuild_btn = gr.Button("Build / Refresh RAG from structured-cv.json") | |
| rebuild_log = gr.Markdown() | |
| def rebuild(): | |
| try: | |
| n = build_rag_from_structured_json() | |
| return f"✅ RAG rebuilt with {n} chunks." | |
| except Exception as e: | |
| return f"❌ RAG rebuild failed: {e}" | |
| rebuild_btn.click(fn=rebuild, inputs=None, outputs=rebuild_log) | |
| demo.launch() | |