Spaces:
Sleeping
Sleeping
| """ | |
| Advanced Adverse Drug Reaction (ADR) Analysis Tools | |
| This module provides comprehensive pharmacovigilance capabilities including: | |
| - Enhanced FAERS database searches with filtering | |
| - Naranjo probability scale calculator | |
| - Disproportionality analysis (PRR, ROR, IC) | |
| - Case similarity analysis | |
| - Temporal pattern analysis | |
| """ | |
| import requests | |
| import re | |
| import math | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from collections import defaultdict, Counter | |
| from caching import with_caching | |
| from utils import with_error_handling, make_api_request | |
| logger = logging.getLogger(__name__) | |
| def enhanced_faers_search( | |
| drug_name: str, | |
| adverse_event: str = None, | |
| age_range: str = None, | |
| gender: str = None, | |
| serious_only: bool = False, | |
| limit: int = 100 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Enhanced FAERS search with filtering capabilities for pharmacovigilance analysis. | |
| Args: | |
| drug_name: Drug name to search for | |
| adverse_event: Specific adverse event/reaction to filter by (optional) | |
| age_range: Age range filter like "18-65" or ">65" (optional) | |
| gender: Gender filter "1" (male) or "2" (female) (optional) | |
| serious_only: If True, only return serious adverse events | |
| limit: Maximum number of results (default 100) | |
| Returns: | |
| Dict with enhanced case data including demographics, outcomes, and temporal info | |
| """ | |
| if not drug_name or not drug_name.strip(): | |
| raise ValueError("Drug name cannot be empty") | |
| # Build search query | |
| search_parts = [f'patient.drug.medicinalproduct:"{drug_name.strip()}"'] | |
| if adverse_event: | |
| search_parts.append(f'patient.reaction.reactionmeddrapt:"{adverse_event.strip()}"') | |
| if serious_only: | |
| search_parts.append('serious:"1"') | |
| if gender in ["1", "2"]: | |
| search_parts.append(f'patient.patientsex:"{gender}"') | |
| search_query = " AND ".join(search_parts) | |
| base_url = "https://api.fda.gov/drug/event.json" | |
| query_params = { | |
| "search": search_query, | |
| "limit": min(max(1, limit), 1000) | |
| } | |
| response = make_api_request(base_url, query_params, timeout=15) | |
| if response.status_code != 200: | |
| if response.status_code == 404: | |
| return { | |
| "cases": [], | |
| "total_found": 0, | |
| "query_info": { | |
| "drug": drug_name, | |
| "adverse_event": adverse_event, | |
| "filters_applied": { | |
| "age_range": age_range, | |
| "gender": gender, | |
| "serious_only": serious_only | |
| } | |
| }, | |
| "message": "No matching cases found" | |
| } | |
| raise requests.exceptions.RequestException(f"Enhanced FAERS search failed: {response.status_code}") | |
| data = response.json() | |
| cases = [] | |
| for rec in data.get("results", []): | |
| case = extract_case_details(rec, age_range) | |
| if case: # Only include if age filter passes | |
| cases.append(case) | |
| # Calculate summary statistics | |
| summary_stats = calculate_case_statistics(cases) | |
| return { | |
| "cases": cases, | |
| "total_found": data.get("meta", {}).get("results", {}).get("total", 0), | |
| "filtered_count": len(cases), | |
| "query_info": { | |
| "drug": drug_name, | |
| "adverse_event": adverse_event, | |
| "filters_applied": { | |
| "age_range": age_range, | |
| "gender": gender, | |
| "serious_only": serious_only | |
| } | |
| }, | |
| "summary_statistics": summary_stats | |
| } | |
| def extract_case_details(rec: Dict, age_range: str = None) -> Optional[Dict]: | |
| """Extract and structure case details from FAERS record.""" | |
| patient = rec.get("patient", {}) | |
| # Extract patient demographics | |
| age = patient.get("patientagegroup") | |
| age_years = patient.get("patientage") | |
| gender = patient.get("patientsex") | |
| # Apply age filter if specified | |
| if age_range and age_years: | |
| try: | |
| age_num = float(age_years) | |
| if not passes_age_filter(age_num, age_range): | |
| return None | |
| except (ValueError, TypeError): | |
| pass | |
| # Extract drug information | |
| drugs = [] | |
| for drug in patient.get("drug", []): | |
| drug_info = { | |
| "name": drug.get("medicinalproduct", ""), | |
| "characterization": drug.get("drugcharacterization"), # 1=suspect, 2=concomitant, 3=interacting | |
| "indication": drug.get("drugindication", ""), | |
| "start_date": drug.get("drugstartdate", ""), | |
| "end_date": drug.get("drugenddate", ""), | |
| "dosage": drug.get("drugdosagetext", ""), | |
| "route": drug.get("drugadministrationroute", "") | |
| } | |
| drugs.append(drug_info) | |
| # Extract reactions | |
| reactions = [] | |
| for reaction in patient.get("reaction", []): | |
| reaction_info = { | |
| "term": reaction.get("reactionmeddrapt", ""), | |
| "outcome": reaction.get("reactionoutcome") # 1=recovered, 2=recovering, 3=not recovered, 4=recovered with sequelae, 5=fatal, 6=unknown | |
| } | |
| reactions.append(reaction_info) | |
| # Extract seriousness criteria | |
| seriousness = { | |
| "serious": bool(int(rec.get("serious", "0"))), | |
| "death": bool(int(rec.get("seriousnessdeath", "0"))), | |
| "life_threatening": bool(int(rec.get("seriousnesslifethreatening", "0"))), | |
| "hospitalization": bool(int(rec.get("seriousnesshospitalization", "0"))), | |
| "disability": bool(int(rec.get("seriousnessdisabling", "0"))), | |
| "congenital_anomaly": bool(int(rec.get("seriousnesscongenitalanomali", "0"))), | |
| "other_serious": bool(int(rec.get("seriousnessother", "0"))) | |
| } | |
| return { | |
| "safety_report_id": rec.get("safetyreportid"), | |
| "receive_date": rec.get("receivedate"), | |
| "patient": { | |
| "age": age_years, | |
| "age_group": age, | |
| "gender": gender, # 1=male, 2=female | |
| "weight": patient.get("patientweight") | |
| }, | |
| "drugs": drugs, | |
| "reactions": reactions, | |
| "seriousness": seriousness, | |
| "reporter_qualification": rec.get("primarysource", {}).get("qualification"), # 1=physician, 2=pharmacist, etc. | |
| "country": rec.get("occurcountry") | |
| } | |
| def passes_age_filter(age: float, age_range: str) -> bool: | |
| """Check if age passes the specified filter.""" | |
| age_range = age_range.strip() | |
| if age_range.startswith(">"): | |
| threshold = float(age_range[1:]) | |
| return age > threshold | |
| elif age_range.startswith("<"): | |
| threshold = float(age_range[1:]) | |
| return age < threshold | |
| elif age_range.startswith(">="): | |
| threshold = float(age_range[2:]) | |
| return age >= threshold | |
| elif age_range.startswith("<="): | |
| threshold = float(age_range[2:]) | |
| return age <= threshold | |
| elif "-" in age_range: | |
| min_age, max_age = map(float, age_range.split("-")) | |
| return min_age <= age <= max_age | |
| return True | |
| def calculate_case_statistics(cases: List[Dict]) -> Dict[str, Any]: | |
| """Calculate summary statistics from case data.""" | |
| if not cases: | |
| return {} | |
| # Demographics | |
| ages = [float(case["patient"]["age"]) for case in cases if case["patient"]["age"]] | |
| genders = [case["patient"]["gender"] for case in cases if case["patient"]["gender"]] | |
| # Outcomes | |
| serious_cases = sum(1 for case in cases if case["seriousness"]["serious"]) | |
| fatal_cases = sum(1 for case in cases if case["seriousness"]["death"]) | |
| # Reporter types | |
| reporter_types = [case["reporter_qualification"] for case in cases if case["reporter_qualification"]] | |
| # Most common reactions | |
| all_reactions = [] | |
| for case in cases: | |
| all_reactions.extend([r["term"] for r in case["reactions"]]) | |
| reaction_counts = Counter(all_reactions) | |
| stats = { | |
| "total_cases": len(cases), | |
| "serious_cases": serious_cases, | |
| "serious_percentage": round(serious_cases / len(cases) * 100, 1), | |
| "fatal_cases": fatal_cases, | |
| "fatal_percentage": round(fatal_cases / len(cases) * 100, 1) if len(cases) > 0 else 0, | |
| "demographics": { | |
| "age_stats": { | |
| "mean": round(sum(ages) / len(ages), 1) if ages else None, | |
| "median": sorted(ages)[len(ages)//2] if ages else None, | |
| "range": [min(ages), max(ages)] if ages else None | |
| }, | |
| "gender_distribution": dict(Counter(genders)) | |
| }, | |
| "top_reactions": dict(reaction_counts.most_common(10)), | |
| "reporter_types": dict(Counter(reporter_types)) | |
| } | |
| return stats | |
| def calculate_naranjo_score( | |
| adverse_reaction_after_drug: str, # "yes", "no", "unknown" | |
| reaction_improved_after_stopping: str, # "yes", "no", "unknown" | |
| reaction_reappeared_after_readministration: str, # "yes", "no", "unknown" | |
| alternative_causes_exist: str, # "yes", "no", "unknown" | |
| reaction_when_placebo_given: str, # "yes", "no", "unknown" | |
| drug_detected_in_blood: str, # "yes", "no", "unknown" | |
| reaction_worse_with_higher_dose: str, # "yes", "no", "unknown" | |
| similar_reaction_to_drug_before: str, # "yes", "no", "unknown" | |
| adverse_event_confirmed_objectively: str, # "yes", "no", "unknown" | |
| reaction_appeared_after_suspected_drug_given: str # "yes", "no", "unknown" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Calculate Naranjo Adverse Drug Reaction Probability Scale. | |
| The Naranjo scale helps determine the likelihood that an adverse event | |
| is related to drug therapy rather than other factors. | |
| Args: | |
| All parameters should be "yes", "no", or "unknown" | |
| Returns: | |
| Dict with score, probability category, and detailed breakdown | |
| """ | |
| # Naranjo scoring system | |
| questions = [ | |
| { | |
| "question": "Are there previous conclusive reports on this reaction?", | |
| "answer": adverse_reaction_after_drug, | |
| "scores": {"yes": 1, "no": 0, "unknown": 0} | |
| }, | |
| { | |
| "question": "Did the adverse event appear after the suspected drug was administered?", | |
| "answer": reaction_appeared_after_suspected_drug_given, | |
| "scores": {"yes": 2, "no": -1, "unknown": 0} | |
| }, | |
| { | |
| "question": "Did the adverse reaction improve when the drug was discontinued or a specific antagonist was administered?", | |
| "answer": reaction_improved_after_stopping, | |
| "scores": {"yes": 1, "no": 0, "unknown": 0} | |
| }, | |
| { | |
| "question": "Did the adverse reaction reappear when the drug was readministered?", | |
| "answer": reaction_reappeared_after_readministration, | |
| "scores": {"yes": 2, "no": -1, "unknown": 0} | |
| }, | |
| { | |
| "question": "Are there alternative causes (other than the drug) that could on their own have caused the reaction?", | |
| "answer": alternative_causes_exist, | |
| "scores": {"yes": -1, "no": 2, "unknown": 0} | |
| }, | |
| { | |
| "question": "Did the reaction reappear when a placebo was given?", | |
| "answer": reaction_when_placebo_given, | |
| "scores": {"yes": -1, "no": 1, "unknown": 0} | |
| }, | |
| { | |
| "question": "Was the drug detected in blood (or other fluids) in concentrations known to be toxic?", | |
| "answer": drug_detected_in_blood, | |
| "scores": {"yes": 1, "no": 0, "unknown": 0} | |
| }, | |
| { | |
| "question": "Was the reaction more severe when the dose was increased or less severe when the dose was decreased?", | |
| "answer": reaction_worse_with_higher_dose, | |
| "scores": {"yes": 1, "no": 0, "unknown": 0} | |
| }, | |
| { | |
| "question": "Did the patient have a similar reaction to the same or similar drugs in any previous exposure?", | |
| "answer": similar_reaction_to_drug_before, | |
| "scores": {"yes": 1, "no": 0, "unknown": 0} | |
| }, | |
| { | |
| "question": "Was the adverse event confirmed by any objective evidence?", | |
| "answer": adverse_event_confirmed_objectively, | |
| "scores": {"yes": 1, "no": 0, "unknown": 0} | |
| } | |
| ] | |
| total_score = 0 | |
| question_details = [] | |
| for q in questions: | |
| answer = q["answer"].lower().strip() | |
| if answer not in q["scores"]: | |
| raise ValueError(f"Invalid answer '{answer}'. Must be 'yes', 'no', or 'unknown'") | |
| score = q["scores"][answer] | |
| total_score += score | |
| question_details.append({ | |
| "question": q["question"], | |
| "answer": answer, | |
| "points": score | |
| }) | |
| # Determine probability category | |
| if total_score >= 9: | |
| category = "Definite" | |
| probability = "≥95%" | |
| interpretation = "The adverse reaction is definitely related to the drug." | |
| elif total_score >= 5: | |
| category = "Probable" | |
| probability = "75-95%" | |
| interpretation = "The adverse reaction is probably related to the drug." | |
| elif total_score >= 1: | |
| category = "Possible" | |
| probability = "25-75%" | |
| interpretation = "The adverse reaction is possibly related to the drug." | |
| else: | |
| category = "Doubtful" | |
| probability = "<25%" | |
| interpretation = "The adverse reaction is doubtfully related to the drug." | |
| return { | |
| "total_score": total_score, | |
| "category": category, | |
| "probability": probability, | |
| "interpretation": interpretation, | |
| "question_breakdown": question_details, | |
| "scale_info": { | |
| "name": "Naranjo Adverse Drug Reaction Probability Scale", | |
| "reference": "Naranjo CA, et al. Clin Pharmacol Ther. 1981;30(2):239-245", | |
| "scoring": { | |
| "Definite": "≥9 points", | |
| "Probable": "5-8 points", | |
| "Possible": "1-4 points", | |
| "Doubtful": "≤0 points" | |
| } | |
| } | |
| } | |
| def disproportionality_analysis( | |
| drug_name: str, | |
| adverse_event: str, | |
| background_limit: int = 10000 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Perform disproportionality analysis to detect potential drug-adverse event signals. | |
| Calculates Proportional Reporting Ratio (PRR), Reporting Odds Ratio (ROR), | |
| and Information Component (IC) with confidence intervals. | |
| Args: | |
| drug_name: Drug of interest | |
| adverse_event: Adverse event of interest | |
| background_limit: Number of background cases to sample for comparison | |
| Returns: | |
| Dict with PRR, ROR, IC values and statistical significance | |
| """ | |
| try: | |
| base_url = "https://api.fda.gov/drug/event.json" | |
| # Get cases for drug + adverse event (a) | |
| drug_ae_query = { | |
| "search": f'patient.drug.medicinalproduct:"{drug_name}" AND patient.reaction.reactionmeddrapt:"{adverse_event}"', | |
| "limit": 1 | |
| } | |
| drug_ae_response = make_api_request(base_url, drug_ae_query, timeout=10) | |
| if drug_ae_response and drug_ae_response.status_code == 200: | |
| drug_ae_data = drug_ae_response.json() | |
| a = drug_ae_data.get("meta", {}).get("results", {}).get("total", 0) | |
| else: | |
| a = 0 | |
| if a == 0: | |
| return { | |
| "drug": drug_name, | |
| "adverse_event": adverse_event, | |
| "message": "No cases found for this drug-adverse event combination", | |
| "signal_detected": False, | |
| "case_count": 0 | |
| } | |
| # Get total cases for drug (a + b) | |
| drug_total_query = { | |
| "search": f'patient.drug.medicinalproduct:"{drug_name}"', | |
| "limit": 1 | |
| } | |
| drug_total_response = make_api_request(base_url, drug_total_query, timeout=10) | |
| if drug_total_response and drug_total_response.status_code == 200: | |
| drug_total_data = drug_total_response.json() | |
| total_drug_cases = drug_total_data.get("meta", {}).get("results", {}).get("total", 0) | |
| b = max(total_drug_cases - a, 1) # Ensure b is at least 1 | |
| else: | |
| b = max(a * 5, 10) # Conservative estimate | |
| # Get total cases for adverse event (a + c) | |
| ae_total_query = { | |
| "search": f'patient.reaction.reactionmeddrapt:"{adverse_event}"', | |
| "limit": 1 | |
| } | |
| ae_total_response = make_api_request(base_url, ae_total_query, timeout=10) | |
| if ae_total_response and ae_total_response.status_code == 200: | |
| ae_total_data = ae_total_response.json() | |
| total_ae_cases = ae_total_data.get("meta", {}).get("results", {}).get("total", 0) | |
| c = max(total_ae_cases - a, 1) # Avoid zero | |
| else: | |
| c = max(a * 10, 100) # Conservative estimate | |
| # Estimate total background cases (d) | |
| # Use a reasonable estimate based on FAERS database size | |
| total_cases_estimate = 15000000 # Approximate FAERS database size | |
| d = max(total_cases_estimate - a - b - c, 1000) | |
| # Calculate disproportionality measures | |
| results = calculate_disproportionality_measures(a, b, c, d) | |
| # Add metadata | |
| results.update({ | |
| "drug": drug_name, | |
| "adverse_event": adverse_event, | |
| "contingency_table": { | |
| "drug_ae": a, | |
| "drug_other_ae": b, | |
| "other_drug_ae": c, | |
| "other_drug_other_ae": d, | |
| "total": a + b + c + d | |
| }, | |
| "data_sources": { | |
| "drug_ae_cases": "FAERS API direct query", | |
| "total_drug_cases": "FAERS API direct query", | |
| "total_ae_cases": "FAERS API direct query", | |
| "background_estimate": "Statistical approximation" | |
| }, | |
| "data_notes": [ | |
| "This analysis uses FAERS data which has inherent limitations", | |
| "Results should be interpreted by qualified pharmacovigilance professionals", | |
| "Background estimates are approximations due to API limitations", | |
| "Consider confounding factors and reporting biases" | |
| ] | |
| }) | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error in disproportionality analysis: {e}") | |
| return { | |
| "drug": drug_name, | |
| "adverse_event": adverse_event, | |
| "error": str(e), | |
| "message": "Analysis failed due to data access issues", | |
| "signal_detected": False, | |
| "case_count": 0 | |
| } | |
| def calculate_disproportionality_measures(a: int, b: int, c: int, d: int) -> Dict[str, Any]: | |
| """ | |
| Calculate PRR, ROR, and IC with confidence intervals. | |
| 2x2 contingency table: | |
| AE of Interest Other AEs | |
| Drug of Interest a b | |
| Other Drugs c d | |
| """ | |
| # Proportional Reporting Ratio (PRR) | |
| prr = (a / (a + b)) / (c / (c + d)) if (a + b) > 0 and (c + d) > 0 else 0 | |
| # PRR 95% CI (using log transformation) | |
| if a > 0: | |
| log_prr = math.log(prr) | |
| se_log_prr = math.sqrt(1/a + 1/c - 1/(a+b) - 1/(c+d)) | |
| prr_ci_lower = math.exp(log_prr - 1.96 * se_log_prr) | |
| prr_ci_upper = math.exp(log_prr + 1.96 * se_log_prr) | |
| else: | |
| prr_ci_lower = prr_ci_upper = 0 | |
| # Reporting Odds Ratio (ROR) | |
| ror = (a * d) / (b * c) if b > 0 and c > 0 else 0 | |
| # ROR 95% CI | |
| if a > 0 and b > 0 and c > 0 and d > 0: | |
| log_ror = math.log(ror) | |
| se_log_ror = math.sqrt(1/a + 1/b + 1/c + 1/d) | |
| ror_ci_lower = math.exp(log_ror - 1.96 * se_log_ror) | |
| ror_ci_upper = math.exp(log_ror + 1.96 * se_log_ror) | |
| else: | |
| ror_ci_lower = ror_ci_upper = 0 | |
| # Information Component (IC) | |
| expected = ((a + b) * (a + c)) / (a + b + c + d) | |
| ic = math.log2(a / expected) if expected > 0 and a > 0 else 0 | |
| # IC 95% CI (simplified approximation) | |
| if a > 0: | |
| ic_se = 1 / (math.log(2) * math.sqrt(a)) | |
| ic_ci_lower = ic - 1.96 * ic_se | |
| ic_ci_upper = ic + 1.96 * ic_se | |
| else: | |
| ic_ci_lower = ic_ci_upper = 0 | |
| # Signal detection criteria | |
| prr_signal = prr >= 2.0 and prr_ci_lower > 1.0 and a >= 3 | |
| ror_signal = ror >= 2.0 and ror_ci_lower > 1.0 and a >= 3 | |
| ic_signal = ic_ci_lower > 0 and a >= 3 | |
| signal_detected = prr_signal or ror_signal or ic_signal | |
| return { | |
| "proportional_reporting_ratio": { | |
| "value": round(prr, 3), | |
| "confidence_interval_95": [round(prr_ci_lower, 3), round(prr_ci_upper, 3)], | |
| "signal_detected": prr_signal, | |
| "interpretation": "PRR ≥2 with lower CI >1 suggests potential signal" if prr_signal else "No signal detected by PRR criteria" | |
| }, | |
| "reporting_odds_ratio": { | |
| "value": round(ror, 3), | |
| "confidence_interval_95": [round(ror_ci_lower, 3), round(ror_ci_upper, 3)], | |
| "signal_detected": ror_signal, | |
| "interpretation": "ROR ≥2 with lower CI >1 suggests potential signal" if ror_signal else "No signal detected by ROR criteria" | |
| }, | |
| "information_component": { | |
| "value": round(ic, 3), | |
| "confidence_interval_95": [round(ic_ci_lower, 3), round(ic_ci_upper, 3)], | |
| "signal_detected": ic_signal, | |
| "interpretation": "IC lower CI >0 suggests potential signal" if ic_signal else "No signal detected by IC criteria" | |
| }, | |
| "overall_signal_detected": signal_detected, | |
| "case_count": a, | |
| "signal_strength": "Strong" if (prr_signal and ror_signal and ic_signal) else | |
| "Moderate" if signal_detected else "Weak/None" | |
| } | |
| def find_similar_cases( | |
| reference_case_id: str, | |
| similarity_threshold: float = 0.7, | |
| limit: int = 50 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Find cases similar to a reference case based on patient characteristics, | |
| drugs, and adverse events. | |
| Args: | |
| reference_case_id: FAERS safety report ID to use as reference | |
| similarity_threshold: Minimum similarity score (0-1) | |
| limit: Maximum number of similar cases to return | |
| Returns: | |
| Dict with similar cases and similarity scores | |
| """ | |
| # First, get the reference case details | |
| from drug_data_endpoints import fetch_event_details | |
| try: | |
| ref_case = fetch_event_details(reference_case_id) | |
| except Exception as e: | |
| raise ValueError(f"Could not fetch reference case {reference_case_id}: {e}") | |
| ref_drugs = [drug.lower() for drug in ref_case["drugs"]] | |
| ref_reactions = [reaction.lower() for reaction in ref_case["reactions"]] | |
| if not ref_drugs: | |
| raise ValueError("Reference case has no drug information") | |
| # Search for cases with similar drugs | |
| primary_drug = ref_drugs[0] if ref_drugs else "" | |
| similar_cases_response = enhanced_faers_search( | |
| drug_name=primary_drug, | |
| limit=min(limit * 3, 500) # Get more cases to filter | |
| ) | |
| similar_cases = [] | |
| for case in similar_cases_response["cases"]: | |
| case_drugs = [drug["name"].lower() for drug in case["drugs"] if drug["name"]] | |
| case_reactions = [reaction["term"].lower() for reaction in case["reactions"] if reaction["term"]] | |
| # Skip the reference case itself | |
| if case["safety_report_id"] == reference_case_id: | |
| continue | |
| # Calculate similarity score | |
| similarity_score = calculate_case_similarity( | |
| ref_drugs, ref_reactions, | |
| case_drugs, case_reactions, | |
| ref_case.get("full_record", {}).get("patient", {}), | |
| case.get("patient", {}) | |
| ) | |
| if similarity_score >= similarity_threshold: | |
| similar_cases.append({ | |
| "case": case, | |
| "similarity_score": similarity_score, | |
| "similarity_factors": get_similarity_factors( | |
| ref_drugs, ref_reactions, case_drugs, case_reactions | |
| ) | |
| }) | |
| # Sort by similarity score | |
| similar_cases.sort(key=lambda x: x["similarity_score"], reverse=True) | |
| return { | |
| "reference_case_id": reference_case_id, | |
| "reference_drugs": ref_drugs, | |
| "reference_reactions": ref_reactions, | |
| "similar_cases": similar_cases[:limit], | |
| "total_similar_found": len(similar_cases), | |
| "similarity_threshold": similarity_threshold, | |
| "analysis_summary": { | |
| "most_common_shared_drugs": get_most_common_shared_elements( | |
| [case["similarity_factors"]["shared_drugs"] for case in similar_cases] | |
| ), | |
| "most_common_shared_reactions": get_most_common_shared_elements( | |
| [case["similarity_factors"]["shared_reactions"] for case in similar_cases] | |
| ) | |
| } | |
| } | |
| def calculate_case_similarity( | |
| ref_drugs: List[str], ref_reactions: List[str], | |
| case_drugs: List[str], case_reactions: List[str], | |
| ref_patient: Dict, case_patient: Dict | |
| ) -> float: | |
| """Calculate similarity score between two cases.""" | |
| # Drug similarity (Jaccard index) | |
| ref_drugs_set = set(ref_drugs) | |
| case_drugs_set = set(case_drugs) | |
| drug_intersection = len(ref_drugs_set & case_drugs_set) | |
| drug_union = len(ref_drugs_set | case_drugs_set) | |
| drug_similarity = drug_intersection / drug_union if drug_union > 0 else 0 | |
| # Reaction similarity (Jaccard index) | |
| ref_reactions_set = set(ref_reactions) | |
| case_reactions_set = set(case_reactions) | |
| reaction_intersection = len(ref_reactions_set & case_reactions_set) | |
| reaction_union = len(ref_reactions_set | case_reactions_set) | |
| reaction_similarity = reaction_intersection / reaction_union if reaction_union > 0 else 0 | |
| # Patient similarity (age and gender) | |
| patient_similarity = 0 | |
| similarity_factors = 0 | |
| # Age similarity | |
| ref_age = ref_patient.get("patientage") | |
| case_age = case_patient.get("age") | |
| if ref_age and case_age: | |
| try: | |
| age_diff = abs(float(ref_age) - float(case_age)) | |
| age_similarity = max(0, 1 - age_diff / 50) # Normalize by 50 years | |
| patient_similarity += age_similarity | |
| similarity_factors += 1 | |
| except (ValueError, TypeError): | |
| pass | |
| # Gender similarity | |
| ref_gender = ref_patient.get("patientsex") | |
| case_gender = case_patient.get("gender") | |
| if ref_gender and case_gender and ref_gender == case_gender: | |
| patient_similarity += 1 | |
| similarity_factors += 1 | |
| elif ref_gender and case_gender: | |
| similarity_factors += 1 | |
| if similarity_factors > 0: | |
| patient_similarity /= similarity_factors | |
| # Weighted overall similarity | |
| # Drugs and reactions are most important, patient characteristics less so | |
| overall_similarity = ( | |
| 0.5 * drug_similarity + | |
| 0.4 * reaction_similarity + | |
| 0.1 * patient_similarity | |
| ) | |
| return round(overall_similarity, 3) | |
| def get_similarity_factors( | |
| ref_drugs: List[str], ref_reactions: List[str], | |
| case_drugs: List[str], case_reactions: List[str] | |
| ) -> Dict[str, List[str]]: | |
| """Get the specific shared elements between cases.""" | |
| shared_drugs = list(set(ref_drugs) & set(case_drugs)) | |
| shared_reactions = list(set(ref_reactions) & set(case_reactions)) | |
| return { | |
| "shared_drugs": shared_drugs, | |
| "shared_reactions": shared_reactions, | |
| "unique_to_reference_drugs": list(set(ref_drugs) - set(case_drugs)), | |
| "unique_to_case_drugs": list(set(case_drugs) - set(ref_drugs)), | |
| "unique_to_reference_reactions": list(set(ref_reactions) - set(case_reactions)), | |
| "unique_to_case_reactions": list(set(case_reactions) - set(ref_reactions)) | |
| } | |
| def get_most_common_shared_elements(element_lists: List[List[str]]) -> Dict[str, int]: | |
| """Get the most commonly shared elements across multiple cases.""" | |
| all_elements = [] | |
| for element_list in element_lists: | |
| all_elements.extend(element_list) | |
| return dict(Counter(all_elements).most_common(10)) | |
| def temporal_analysis( | |
| drug_name: str, | |
| adverse_event: str = None, | |
| limit: int = 500 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Analyze temporal patterns of adverse events for a drug. | |
| Args: | |
| drug_name: Drug to analyze | |
| adverse_event: Specific adverse event (optional) | |
| limit: Maximum cases to analyze | |
| Returns: | |
| Dict with temporal patterns and time-to-onset analysis | |
| """ | |
| # Get cases with temporal information | |
| cases_response = enhanced_faers_search( | |
| drug_name=drug_name, | |
| adverse_event=adverse_event, | |
| limit=limit | |
| ) | |
| cases = cases_response["cases"] | |
| if not cases: | |
| return { | |
| "drug": drug_name, | |
| "adverse_event": adverse_event, | |
| "message": "No cases found for temporal analysis" | |
| } | |
| # Analyze time to onset patterns | |
| onset_times = [] | |
| reporting_dates = [] | |
| for case in cases: | |
| # Extract drug start dates and reaction onset | |
| for drug in case["drugs"]: | |
| if drug["name"].lower() == drug_name.lower() and drug["start_date"]: | |
| try: | |
| # Parse date (YYYYMMDD format) | |
| start_date = datetime.strptime(drug["start_date"], "%Y%m%d") | |
| # For now, we'll use receive date as proxy for reaction onset | |
| # In practice, you'd want more sophisticated temporal extraction | |
| if case["receive_date"]: | |
| receive_date = datetime.strptime(case["receive_date"], "%Y%m%d") | |
| onset_time = (receive_date - start_date).days | |
| if 0 <= onset_time <= 365: # Filter reasonable onset times | |
| onset_times.append(onset_time) | |
| reporting_dates.append(receive_date) | |
| except (ValueError, TypeError): | |
| continue | |
| # Calculate temporal statistics | |
| temporal_stats = {} | |
| if onset_times: | |
| onset_times.sort() | |
| temporal_stats["time_to_onset"] = { | |
| "median_days": onset_times[len(onset_times)//2], | |
| "mean_days": round(sum(onset_times) / len(onset_times), 1), | |
| "range_days": [min(onset_times), max(onset_times)], | |
| "percentiles": { | |
| "25th": onset_times[len(onset_times)//4], | |
| "75th": onset_times[3*len(onset_times)//4], | |
| "90th": onset_times[9*len(onset_times)//10] if len(onset_times) >= 10 else max(onset_times) | |
| }, | |
| "distribution": categorize_onset_times(onset_times) | |
| } | |
| if reporting_dates: | |
| # Analyze reporting trends over time | |
| reporting_dates.sort() | |
| temporal_stats["reporting_trends"] = analyze_reporting_trends(reporting_dates) | |
| return { | |
| "drug": drug_name, | |
| "adverse_event": adverse_event, | |
| "total_cases_analyzed": len(cases), | |
| "cases_with_temporal_data": len(onset_times), | |
| "temporal_analysis": temporal_stats, | |
| "interpretation": interpret_temporal_patterns(temporal_stats) | |
| } | |
| def categorize_onset_times(onset_times: List[int]) -> Dict[str, int]: | |
| """Categorize onset times into clinically relevant periods.""" | |
| categories = { | |
| "immediate_0_1_day": 0, | |
| "acute_1_7_days": 0, | |
| "subacute_1_4_weeks": 0, | |
| "delayed_1_3_months": 0, | |
| "late_3_12_months": 0 | |
| } | |
| for onset in onset_times: | |
| if onset <= 1: | |
| categories["immediate_0_1_day"] += 1 | |
| elif onset <= 7: | |
| categories["acute_1_7_days"] += 1 | |
| elif onset <= 28: | |
| categories["subacute_1_4_weeks"] += 1 | |
| elif onset <= 90: | |
| categories["delayed_1_3_months"] += 1 | |
| elif onset <= 365: | |
| categories["late_3_12_months"] += 1 | |
| return categories | |
| def analyze_reporting_trends(reporting_dates: List[datetime]) -> Dict[str, Any]: | |
| """Analyze trends in adverse event reporting over time.""" | |
| # Group by year | |
| year_counts = defaultdict(int) | |
| for date in reporting_dates: | |
| year_counts[date.year] += 1 | |
| # Calculate trend | |
| years = sorted(year_counts.keys()) | |
| if len(years) >= 3: | |
| recent_avg = sum(year_counts[year] for year in years[-3:]) / 3 | |
| early_avg = sum(year_counts[year] for year in years[:3]) / 3 | |
| trend = "increasing" if recent_avg > early_avg * 1.2 else "decreasing" if recent_avg < early_avg * 0.8 else "stable" | |
| else: | |
| trend = "insufficient_data" | |
| return { | |
| "yearly_counts": dict(year_counts), | |
| "date_range": [min(reporting_dates).year, max(reporting_dates).year], | |
| "trend": trend, | |
| "peak_year": max(year_counts.keys(), key=lambda k: year_counts[k]) if year_counts else None | |
| } | |
| def interpret_temporal_patterns(temporal_stats: Dict) -> List[str]: | |
| """Provide clinical interpretation of temporal patterns.""" | |
| interpretations = [] | |
| if "time_to_onset" in temporal_stats: | |
| onset_data = temporal_stats["time_to_onset"] | |
| median_onset = onset_data["median_days"] | |
| if median_onset <= 1: | |
| interpretations.append("Immediate onset pattern suggests Type A (dose-dependent) reaction or acute hypersensitivity") | |
| elif median_onset <= 7: | |
| interpretations.append("Acute onset pattern typical of many drug allergies and dose-related effects") | |
| elif median_onset <= 28: | |
| interpretations.append("Subacute onset may suggest immune-mediated or cumulative toxicity") | |
| elif median_onset <= 90: | |
| interpretations.append("Delayed onset pattern may indicate idiosyncratic reactions or chronic toxicity") | |
| else: | |
| interpretations.append("Late onset suggests possible chronic effects or delayed hypersensitivity") | |
| # Check distribution | |
| distribution = onset_data.get("distribution", {}) | |
| immediate = distribution.get("immediate_0_1_day", 0) | |
| total_with_onset = sum(distribution.values()) | |
| if total_with_onset > 0: | |
| immediate_pct = immediate / total_with_onset * 100 | |
| if immediate_pct > 50: | |
| interpretations.append(f"High proportion ({immediate_pct:.1f}%) of immediate reactions suggests acute mechanism") | |
| if "reporting_trends" in temporal_stats: | |
| trend = temporal_stats["reporting_trends"]["trend"] | |
| if trend == "increasing": | |
| interpretations.append("Increasing reporting trend may indicate growing awareness or emerging safety signal") | |
| elif trend == "decreasing": | |
| interpretations.append("Decreasing reporting trend may suggest improved safety monitoring or reduced use") | |
| if not interpretations: | |
| interpretations.append("Insufficient temporal data for meaningful interpretation") | |
| return interpretations |