rag / app.py
Donlagon007's picture
Update app.py
ec441c7 verified
def _simple_response(self, scenario_params: dict, stats: dict) -> str:
"""Simple fallback response without LLM"""
if not scenario_params:
return f"""Based on the analysis results, colorectal cancer screening reduces cancer incidence from {stats['cancer_rate_noscreening']:.2%} to {stats['cancer_rate_screening']:.2%} ({stats.get('cancer_reduction_pct', 0):.1f}% reduction) and advanced cancer from {stats['advca_rate_noscreening']:.2%} to {stats['advca_rate_screening']:.2%} ({stats.get('advca_reduction_pct', 0):.1f}% reduction). The cancer prevention effect is {stats['cancer_prevented_rate']:.2%} and the advanced cancer prevention rate is {stats['advca_prevented_rate']:.2%}.
If you have specific parameter adjustments in mind, please provide them for a detailed comparison of the screening effectiveness with the adjusted parameters."""
response = f"""
πŸ“Š Analysis Results (Based on {stats['total_cases']:,} real cases with fixed random seed):
🎯 Cancer Incidence:
- No screening: {stats['cancer_rate_noscreening']:.2%}
- With screening: {stats['cancer_rate_screening']:.2%}
- Reduction: {stats.get('cancer_reduction_pct', 0):.1f}%
🎯 Advanced Cancer Incidence:
- No screening: {stats['advca_rate_noscreening']:.2%}
- With screening: {stats['advca_rate_screening']:.2%}
- Reduction: {stats.get('advca_reduction_pct', 0):.1f}%
πŸ”¬ Prevention Effects:
- Cancer prevention effect: {stats['cancer_prevented_rate']:.2%}
- Advanced cancer prevention: {stats['advca_prevented_rate']:.2%}
"""
if scenario_params:
response += f"\nπŸ”§ Parameter Adjustments:\n"
for param, value in scenario_params.items():
if param == 'screening_interval':
response += f" - {param}: {value} years\n"
else:
response += f" - {param}: {value:.1%}\n"
return response # Colorectal Cancer Screening AI Analysis System - English Version
import pandas as pd
import numpy as np
import random
import re
import json
import os
from typing import Dict, List, Optional
from langchain_openai import ChatOpenAI
from langchain.schema import SystemMessage, HumanMessage
import pyreadstat
import requests
# Set API Key
os.environ["OPENAI_API_KEY"] ="OPENAI_API_KEY" # θ«‹ζ›Ώζ›η‚Ίζ‚¨ηš„ε―¦ιš› API Key
class ScreeningSimulator:
def __init__(self):
self.params = {
'participation_rate': 0.40,
'sensitivity_state2': 0.44,
'sensitivity_state3': 0.52,
'sensitivity_state4': 0.61,
'sensitivity_state5': 0.70,
'referral_rate': 0.70,
'treatment_success': 0.90,
'screening_interval': 2 # years between screenings
}
self.data = None
self.results = None
# Fixed random seeds for reproducible results
self.random_seed = 246810
self.numpy_seed = 13579
def load_data(self):
"""Load and sample the screening data"""
# Set random seed for reproducible sampling
np.random.seed(self.numpy_seed)
random.seed(self.random_seed)
DROPBOX_URL = "https://www.dropbox.com/scl/fi/p3n3g7h3wifzs26y0ylah/matched_with_error_all_noid.sas7bdat?rlkey=il8x2ur5xf1n5ivs84rwdlyto&st=jk0vcb2f&dl=1"
LOCAL_FILENAME = "matched_with_error_all_noid.sas7bdat"
def download_from_dropbox(url, out_path):
if not os.path.exists(out_path):
print("Downloading file from Dropbox...")
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(out_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
print("Download complete.")
else:
print("File already exists, skipping download.")
download_from_dropbox(DROPBOX_URL, LOCAL_FILENAME)
# data_path = r"matched_with_error_all_noid.sas7bdat"
# data_path = hf_hub_download(repo_id="Donlagon007/iaccs2025", filename="matched_with_error_all_noid.sas7bdat")
full_data, meta = pyreadstat.read_sas7bdat(LOCAL_FILENAME)
sample_size = int(len(full_data) * 0.01)
self.data = full_data.sample(n=sample_size, random_state=self.random_seed).reset_index(drop=True)
return True
def simulate_screening(self, custom_params=None):
"""Simulate the screening process for each individual"""
# Set random seed for reproducible simulation
random.seed(self.random_seed)
np.random.seed(self.numpy_seed)
if custom_params:
params = {**self.params, **custom_params}
else:
params = self.params
results = []
for idx, row in self.data.iterrows():
# Extract vstate50-vstate80 (health states by age)
vstate = {}
for age in range(50, 81):
col = f'vstate{age}'
if col in row.index and pd.notna(row[col]):
vstate[age] = int(row[col])
else:
vstate[age] = 1
# Initialize intervention state (same as natural history initially)
istate = vstate.copy()
# Screening variables
participated = False
detected = False
had_referral = False
intervention_success = False
intervention_age = None
intervention_state = None
# Determine if this person never participates (0% for now)
never_participate = random.random() < 0
# If not a never-participant, go through screening process
if not never_participate:
# Screening process: starts at age 50, with custom interval, until age 74
screening_interval = int(params['screening_interval'])
for screening_age in range(50, 75, screening_interval):
current_state = istate[screening_age]
# Skip if already dead or in terminal state
if current_state in [6, 7]:
continue
# Check if participates in this screening round
if random.random() <= params['participation_rate']:
participated = True
# Detection probability based on current state
detection_prob = 0
if current_state in [2, 8]: # Adenoma states
detection_prob = params['sensitivity_state2']
elif current_state in [3, 9]: # Advanced adenoma states
detection_prob = params['sensitivity_state3']
elif current_state == 4: # Cancer
detection_prob = params['sensitivity_state4']
elif current_state == 5: # Advanced cancer
detection_prob = params['sensitivity_state5']
if detection_prob > 0 and random.random() <= detection_prob:
detected = True
# Referral for further investigation (colonoscopy)
if random.random() <= params['referral_rate']:
had_referral = True
# Treatment success
if random.random() <= params['treatment_success']:
intervention_success = True
intervention_age = screening_age
intervention_state = current_state
# Reset all future states to healthy (state 1)
for treat_age in range(screening_age, 81):
istate[treat_age] = 1
break
# Only break if intervention was successful
if intervention_success:
break
# Calculate outcome indicators
cancer_noscreening = any(state in [4, 5, 6, 7] for state in vstate.values())
cancer_screening = any(state in [4, 5, 6, 7] for state in istate.values())
advca_noscreening = any(state in [5, 7] for state in vstate.values())
advca_screening = any(state in [5, 7] for state in istate.values())
# Overdiagnosis calculation - two types
detected_state = None
if intervention_success:
detected_state = vstate[intervention_age]
overdiag_adenoma = False # Overdiagnosis of adenoma (state 8)
overdiag_advanced_adenoma = False # Overdiagnosis of advanced adenoma (state 9)
adenoma_detected = False # Any adenoma detected (states 2,3,8,9)
advanced_adenoma_detected = False # Advanced adenoma detected (states 3,9)
if intervention_success and detected_state is not None:
if detected_state in [2, 3, 8, 9]: # Adenoma detected
adenoma_detected = True
if detected_state in [8, 9]: # Overdiagnosis adenoma (8,9)
overdiag_adenoma = True
if detected_state in [3, 9]: # Advanced adenoma detected
advanced_adenoma_detected = True
if detected_state == 9: # Overdiagnosis advanced adenoma (9)
overdiag_advanced_adenoma = True
results.append({
'cancer_noscreening': cancer_noscreening,
'cancer_screening': cancer_screening,
'advca_noscreening': advca_noscreening,
'advca_screening': advca_screening,
'participated': participated,
'detected': detected,
'had_referral': had_referral,
'intervention_success': intervention_success,
'adenoma_detected': adenoma_detected,
'advanced_adenoma_detected': advanced_adenoma_detected,
'overdiag_adenoma': overdiag_adenoma,
'overdiag_advanced_adenoma': overdiag_advanced_adenoma
})
self.results = pd.DataFrame(results)
return self.results
def analyze_results(self):
"""Analyze simulation results and calculate key metrics"""
df = self.results
# Overdiagnosis rate calculation - two types
adenoma_detected_count = df['adenoma_detected'].sum()
advanced_adenoma_detected_count = df['advanced_adenoma_detected'].sum()
overdiag_adenoma_count = df['overdiag_adenoma'].sum()
overdiag_advanced_adenoma_count = df['overdiag_advanced_adenoma'].sum()
overdiag_adenoma_rate = overdiag_adenoma_count / adenoma_detected_count if adenoma_detected_count > 0 else 0
overdiag_advanced_adenoma_rate = overdiag_advanced_adenoma_count / advanced_adenoma_detected_count if advanced_adenoma_detected_count > 0 else 0
stats = {
'total_cases': len(df),
'cancer_rate_noscreening': df['cancer_noscreening'].mean(),
'cancer_rate_screening': df['cancer_screening'].mean(),
'cancer_prevented_rate': (df['cancer_noscreening'] & ~df['cancer_screening']).mean(),
'advca_rate_noscreening': df['advca_noscreening'].mean(),
'advca_rate_screening': df['advca_screening'].mean(),
'advca_prevented_rate': (df['advca_noscreening'] & ~df['advca_screening']).mean(),
'participation_rate': df['participated'].mean(),
'detection_rate': df['detected'].mean(),
'referral_rate': df['had_referral'].mean(),
'intervention_success_rate': df['intervention_success'].mean(),
# Two types of overdiagnosis rates
'overdiag_adenoma_rate': overdiag_adenoma_rate,
'overdiag_advanced_adenoma_rate': overdiag_advanced_adenoma_rate,
# Detailed counts
'adenoma_detected_count': adenoma_detected_count,
'advanced_adenoma_detected_count': advanced_adenoma_detected_count,
'overdiag_adenoma_count': overdiag_adenoma_count,
'overdiag_advanced_adenoma_count': overdiag_advanced_adenoma_count
}
# Relative reduction percentages
if stats['cancer_rate_noscreening'] > 0:
stats['cancer_reduction_pct'] = (stats['cancer_rate_noscreening'] - stats['cancer_rate_screening']) / stats[
'cancer_rate_noscreening'] * 100
stats['cancer_reduction_rate'] = 1 - (stats['cancer_rate_screening'] / stats['cancer_rate_noscreening'])
if stats['advca_rate_noscreening'] > 0:
stats['advca_reduction_pct'] = (stats['advca_rate_noscreening'] - stats['advca_rate_screening']) / stats[
'advca_rate_noscreening'] * 100
stats['advca_reduction_rate'] = 1 - (stats['advca_rate_screening'] / stats['advca_rate_noscreening'])
return stats
class CancerScreeningLLMInterface:
def __init__(self, simulator: ScreeningSimulator):
self.simulator = simulator
self.llm = ChatOpenAI(temperature=0.3, model="gpt-3.5-turbo", max_tokens=1000)
def parse_user_query(self, user_input: str) -> dict:
"""Parse user question and extract screening parameter adjustments"""
prompt = f"""
Please analyze the user's question and extract screening parameter adjustments:
User question: {user_input}
Parameter descriptions:
1. participation_rate: Currently 40%
2. referral_rate: Currently 70%
3. treatment_success: Currently 90%
4. sensitivity_state2/3/4/5: Currently 44%/52%/61%/70%
5. screening_interval: Currently 2 years (between screenings)
Return only JSON format with parameters the user wants to adjust.
Example: {{"participation_rate": 0.60, "referral_rate": 0.90, "screening_interval": 1}}
If no parameter adjustments, return: {{}}
"""
try:
response = self.llm.invoke([HumanMessage(content=prompt)])
json_match = re.search(r'\{[^}]*\}', response.content)
if json_match:
return json.loads(json_match.group())
return {}
except:
return self._simple_parse(user_input)
def _simple_parse(self, user_input: str) -> dict:
"""Simple fallback parsing using regex"""
user_input = user_input.lower()
params = {}
numbers = re.findall(r'(\d+(?:\.\d+)?)', user_input)
if ('participation' in user_input or 'participate' in user_input) and numbers:
value = float(numbers[0])
params['participation_rate'] = value / 100 if value > 1 else value
if ('referral' in user_input or 'colonoscopy' in user_input or 'enhance' in user_input) and numbers:
value = float(numbers[0])
params['referral_rate'] = value / 100 if value > 1 else value
if ('treatment' in user_input or 'success' in user_input) and numbers:
value = float(numbers[0])
params['treatment_success'] = value / 100 if value > 1 else value
if ('sensitivity' in user_input or 'detection' in user_input) and numbers:
value = float(numbers[0])
sens_value = value / 100 if value > 1 else value
# Apply to all sensitivity parameters
params['sensitivity_state2'] = sens_value
params['sensitivity_state3'] = sens_value
params['sensitivity_state4'] = sens_value
params['sensitivity_state5'] = sens_value
if (
'interval' in user_input or 'frequency' in user_input or 'yearly' in user_input or 'annual' in user_input) and numbers:
value = float(numbers[0])
# Handle common expressions
if 'annual' in user_input or 'yearly' in user_input or 'every year' in user_input:
params['screening_interval'] = 1
elif 'biennial' in user_input or 'every 2 years' in user_input or 'two years' in user_input:
params['screening_interval'] = 2
else:
params['screening_interval'] = int(value)
return params
def generate_response(self, user_input: str, scenario_params: dict, stats: dict) -> str:
"""Generate AI response based on analysis results"""
# For basic effectiveness questions without parameter changes
if not scenario_params:
return f"""Based on the analysis results, colorectal cancer screening reduces cancer incidence from {stats['cancer_rate_noscreening']:.2%} to {stats['cancer_rate_screening']:.2%} ({stats.get('cancer_reduction_pct', 0):.1f}% reduction) and advanced cancer from {stats['advca_rate_noscreening']:.2%} to {stats['advca_rate_screening']:.2%} ({stats.get('advca_reduction_pct', 0):.1f}% reduction). The cancer prevention effect is {stats['cancer_prevented_rate']:.2%} and the advanced cancer prevention rate is {stats['advca_prevented_rate']:.2%}.
If you have specific parameter adjustments in mind, please provide them for a detailed comparison of the screening effectiveness with the adjusted parameters."""
# For questions with parameter adjustments, provide comparison
try:
# Run baseline simulation for comparison
baseline_results = self.simulator.simulate_screening()
baseline_stats = self.simulator.analyze_results()
response = f"""With the parameter adjustment of {scenario_params}:
**Cancer Incidence:**
- Baseline: {baseline_stats['cancer_rate_noscreening']:.2%} β†’ {baseline_stats['cancer_rate_screening']:.2%} ({baseline_stats.get('cancer_reduction_pct', 0):.1f}% reduction)
- Adjusted: {stats['cancer_rate_noscreening']:.2%} β†’ {stats['cancer_rate_screening']:.2%} ({stats.get('cancer_reduction_pct', 0):.1f}% reduction)
- Change in cancer rate: {(stats['cancer_rate_screening'] - baseline_stats['cancer_rate_screening']) * 100:+.2f} percentage points
**Advanced Cancer:**
- Baseline: {baseline_stats['advca_rate_noscreening']:.2%} β†’ {baseline_stats['advca_rate_screening']:.2%} ({baseline_stats.get('advca_reduction_pct', 0):.1f}% reduction)
- Adjusted: {stats['advca_rate_noscreening']:.2%} β†’ {stats['advca_rate_screening']:.2%} ({stats.get('advca_reduction_pct', 0):.1f}% reduction)
- Change in advanced cancer rate: {(stats['advca_rate_screening'] - baseline_stats['advca_rate_screening']) * 100:+.2f} percentage points
**Key Changes:**
- Cancer prevention effect: {baseline_stats['cancer_prevented_rate']:.2%} β†’ {stats['cancer_prevented_rate']:.2%} ({(stats['cancer_prevented_rate'] - baseline_stats['cancer_prevented_rate']) * 100:+.2f} pp)
"""
return response
except:
return self._simple_response(scenario_params, stats)
def _simple_response(self, scenario_params: Dict, stats: Dict) -> str:
"""Simple fallback response without LLM"""
response = f"""
πŸ“Š Analysis Results (Based on {stats['total_cases']:,} real cases):
🎯 Cancer Incidence:
- Natural history: {stats['cancer_rate_natural']:.2%}
- With screening: {stats['cancer_rate_intervention']:.2%}
- Prevention effect: {stats['cancer_prevented_rate']:.2%}
🎯 Advanced Cancer Incidence:
- Natural history: {stats['advanced_rate_natural']:.2%}
- With screening: {stats['advanced_rate_intervention']:.2%}
- Prevention effect: {stats['advanced_prevented_rate']:.2%}
πŸ”¬ Screening Process:
- Participation rate: {stats['participation_rate']:.2%}
- Successful intervention rate: {stats['intervention_success_rate']:.2%}
"""
if scenario_params:
response += f"\nπŸ”§ Parameter Adjustments:\n"
for param, value in scenario_params.items():
response += f" - {param}: {value:.1%}\n"
return response
def answer_question(self, user_input: str) -> str:
"""Main method to answer user questions"""
scenario_params = self.parse_user_query(user_input)
# Debug: Show what parameters were parsed
if scenario_params:
print(f"πŸ”§ Parsed parameters: {scenario_params}")
self.simulator.simulate_screening(scenario_params)
stats = self.simulator.analyze_results()
if stats is None:
return "Analysis failed, please try again."
return self.generate_response(user_input, scenario_params, stats)
import gradio as gr
# --- Assume your classes are defined elsewhere ---
# from your_module import ScreeningSimulator, CancerScreeningLLMInterface
# Create simulator and interface only once, to persist between calls
simulator = ScreeningSimulator()
simulator.load_data()
llm_interface = CancerScreeningLLMInterface(simulator)
def answer_question_gradio(question):
if not question.strip():
return "Please enter a question."
try:
answer = llm_interface.answer_question(question)
return answer
except Exception as e:
return f"❌ Error: {e}"
# Create the Gradio Interface
iface = gr.Interface(
fn=answer_question_gradio,
inputs=gr.Textbox(lines=5, label="πŸ™‹β€β™€οΈ Please enter your question"),
outputs=gr.Textbox(lines=5, label="πŸ€– AI Analyst"),
title="πŸ”¬ Colorectal Cancer Screening AI Analysis System",
description="Ask a question about the effectiveness of CRC screening in Taiwanβ€”for example, regarding screening intervals, attendance rates, referral rates, or treatment success rates."
)
if __name__ == "__main__":
iface.launch(share=True)