Spaces:
Sleeping
Sleeping
| def _simple_response(self, scenario_params: dict, stats: dict) -> str: | |
| """Simple fallback response without LLM""" | |
| if not scenario_params: | |
| return f"""Based on the analysis results, colorectal cancer screening reduces cancer incidence from {stats['cancer_rate_noscreening']:.2%} to {stats['cancer_rate_screening']:.2%} ({stats.get('cancer_reduction_pct', 0):.1f}% reduction) and advanced cancer from {stats['advca_rate_noscreening']:.2%} to {stats['advca_rate_screening']:.2%} ({stats.get('advca_reduction_pct', 0):.1f}% reduction). The cancer prevention effect is {stats['cancer_prevented_rate']:.2%} and the advanced cancer prevention rate is {stats['advca_prevented_rate']:.2%}. | |
| If you have specific parameter adjustments in mind, please provide them for a detailed comparison of the screening effectiveness with the adjusted parameters.""" | |
| response = f""" | |
| π Analysis Results (Based on {stats['total_cases']:,} real cases with fixed random seed): | |
| π― Cancer Incidence: | |
| - No screening: {stats['cancer_rate_noscreening']:.2%} | |
| - With screening: {stats['cancer_rate_screening']:.2%} | |
| - Reduction: {stats.get('cancer_reduction_pct', 0):.1f}% | |
| π― Advanced Cancer Incidence: | |
| - No screening: {stats['advca_rate_noscreening']:.2%} | |
| - With screening: {stats['advca_rate_screening']:.2%} | |
| - Reduction: {stats.get('advca_reduction_pct', 0):.1f}% | |
| π¬ Prevention Effects: | |
| - Cancer prevention effect: {stats['cancer_prevented_rate']:.2%} | |
| - Advanced cancer prevention: {stats['advca_prevented_rate']:.2%} | |
| """ | |
| if scenario_params: | |
| response += f"\nπ§ Parameter Adjustments:\n" | |
| for param, value in scenario_params.items(): | |
| if param == 'screening_interval': | |
| response += f" - {param}: {value} years\n" | |
| else: | |
| response += f" - {param}: {value:.1%}\n" | |
| return response # Colorectal Cancer Screening AI Analysis System - English Version | |
| import pandas as pd | |
| import numpy as np | |
| import random | |
| import re | |
| import json | |
| import os | |
| from typing import Dict, List, Optional | |
| from langchain_openai import ChatOpenAI | |
| from langchain.schema import SystemMessage, HumanMessage | |
| import pyreadstat | |
| import requests | |
| # Set API Key | |
| os.environ["OPENAI_API_KEY"] ="OPENAI_API_KEY" # θ«ζΏζηΊζ¨ηε―¦ι API Key | |
| class ScreeningSimulator: | |
| def __init__(self): | |
| self.params = { | |
| 'participation_rate': 0.40, | |
| 'sensitivity_state2': 0.44, | |
| 'sensitivity_state3': 0.52, | |
| 'sensitivity_state4': 0.61, | |
| 'sensitivity_state5': 0.70, | |
| 'referral_rate': 0.70, | |
| 'treatment_success': 0.90, | |
| 'screening_interval': 2 # years between screenings | |
| } | |
| self.data = None | |
| self.results = None | |
| # Fixed random seeds for reproducible results | |
| self.random_seed = 246810 | |
| self.numpy_seed = 13579 | |
| def load_data(self): | |
| """Load and sample the screening data""" | |
| # Set random seed for reproducible sampling | |
| np.random.seed(self.numpy_seed) | |
| random.seed(self.random_seed) | |
| DROPBOX_URL = "https://www.dropbox.com/scl/fi/p3n3g7h3wifzs26y0ylah/matched_with_error_all_noid.sas7bdat?rlkey=il8x2ur5xf1n5ivs84rwdlyto&st=jk0vcb2f&dl=1" | |
| LOCAL_FILENAME = "matched_with_error_all_noid.sas7bdat" | |
| def download_from_dropbox(url, out_path): | |
| if not os.path.exists(out_path): | |
| print("Downloading file from Dropbox...") | |
| with requests.get(url, stream=True) as r: | |
| r.raise_for_status() | |
| with open(out_path, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print("Download complete.") | |
| else: | |
| print("File already exists, skipping download.") | |
| download_from_dropbox(DROPBOX_URL, LOCAL_FILENAME) | |
| # data_path = r"matched_with_error_all_noid.sas7bdat" | |
| # data_path = hf_hub_download(repo_id="Donlagon007/iaccs2025", filename="matched_with_error_all_noid.sas7bdat") | |
| full_data, meta = pyreadstat.read_sas7bdat(LOCAL_FILENAME) | |
| sample_size = int(len(full_data) * 0.01) | |
| self.data = full_data.sample(n=sample_size, random_state=self.random_seed).reset_index(drop=True) | |
| return True | |
| def simulate_screening(self, custom_params=None): | |
| """Simulate the screening process for each individual""" | |
| # Set random seed for reproducible simulation | |
| random.seed(self.random_seed) | |
| np.random.seed(self.numpy_seed) | |
| if custom_params: | |
| params = {**self.params, **custom_params} | |
| else: | |
| params = self.params | |
| results = [] | |
| for idx, row in self.data.iterrows(): | |
| # Extract vstate50-vstate80 (health states by age) | |
| vstate = {} | |
| for age in range(50, 81): | |
| col = f'vstate{age}' | |
| if col in row.index and pd.notna(row[col]): | |
| vstate[age] = int(row[col]) | |
| else: | |
| vstate[age] = 1 | |
| # Initialize intervention state (same as natural history initially) | |
| istate = vstate.copy() | |
| # Screening variables | |
| participated = False | |
| detected = False | |
| had_referral = False | |
| intervention_success = False | |
| intervention_age = None | |
| intervention_state = None | |
| # Determine if this person never participates (0% for now) | |
| never_participate = random.random() < 0 | |
| # If not a never-participant, go through screening process | |
| if not never_participate: | |
| # Screening process: starts at age 50, with custom interval, until age 74 | |
| screening_interval = int(params['screening_interval']) | |
| for screening_age in range(50, 75, screening_interval): | |
| current_state = istate[screening_age] | |
| # Skip if already dead or in terminal state | |
| if current_state in [6, 7]: | |
| continue | |
| # Check if participates in this screening round | |
| if random.random() <= params['participation_rate']: | |
| participated = True | |
| # Detection probability based on current state | |
| detection_prob = 0 | |
| if current_state in [2, 8]: # Adenoma states | |
| detection_prob = params['sensitivity_state2'] | |
| elif current_state in [3, 9]: # Advanced adenoma states | |
| detection_prob = params['sensitivity_state3'] | |
| elif current_state == 4: # Cancer | |
| detection_prob = params['sensitivity_state4'] | |
| elif current_state == 5: # Advanced cancer | |
| detection_prob = params['sensitivity_state5'] | |
| if detection_prob > 0 and random.random() <= detection_prob: | |
| detected = True | |
| # Referral for further investigation (colonoscopy) | |
| if random.random() <= params['referral_rate']: | |
| had_referral = True | |
| # Treatment success | |
| if random.random() <= params['treatment_success']: | |
| intervention_success = True | |
| intervention_age = screening_age | |
| intervention_state = current_state | |
| # Reset all future states to healthy (state 1) | |
| for treat_age in range(screening_age, 81): | |
| istate[treat_age] = 1 | |
| break | |
| # Only break if intervention was successful | |
| if intervention_success: | |
| break | |
| # Calculate outcome indicators | |
| cancer_noscreening = any(state in [4, 5, 6, 7] for state in vstate.values()) | |
| cancer_screening = any(state in [4, 5, 6, 7] for state in istate.values()) | |
| advca_noscreening = any(state in [5, 7] for state in vstate.values()) | |
| advca_screening = any(state in [5, 7] for state in istate.values()) | |
| # Overdiagnosis calculation - two types | |
| detected_state = None | |
| if intervention_success: | |
| detected_state = vstate[intervention_age] | |
| overdiag_adenoma = False # Overdiagnosis of adenoma (state 8) | |
| overdiag_advanced_adenoma = False # Overdiagnosis of advanced adenoma (state 9) | |
| adenoma_detected = False # Any adenoma detected (states 2,3,8,9) | |
| advanced_adenoma_detected = False # Advanced adenoma detected (states 3,9) | |
| if intervention_success and detected_state is not None: | |
| if detected_state in [2, 3, 8, 9]: # Adenoma detected | |
| adenoma_detected = True | |
| if detected_state in [8, 9]: # Overdiagnosis adenoma (8,9) | |
| overdiag_adenoma = True | |
| if detected_state in [3, 9]: # Advanced adenoma detected | |
| advanced_adenoma_detected = True | |
| if detected_state == 9: # Overdiagnosis advanced adenoma (9) | |
| overdiag_advanced_adenoma = True | |
| results.append({ | |
| 'cancer_noscreening': cancer_noscreening, | |
| 'cancer_screening': cancer_screening, | |
| 'advca_noscreening': advca_noscreening, | |
| 'advca_screening': advca_screening, | |
| 'participated': participated, | |
| 'detected': detected, | |
| 'had_referral': had_referral, | |
| 'intervention_success': intervention_success, | |
| 'adenoma_detected': adenoma_detected, | |
| 'advanced_adenoma_detected': advanced_adenoma_detected, | |
| 'overdiag_adenoma': overdiag_adenoma, | |
| 'overdiag_advanced_adenoma': overdiag_advanced_adenoma | |
| }) | |
| self.results = pd.DataFrame(results) | |
| return self.results | |
| def analyze_results(self): | |
| """Analyze simulation results and calculate key metrics""" | |
| df = self.results | |
| # Overdiagnosis rate calculation - two types | |
| adenoma_detected_count = df['adenoma_detected'].sum() | |
| advanced_adenoma_detected_count = df['advanced_adenoma_detected'].sum() | |
| overdiag_adenoma_count = df['overdiag_adenoma'].sum() | |
| overdiag_advanced_adenoma_count = df['overdiag_advanced_adenoma'].sum() | |
| overdiag_adenoma_rate = overdiag_adenoma_count / adenoma_detected_count if adenoma_detected_count > 0 else 0 | |
| overdiag_advanced_adenoma_rate = overdiag_advanced_adenoma_count / advanced_adenoma_detected_count if advanced_adenoma_detected_count > 0 else 0 | |
| stats = { | |
| 'total_cases': len(df), | |
| 'cancer_rate_noscreening': df['cancer_noscreening'].mean(), | |
| 'cancer_rate_screening': df['cancer_screening'].mean(), | |
| 'cancer_prevented_rate': (df['cancer_noscreening'] & ~df['cancer_screening']).mean(), | |
| 'advca_rate_noscreening': df['advca_noscreening'].mean(), | |
| 'advca_rate_screening': df['advca_screening'].mean(), | |
| 'advca_prevented_rate': (df['advca_noscreening'] & ~df['advca_screening']).mean(), | |
| 'participation_rate': df['participated'].mean(), | |
| 'detection_rate': df['detected'].mean(), | |
| 'referral_rate': df['had_referral'].mean(), | |
| 'intervention_success_rate': df['intervention_success'].mean(), | |
| # Two types of overdiagnosis rates | |
| 'overdiag_adenoma_rate': overdiag_adenoma_rate, | |
| 'overdiag_advanced_adenoma_rate': overdiag_advanced_adenoma_rate, | |
| # Detailed counts | |
| 'adenoma_detected_count': adenoma_detected_count, | |
| 'advanced_adenoma_detected_count': advanced_adenoma_detected_count, | |
| 'overdiag_adenoma_count': overdiag_adenoma_count, | |
| 'overdiag_advanced_adenoma_count': overdiag_advanced_adenoma_count | |
| } | |
| # Relative reduction percentages | |
| if stats['cancer_rate_noscreening'] > 0: | |
| stats['cancer_reduction_pct'] = (stats['cancer_rate_noscreening'] - stats['cancer_rate_screening']) / stats[ | |
| 'cancer_rate_noscreening'] * 100 | |
| stats['cancer_reduction_rate'] = 1 - (stats['cancer_rate_screening'] / stats['cancer_rate_noscreening']) | |
| if stats['advca_rate_noscreening'] > 0: | |
| stats['advca_reduction_pct'] = (stats['advca_rate_noscreening'] - stats['advca_rate_screening']) / stats[ | |
| 'advca_rate_noscreening'] * 100 | |
| stats['advca_reduction_rate'] = 1 - (stats['advca_rate_screening'] / stats['advca_rate_noscreening']) | |
| return stats | |
| class CancerScreeningLLMInterface: | |
| def __init__(self, simulator: ScreeningSimulator): | |
| self.simulator = simulator | |
| self.llm = ChatOpenAI(temperature=0.3, model="gpt-3.5-turbo", max_tokens=1000) | |
| def parse_user_query(self, user_input: str) -> dict: | |
| """Parse user question and extract screening parameter adjustments""" | |
| prompt = f""" | |
| Please analyze the user's question and extract screening parameter adjustments: | |
| User question: {user_input} | |
| Parameter descriptions: | |
| 1. participation_rate: Currently 40% | |
| 2. referral_rate: Currently 70% | |
| 3. treatment_success: Currently 90% | |
| 4. sensitivity_state2/3/4/5: Currently 44%/52%/61%/70% | |
| 5. screening_interval: Currently 2 years (between screenings) | |
| Return only JSON format with parameters the user wants to adjust. | |
| Example: {{"participation_rate": 0.60, "referral_rate": 0.90, "screening_interval": 1}} | |
| If no parameter adjustments, return: {{}} | |
| """ | |
| try: | |
| response = self.llm.invoke([HumanMessage(content=prompt)]) | |
| json_match = re.search(r'\{[^}]*\}', response.content) | |
| if json_match: | |
| return json.loads(json_match.group()) | |
| return {} | |
| except: | |
| return self._simple_parse(user_input) | |
| def _simple_parse(self, user_input: str) -> dict: | |
| """Simple fallback parsing using regex""" | |
| user_input = user_input.lower() | |
| params = {} | |
| numbers = re.findall(r'(\d+(?:\.\d+)?)', user_input) | |
| if ('participation' in user_input or 'participate' in user_input) and numbers: | |
| value = float(numbers[0]) | |
| params['participation_rate'] = value / 100 if value > 1 else value | |
| if ('referral' in user_input or 'colonoscopy' in user_input or 'enhance' in user_input) and numbers: | |
| value = float(numbers[0]) | |
| params['referral_rate'] = value / 100 if value > 1 else value | |
| if ('treatment' in user_input or 'success' in user_input) and numbers: | |
| value = float(numbers[0]) | |
| params['treatment_success'] = value / 100 if value > 1 else value | |
| if ('sensitivity' in user_input or 'detection' in user_input) and numbers: | |
| value = float(numbers[0]) | |
| sens_value = value / 100 if value > 1 else value | |
| # Apply to all sensitivity parameters | |
| params['sensitivity_state2'] = sens_value | |
| params['sensitivity_state3'] = sens_value | |
| params['sensitivity_state4'] = sens_value | |
| params['sensitivity_state5'] = sens_value | |
| if ( | |
| 'interval' in user_input or 'frequency' in user_input or 'yearly' in user_input or 'annual' in user_input) and numbers: | |
| value = float(numbers[0]) | |
| # Handle common expressions | |
| if 'annual' in user_input or 'yearly' in user_input or 'every year' in user_input: | |
| params['screening_interval'] = 1 | |
| elif 'biennial' in user_input or 'every 2 years' in user_input or 'two years' in user_input: | |
| params['screening_interval'] = 2 | |
| else: | |
| params['screening_interval'] = int(value) | |
| return params | |
| def generate_response(self, user_input: str, scenario_params: dict, stats: dict) -> str: | |
| """Generate AI response based on analysis results""" | |
| # For basic effectiveness questions without parameter changes | |
| if not scenario_params: | |
| return f"""Based on the analysis results, colorectal cancer screening reduces cancer incidence from {stats['cancer_rate_noscreening']:.2%} to {stats['cancer_rate_screening']:.2%} ({stats.get('cancer_reduction_pct', 0):.1f}% reduction) and advanced cancer from {stats['advca_rate_noscreening']:.2%} to {stats['advca_rate_screening']:.2%} ({stats.get('advca_reduction_pct', 0):.1f}% reduction). The cancer prevention effect is {stats['cancer_prevented_rate']:.2%} and the advanced cancer prevention rate is {stats['advca_prevented_rate']:.2%}. | |
| If you have specific parameter adjustments in mind, please provide them for a detailed comparison of the screening effectiveness with the adjusted parameters.""" | |
| # For questions with parameter adjustments, provide comparison | |
| try: | |
| # Run baseline simulation for comparison | |
| baseline_results = self.simulator.simulate_screening() | |
| baseline_stats = self.simulator.analyze_results() | |
| response = f"""With the parameter adjustment of {scenario_params}: | |
| **Cancer Incidence:** | |
| - Baseline: {baseline_stats['cancer_rate_noscreening']:.2%} β {baseline_stats['cancer_rate_screening']:.2%} ({baseline_stats.get('cancer_reduction_pct', 0):.1f}% reduction) | |
| - Adjusted: {stats['cancer_rate_noscreening']:.2%} β {stats['cancer_rate_screening']:.2%} ({stats.get('cancer_reduction_pct', 0):.1f}% reduction) | |
| - Change in cancer rate: {(stats['cancer_rate_screening'] - baseline_stats['cancer_rate_screening']) * 100:+.2f} percentage points | |
| **Advanced Cancer:** | |
| - Baseline: {baseline_stats['advca_rate_noscreening']:.2%} β {baseline_stats['advca_rate_screening']:.2%} ({baseline_stats.get('advca_reduction_pct', 0):.1f}% reduction) | |
| - Adjusted: {stats['advca_rate_noscreening']:.2%} β {stats['advca_rate_screening']:.2%} ({stats.get('advca_reduction_pct', 0):.1f}% reduction) | |
| - Change in advanced cancer rate: {(stats['advca_rate_screening'] - baseline_stats['advca_rate_screening']) * 100:+.2f} percentage points | |
| **Key Changes:** | |
| - Cancer prevention effect: {baseline_stats['cancer_prevented_rate']:.2%} β {stats['cancer_prevented_rate']:.2%} ({(stats['cancer_prevented_rate'] - baseline_stats['cancer_prevented_rate']) * 100:+.2f} pp) | |
| """ | |
| return response | |
| except: | |
| return self._simple_response(scenario_params, stats) | |
| def _simple_response(self, scenario_params: Dict, stats: Dict) -> str: | |
| """Simple fallback response without LLM""" | |
| response = f""" | |
| π Analysis Results (Based on {stats['total_cases']:,} real cases): | |
| π― Cancer Incidence: | |
| - Natural history: {stats['cancer_rate_natural']:.2%} | |
| - With screening: {stats['cancer_rate_intervention']:.2%} | |
| - Prevention effect: {stats['cancer_prevented_rate']:.2%} | |
| π― Advanced Cancer Incidence: | |
| - Natural history: {stats['advanced_rate_natural']:.2%} | |
| - With screening: {stats['advanced_rate_intervention']:.2%} | |
| - Prevention effect: {stats['advanced_prevented_rate']:.2%} | |
| π¬ Screening Process: | |
| - Participation rate: {stats['participation_rate']:.2%} | |
| - Successful intervention rate: {stats['intervention_success_rate']:.2%} | |
| """ | |
| if scenario_params: | |
| response += f"\nπ§ Parameter Adjustments:\n" | |
| for param, value in scenario_params.items(): | |
| response += f" - {param}: {value:.1%}\n" | |
| return response | |
| def answer_question(self, user_input: str) -> str: | |
| """Main method to answer user questions""" | |
| scenario_params = self.parse_user_query(user_input) | |
| # Debug: Show what parameters were parsed | |
| if scenario_params: | |
| print(f"π§ Parsed parameters: {scenario_params}") | |
| self.simulator.simulate_screening(scenario_params) | |
| stats = self.simulator.analyze_results() | |
| if stats is None: | |
| return "Analysis failed, please try again." | |
| return self.generate_response(user_input, scenario_params, stats) | |
| import gradio as gr | |
| # --- Assume your classes are defined elsewhere --- | |
| # from your_module import ScreeningSimulator, CancerScreeningLLMInterface | |
| # Create simulator and interface only once, to persist between calls | |
| simulator = ScreeningSimulator() | |
| simulator.load_data() | |
| llm_interface = CancerScreeningLLMInterface(simulator) | |
| def answer_question_gradio(question): | |
| if not question.strip(): | |
| return "Please enter a question." | |
| try: | |
| answer = llm_interface.answer_question(question) | |
| return answer | |
| except Exception as e: | |
| return f"β Error: {e}" | |
| # Create the Gradio Interface | |
| iface = gr.Interface( | |
| fn=answer_question_gradio, | |
| inputs=gr.Textbox(lines=5, label="πββοΈ Please enter your question"), | |
| outputs=gr.Textbox(lines=5, label="π€ AI Analyst"), | |
| title="π¬ Colorectal Cancer Screening AI Analysis System", | |
| description="Ask a question about the effectiveness of CRC screening in Taiwanβfor example, regarding screening intervals, attendance rates, referral rates, or treatment success rates." | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch(share=True) | |