Spaces:
Sleeping
Sleeping
| import os | |
| import streamlit as st | |
| from PIL import Image | |
| import pandas as pd | |
| from datetime import datetime | |
| import requests | |
| from geopy.geocoders import Nominatim | |
| import folium | |
| from streamlit_folium import st_folium | |
| import cv2 | |
| import numpy as np | |
| from huggingface_hub import snapshot_download | |
| from transformers import ( | |
| AutoFeatureExtractor, | |
| AutoModelForImageClassification, | |
| ConvNextConfig, | |
| pipeline, | |
| ) | |
| st.set_page_config(page_title="Skin Cancer Dashboard", layout="wide") | |
| # --- Configuration --- | |
| # Ensure you have set your Hugging Face token as an environment variable: | |
| #export HF_TOKEN="YOUR_TOKEN_HERE" | |
| MODEL_NAME = "Anwarkh1/Skin_Cancer-Image_Classification" | |
| LLM_NAME = "google/flan-t5-xl" | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| DATA_DIR = "data/harvard_dataset" # Path where you download and unpack the Harvard Dataverse dataset | |
| DIARY_CSV = "diary.csv" | |
| # Initialize session state defaults | |
| if 'initialized' not in st.session_state: | |
| st.session_state['label'] = None | |
| st.session_state['score'] = None | |
| st.session_state['mole_id'] = '' | |
| st.session_state['geo_location'] = '' | |
| st.session_state['chat_history'] = [] | |
| st.session_state['initialized'] = True | |
| # Initialize geolocator for free geocoding | |
| geolocator = Nominatim(user_agent="skin-dashboard", timeout = 10) | |
| def load_image_model(token: str): | |
| return pipeline( | |
| "image-classification", | |
| feature_extractor=AutoFeatureExtractor.from_pretrained( | |
| MODEL_NAME, | |
| #subfolder="Skin_Cancer-Image_Classification", | |
| use_auth_token=token | |
| ), | |
| model=AutoModelForImageClassification.from_pretrained( | |
| MODEL_NAME, | |
| #subfolder="Skin_Cancer-Image_Classification", | |
| use_auth_token=token | |
| ), | |
| device=0 # or -1 for CPU | |
| ) | |
| def load_llm(token: str): | |
| return pipeline( | |
| "text2text-generation", | |
| model=LLM_NAME, | |
| device_map="auto", # or device=0 for single GPU / -1 for CPU | |
| max_length=10000, | |
| num_beams=5, | |
| no_repeat_ngram_size=2, | |
| early_stopping=True, | |
| ) | |
| classifier = load_image_model(HF_TOKEN) if HF_TOKEN else None | |
| explainer = load_llm(HF_TOKEN) if HF_TOKEN else None | |
| # --- Diary Init ---- | |
| if not os.path.exists(DIARY_CSV): | |
| pd.DataFrame( | |
| columns=["timestamp", "image_path", "mole_id", "geo_location", "label", "score", | |
| "body_location", "prior_consultation", "pain", "itch"] | |
| ).to_csv(DIARY_CSV, index=False) | |
| # --- Save entry helper | |
| def save_entry(img_path: str, mole_id: str, geo_location: str, | |
| label: str, score: float, | |
| body_location: str, prior_consult: str, pain: str, itch: str): | |
| df = pd.read_csv(DIARY_CSV) | |
| entry = { | |
| "timestamp": datetime.now().isoformat(), | |
| "image_path": img_path, | |
| "mole_id": mole_id, | |
| "geo_location": geo_location, | |
| "label": label, | |
| "score": float(score), | |
| "body_location": body_location, | |
| "prior_consultation": prior_consult, | |
| "pain": pain, | |
| "itch": itch | |
| } | |
| df.loc[len(df)] = entry | |
| df.to_csv(DIARY_CSV, index=False) | |
| # --- Preprocessing Functions --- | |
| def remove_hair(img: np.ndarray) -> np.ndarray: | |
| gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (17, 17)) | |
| blackhat = cv2.morphologyEx(gray, cv2.MORPH_BLACKHAT, kernel) | |
| _, mask = cv2.threshold(blackhat, 10, 255, cv2.THRESH_BINARY) | |
| return cv2.inpaint(img, mask, 1, cv2.INPAINT_TELEA) | |
| def preprocess(img: Image.Image, size: int = 224) -> Image.Image: | |
| arr = np.array(img) | |
| bgr = cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) | |
| bgr = remove_hair(bgr) | |
| bgr = cv2.bilateralFilter(bgr, d=9, sigmaColor=75, sigmaSpace=75) | |
| lab = cv2.cvtColor(bgr, cv2.COLOR_BGR2LAB) | |
| l, a, b = cv2.split(lab) | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) | |
| cl = clahe.apply(l) | |
| merged = cv2.merge((cl, a, b)) | |
| bgr = cv2.cvtColor(merged, cv2.COLOR_LAB2BGR) | |
| h, w = bgr.shape[:2] | |
| scale = size / max(h, w) | |
| nh, nw = int(h*scale), int(w*scale) | |
| resized = cv2.resize(bgr, (nw, nh), interpolation=cv2.INTER_AREA) | |
| canvas = np.full((size, size, 3), 128, dtype=np.uint8) | |
| top, left = (size-nh)//2, (size-nw)//2 | |
| canvas[top:top+nh, left:left+nw] = resized | |
| rgb = cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB) | |
| return Image.fromarray(rgb) | |
| # -----Streamlit layout ---- | |
| st.title("🩺 Skin Cancer Recognition Dashboard") | |
| menu = ["Scan Mole","Chat","Diary", "Dataset Explorer"] | |
| choice = st.sidebar.selectbox("Navigation", menu) | |
| # --- Initialize Scan a Mole --- | |
| if choice == "Scan Mole": | |
| st.header("🔍 Scan a Mole") | |
| if not classifier: | |
| st.error("Missing HF_TOKEN.") | |
| st.stop() | |
| upload = st.file_uploader("Upload a skin image", type=["jpg","jpeg","png"]) | |
| if not upload: | |
| st.info("Please upload an image to begin.") | |
| st.stop() | |
| raw = Image.open(upload).convert("RGB") | |
| st.image(raw, caption="Original", use_container_width=True) | |
| proc = preprocess(raw) | |
| st.image(proc, caption="Preprocessed", use_container_width=True) | |
| mole = st.text_input("Mole ID") | |
| city = st.text_input("Geographic location") | |
| body = st.selectbox("Body location", ["Face","Scalp","Neck","Chest","Back","Arm","Hand","Leg","Foot","Other"]) | |
| prior = st.radio("Prior consult?", ["Yes","No"], horizontal=True) | |
| pain = st.radio("Pain?", ["Yes","No"], horizontal=True) | |
| itch = st.radio("Itch?", ["Yes","No"], horizontal=True) | |
| if st.button("Classify"): | |
| if not mole or not city: | |
| st.error("Enter ID and location.") | |
| else: | |
| with st.spinner("Analyzing..."): | |
| out = classifier(proc) | |
| lbl, scr = out[0]["label"], out[0]["score"] | |
| save_dir = os.path.join("scans", f"{mole}_{datetime.now().timestamp()}.png") | |
| os.makedirs(os.path.dirname(save_dir), exist_ok=True) | |
| raw.save(save_dir) | |
| save_entry(save_dir, mole, city, lbl, scr, body, prior, pain, itch) | |
| st.session_state.update({ | |
| 'label': lbl, | |
| 'score': scr, | |
| 'mole_id': mole, | |
| 'geo_location': city | |
| }) | |
| if st.session_state['label']: | |
| st.success(f"Prediction: {st.session_state['label']} (score {st.session_state['score']:.2f})") | |
| if explainer: | |
| with st.spinner("Explaining..."): | |
| text = explainer(f"Explain {st.session_state['label']} and recommendation.")[0]['generated_text'] | |
| st.markdown("### Explanation"); st.write(text) | |
| loc = geolocator.geocode(st.session_state['geo_location']) | |
| if loc: | |
| m = folium.Map([loc.latitude, loc.longitude], zoom_start=12) | |
| folium.Marker([loc.latitude, loc.longitude], "You").add_to(m) | |
| resp = requests.post( | |
| "https://overpass-api.de/api/interpreter", | |
| data={"data": f"[out:json];node(around:5000,{loc.latitude},{loc.longitude})[~\"^(amenity|healthcare)$\"~\"clinic|doctors\"];out;"} | |
| ) | |
| for el in resp.json().get('elements', []): | |
| tags = el.get('tags', {}); | |
| lat = el.get('lat') or el['center']['lat']; lon = el.get('lon') or el['center']['lon'] | |
| folium.Marker([lat, lon], tags.get('name','Clinic')).add_to(m) | |
| st.markdown("### Nearby Clinics"); st_folium(m, width=700) | |
| # --- Chat Tab --- | |
| elif choice == "Chat": | |
| st.header("💬 Follow-Up Chat") | |
| if not st.session_state['label']: | |
| st.info("Please perform a scan first in the 'Scan Mole' tab.") | |
| else: | |
| lbl = st.session_state['label'] | |
| scr = st.session_state['score'] | |
| mid = st.session_state['mole_id'] | |
| gloc = st.session_state['geo_location'] | |
| st.markdown(f"**Context:** prediction for **{mid}** at **{gloc}** is **{lbl}** (confidence {scr:.2f}).") | |
| # New user message comes first for immediate loop | |
| user_q = st.chat_input("Ask a follow-up question:", key="chat_input") | |
| if user_q and explainer: | |
| st.session_state['chat_history'].append({'role':'user','content':user_q}) | |
| system_p = "You are a dermatology assistant. Provide concise medical advice without clarifying questions." | |
| tpl = ( | |
| f"{system_p}\nContext: prediction is {lbl} with confidence {scr:.2f}.\n" | |
| f"User: {user_q}\nAssistant:" | |
| ) | |
| with st.spinner("Generating response..."): | |
| reply = explainer(tpl)[0]['generated_text'] | |
| st.session_state['chat_history'].append({'role':'assistant','content':reply}) | |
| # Display the updated chat history | |
| for msg in st.session_state['chat_history']: | |
| prefix = 'You' if msg['role']=='user' else 'AI' | |
| st.markdown(f"**{prefix}:** {msg['content']}") | |
| # --- Diary Page --- | |
| elif choice == "Diary": | |
| st.header("📖 Skin Cancer Diary") | |
| df = pd.read_csv(DIARY_CSV) | |
| df['timestamp'] = pd.to_datetime(df['timestamp']) | |
| if df.empty: | |
| st.info("No diary entries yet.") | |
| else: | |
| mole_ids = sorted(df['mole_id'].unique()) | |
| sel = st.selectbox("Select Mole to View", ['All'] + mole_ids, key="diary_sel") | |
| if sel == 'All': | |
| # Display moles in columns (max 3 per row) | |
| chunks = [mole_ids[i:i+3] for i in range(0, len(mole_ids), 3)] | |
| for group in chunks: | |
| cols = st.columns(len(group)) | |
| for col, mid in zip(cols, group): | |
| with col: | |
| st.subheader(mid) | |
| entries = df[df['mole_id'] == mid].sort_values('timestamp') | |
| # Show image timeline | |
| for _, row in entries.iterrows(): | |
| if os.path.exists(row['image_path']): | |
| st.image( | |
| row['image_path'], | |
| width=150, | |
| caption=f"{row['timestamp'].strftime('%Y-%m-%d')} — {row['score']:.2f}" | |
| ) | |
| st.write(f"Total scans: {len(entries)}") | |
| else: | |
| # Detailed view for a single mole | |
| entries = df[df['mole_id'] == sel].sort_values('timestamp') | |
| if entries.empty: | |
| st.warning(f"No entries for {sel}.") | |
| else: | |
| # Score over time | |
| st.line_chart(entries.set_index('timestamp')['score']) | |
| st.markdown("#### Image Timeline") | |
| for _, row in entries.iterrows(): | |
| if os.path.exists(row['image_path']): | |
| st.image( | |
| row['image_path'], | |
| width=200, | |
| caption=( | |
| f"{row['timestamp'].strftime('%Y-%m-%d %H:%M')} — " | |
| f"Score: {row['score']:.2f}" | |
| ) | |
| ) | |
| st.markdown("#### Details") | |
| st.dataframe( | |
| entries[ | |
| ['timestamp','geo_location','label','score', | |
| 'body_location','prior_consultation','pain','itch'] | |
| ] | |
| .rename(columns={ | |
| 'timestamp':'Time','geo_location':'Location', | |
| 'label':'Diagnosis','score':'Confidence', | |
| 'body_location':'Body Part','prior_consultation':'Prior Consult', | |
| 'pain':'Pain','itch':'Itch' | |
| }) | |
| .sort_values('Time', ascending=False) | |
| ) | |
| else: | |
| st.header("📂 Dataset Explorer") | |
| st.write("Preview images from the Harvard Skin Cancer Dataset") | |
| # pick up to 15 image files | |
| image_files = [ | |
| f for f in os.listdir(DATA_DIR) | |
| if os.path.isfile(os.path.join(DATA_DIR, f)) | |
| and f.lower().endswith((".jpg", ".jpeg", ".png")) | |
| ][:15] | |
| for i in range(0, len(image_files), 3): | |
| cols = st.columns(3) | |
| for col, fn in zip(cols, image_files[i : i + 3]): | |
| path = os.path.join(DATA_DIR, fn) | |
| img = Image.open(path) | |
| col.image(img, use_container_width=True) | |
| col.caption(fn) | |
| st.sidebar.markdown("---") | |
| st.sidebar.write("Dataset powered by Harvard Dataverse [DBW86T]") | |
| st.sidebar.write(f"Model: {MODEL_NAME}") | |
| st.sidebar.write(f"LLM: {LLM_NAME}") | |
| if __name__ == '__main__': | |
| st.write() | |