def _simple_response(self, scenario_params: dict, stats: dict) -> str: """Simple fallback response without LLM""" if not scenario_params: return f"""Based on the analysis results, colorectal cancer screening reduces cancer incidence from {stats['cancer_rate_noscreening']:.2%} to {stats['cancer_rate_screening']:.2%} ({stats.get('cancer_reduction_pct', 0):.1f}% reduction) and advanced cancer from {stats['advca_rate_noscreening']:.2%} to {stats['advca_rate_screening']:.2%} ({stats.get('advca_reduction_pct', 0):.1f}% reduction). The cancer prevention effect is {stats['cancer_prevented_rate']:.2%} and the advanced cancer prevention rate is {stats['advca_prevented_rate']:.2%}. If you have specific parameter adjustments in mind, please provide them for a detailed comparison of the screening effectiveness with the adjusted parameters.""" response = f""" πŸ“Š Analysis Results (Based on {stats['total_cases']:,} real cases with fixed random seed): 🎯 Cancer Incidence: - No screening: {stats['cancer_rate_noscreening']:.2%} - With screening: {stats['cancer_rate_screening']:.2%} - Reduction: {stats.get('cancer_reduction_pct', 0):.1f}% 🎯 Advanced Cancer Incidence: - No screening: {stats['advca_rate_noscreening']:.2%} - With screening: {stats['advca_rate_screening']:.2%} - Reduction: {stats.get('advca_reduction_pct', 0):.1f}% πŸ”¬ Prevention Effects: - Cancer prevention effect: {stats['cancer_prevented_rate']:.2%} - Advanced cancer prevention: {stats['advca_prevented_rate']:.2%} """ if scenario_params: response += f"\nπŸ”§ Parameter Adjustments:\n" for param, value in scenario_params.items(): if param == 'screening_interval': response += f" - {param}: {value} years\n" else: response += f" - {param}: {value:.1%}\n" return response # Colorectal Cancer Screening AI Analysis System - English Version import pandas as pd import numpy as np import random import re import json import os from typing import Dict, List, Optional from langchain_openai import ChatOpenAI from langchain.schema import SystemMessage, HumanMessage import pyreadstat import requests # Set API Key os.environ["OPENAI_API_KEY"] ="OPENAI_API_KEY" # θ«‹ζ›Ώζ›η‚Ίζ‚¨ηš„ε―¦ιš› API Key class ScreeningSimulator: def __init__(self): self.params = { 'participation_rate': 0.40, 'sensitivity_state2': 0.44, 'sensitivity_state3': 0.52, 'sensitivity_state4': 0.61, 'sensitivity_state5': 0.70, 'referral_rate': 0.70, 'treatment_success': 0.90, 'screening_interval': 2 # years between screenings } self.data = None self.results = None # Fixed random seeds for reproducible results self.random_seed = 246810 self.numpy_seed = 13579 def load_data(self): """Load and sample the screening data""" # Set random seed for reproducible sampling np.random.seed(self.numpy_seed) random.seed(self.random_seed) DROPBOX_URL = "https://www.dropbox.com/scl/fi/p3n3g7h3wifzs26y0ylah/matched_with_error_all_noid.sas7bdat?rlkey=il8x2ur5xf1n5ivs84rwdlyto&st=jk0vcb2f&dl=1" LOCAL_FILENAME = "matched_with_error_all_noid.sas7bdat" def download_from_dropbox(url, out_path): if not os.path.exists(out_path): print("Downloading file from Dropbox...") with requests.get(url, stream=True) as r: r.raise_for_status() with open(out_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) print("Download complete.") else: print("File already exists, skipping download.") download_from_dropbox(DROPBOX_URL, LOCAL_FILENAME) # data_path = r"matched_with_error_all_noid.sas7bdat" # data_path = hf_hub_download(repo_id="Donlagon007/iaccs2025", filename="matched_with_error_all_noid.sas7bdat") full_data, meta = pyreadstat.read_sas7bdat(LOCAL_FILENAME) sample_size = int(len(full_data) * 0.01) self.data = full_data.sample(n=sample_size, random_state=self.random_seed).reset_index(drop=True) return True def simulate_screening(self, custom_params=None): """Simulate the screening process for each individual""" # Set random seed for reproducible simulation random.seed(self.random_seed) np.random.seed(self.numpy_seed) if custom_params: params = {**self.params, **custom_params} else: params = self.params results = [] for idx, row in self.data.iterrows(): # Extract vstate50-vstate80 (health states by age) vstate = {} for age in range(50, 81): col = f'vstate{age}' if col in row.index and pd.notna(row[col]): vstate[age] = int(row[col]) else: vstate[age] = 1 # Initialize intervention state (same as natural history initially) istate = vstate.copy() # Screening variables participated = False detected = False had_referral = False intervention_success = False intervention_age = None intervention_state = None # Determine if this person never participates (0% for now) never_participate = random.random() < 0 # If not a never-participant, go through screening process if not never_participate: # Screening process: starts at age 50, with custom interval, until age 74 screening_interval = int(params['screening_interval']) for screening_age in range(50, 75, screening_interval): current_state = istate[screening_age] # Skip if already dead or in terminal state if current_state in [6, 7]: continue # Check if participates in this screening round if random.random() <= params['participation_rate']: participated = True # Detection probability based on current state detection_prob = 0 if current_state in [2, 8]: # Adenoma states detection_prob = params['sensitivity_state2'] elif current_state in [3, 9]: # Advanced adenoma states detection_prob = params['sensitivity_state3'] elif current_state == 4: # Cancer detection_prob = params['sensitivity_state4'] elif current_state == 5: # Advanced cancer detection_prob = params['sensitivity_state5'] if detection_prob > 0 and random.random() <= detection_prob: detected = True # Referral for further investigation (colonoscopy) if random.random() <= params['referral_rate']: had_referral = True # Treatment success if random.random() <= params['treatment_success']: intervention_success = True intervention_age = screening_age intervention_state = current_state # Reset all future states to healthy (state 1) for treat_age in range(screening_age, 81): istate[treat_age] = 1 break # Only break if intervention was successful if intervention_success: break # Calculate outcome indicators cancer_noscreening = any(state in [4, 5, 6, 7] for state in vstate.values()) cancer_screening = any(state in [4, 5, 6, 7] for state in istate.values()) advca_noscreening = any(state in [5, 7] for state in vstate.values()) advca_screening = any(state in [5, 7] for state in istate.values()) # Overdiagnosis calculation - two types detected_state = None if intervention_success: detected_state = vstate[intervention_age] overdiag_adenoma = False # Overdiagnosis of adenoma (state 8) overdiag_advanced_adenoma = False # Overdiagnosis of advanced adenoma (state 9) adenoma_detected = False # Any adenoma detected (states 2,3,8,9) advanced_adenoma_detected = False # Advanced adenoma detected (states 3,9) if intervention_success and detected_state is not None: if detected_state in [2, 3, 8, 9]: # Adenoma detected adenoma_detected = True if detected_state in [8, 9]: # Overdiagnosis adenoma (8,9) overdiag_adenoma = True if detected_state in [3, 9]: # Advanced adenoma detected advanced_adenoma_detected = True if detected_state == 9: # Overdiagnosis advanced adenoma (9) overdiag_advanced_adenoma = True results.append({ 'cancer_noscreening': cancer_noscreening, 'cancer_screening': cancer_screening, 'advca_noscreening': advca_noscreening, 'advca_screening': advca_screening, 'participated': participated, 'detected': detected, 'had_referral': had_referral, 'intervention_success': intervention_success, 'adenoma_detected': adenoma_detected, 'advanced_adenoma_detected': advanced_adenoma_detected, 'overdiag_adenoma': overdiag_adenoma, 'overdiag_advanced_adenoma': overdiag_advanced_adenoma }) self.results = pd.DataFrame(results) return self.results def analyze_results(self): """Analyze simulation results and calculate key metrics""" df = self.results # Overdiagnosis rate calculation - two types adenoma_detected_count = df['adenoma_detected'].sum() advanced_adenoma_detected_count = df['advanced_adenoma_detected'].sum() overdiag_adenoma_count = df['overdiag_adenoma'].sum() overdiag_advanced_adenoma_count = df['overdiag_advanced_adenoma'].sum() overdiag_adenoma_rate = overdiag_adenoma_count / adenoma_detected_count if adenoma_detected_count > 0 else 0 overdiag_advanced_adenoma_rate = overdiag_advanced_adenoma_count / advanced_adenoma_detected_count if advanced_adenoma_detected_count > 0 else 0 stats = { 'total_cases': len(df), 'cancer_rate_noscreening': df['cancer_noscreening'].mean(), 'cancer_rate_screening': df['cancer_screening'].mean(), 'cancer_prevented_rate': (df['cancer_noscreening'] & ~df['cancer_screening']).mean(), 'advca_rate_noscreening': df['advca_noscreening'].mean(), 'advca_rate_screening': df['advca_screening'].mean(), 'advca_prevented_rate': (df['advca_noscreening'] & ~df['advca_screening']).mean(), 'participation_rate': df['participated'].mean(), 'detection_rate': df['detected'].mean(), 'referral_rate': df['had_referral'].mean(), 'intervention_success_rate': df['intervention_success'].mean(), # Two types of overdiagnosis rates 'overdiag_adenoma_rate': overdiag_adenoma_rate, 'overdiag_advanced_adenoma_rate': overdiag_advanced_adenoma_rate, # Detailed counts 'adenoma_detected_count': adenoma_detected_count, 'advanced_adenoma_detected_count': advanced_adenoma_detected_count, 'overdiag_adenoma_count': overdiag_adenoma_count, 'overdiag_advanced_adenoma_count': overdiag_advanced_adenoma_count } # Relative reduction percentages if stats['cancer_rate_noscreening'] > 0: stats['cancer_reduction_pct'] = (stats['cancer_rate_noscreening'] - stats['cancer_rate_screening']) / stats[ 'cancer_rate_noscreening'] * 100 stats['cancer_reduction_rate'] = 1 - (stats['cancer_rate_screening'] / stats['cancer_rate_noscreening']) if stats['advca_rate_noscreening'] > 0: stats['advca_reduction_pct'] = (stats['advca_rate_noscreening'] - stats['advca_rate_screening']) / stats[ 'advca_rate_noscreening'] * 100 stats['advca_reduction_rate'] = 1 - (stats['advca_rate_screening'] / stats['advca_rate_noscreening']) return stats class CancerScreeningLLMInterface: def __init__(self, simulator: ScreeningSimulator): self.simulator = simulator self.llm = ChatOpenAI(temperature=0.3, model="gpt-3.5-turbo", max_tokens=1000) def parse_user_query(self, user_input: str) -> dict: """Parse user question and extract screening parameter adjustments""" prompt = f""" Please analyze the user's question and extract screening parameter adjustments: User question: {user_input} Parameter descriptions: 1. participation_rate: Currently 40% 2. referral_rate: Currently 70% 3. treatment_success: Currently 90% 4. sensitivity_state2/3/4/5: Currently 44%/52%/61%/70% 5. screening_interval: Currently 2 years (between screenings) Return only JSON format with parameters the user wants to adjust. Example: {{"participation_rate": 0.60, "referral_rate": 0.90, "screening_interval": 1}} If no parameter adjustments, return: {{}} """ try: response = self.llm.invoke([HumanMessage(content=prompt)]) json_match = re.search(r'\{[^}]*\}', response.content) if json_match: return json.loads(json_match.group()) return {} except: return self._simple_parse(user_input) def _simple_parse(self, user_input: str) -> dict: """Simple fallback parsing using regex""" user_input = user_input.lower() params = {} numbers = re.findall(r'(\d+(?:\.\d+)?)', user_input) if ('participation' in user_input or 'participate' in user_input) and numbers: value = float(numbers[0]) params['participation_rate'] = value / 100 if value > 1 else value if ('referral' in user_input or 'colonoscopy' in user_input or 'enhance' in user_input) and numbers: value = float(numbers[0]) params['referral_rate'] = value / 100 if value > 1 else value if ('treatment' in user_input or 'success' in user_input) and numbers: value = float(numbers[0]) params['treatment_success'] = value / 100 if value > 1 else value if ('sensitivity' in user_input or 'detection' in user_input) and numbers: value = float(numbers[0]) sens_value = value / 100 if value > 1 else value # Apply to all sensitivity parameters params['sensitivity_state2'] = sens_value params['sensitivity_state3'] = sens_value params['sensitivity_state4'] = sens_value params['sensitivity_state5'] = sens_value if ( 'interval' in user_input or 'frequency' in user_input or 'yearly' in user_input or 'annual' in user_input) and numbers: value = float(numbers[0]) # Handle common expressions if 'annual' in user_input or 'yearly' in user_input or 'every year' in user_input: params['screening_interval'] = 1 elif 'biennial' in user_input or 'every 2 years' in user_input or 'two years' in user_input: params['screening_interval'] = 2 else: params['screening_interval'] = int(value) return params def generate_response(self, user_input: str, scenario_params: dict, stats: dict) -> str: """Generate AI response based on analysis results""" # For basic effectiveness questions without parameter changes if not scenario_params: return f"""Based on the analysis results, colorectal cancer screening reduces cancer incidence from {stats['cancer_rate_noscreening']:.2%} to {stats['cancer_rate_screening']:.2%} ({stats.get('cancer_reduction_pct', 0):.1f}% reduction) and advanced cancer from {stats['advca_rate_noscreening']:.2%} to {stats['advca_rate_screening']:.2%} ({stats.get('advca_reduction_pct', 0):.1f}% reduction). The cancer prevention effect is {stats['cancer_prevented_rate']:.2%} and the advanced cancer prevention rate is {stats['advca_prevented_rate']:.2%}. If you have specific parameter adjustments in mind, please provide them for a detailed comparison of the screening effectiveness with the adjusted parameters.""" # For questions with parameter adjustments, provide comparison try: # Run baseline simulation for comparison baseline_results = self.simulator.simulate_screening() baseline_stats = self.simulator.analyze_results() response = f"""With the parameter adjustment of {scenario_params}: **Cancer Incidence:** - Baseline: {baseline_stats['cancer_rate_noscreening']:.2%} β†’ {baseline_stats['cancer_rate_screening']:.2%} ({baseline_stats.get('cancer_reduction_pct', 0):.1f}% reduction) - Adjusted: {stats['cancer_rate_noscreening']:.2%} β†’ {stats['cancer_rate_screening']:.2%} ({stats.get('cancer_reduction_pct', 0):.1f}% reduction) - Change in cancer rate: {(stats['cancer_rate_screening'] - baseline_stats['cancer_rate_screening']) * 100:+.2f} percentage points **Advanced Cancer:** - Baseline: {baseline_stats['advca_rate_noscreening']:.2%} β†’ {baseline_stats['advca_rate_screening']:.2%} ({baseline_stats.get('advca_reduction_pct', 0):.1f}% reduction) - Adjusted: {stats['advca_rate_noscreening']:.2%} β†’ {stats['advca_rate_screening']:.2%} ({stats.get('advca_reduction_pct', 0):.1f}% reduction) - Change in advanced cancer rate: {(stats['advca_rate_screening'] - baseline_stats['advca_rate_screening']) * 100:+.2f} percentage points **Key Changes:** - Cancer prevention effect: {baseline_stats['cancer_prevented_rate']:.2%} β†’ {stats['cancer_prevented_rate']:.2%} ({(stats['cancer_prevented_rate'] - baseline_stats['cancer_prevented_rate']) * 100:+.2f} pp) """ return response except: return self._simple_response(scenario_params, stats) def _simple_response(self, scenario_params: Dict, stats: Dict) -> str: """Simple fallback response without LLM""" response = f""" πŸ“Š Analysis Results (Based on {stats['total_cases']:,} real cases): 🎯 Cancer Incidence: - Natural history: {stats['cancer_rate_natural']:.2%} - With screening: {stats['cancer_rate_intervention']:.2%} - Prevention effect: {stats['cancer_prevented_rate']:.2%} 🎯 Advanced Cancer Incidence: - Natural history: {stats['advanced_rate_natural']:.2%} - With screening: {stats['advanced_rate_intervention']:.2%} - Prevention effect: {stats['advanced_prevented_rate']:.2%} πŸ”¬ Screening Process: - Participation rate: {stats['participation_rate']:.2%} - Successful intervention rate: {stats['intervention_success_rate']:.2%} """ if scenario_params: response += f"\nπŸ”§ Parameter Adjustments:\n" for param, value in scenario_params.items(): response += f" - {param}: {value:.1%}\n" return response def answer_question(self, user_input: str) -> str: """Main method to answer user questions""" scenario_params = self.parse_user_query(user_input) # Debug: Show what parameters were parsed if scenario_params: print(f"πŸ”§ Parsed parameters: {scenario_params}") self.simulator.simulate_screening(scenario_params) stats = self.simulator.analyze_results() if stats is None: return "Analysis failed, please try again." return self.generate_response(user_input, scenario_params, stats) import gradio as gr # --- Assume your classes are defined elsewhere --- # from your_module import ScreeningSimulator, CancerScreeningLLMInterface # Create simulator and interface only once, to persist between calls simulator = ScreeningSimulator() simulator.load_data() llm_interface = CancerScreeningLLMInterface(simulator) def answer_question_gradio(question): if not question.strip(): return "Please enter a question." try: answer = llm_interface.answer_question(question) return answer except Exception as e: return f"❌ Error: {e}" # Create the Gradio Interface iface = gr.Interface( fn=answer_question_gradio, inputs=gr.Textbox(lines=5, label="πŸ™‹β€β™€οΈ Please enter your question"), outputs=gr.Textbox(lines=5, label="πŸ€– AI Analyst"), title="πŸ”¬ Colorectal Cancer Screening AI Analysis System", description="Ask a question about the effectiveness of CRC screening in Taiwanβ€”for example, regarding screening intervals, attendance rates, referral rates, or treatment success rates." ) if __name__ == "__main__": iface.launch(share=True)