Spaces:
Runtime error
Runtime error
| import os | |
| import time | |
| import gradio as gr | |
| import re | |
| import requests | |
| import pandas as pd | |
| import faiss | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| import PyPDF2 | |
| # Install required tools | |
| os.system("apt-get update") | |
| # Ensure Hugging Face Authentication | |
| os.system("huggingface-cli login") | |
| # Load the Llama-3.2-3B-Instruct model and tokenizer | |
| MODEL_NAME = "meta-llama/Llama-3.2-3B-Instruct" | |
| print("Loading Llama model...") | |
| API_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, token=API_TOKEN) | |
| model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, token=API_TOKEN) | |
| # Load the encoder model for FAISS | |
| encoder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| # Paths for required files | |
| INDEX_PATH = "blood_test_index.faiss" | |
| CSV_PATH = "rag_documents.csv" | |
| def generate_faiss_index(): | |
| df = pd.read_csv(CSV_PATH) | |
| if "Content" not in df.columns: | |
| raise ValueError("The CSV file must contain a 'Content' column.") | |
| print("Encoding sentences...") | |
| embeddings = encoder.encode(df["Content"].tolist()).astype("float32") | |
| print("Creating FAISS index...") | |
| index = faiss.IndexFlatL2(embeddings.shape[1]) | |
| index.add(embeddings) | |
| faiss.write_index(index, INDEX_PATH) | |
| print("FAISS index generated successfully!") | |
| if not os.path.exists(INDEX_PATH): | |
| print("Generating FAISS index...") | |
| if not os.path.exists(CSV_PATH): | |
| raise FileNotFoundError(f"The required file '{CSV_PATH}' is missing!") | |
| generate_faiss_index() | |
| print("Loading FAISS index...") | |
| index = faiss.read_index(INDEX_PATH) | |
| rag_df = pd.read_csv(CSV_PATH) | |
| def load_thresholds(file_path="blood_test_thresholds.csv"): | |
| df = pd.read_csv(file_path) | |
| thresholds = {} | |
| for _, row in df.iterrows(): | |
| thresholds[row["Parameter"]] = {"low": row["Low"], "high": row["High"], "unit": row["Unit"]} | |
| return thresholds | |
| def extract_text_from_pdf(pdf_path): | |
| text = "" | |
| with open(pdf_path, "rb") as file: | |
| reader = PyPDF2.PdfReader(file) | |
| for page in reader.pages: | |
| text += page.extract_text() or "" | |
| return text | |
| def retrieve_context(query, index, document_df): | |
| query_vector = encoder.encode([query]).astype("float32") | |
| distances, indices = index.search(query_vector, k=3) | |
| results = [document_df.iloc[i]["Content"] for i in indices[0]] | |
| return " ".join(results) | |
| def generate_response_with_llama(flagged_abnormality, context): | |
| prompt = ( | |
| f"Flagged Abnormality: {flagged_abnormality}\n" | |
| f"Context: {context[:300]}.\n" | |
| f"Provide specific and actionable medical advice for the abnormality:" | |
| ) | |
| inputs = tokenizer(prompt, return_tensors="pt", max_length=512, truncation=True) | |
| outputs = model.generate( | |
| inputs["input_ids"], | |
| max_length=150, | |
| num_return_sequences=1, | |
| no_repeat_ngram_size=2, | |
| top_p=0.9, | |
| temperature=0.7, | |
| ) | |
| return tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| def analyze_blood_report(extracted_text, thresholds): | |
| lines = [line.strip() for line in extracted_text.split("\n") if line.strip()] | |
| blood_data = {} | |
| regex = r"([A-Za-z\s\#]+)\s*[:\-]?\s*([\d.]+)\s*([a-zA-Z/\s]*)" | |
| for line in lines: | |
| match = re.search(regex, line) | |
| if match: | |
| param, value_str, unit = match.group(1).strip(), match.group(2).strip(), match.group(3).strip() | |
| try: | |
| value = float(value_str) | |
| blood_data[param] = {"value": value, "unit": unit} | |
| except ValueError: | |
| continue | |
| flagged = {} | |
| recommendations = {} | |
| for parameter, data in blood_data.items(): | |
| if parameter in thresholds: | |
| value, unit = data["value"], data["unit"] | |
| if value < thresholds[parameter]["low"]: | |
| flagged[parameter] = f"Low ({value} {unit})" | |
| elif value > thresholds[parameter]["high"]: | |
| flagged[parameter] = f"High ({value} {unit})" | |
| for param, status in flagged.items(): | |
| query = f"The blood test result for {param} is {status}." | |
| context = retrieve_context(query, index, rag_df) | |
| recommendations[param] = generate_response_with_llama(flagged[param], context) | |
| return flagged, recommendations | |
| def process_pdf(pdf_path): | |
| thresholds = load_thresholds() | |
| extracted_text = extract_text_from_pdf(pdf_path) | |
| flagged, recommendations = analyze_blood_report(extracted_text, thresholds) | |
| return {"Flagged Abnormalities": flagged, "Recommendations": recommendations} | |
| interface = gr.Interface( | |
| fn=process_pdf, | |
| inputs=gr.File(type="filepath", label="Upload Blood Test PDF"), | |
| outputs="json", | |
| title="Blood Test Analyzer (PDF)", | |
| description="Upload a PDF blood test report. This tool extracts data, flags abnormalities, and provides medical recommendations." | |
| ) | |
| if __name__ == "__main__": | |
| interface.launch(share=True) | |