Spaces:
Running
Running
| import os | |
| import re | |
| import csv | |
| from concurrent.futures import ThreadPoolExecutor | |
| from datetime import datetime | |
| from typing import Optional, List | |
| import pytz | |
| from langchain.schema import Document, HumanMessage, SystemMessage | |
| from langchain.tools import tool | |
| from .retrievers import hybrid_search, vector_search, bm25_search | |
| from .validation import validate_medical_answer | |
| from .github_storage import get_github_storage | |
| from .context_enrichment import enrich_retrieved_documents | |
| from .config import logger | |
| from langchain_openai import ChatOpenAI | |
| CANONICAL_PROVIDERS = {"Manus", "ASCO", "NCCN", "ESMO", "NICE"} | |
| # Global variables to store context for validation | |
| _last_question = None # Stores the tool query | |
| _last_user_question = None # Stores the original user question | |
| _last_documents = None | |
| _last_answer = None | |
| TOOL_MAX_WORKERS = max(2, min(8, (os.cpu_count() or 4))) | |
| _tool_executor = ThreadPoolExecutor(max_workers=TOOL_MAX_WORKERS) | |
| def store_user_question(user_question: str): | |
| """Store the original user question for validation purposes.""" | |
| global _last_user_question | |
| _last_user_question = user_question | |
| def _get_llm_safe(temperature: float = 0.0, model: str = "gpt-4o"): | |
| """Create a ChatOpenAI client if API key/config is available, else return None.""" | |
| try: | |
| # ChatOpenAI will read OPENAI_API_KEY from env as in validation.py | |
| return ChatOpenAI(model=model, temperature=temperature, max_tokens=512, request_timeout=30) | |
| except Exception: | |
| return None | |
| def _is_side_effect_report_llm(user_input: str) -> Optional[bool]: | |
| """Use LLM to classify if input is an adverse drug reaction/side-effect report. | |
| Returns True/False if confident, or None if unavailable/uncertain. | |
| """ | |
| llm = _get_llm_safe() | |
| if not llm: | |
| return None | |
| try: | |
| system = SystemMessage(content=( | |
| "You are a medical triage classifier. Decide if the user's text is a report of an adverse drug reaction (side effect) about a medication.\n" | |
| "Criteria: mentions a medication/drug and symptoms or adverse effects experienced by a patient.\n" | |
| "Respond with exactly one token: yes or no." | |
| )) | |
| human = HumanMessage(content=user_input[:1500]) | |
| resp = llm.invoke([system, human]) | |
| ans = (resp.content or "").strip().lower() | |
| if ans.startswith("yes"): | |
| return True | |
| if ans.startswith("no"): | |
| return False | |
| return None | |
| except Exception: | |
| return None | |
| # Map lowercase variants and full names to canonical provider codes | |
| _PROVIDER_ALIASES = { | |
| # NCCN | |
| "nccn": "NCCN", | |
| "national comprehensive cancer network": "NCCN", | |
| "nccn guidelines": "NCCN", | |
| # ESMO | |
| "esmo": "ESMO", | |
| "european society for medical oncology": "ESMO", | |
| "esmo guidelines": "ESMO", | |
| # ASCO | |
| "asco": "ASCO", | |
| "american society of clinical oncology": "ASCO", | |
| "asco guidelines": "ASCO", | |
| # NICE | |
| "nice": "NICE", | |
| "national institute for health and care excellence": "NICE", | |
| "nice guidelines": "NICE", | |
| # Manus (custom provider) | |
| "manus": "Manus", | |
| "by manus": "Manus", | |
| } | |
| def _normalize_provider_from_text(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| t = text.lower() | |
| # Quick direct hits for canonical providers | |
| for canon in CANONICAL_PROVIDERS: | |
| if re.search(rf"\b{re.escape(canon.lower())}\b", t): | |
| return canon | |
| # Alias-based detection | |
| for alias, canon in _PROVIDER_ALIASES.items(): | |
| if alias in t: | |
| return canon | |
| return None | |
| def _normalize_provider(provider: Optional[str], query: str) -> Optional[str]: | |
| # If explicit provider given, normalize it first | |
| if provider: | |
| p = provider.strip().lower() | |
| # Exact canonical match | |
| for canon in CANONICAL_PROVIDERS: | |
| if p == canon.lower(): | |
| return canon | |
| # Alias match | |
| if p in _PROVIDER_ALIASES: | |
| return _PROVIDER_ALIASES[p] | |
| # Try to find within text like "according to NCCN guidelines" | |
| norm = _normalize_provider_from_text(provider) | |
| if norm: | |
| return norm | |
| # Fall back to inferring from query text | |
| return _normalize_provider_from_text(query) | |
| def clear_text(text: str, max_chars: int = 1200) -> str: | |
| """Reduce token bloat by removing heavy markdown and collapsing whitespace. | |
| - Convert [title](url) -> title (url) | |
| - Remove images  | |
| - Strip code fences/backticks and most markdown emphasis | |
| - Collapse multiple newlines/spaces | |
| - Trim to max_chars | |
| """ | |
| if not text: | |
| return "" | |
| t = text | |
| # Normalize newlines | |
| t = t.replace("\r\n", "\n").replace("\r", "\n") | |
| # Links: keep title and URL | |
| t = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", r"\1 (\2)", t) | |
| # Images: drop entirely | |
| t = re.sub(r"!\[[^\]]*\]\([^)]*\)", "", t) | |
| # Remove headers/quotes markers at line starts | |
| t = re.sub(r"(?m)^[>\s]*#{1,6}\s*", "", t) | |
| # Remove backticks/code fences and emphasis | |
| t = t.replace("```", "").replace("`", "") | |
| t = t.replace("**", "").replace("*", "").replace("_", "") | |
| # Collapse spaces before newlines | |
| t = re.sub(r"[ \t]+\n", "\n", t) | |
| # Collapse multiple newlines and spaces | |
| t = re.sub(r"\n{3,}", "\n\n", t) | |
| t = re.sub(r"[ \t]{2,}", " ", t) | |
| # Trim and truncate | |
| t = t.strip() | |
| if max_chars and len(t) > max_chars: | |
| t = t[:max_chars].rstrip() + " ..." | |
| return t | |
| def _format_docs_with_citations(docs: List[Document]) -> str: | |
| if not docs: | |
| return "No results." | |
| parts = [] | |
| for i, d in enumerate(docs, start=1): | |
| meta = d.metadata or {} | |
| source = meta.get("source", "unknown") | |
| page = meta.get("page_number", "?") | |
| provider = meta.get("provider", "unknown") | |
| disease = meta.get("disease", "unknown") | |
| is_context = meta.get("context_enrichment", False) | |
| snippet = clear_text(d.page_content) | |
| # Build citation header | |
| citation = f"Result {i}:\n" | |
| citation += f"Provider: {provider} | Disease: {disease} | Source: {source} | Page: {page}" | |
| # Add context enrichment marker if this is a context page | |
| if is_context: | |
| citation += " [CONTEXT PAGE]" | |
| citation += f"\nText:\n{snippet}\n" | |
| parts.append(citation) | |
| return "\n\n".join(parts) | |
| def medical_guidelines_knowledge_tool(query: str, provider: Optional[str] = None) -> str: | |
| """ | |
| Retrieve comprehensive medical guideline knowledge with enriched context. | |
| Includes surrounding pages (before/after) for top results to provide complete clinical context. | |
| If provider is provided (e.g., "NCCN", "ASCO", "ESMO", "NICE"), results will be filtered by metadata provider. | |
| Returns detailed text with full metadata and contextual information for expert analysis. | |
| """ | |
| global _last_question, _last_documents | |
| try: | |
| # Store question for validation context | |
| _last_question = query | |
| # Normalize provider name from either explicit arg or query text | |
| normalized_provider = _normalize_provider(provider, query) | |
| # Use hybrid search with query expansion for comprehensive retrieval | |
| # Uses global defaults: DEFAULT_K_VECTOR=10, DEFAULT_K_BM25=5 (configurable in core/retrievers.py) | |
| docs = hybrid_search(query=query, provider=normalized_provider) | |
| # Enrich top documents with surrounding pages for richer context | |
| # This provides complete clinical context including adjacent information | |
| # Increased pages_before/after and max_enriched for more comprehensive answers | |
| enriched_docs = enrich_retrieved_documents( | |
| documents=docs, | |
| pages_before=2, # Include 2 pages before for fuller context | |
| pages_after=2, # Include 2 pages after for fuller context | |
| max_enriched=8 # Enrich top 8 most relevant documents | |
| ) | |
| # Count context pages added | |
| context_pages_count = sum(1 for doc in enriched_docs if doc.metadata.get("context_enrichment", False)) | |
| logger.info(f"Retrieved {len(docs)} documents, added {context_pages_count} context pages") | |
| # Store documents for validation context with enrichment metadata | |
| _last_documents = [] | |
| for doc in enriched_docs: | |
| doc_dict = { | |
| "doc_id": getattr(doc, 'id', None), | |
| "source": doc.metadata.get("source", "unknown"), | |
| "provider": doc.metadata.get("provider", "unknown"), | |
| "page_number": doc.metadata.get("page_number", "unknown"), | |
| "disease": doc.metadata.get("disease", "unknown"), | |
| "context_enrichment": doc.metadata.get("context_enrichment", False), | |
| "enriched": doc.metadata.get("enriched", False), | |
| "pages_included": doc.metadata.get("pages_included", []), | |
| "primary_page": doc.metadata.get("primary_page"), | |
| "context_pages_before": doc.metadata.get("context_pages_before"), | |
| "context_pages_after": doc.metadata.get("context_pages_after"), | |
| "content": doc.page_content | |
| } | |
| _last_documents.append(doc_dict) | |
| return _format_docs_with_citations(enriched_docs) | |
| except Exception as e: | |
| logger.error(f"Retrieval error: {str(e)}") | |
| return f"Retrieval error: {str(e)}" | |
| def compare_providers_tool(query: str, provider_a: str, provider_b: str) -> str: | |
| """ | |
| Compare guideline answers between two providers (e.g., provider_a="NCCN", provider_b="ESMO"). | |
| Retrieves provider-filtered results independently, then returns a structured text block suited for comparison. | |
| Output includes citations (source file, page number, provider, disease) for each side. | |
| """ | |
| try: | |
| canon_a = _normalize_provider(provider_a, query) or provider_a | |
| canon_b = _normalize_provider(provider_b, query) or provider_b | |
| a_future = _tool_executor.submit(hybrid_search, query, canon_a, 5, 5) | |
| b_future = _tool_executor.submit(hybrid_search, query, canon_b, 5, 5) | |
| a_docs = a_future.result() | |
| b_docs = b_future.result() | |
| format_a_future = _tool_executor.submit(_format_docs_with_citations, a_docs) | |
| format_b_future = _tool_executor.submit(_format_docs_with_citations, b_docs) | |
| a_text = format_a_future.result() | |
| b_text = format_b_future.result() | |
| return ( | |
| f"Comparison for query: {query}\n\n" | |
| f"Provider A: {canon_a}\n" | |
| f"{'-'*40}\n" | |
| f"{a_text}\n\n" | |
| f"Provider B: {canon_b}\n" | |
| f"{'-'*40}\n" | |
| f"{b_text}\n" | |
| ) | |
| except Exception as e: | |
| return f"Comparison retrieval error: {str(e)}" | |
| def get_current_datetime_tool() -> str: | |
| """ | |
| Returns the current date, time, and day of the week for Egypt (Africa/Cairo). | |
| This is the only reliable source for date and time information. Use this tool | |
| whenever a user asks about 'today', 'now', or any other time-sensitive query. | |
| The output is always in English and in standard 12-hour format. | |
| """ | |
| try: | |
| # Define the timezone for Egypt | |
| egypt_tz = pytz.timezone('Africa/Cairo') | |
| # Get the current time in that timezone | |
| now_egypt = datetime.now(egypt_tz) | |
| # Manual mapping to ensure English output regardless of system locale | |
| days_en = { | |
| 0: "Monday", 1: "Tuesday", 2: "Wednesday", 3: "Thursday", | |
| 4: "Friday", 5: "Saturday", 6: "Sunday" | |
| } | |
| months_en = { | |
| 1: "January", 2: "February", 3: "March", 4: "April", | |
| 5: "May", 6: "June", 7: "July", 8: "August", | |
| 9: "September", 10: "October", 11: "November", 12: "December" | |
| } | |
| # Get English names using manual mapping | |
| day_name = days_en[now_egypt.weekday()] | |
| month_name = months_en[now_egypt.month] | |
| day = now_egypt.day | |
| year = now_egypt.year | |
| # Format time manually to avoid locale issues | |
| hour = now_egypt.hour | |
| minute = now_egypt.minute | |
| # Convert to 12-hour format | |
| if hour == 0: | |
| hour_12 = 12 | |
| period = "AM" | |
| elif hour < 12: | |
| hour_12 = hour | |
| period = "AM" | |
| elif hour == 12: | |
| hour_12 = 12 | |
| period = "PM" | |
| else: | |
| hour_12 = hour - 12 | |
| period = "PM" | |
| time_str = f"{hour_12:02d}:{minute:02d} {period}" | |
| # Create the final string | |
| return f"Current date and time in Egypt: {day_name}, {month_name} {day}, {year} at {time_str}" | |
| except Exception as e: | |
| return f"Error getting current datetime: {str(e)}" | |
| def side_effect_recording_tool(user_input: str) -> str: | |
| """ | |
| Detects when a doctor reports or mentions discovering a side effect related to a drug. | |
| First asks for missing critical information (drug name, side effects) and optional details | |
| (patient_age, patient_gender, dosage, duration, severity). If user cannot provide optional | |
| information, saves the report with NaN values for unknown data. | |
| This tool should be used when the input contains: | |
| - Reports of adverse drug reactions or side effects | |
| - Patient experiencing unexpected symptoms after medication | |
| - Drug-related complications or adverse events | |
| - Medical professionals reporting medication issues | |
| Args: | |
| user_input (str): The doctor's input describing the side effect or adverse reaction | |
| Returns: | |
| str: Interactive form for collecting missing information or confirmation of data recording | |
| """ | |
| try: | |
| # LLM classification (preferred), with keyword fallback to preserve behavior | |
| side_effect_keywords = [ | |
| 'side effect', 'adverse reaction', 'adverse event', 'drug reaction', | |
| 'medication reaction', 'allergic reaction', 'complication', 'toxicity', | |
| 'intolerance', 'hypersensitivity', 'contraindication', 'withdrawal', | |
| 'overdose', 'poisoning', 'drug-induced', 'medication-induced', | |
| 'experienced after taking', 'developed after', 'caused by medication', | |
| 'drug-related', 'medication-related', 'pharmaceutical reaction', | |
| 'kidney problems', 'liver problems', 'heart problems', 'breathing problems', | |
| 'skin problems', 'stomach problems', 'nausea', 'vomiting', 'diarrhea', | |
| 'headache', 'dizziness', 'fatigue', 'weakness', 'rash', 'swelling', | |
| 'pain', 'fever', 'cough', 'infection', 'bleeding', 'bruising', | |
| 'has these', 'has serious', 'causes', 'resulted in', 'led to', | |
| 'problems with', 'issues with', 'complications from' | |
| ] | |
| input_lower = user_input.lower().strip() | |
| llm_decision = _is_side_effect_report_llm(user_input) | |
| # Check for special commands first | |
| if input_lower in ['save report', 'save', 'submit report', 'submit']: | |
| # Create minimal data for saving | |
| extracted_data = _extract_side_effect_data(user_input) | |
| return _save_side_effect_report(extracted_data) | |
| if input_lower in ['cancel', 'cancel report', 'abort']: | |
| return "**Side Effect Report Cancelled**\n\nThe adverse drug reaction report has been cancelled and no data was saved." | |
| # Check if this is a follow-up with additional information or user saying they can't provide info | |
| if _is_followup_response(user_input): | |
| # For follow-up responses, we need to get the base data from somewhere | |
| # Since we don't have session state, treat this as a new report | |
| extracted_data = _extract_side_effect_data(user_input) | |
| return _process_followup_response(user_input, extracted_data) | |
| # Combine LLM decision with keyword fallback to avoid behavior regression | |
| keyword_detected = any(keyword in input_lower for keyword in side_effect_keywords) | |
| contains_side_effect = (llm_decision is True) or (llm_decision is not False and keyword_detected) | |
| if not contains_side_effect: | |
| return "This input does not appear to contain a side effect report. If you are reporting an adverse drug reaction, please include specific details about the medication and symptoms." | |
| # Extract information using pattern matching and keyword analysis | |
| extracted_data = _extract_side_effect_data(user_input) | |
| # Check if we have the critical information (drug name and side effects) | |
| missing_critical = _identify_missing_information(extracted_data) | |
| if missing_critical: | |
| # Missing critical info, ask for it first | |
| return _generate_information_request(extracted_data, missing_critical) | |
| else: | |
| # Have critical info, now ask for optional information | |
| missing_optional = _identify_missing_optional_information(extracted_data) | |
| if missing_optional: | |
| return _generate_optional_information_request(extracted_data, missing_optional) | |
| else: | |
| # Have all available info, save the report | |
| return _save_side_effect_report(extracted_data) | |
| except Exception as e: | |
| return f"Error processing side effect report: {str(e)}. Please ensure your report includes drug name and symptoms." | |
| def _is_followup_response(user_input: str) -> bool: | |
| """Check if the input appears to be a follow-up response with additional information.""" | |
| followup_indicators = [ | |
| 'patient age:', 'age:', 'gender:', 'dosage:', 'dose:', 'duration:', | |
| 'severity:', 'outcome:', 'additional:', 'reporter:', 'notes:', | |
| 'male', 'female', 'years old', 'mg', 'ml', 'tablets', 'capsules', | |
| 'mild', 'moderate', 'severe', 'recovered', 'ongoing', 'hospitalized', | |
| # Add indicators for when user can't provide info | |
| "can't provide", "cannot provide", "don't have", "do not have", | |
| "not available", "unavailable", "missing", "no information", | |
| "just save", "save them", "save it", "save anyway", "will not provide", | |
| "won't provide", "don't know", "unknown", "not sure" | |
| ] | |
| input_lower = user_input.lower() | |
| return any(indicator in input_lower for indicator in followup_indicators) | |
| def _process_followup_response(user_input: str, base_data: dict) -> str: | |
| """Process follow-up response and update the extracted data.""" | |
| # Check if user is indicating they can't provide information | |
| cant_provide_indicators = [ | |
| "can't provide", "cannot provide", "don't have", "do not have", | |
| "not available", "unknown", "unavailable", "missing", "no information", | |
| "will not provide", "won't provide", "save anyway", "just save" | |
| ] | |
| input_lower = user_input.lower() | |
| if any(indicator in input_lower for indicator in cant_provide_indicators): | |
| # User can't provide additional info, save with what we have | |
| return _save_side_effect_report(base_data) | |
| # Extract additional information from the follow-up | |
| additional_data = _extract_side_effect_data(user_input) | |
| # Merge with base data, prioritizing new information | |
| merged_data = base_data.copy() | |
| for key, value in additional_data.items(): | |
| if value and value != 'NaN' and str(value).strip(): | |
| merged_data[key] = value | |
| # Check if there are still critical missing fields (only drug_name and side_effects are truly critical) | |
| critical_missing = [] | |
| truly_critical_fields = ['drug_name', 'side_effects'] | |
| for field in truly_critical_fields: | |
| value = merged_data.get(field, '') | |
| if not value or value == 'NaN' or not str(value).strip(): | |
| critical_missing.append(field) | |
| # If critical information is missing, ask for it | |
| if critical_missing: | |
| return _generate_information_request(merged_data, [(field, field.replace('_', ' ').title()) for field in critical_missing]) | |
| # Always save automatically after processing follow-up information | |
| # This ensures we save after any follow-up response, whether complete or partial | |
| return _save_side_effect_report(merged_data) | |
| def _identify_missing_information(extracted_data: dict) -> list: | |
| """Identify which critical information is missing from the extracted data.""" | |
| missing = [] | |
| # Only truly critical fields - drug name and side effects | |
| critical_fields = { | |
| 'drug_name': 'Drug/Medication Name', | |
| 'side_effects': 'Side Effects/Symptoms' | |
| } | |
| for field, display_name in critical_fields.items(): | |
| value = extracted_data.get(field, '') | |
| if not value or value == 'NaN' or not value.strip(): | |
| missing.append((field, display_name)) | |
| return missing | |
| def _identify_missing_optional_information(extracted_data: dict) -> list: | |
| """Identify which optional information is missing from the extracted data.""" | |
| missing = [] | |
| # Optional fields that we should ask for | |
| optional_fields = { | |
| 'patient_age': 'Patient Age', | |
| 'patient_gender': 'Patient Gender', | |
| 'dosage': 'Medication Dosage', | |
| 'duration': 'Treatment Duration', | |
| 'severity': 'Severity Level' | |
| } | |
| for field, display_name in optional_fields.items(): | |
| value = extracted_data.get(field, '') | |
| if not value or value == 'NaN' or not value.strip(): | |
| missing.append((field, display_name)) | |
| return missing | |
| # Remove this function as we no longer ask for optional information | |
| def _generate_information_request(extracted_data: dict, missing_info: list) -> str: | |
| """Generate a medical-professional request for missing critical information.""" | |
| # Only ask for truly critical missing information | |
| critical_missing = [] | |
| for field, display_name in missing_info: | |
| if field in ['drug_name', 'side_effects']: | |
| critical_missing.append((field, display_name)) | |
| if not critical_missing: | |
| # No critical info missing, save the report | |
| return _save_side_effect_report(extracted_data) | |
| # Create a concise request for only critical missing information | |
| response = "**Adverse Drug Reaction Report**\n\n" | |
| if any(field == 'drug_name' for field, _ in critical_missing): | |
| response += "Please specify the **medication/drug name** involved in this adverse reaction.\n\n" | |
| if any(field == 'side_effects' for field, _ in critical_missing): | |
| response += "Please describe the **side effects or symptoms** experienced.\n\n" | |
| response += "**Note**: All other details (age, gender, dosage, etc.) are optional. If you cannot provide them, I'll save the report with the available information." | |
| return response.strip() | |
| def _generate_optional_information_request(extracted_data: dict, missing_optional: list) -> str: | |
| """Generate a request for optional information that would enhance the side effect report.""" | |
| # Show what we already have | |
| response = "**Adverse Drug Reaction Report**\n\n" | |
| response += "**Recorded Information:**\n" | |
| if extracted_data.get('drug_name') and extracted_data['drug_name'] != 'NaN': | |
| response += f"- **Drug:** {extracted_data['drug_name']}\n" | |
| if extracted_data.get('side_effects') and extracted_data['side_effects'] != 'NaN': | |
| response += f"- **Side Effects:** {extracted_data['side_effects']}\n" | |
| response += "\n**Additional Information (Optional):**\n" | |
| response += "To enhance this report, please provide any of the following details if available:\n\n" | |
| for field, display_name in missing_optional: | |
| if field == 'patient_age': | |
| response += "- **Patient Age:** (e.g., 45 years old)\n" | |
| elif field == 'patient_gender': | |
| response += "- **Patient Gender:** (Male/Female)\n" | |
| elif field == 'dosage': | |
| response += "- **Dosage:** (e.g., 10mg daily, 2 tablets)\n" | |
| elif field == 'duration': | |
| response += "- **Duration:** (e.g., 3 months, 2 weeks)\n" | |
| elif field == 'severity': | |
| response += "- **Severity:** (Mild/Moderate/Severe)\n" | |
| response += "\n**Note:** If you don't have this information or cannot provide it, just reply with \"I don't have that information\" or \"save anyway\" and I'll save the report with the available data." | |
| return response.strip() | |
| def _save_side_effect_report(extracted_data: dict) -> str: | |
| """Save the side effect report to CSV file.""" | |
| try: | |
| # Ensure all fields have values (use 'NaN' for empty fields) | |
| fieldnames = [ | |
| 'timestamp', 'drug_name', 'side_effects', 'patient_age', | |
| 'patient_gender', 'dosage', 'duration', 'severity', | |
| 'outcome', 'additional_details', 'reporter_info', 'raw_input' | |
| ] | |
| # Fill missing fields with 'NaN' and ensure proper data types | |
| for field in fieldnames: | |
| value = extracted_data.get(field, '') | |
| if not value or value == '' or not str(value).strip(): | |
| extracted_data[field] = 'NaN' | |
| else: | |
| # Ensure the value is properly formatted | |
| extracted_data[field] = str(value).strip() | |
| # Save to GitHub repository (fallback to local if needed) | |
| github_storage = get_github_storage() | |
| success = github_storage.save_side_effects_report(extracted_data) | |
| if not success: | |
| csv_filename = "side_effects_reports.csv" | |
| csv_path = os.path.join(os.getcwd(), csv_filename) | |
| file_exists = os.path.exists(csv_path) | |
| with open(csv_path, 'a', newline='', encoding='utf-8') as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| if not file_exists: | |
| writer.writeheader() | |
| writer.writerow(extracted_data) | |
| storage_location = "locally to side_effects_reports.csv (GitHub upload failed)" | |
| else: | |
| storage_location = "to GitHub cloud repository" | |
| # Generate confirmation message | |
| drug_name = extracted_data.get('drug_name', 'NaN') | |
| side_effects = extracted_data.get('side_effects', 'NaN') | |
| report_id = extracted_data['timestamp'].replace(':', '').replace('-', '').replace(' ', '_') | |
| # Create a summary of provided vs missing information | |
| provided_info = [] | |
| missing_info = [] | |
| info_fields = { | |
| 'drug_name': 'Drug/Medication', | |
| 'side_effects': 'Side Effects', | |
| 'patient_age': 'Patient Age', | |
| 'patient_gender': 'Patient Gender', | |
| 'dosage': 'Dosage', | |
| 'duration': 'Duration', | |
| 'severity': 'Severity', | |
| 'outcome': 'Outcome' | |
| } | |
| for field, display_name in info_fields.items(): | |
| value = extracted_data.get(field, 'NaN') | |
| if value and value != 'NaN': | |
| provided_info.append(f"- **{display_name}:** {value}") | |
| else: | |
| missing_info.append(display_name) | |
| confirmation = f""" | |
| **✅ Adverse Drug Reaction Report Saved** | |
| **Report ID:** {report_id} | |
| **Documented Information:** | |
| {chr(10).join(provided_info) if provided_info else '- Basic side effect report recorded'} | |
| **Pharmacovigilance Status:** Report successfully saved {storage_location} for regulatory review. | |
| **Clinical Recommendations:** | |
| - Monitor patient for symptom progression | |
| - Consider dose adjustment or alternative therapy if appropriate | |
| - Document in patient medical record | |
| - Report serious reactions to pharmacovigilance authorities | |
| How can I assist you further with clinical guidance for this case? | |
| """ | |
| return confirmation.strip() | |
| except Exception as e: | |
| return f"Error saving side effect report: {str(e)}" | |
| def _extract_side_effect_data_with_llm(user_input: str) -> dict: | |
| """ | |
| Extract structured data from side effect report text using LLM-based extraction. | |
| Args: | |
| user_input (str): Raw input text containing side effect report | |
| Returns: | |
| dict: Structured data extracted from the input | |
| """ | |
| import json | |
| # Get current timestamp | |
| egypt_tz = pytz.timezone('Africa/Cairo') | |
| current_time = datetime.now(egypt_tz).strftime('%Y-%m-%d %H:%M:%S') | |
| # Initialize extracted data with defaults | |
| extracted_data = { | |
| 'timestamp': current_time, | |
| 'drug_name': 'NaN', | |
| 'side_effects': 'NaN', | |
| 'patient_age': 'NaN', | |
| 'patient_gender': 'NaN', | |
| 'dosage': 'NaN', | |
| 'duration': 'NaN', | |
| 'severity': 'NaN', | |
| 'outcome': 'NaN', | |
| 'additional_details': 'NaN', | |
| 'reporter_info': 'NaN', | |
| 'raw_input': user_input[:500] | |
| } | |
| llm = _get_llm_safe() | |
| if llm: | |
| try: | |
| system = SystemMessage(content=( | |
| "Extract medical side effect information. Return ONLY a JSON object with these exact fields: " | |
| "drug_name, side_effects, patient_age, patient_gender, dosage, duration, severity, outcome. " | |
| "If missing/unclear, use 'NaN'." | |
| )) | |
| human = HumanMessage(content=user_input[:2000]) | |
| response = llm.invoke([system, human]) | |
| text = (response.content or "").strip() | |
| # Try parse; if fails, fallback regex | |
| try: | |
| extracted_json = json.loads(text) | |
| except json.JSONDecodeError: | |
| extracted_json = _extract_with_improved_regex(user_input) | |
| except Exception: | |
| extracted_json = _extract_with_improved_regex(user_input) | |
| else: | |
| extracted_json = _extract_with_improved_regex(user_input) | |
| # Update extracted_data | |
| for key, value in extracted_json.items(): | |
| if key in extracted_data and value and str(value).strip() and str(value).strip().lower() != 'nan': | |
| extracted_data[key] = str(value).strip() | |
| return extracted_data | |
| def _extract_with_improved_regex(user_input: str) -> dict: | |
| """ | |
| Improved regex-based extraction with better duration handling. | |
| """ | |
| extracted = { | |
| 'drug_name': 'NaN', | |
| 'side_effects': 'NaN', | |
| 'patient_age': 'NaN', | |
| 'patient_gender': 'NaN', | |
| 'dosage': 'NaN', | |
| 'duration': 'NaN', | |
| 'severity': 'NaN', | |
| 'outcome': 'NaN' | |
| } | |
| input_lower = user_input.lower() | |
| # Extract drug names with improved patterns | |
| drug_patterns = [ | |
| r'\b(afinitor|cisplatin|afatinib|imatinib|dasatinib|nilotinib|bosutinib|ponatinib|bevacizumab|cetuximab|trastuzumab)\b', | |
| r'(?:found that|that)\s+([A-Za-z]{4,20})\s+(?:has|have)', | |
| r'([A-Za-z]{4,20})\s+(?:has|have)\s+(?:these\s+)?(?:side\s+effects?|adverse\s+effects?)', | |
| r'(?:taking|prescribed|given|on)\s+([A-Za-z][A-Za-z0-9\s\-]{2,20}?)(?:\s+(?:mg|mcg|g|ml)|\s+for|\.|,)', | |
| r'(?:side effects?|adverse effects?)\s+(?:of|from)\s+([A-Za-z][A-Za-z0-9\s\-]{2,20}?)(?:\s|,|\.|;)' | |
| ] | |
| for pattern in drug_patterns: | |
| matches = re.findall(pattern, user_input, re.IGNORECASE) | |
| if matches: | |
| drug_name = matches[0].strip() | |
| if len(drug_name) > 2 and not drug_name.lower() in ['that', 'these', 'those', 'found', 'have']: | |
| extracted['drug_name'] = drug_name | |
| break | |
| # Extract side effects | |
| symptom_patterns = [ | |
| r'(?:side effects?|symptoms?|adverse effects?)\s*[:\-]?\s*([^.!?]+?)(?:\.|!|\?|patient|$)', | |
| r'(?:has|have)\s+(?:these\s+)?(?:side effects?[:\s]+)?([A-Za-z][^.!?]*?)(?:\.|!|\?|patient|$)', | |
| r'(?:experienced|developed|suffered|had)\s+([^.!?]+?)(?:\.|!|\?|after|following|$)' | |
| ] | |
| for pattern in symptom_patterns: | |
| matches = re.findall(pattern, user_input, re.IGNORECASE) | |
| if matches: | |
| symptoms = matches[0].strip() | |
| if len(symptoms) > 3: | |
| extracted['side_effects'] = symptoms | |
| break | |
| # Extract patient age with better patterns | |
| age_patterns = [ | |
| r'patient\'?s?\s+age\s*[:\-]?\s*(\d{1,3})', | |
| r'age\s*[:\-]?\s*(\d{1,3})', | |
| r'(\d{1,3})\s*(?:years?\s+old|y/?o)', | |
| r'aged\s+(\d{1,3})' | |
| ] | |
| for pattern in age_patterns: | |
| matches = re.findall(pattern, user_input, re.IGNORECASE) | |
| if matches: | |
| age = int(matches[0]) | |
| if 0 <= age <= 120: | |
| extracted['patient_age'] = str(age) | |
| break | |
| # Extract patient gender | |
| if re.search(r'\b(?:male|man|boy|gentleman|he|his|him)\b', input_lower): | |
| extracted['patient_gender'] = 'Male' | |
| elif re.search(r'\b(?:female|woman|girl|lady|she|her)\b', input_lower): | |
| extracted['patient_gender'] = 'Female' | |
| # Extract dosage | |
| dosage_patterns = [ | |
| r'(?:medication\s+)?dosage\s*[:\-]?\s*([\d\.]+\s*(?:mg|mcg|g|ml|units?|tablets?|capsules?))', | |
| r'dosage\s*[:\-]?\s*([\d\.]+\s*ml)', | |
| r'(\d+(?:\.\d+)?\s*(?:mg|mcg|g|ml|units?|tablets?|capsules?))' | |
| ] | |
| for pattern in dosage_patterns: | |
| matches = re.findall(pattern, user_input, re.IGNORECASE) | |
| if matches: | |
| extracted['dosage'] = matches[0].strip() | |
| break | |
| # Extract duration with improved patterns | |
| duration_patterns = [ | |
| r'treatment\s+duration\s*[:\-]?\s*(\d+\s*(?:days?|weeks?|months?|years?))', | |
| r'duration\s*[:\-]?\s*(\d+\s*(?:days?|weeks?|months?|years?))', | |
| r'(?:for|over|during)\s+(\d+\s*(?:days?|weeks?|months?|years?))', | |
| r'(\d+\s*(?:days?|weeks?|months?|years?))\s+(?:of\s+)?(?:treatment|therapy)', | |
| r'(?:lasted|continuing for|ongoing for)\s+(\d+\s*(?:days?|weeks?|months?|years?))' | |
| ] | |
| for pattern in duration_patterns: | |
| matches = re.findall(pattern, user_input, re.IGNORECASE) | |
| if matches: | |
| extracted['duration'] = matches[0].strip() | |
| break | |
| # Extract severity | |
| severity_keywords = { | |
| 'mild': ['mild', 'slight', 'minor', 'light'], | |
| 'moderate': ['moderate', 'medium', 'noticeable'], | |
| 'severe': ['severe', 'serious', 'major', 'significant', 'intense', 'extreme'] | |
| } | |
| for severity, keywords in severity_keywords.items(): | |
| if any(keyword in input_lower for keyword in keywords): | |
| extracted['severity'] = severity.capitalize() | |
| break | |
| # Extract outcome | |
| outcome_keywords = { | |
| 'recovered': ['recovered', 'resolved', 'better', 'improved'], | |
| 'ongoing': ['ongoing', 'continuing', 'persistent', 'current status: ongoing'], | |
| 'worsened': ['worsened', 'deteriorated', 'worse'], | |
| 'hospitalized': ['hospitalized', 'admitted', 'emergency'] | |
| } | |
| for outcome, keywords in outcome_keywords.items(): | |
| if any(keyword in input_lower for keyword in keywords): | |
| extracted['outcome'] = outcome.capitalize() | |
| break | |
| return extracted | |
| def _extract_side_effect_data(user_input: str) -> dict: | |
| """ | |
| Extract structured data from side effect report text. | |
| Args: | |
| user_input (str): Raw input text containing side effect report | |
| Returns: | |
| dict: Structured data extracted from the input | |
| """ | |
| # Use the new LLM-based extraction | |
| return _extract_side_effect_data_with_llm(user_input) | |