import gradio as gr import os, requests, io, json import numpy as np import pandas as pd import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from groq import Groq from PIL import Image from datetime import datetime from huggingface_hub import HfApi, hf_hub_download GROQ_KEY = os.environ.get("GROQ_API_KEY", "") HF_TOKEN = os.environ.get("HF_TOKEN", "") HISTORY_REPO = "Saicharan21/cardiolab-chat-history" PAPERS_DB_REPO = "Saicharan21/cardiolab-papers-db" CARDIOLAB_MODEL = "Saicharan21/CardioLab-AI-Model" CHAT_MODELS = { "CardioLab Fine-tuned (SJSU)": "cardiolab", "Llama 3.3 70B (Best)": "llama-3.3-70b-versatile", "Llama 3.1 8B (Fast)": "llama-3.1-8b-instant", "Mixtral 8x7B": "mixtral-8x7b-32768", "Llama 4 Scout (New)": "meta-llama/llama-4-scout-17b-16e-instruct", } KNOWHOW = ("MCL: Sylgard 184 PDMS 10:1 ratio 48hr cure green laser PIV 70bpm 5L/min cardiac output 80-120mmHg. " "TGT: Arduino Uno Stepper Motor 150mL blood sampled at 0 20 40 60 minutes. " "NORMAL RANGES: TAT below 8 ng/mL. PF1.2 below 2.0 nmol/L. Free hemoglobin below 20 mg/L. Platelets above 150 thousand per uL. " "HIGH RISK: TAT above 15. PF1.2 above 3.0. Hemoglobin above 50. Platelets below 100. " "uPAD: Jaffe reaction creatinine picric acid orange-red. Normal creatinine 0.6-1.2 mg/dL. Borderline 1.2-1.5. CKD above 1.5. Stage2 1.5-3.0. Stage3-4 3.0-6.0. Stage5 above 6.0. " "MHV: 27mm SJM Regent bileaflet also trileaflet monoleaflet pediatric designs. " "PIV: green laser 532nm time-resolved. Normal velocity 0.5-2.0 m/s. Normal shear below 5 Pa. Risk above 10 Pa. " "Equipment: Heska Element HT5 hematology analyzer time-resolved PIV Tygon tubing Arduino Uno stepper motor.") # ── LOAD PAPERS + FINE-TUNED MODEL ON STARTUP ───────────────────── CHUNKS = [] METADATA = [] EMBEDDINGS = None PAPERS_LOADED = False EMBEDDER = None CARDIOLAB_TOKENIZER = None CARDIOLAB_LLM = None CARDIOLAB_MODEL_LOADED = False def load_papers(): global CHUNKS, METADATA, EMBEDDINGS, PAPERS_LOADED, EMBEDDER try: from sentence_transformers import SentenceTransformer chunks_path = hf_hub_download(repo_id=PAPERS_DB_REPO, filename="chunks.json", repo_type="dataset", token=HF_TOKEN) meta_path = hf_hub_download(repo_id=PAPERS_DB_REPO, filename="metadata.json", repo_type="dataset", token=HF_TOKEN) emb_path = hf_hub_download(repo_id=PAPERS_DB_REPO, filename="embeddings.npy", repo_type="dataset", token=HF_TOKEN) with open(chunks_path) as f: CHUNKS = json.load(f) with open(meta_path) as f: METADATA = json.load(f) EMBEDDINGS = np.load(emb_path) EMBEDDER = SentenceTransformer("all-MiniLM-L6-v2") PAPERS_LOADED = True print(f"Papers loaded: {len(CHUNKS)} chunks from {len(set(m['paper'] for m in METADATA))} papers") return True except Exception as e: print(f"Paper load error: {e}") return False def load_cardiolab_model(): global CARDIOLAB_TOKENIZER, CARDIOLAB_LLM, CARDIOLAB_MODEL_LOADED try: import torch from transformers import AutoModelForCausalLM, AutoTokenizer from peft import PeftModel print("Loading CardioLab fine-tuned model...") base_model = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" CARDIOLAB_TOKENIZER = AutoTokenizer.from_pretrained(CARDIOLAB_MODEL, token=HF_TOKEN) CARDIOLAB_TOKENIZER.pad_token = CARDIOLAB_TOKENIZER.eos_token device = "cuda" if torch.cuda.is_available() else "cpu" CARDIOLAB_LLM = AutoModelForCausalLM.from_pretrained( CARDIOLAB_MODEL, token=HF_TOKEN, torch_dtype=torch.float16 if device=="cuda" else torch.float32, device_map="auto" if device=="cuda" else None, low_cpu_mem_usage=True ) CARDIOLAB_MODEL_LOADED = True print(f"CardioLab model loaded on {device}!") return True except Exception as e: print(f"CardioLab model load error: {e}") return False load_papers() load_cardiolab_model() # ── SEMANTIC SEARCH ──────────────────────────────────────────────── def search_papers(query, n=4): if not PAPERS_LOADED or EMBEDDINGS is None or EMBEDDER is None: return "", [] try: q_emb = EMBEDDER.encode([query]) norms = np.linalg.norm(EMBEDDINGS, axis=1, keepdims=True) emb_norm = EMBEDDINGS / (norms + 1e-10) q_norm = q_emb / (np.linalg.norm(q_emb) + 1e-10) scores = (emb_norm @ q_norm.T).flatten() top_idx = np.argsort(scores)[::-1][:n] context = "" results = [] seen = set() for idx in top_idx: chunk = CHUNKS[idx] meta = METADATA[idx] score = float(scores[idx]) if score > 0.25: results.append({"chunk":chunk,"paper":meta["paper"],"pillar":meta.get("pillar",""),"score":score}) if meta["paper"] not in seen: context += chr(10)+"=== FROM: "+meta["paper"]+" ==="+chr(10) seen.add(meta["paper"]) context += chunk[:500]+chr(10) return context, results except Exception as e: return "", [] def answer_with_cardiolab_model(question, paper_context=""): if not CARDIOLAB_MODEL_LOADED: return None try: import torch system = "You are CardioLab AI for SJSU Biomedical Engineering." if paper_context: system += " Use these SJSU research papers: "+paper_context[:500] prompt = f"<|system|>{system}<|user|>{question}<|assistant|>" inputs = CARDIOLAB_TOKENIZER(prompt, return_tensors="pt", truncation=True, max_length=512) device = next(CARDIOLAB_LLM.parameters()).device inputs = {k:v.to(device) for k,v in inputs.items()} with torch.no_grad(): outputs = CARDIOLAB_LLM.generate( **inputs, max_new_tokens=200, do_sample=True, temperature=0.3, pad_token_id=CARDIOLAB_TOKENIZER.eos_token_id ) response = CARDIOLAB_TOKENIZER.decode(outputs[0], skip_special_tokens=True) if "<|assistant|>" in response: answer = response.split("<|assistant|>")[-1].strip() else: answer = response[len(prompt):].strip() if len(response) > len(prompt) else response return answer if len(answer) > 20 else None except Exception as e: print(f"CardioLab model error: {e}") return None CSS = """ body, .gradio-container { background: #f7f7f8 !important; font-family: -apple-system, BlinkMacSystemFont, Segoe UI, sans-serif !important; } .tab-nav { background: #ffffff !important; border-bottom: 1px solid #e5e7eb !important; padding: 0 16px !important; display: flex !important; flex-wrap: wrap !important; } .tab-nav button { background: transparent !important; color: #6b7280 !important; border: none !important; border-bottom: 2px solid transparent !important; padding: 10px 12px !important; font-weight: 500 !important; font-size: 0.8em !important; white-space: nowrap !important; border-radius: 0 !important; } .tab-nav button:hover { color: #111827 !important; background: #f9fafb !important; } .tab-nav button.selected { color: #c1121f !important; border-bottom: 2px solid #c1121f !important; font-weight: 700 !important; background: transparent !important; } .message.user { background: #f3f4f6 !important; color: #1a202c !important; border-radius: 12px !important; } .message.bot { background: #ffffff !important; color: #1a202c !important; border-left: 3px solid #c1121f !important; } textarea { background: #ffffff !important; color: #1a202c !important; border: 1px solid #d1d5db !important; border-radius: 10px !important; } button.primary { background: #c1121f !important; color: white !important; border: none !important; border-radius: 8px !important; font-weight: 600 !important; } button.secondary { background: #f3f4f6 !important; color: #374151 !important; border: 1px solid #d1d5db !important; border-radius: 8px !important; } input[type=number] { background: #f9fafb !important; color: #1a202c !important; border: 1px solid #d1d5db !important; border-radius: 8px !important; } """ HEADER = """
""" def load_all_sessions(): if not HF_TOKEN: return {} try: path = hf_hub_download(repo_id=HISTORY_REPO, filename="chat_history.json", repo_type="dataset", token=HF_TOKEN) with open(path) as f: return json.load(f) except: return {} def save_all_sessions(sessions): if not HF_TOKEN: return False try: api2 = HfApi(token=HF_TOKEN) api2.upload_file(path_or_fileobj=json.dumps(sessions, indent=2).encode(), path_in_repo="chat_history.json", repo_id=HISTORY_REPO, repo_type="dataset", token=HF_TOKEN, commit_message="Update") return True except: return False def get_session_list(): s = load_all_sessions() return list(reversed(list(s.keys()))) if s else ["No saved sessions"] def save_session(history, name): if not history: return "Nothing to save", gr.update() if not name or not name.strip(): name = "Chat "+datetime.now().strftime("%b %d %H:%M") sessions = load_all_sessions() sessions[name] = {"messages":history,"saved_at":datetime.now().isoformat()} ok = save_all_sessions(sessions) choices = get_session_list() return ("Saved: "+name if ok else "Save failed"), gr.update(choices=choices, value=name) def load_session(name): if not name or "No saved" in name: return [], "Select a session" sessions = load_all_sessions() return (sessions[name]["messages"], "Loaded: "+name) if name in sessions else ([], "Not found") def delete_session(name): if not name or "No saved" in name: return "Select a session", gr.update() sessions = load_all_sessions() if name in sessions: del sessions[name]; save_all_sessions(sessions) choices = get_session_list() return "Deleted: "+name, gr.update(choices=choices, value=choices[0] if choices else None) return "Not found", gr.update() def new_chat(): return [], "", "New chat started" def get_pubmed_chat(query, n=3): try: r = requests.get("https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi", params={"db":"pubmed","term":query+" AND (heart valve OR hemodynamics OR microfluidic OR thrombogen OR creatinine OR CKD)","retmax":n,"retmode":"json","sort":"date","field":"tiab"},timeout=10) ids = r.json()["esearchresult"]["idlist"] return chr(10).join(["https://pubmed.ncbi.nlm.nih.gov/"+i for i in ids]) if ids else "" except: return "" # ── PHASE C: BIOGPT + CLINICALTRIALS + WEEKLY UPDATE ────────────── def search_biogpt(query): """Search BioGPT — trained on 15M PubMed papers via HuggingFace API""" if not HF_TOKEN: return "" try: headers = {"Authorization": "Bearer "+HF_TOKEN} # Use BioGPT for biomedical question answering payload = {"inputs": query+" [SEP] Answer based on biomedical literature:"} r = requests.post( "https://api-inference.huggingface.co/models/microsoft/BioGPT-Large-PubMedQA", headers=headers, json=payload, timeout=20 ) if r.status_code == 200: result = r.json() if isinstance(result, list) and len(result) > 0: text = result[0].get("generated_text","") # Extract just the answer part if "[SEP]" in text: text = text.split("[SEP]")[-1].strip() return text[:400] if text else "" return "" except: return "" def search_clinical_trials(query, n=5): """Search ClinicalTrials.gov for heart valve and CKD trials""" try: r = requests.get( "https://clinicaltrials.gov/api/v2/studies", params={ "query.term": query, "filter.overallStatus": "RECRUITING|COMPLETED", "pageSize": n, "format": "json", "fields": "NCTId,BriefTitle,OverallStatus,Phase,StartDate,Condition" }, timeout=12 ) if r.status_code != 200: return [] studies = r.json().get("studies",[]) results = [] for s in studies: proto = s.get("protocolSection",{}) ident = proto.get("identificationModule",{}) status = proto.get("statusModule",{}) nct = ident.get("nctId","") title = ident.get("briefTitle","") phase = status.get("phase","") overall = status.get("overallStatus","") if nct and title: results.append({ "nct": nct, "title": title, "status": overall, "phase": phase, "url": "https://clinicaltrials.gov/study/"+nct }) return results except: return [] def get_weekly_pubmed_update(topics=None): """Get papers published in last 7 days on CardioLab topics""" if topics is None: topics = [ "mechanical heart valve thrombogenicity", "microfluidic creatinine CKD diagnosis", "PIV hemodynamics prosthetic valve", "Mock Circulatory Loop cardiac", "bileaflet valve fluid structure interaction" ] all_new = [] try: from datetime import datetime, timedelta week_ago = (datetime.now() - timedelta(days=7)).strftime("%Y/%m/%d") for topic in topics: r = requests.get( "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi", params={ "db":"pubmed", "term":topic, "mindate":week_ago, "datetype":"pdat", "retmax":3, "retmode":"json", "sort":"date" }, timeout=10 ) ids = r.json()["esearchresult"]["idlist"] for pmid in ids: all_new.append({ "pmid": pmid, "topic": topic, "url": "https://pubmed.ncbi.nlm.nih.gov/"+pmid }) return all_new except: return [] def full_research_search(query, search_model="Llama 3.3 70B (Best)"): """Complete search across ALL sources including Phase C additions""" if not query.strip(): return "Please enter a research topic." model_id = CHAT_MODELS.get(search_model, "llama-3.3-70b-versatile") expanded = expand_query_ai(query, model_id) if GROQ_KEY else query # All search sources pubmed = fetch_pubmed(expanded, n=6) scholar = fetch_scholar(expanded, n=5) europe = fetch_europe_pmc(expanded, n=4) trials = search_clinical_trials(query, n=4) weekly = get_weekly_pubmed_update() biogpt_answer = search_biogpt(query) # Format output out = "QUERY: "+query+chr(10) out += "AI EXPANDED: "+expanded+chr(10) out += "SOURCES: PubMed + Scholar + EuropePMC + ClinicalTrials + SJSU + BioGPT"+chr(10) out += "="*50+chr(10)+chr(10) # BioGPT answer first if biogpt_answer: out += "BIOGPT ANSWER (trained on 15M PubMed papers):"+chr(10) out += biogpt_answer+chr(10)+chr(10) out += "="*50+chr(10)+chr(10) # PubMed results if pubmed: out += "PUBMED ("+str(len(pubmed))+" papers):"+chr(10) for p in pubmed[:6]: out += p["title"][:85]+" ("+p["year"]+")"+chr(10) out += " "+p["url"]+chr(10)+chr(10) # Scholar results if scholar: out += "SEMANTIC SCHOLAR ("+str(len(scholar))+" papers):"+chr(10) for p in scholar[:5]: out += p["title"][:85]+" ("+p["year"]+")" if p["citations"] not in ("N/A","","0"): out += " | "+p["citations"]+" citations" out += chr(10)+" "+p["url"]+chr(10)+chr(10) # Clinical trials if trials: out += "CLINICALTRIALS.GOV ("+str(len(trials))+" trials):"+chr(10) for t in trials: out += t["title"][:80]+" | "+t["status"]+" | "+t.get("phase","")+" "+chr(10) out += " "+t["url"]+chr(10)+chr(10) # Weekly updates weekly_relevant = [w for w in weekly if any( kw in query.lower() for kw in ["valve","heart","ckd","creatinine","piv","tgt","mcl"] )] if weekly_relevant: out += "NEW THIS WEEK (last 7 days):"+chr(10) for w in weekly_relevant[:5]: out += " "+w["url"]+" ["+w["topic"][:40]+"]"+chr(10) # SJSU ScholarWorks out += chr(10)+"SJSU SCHOLARWORKS:"+chr(10) out += " https://scholarworks.sjsu.edu/do/search/?q="+requests.utils.quote(query)+"&context=6781027" return out def research_chat(message, history, chat_model="Llama 3.3 70B (Best)"): if not message.strip(): return "", history paper_context, paper_results = search_papers(message, n=4) # Use fine-tuned CardioLab model if selected if chat_model == "CardioLab Fine-tuned (SJSU)" and CARDIOLAB_MODEL_LOADED: answer = answer_with_cardiolab_model(message, paper_context) if answer: if paper_results: unique_papers = list(dict.fromkeys([r["paper"] for r in paper_results])) answer += chr(10)+chr(10)+"Sources from SJSU CardioLab papers:" for p in unique_papers[:3]: answer += chr(10)+" - "+p.replace('.pdf','').replace('_',' ') pubmed = get_pubmed_chat(message, n=2) if pubmed: answer += chr(10)+"PubMed: "+pubmed history.append({"role":"user","content":message}) history.append({"role":"assistant","content":"[CardioLab Fine-tuned Model] "+answer}) return "", history # Fall back to Groq models if not GROQ_KEY: history.append({"role":"user","content":message}) history.append({"role":"assistant","content":"Error: Add GROQ_API_KEY to Space Settings."}) return "", history try: model_id = CHAT_MODELS.get(chat_model, "llama-3.3-70b-versatile") client = Groq(api_key=GROQ_KEY) if paper_context: system_prompt = ("You are CardioLab AI for SJSU Biomedical Engineering. " "Answer using SJSU CardioLab research papers below. " "Always cite the paper name when using specific data."+chr(10)+chr(10)+ "SJSU CARDIOLAB PAPERS:"+chr(10)+paper_context+chr(10)+chr(10)+ "ADDITIONAL KNOWLEDGE: "+KNOWHOW) else: system_prompt = "You are CardioLab AI for SJSU Biomedical Engineering. Expert in MHV MCL PIV TGT uPAD CKD FSI. "+KNOWHOW msgs = [{"role":"system","content":system_prompt}] for item in history: if isinstance(item, dict): msgs.append({"role":item["role"],"content":item["content"]}) msgs.append({"role":"user","content":message}) resp = client.chat.completions.create(model=model_id, messages=msgs, max_tokens=800) answer = resp.choices[0].message.content if paper_results: unique_papers = list(dict.fromkeys([r["paper"] for r in paper_results])) answer += chr(10)+chr(10)+"Sources from SJSU CardioLab papers:" for p in unique_papers[:3]: answer += chr(10)+" - "+p.replace('.pdf','').replace('_',' ') pubmed = get_pubmed_chat(message, n=2) if pubmed: answer += chr(10)+"PubMed: "+pubmed history.append({"role":"user","content":message}) history.append({"role":"assistant","content":answer}) return "", history except Exception as e: history.append({"role":"user","content":message}) history.append({"role":"assistant","content":"Error: "+str(e)}) return "", history def voice_chat(audio, history): if audio is None: history.append({"role":"assistant","content":"Please record your question first."}) return history try: client = Groq(api_key=GROQ_KEY) with open(audio, "rb") as f: tx = client.audio.transcriptions.create(file=("audio.wav", f, "audio/wav"), model="whisper-large-v3") paper_context, _ = search_papers(tx.text, n=3) system = "You are CardioLab AI. "+KNOWHOW if paper_context: system = "You are CardioLab AI. Use these SJSU papers:"+chr(10)+paper_context+chr(10)+KNOWHOW msgs = [{"role":"system","content":system}] for item in history: if isinstance(item, dict): msgs.append({"role":item["role"],"content":item["content"]}) msgs.append({"role":"user","content":tx.text}) resp = client.chat.completions.create(model="llama-3.3-70b-versatile",messages=msgs,max_tokens=500) history.append({"role":"user","content":"Voice: "+tx.text}) history.append({"role":"assistant","content":resp.choices[0].message.content}) return history except Exception as e: history.append({"role":"assistant","content":"Voice error: "+str(e)}) return history def expand_query_ai(query): if not GROQ_KEY: return query try: client = Groq(api_key=GROQ_KEY) resp = client.chat.completions.create(model="llama-3.1-8b-instant", messages=[{"role":"system","content":"Biomedical PubMed expert. Convert to MeSH terms for heart valves hemodynamics PIV thrombogenicity FSI microfluidics CKD. Return ONLY terms."}, {"role":"user","content":"Optimize: "+query}],max_tokens=80) return resp.choices[0].message.content.strip() or query except: return query def quick_search(query, search_model="Llama 3.3 70B (Best)"): if not query.strip(): return "Please enter a topic." expanded = expand_query_ai(query) results = [] try: forced = expanded+" AND (heart valve OR hemodynamics OR microfluidic OR thrombogen OR creatinine OR PIV OR CFD OR CKD)" r = requests.get("https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi", params={"db":"pubmed","term":forced,"retmax":8,"retmode":"json","sort":"date","field":"tiab"},timeout=12) ids = r.json()["esearchresult"]["idlist"] if ids: r2 = requests.get("https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi", params={"db":"pubmed","id":",".join(ids),"retmode":"xml","rettype":"abstract"},timeout=12) import xml.etree.ElementTree as ET root = ET.fromstring(r2.content) for article in root.findall(".//PubmedArticle"): try: title = article.find(".//ArticleTitle").text or "No title" pmid = article.find(".//PMID").text or "" year_el = article.find(".//PubDate/Year") year = year_el.text if year_el is not None else "" results.append({"source":"PubMed","title":str(title),"year":year,"url":"https://pubmed.ncbi.nlm.nih.gov/"+pmid,"citations":"N/A"}) except: continue except: pass try: r = requests.get("https://api.semanticscholar.org/graph/v1/paper/search", params={"query":expanded,"limit":6,"fields":"title,year,url,citationCount"},timeout=12) for p in r.json().get("data",[]): year = p.get("year",0) or 0 if int(year) >= 2015: results.append({"source":"Scholar","title":p.get("title",""),"year":str(year),"url":p.get("url",""),"citations":str(p.get("citationCount",0))}) except: pass out = "QUERY: "+query+chr(10)+"AI EXPANDED: "+expanded+chr(10)+"="*45+chr(10)+chr(10) groups = {"PubMed":[],"Scholar":[]} seen = set() for r in results: key = r["title"][:50].lower() if key not in seen and r["url"]: seen.add(key); groups[r["source"]].append(r) for source, papers in groups.items(): if not papers: continue out += "--- "+source+" ---"+chr(10) for p in papers[:8]: out += p["title"][:85]+" ("+p["year"]+")" if p["citations"] not in ("N/A","","0"): out += " | "+p["citations"]+" citations" out += chr(10)+" "+p["url"]+chr(10)+chr(10) out += "--- SJSU ScholarWorks ---"+chr(10) out += "https://scholarworks.sjsu.edu/do/search/?q="+requests.utils.quote(query)+"&context=6781027" return out def analyze_upad_photo(image): if image is None: return None, "Upload a uPAD photo first." try: img = Image.fromarray(image) if not isinstance(image, Image.Image) else image arr = np.array(img); h,w = arr.shape[:2] y1,y2,x1,x2 = int(h*0.35),int(h*0.65),int(w*0.35),int(w*0.65) zone = arr[y1:y2,x1:x2] R,G,B = float(np.mean(zone[:,:,0])),float(np.mean(zone[:,:,1])),float(np.mean(zone[:,:,2])) c = max(0,round(0.018*(R-B)-0.3,2)) if c<1.2: s,a="Normal","Monitor annually." elif c<1.5: s,a="Borderline","Repeat in 3 months." elif c<3.0: s,a="Stage 2 CKD","Consult nephrologist." elif c<6.0: s,a="Stage 3-4 CKD","Immediate consultation." else: s,a="Stage 5 CKD","Emergency care." ri=img.copy() import PIL.ImageDraw as D; D.Draw(ri).rectangle([x1,y1,x2,y2],outline=(0,255,0),width=3) return ri,("uPAD ANALYSIS"+chr(10)+"R:"+str(round(R,1))+" G:"+str(round(G,1))+" B:"+str(round(B,1))+chr(10)+"Creatinine: "+str(c)+" mg/dL"+chr(10)+"Stage: "+s+chr(10)+"Action: "+a) except Exception as e: return None,"Error: "+str(e) def mk_chart(fn,title,bg,fg,gc,ac,pb): fig2,ax=plt.subplots(figsize=(8,5)); fig2.patch.set_facecolor(bg); ax.set_facecolor(pb) fn(ax); ax.set_title(title,color=fg,fontweight="bold",fontsize=13,pad=8) ax.tick_params(colors=ac,labelsize=10); ax.grid(True,alpha=0.3,color=gc,linestyle="--") for sp in ["top","right"]: ax.spines[sp].set_visible(False) for sp in ["bottom","left"]: ax.spines[sp].set_color(gc) plt.tight_layout(); buf=io.BytesIO(); plt.savefig(buf,format="png",facecolor=bg,bbox_inches="tight",dpi=130); buf.seek(0) res=Image.open(buf).copy(); plt.close(); return res def analyze_piv_csv(file,theme="White"): if file is None: return None,None,None,None,"Upload PIV CSV first." try: df=pd.read_csv(file.name); cols=[c.lower().strip() for c in df.columns]; df.columns=cols num_cols=df.select_dtypes(include=[np.number]).columns.tolist() if not num_cols: return None,None,None,None,"No numeric columns." bg="#fff" if theme=="White" else "#0a1628"; fg="#1a202c" if theme=="White" else "white" gc="#e2e8f0" if theme=="White" else "#2d4a8a"; ac="#4a5568" if theme=="White" else "#a8b2d8" pb="#f7fafc" if theme=="White" else "#132340" x=np.arange(len(df)) vc=next((c for c in cols if any(k in c for k in ["vel","speed","v_mag"])),num_cols[0] if num_cols else None) sc2=next((c for c in cols if any(k in c for k in ["shear","stress","tau","wss"])),num_cols[1] if len(num_cols)>1 else None) tc=next((c for c in cols if "time" in c or "frame" in c),None); xv=df[tc] if tc else x def pv(ax): if vc: ax.plot(xv,df[vc],color="#c1121f",linewidth=2.5,marker="o",markersize=5) ax.fill_between(xv,df[vc],alpha=0.15,color="#c1121f") ax.axhline(y=2.0,color="#f59e0b",linestyle="--",linewidth=2,label="Risk: 2.0 m/s") ax.set_ylabel("Velocity (m/s)",color=ac); ax.legend(fontsize=9,labelcolor=fg,facecolor=pb) def ps(ax): if sc2: xp=xv.values if tc else x ax.plot(xp,df[sc2],color="#0057a8",linewidth=2.5,marker="s",markersize=5) ax.fill_between(xp,df[sc2],alpha=0.15,color="#0057a8") ax.axhline(y=5,color="#f59e0b",linestyle="--",linewidth=2,label="Caution 5 Pa") ax.axhline(y=10,color="#c1121f",linestyle="--",linewidth=2,label="High risk 10 Pa") ax.set_ylabel("Shear (Pa)",color=ac); ax.legend(fontsize=9,labelcolor=fg,facecolor=pb) def psc(ax): if vc and sc2: s3=ax.scatter(df[vc],df[sc2],c=x,cmap="RdYlGn_r",s=90,edgecolors=fg,linewidth=0.5,zorder=5) cb=plt.colorbar(s3,ax=ax,label="Time"); cb.ax.yaxis.label.set_color(fg); cb.ax.tick_params(colors=ac) ax.axvline(x=2.0,color="#f59e0b",linestyle="--",linewidth=2); ax.axhline(y=10,color="#c1121f",linestyle="--",linewidth=2) ax.set_xlabel("Velocity (m/s)",color=ac); ax.set_ylabel("Shear (Pa)",color=ac) def psum(ax): ax.axis("off"); risk=[] st="CLINICAL SUMMARY"+chr(10)+"="*20+chr(10)+chr(10) for col in num_cols[:3]: mn=round(df[col].mean(),3); mx=round(df[col].max(),3) st+=col[:14]+":"+chr(10)+" Mean: "+str(mn)+chr(10)+" Max: "+str(mx)+chr(10)+chr(10) if "vel" in col and mx>2.0: risk.append("HIGH VELOCITY") if "shear" in col and mx>10: risk.append("HIGH SHEAR") bc="#c1121f" if risk else "#2ecc71" st+="="*20+chr(10)+("OVERALL: HIGH RISK" if risk else "OVERALL: LOW RISK") ax.text(0.05,0.97,st,transform=ax.transAxes,color=fg,fontsize=10,va="top",fontfamily="monospace", bbox=dict(boxstyle="round,pad=0.8",facecolor=pb,edgecolor=bc,linewidth=2.5)) i1=mk_chart(pv,"Velocity Profile",bg,fg,gc,ac,pb); i2=mk_chart(ps,"Wall Shear Stress",bg,fg,gc,ac,pb) i3=mk_chart(psc,"Velocity vs Shear",bg,fg,gc,ac,pb); i4=mk_chart(psum,"Clinical Summary",bg,fg,gc,ac,pb) ai="" if GROQ_KEY: try: client=Groq(api_key=GROQ_KEY) resp=client.chat.completions.create(model="llama-3.3-70b-versatile", messages=[{"role":"system","content":"PIV expert SJSU CardioLab."}, {"role":"user","content":"PIV from 27mm SJM Regent:"+chr(10)+df.describe().to_string()[:500]}],max_tokens=250) ai=chr(10)+"AI: "+resp.choices[0].message.content except: pass return i1,i2,i3,i4,"PIV: "+str(len(df))+" rows"+ai except Exception as e: return None,None,None,None,"Error: "+str(e) def analyze_tgt_csv(file,theme="White"): if file is None: return None,None,None,None,"Upload TGT CSV first." try: df=pd.read_csv(file.name); cols=[c.lower().strip() for c in df.columns]; df.columns=cols num_cols=df.select_dtypes(include=[np.number]).columns.tolist() bg="#fff" if theme=="White" else "#0a1628"; fg="#1a202c" if theme=="White" else "white" gc="#e2e8f0" if theme=="White" else "#2d4a8a"; ac="#4a5568" if theme=="White" else "#a8b2d8" pb="#f7fafc" if theme=="White" else "#132340" tc=next((c for c in cols if "time" in c or "min" in c),None) tatc=next((c for c in cols if "tat" in c),num_cols[0] if num_cols else None) pfc=next((c for c in cols if "pf" in c),num_cols[1] if len(num_cols)>1 else None) hc=next((c for c in cols if "hemo" in c),num_cols[2] if len(num_cols)>2 else None) plc=next((c for c in cols if "platelet" in c or "plt" in c),num_cols[3] if len(num_cols)>3 else None) def mk2(dc,color,yl,lim,ll,title,bar=False): def fn(ax): if dc and dc in df.columns: xp=df[tc].values if tc else range(len(df)); yp=df[dc].values if bar: bs=ax.bar(range(len(yp)),yp,color=color,alpha=0.85,edgecolor=bg,width=0.6) for b,v in zip(bs,yp): ax.text(b.get_x()+b.get_width()/2,b.get_height()+0.5,str(round(v,1)),ha="center",va="bottom",color=fg,fontsize=10,fontweight="bold") else: ax.plot(xp,yp,color=color,linewidth=3,marker="o",markersize=8) ax.fill_between(xp,yp,alpha=0.15,color=color) for xi,yi in zip(xp,yp): ax.annotate(str(round(yi,1)),(xi,yi),textcoords="offset points",xytext=(0,10),ha="center",color=fg,fontsize=10,fontweight="bold") ax.axhline(y=lim,color="#f59e0b",linestyle="--",linewidth=2.5,label=ll) ax.legend(fontsize=10,labelcolor=fg,facecolor=pb); ax.set_ylabel(yl,color=ac) mv=round(float(np.max(yp)),2) ax.set_title(title+chr(10)+"Max: "+str(mv)+" - "+("HIGH" if mv>lim else "NORMAL"),color=fg,fontweight="bold",fontsize=12) return mk_chart(fn,title,bg,fg,gc,ac,pb) i1=mk2(tatc,"#c1121f","TAT (ng/mL)",8,"Normal: 8","TAT"); i2=mk2(pfc,"#0057a8","PF1.2",2.0,"Normal: 2.0","PF1.2") i3=mk2(hc,"#2ecc71","Free Hgb (mg/L)",20,"Normal: 20","Free Hemoglobin",bar=True); i4=mk2(plc,"#e8a020","Platelets",150,"Normal>150","Platelets") ai="" if GROQ_KEY: try: client=Groq(api_key=GROQ_KEY) resp=client.chat.completions.create(model="llama-3.3-70b-versatile", messages=[{"role":"system","content":"Hematology expert. Give thrombogenicity risk."}, {"role":"user","content":"TGT:"+chr(10)+df.describe().to_string()[:500]}],max_tokens=250) ai=chr(10)+"AI: "+resp.choices[0].message.content except: pass return i1,i2,i3,i4,"TGT: "+str(len(df))+" rows"+ai except Exception as e: return None,None,None,None,"Error: "+str(e) def generate_image(prompt): if not prompt.strip(): return None,"Enter description.",""; if not HF_TOKEN: return None,"Add HF_TOKEN to Space secrets.",""; try: enhanced,desc=prompt,"" if GROQ_KEY: try: client=Groq(api_key=GROQ_KEY) resp=client.chat.completions.create(model="llama-3.3-70b-versatile", messages=[{"role":"system","content":"Format: DESCRIPTION: [2 sentences] PROMPT: [detailed image prompt]"}, {"role":"user","content":"Biomedical image: "+prompt}],max_tokens=200) full=resp.choices[0].message.content if "DESCRIPTION:" in full and "PROMPT:" in full: desc=full.split("DESCRIPTION:")[1].split("PROMPT:")[0].strip() enhanced=full.split("PROMPT:")[1].strip() except: pass headers={"Authorization":"Bearer "+HF_TOKEN,"Content-Type":"application/json"} for url in ["https://router.huggingface.co/hf-inference/models/black-forest-labs/FLUX.1-schnell", "https://router.huggingface.co/hf-inference/models/stabilityai/stable-diffusion-xl-base-1.0"]: try: r=requests.post(url,headers=headers,json={"inputs":enhanced,"parameters":{"num_inference_steps":8}},timeout=60) if r.status_code==200: return Image.open(io.BytesIO(r.content)),"Generated!",desc except: continue return None,"Models busy.",desc except Exception as e: return None,"Error: "+str(e),"" def piv_manual(v,s,h): vr="HIGH-stenosis" if float(v)>2.0 else "NORMAL" sr="HIGH-thrombosis" if float(s)>10 else "ELEVATED" if float(s)>5 else "NORMAL" return "Velocity: "+str(v)+" m/s — "+vr+chr(10)+"Shear: "+str(s)+" Pa — "+sr+chr(10)+"HR: "+str(h)+" bpm" def tgt_manual(t,p,h,pl,tm): risk=sum([float(t)>15,float(p)>2.0,float(h)>50,float(pl)<150]) return "TAT:"+str(t)+" PF1.2:"+str(p)+chr(10)+"Hemo:"+str(h)+" Plt:"+str(pl)+chr(10)+"RESULT: "+("HIGH RISK" if risk>=3 else "MODERATE" if risk>=2 else "LOW RISK") with gr.Blocks(title="CardioLab AI - SJSU", css=CSS) as demo: gr.HTML(HEADER) papers_count = len(set(m["paper"] for m in METADATA)) if PAPERS_LOADED else 0 model_status = "CardioLab Fine-tuned Model LOADED" if CARDIOLAB_MODEL_LOADED else "Fine-tuned model loading..." rag_status = f"RAG: {len(CHUNKS)} chunks from {papers_count} SJSU papers" if PAPERS_LOADED else "RAG: loading..." gr.HTML(f'''