Spaces:
Sleeping
Sleeping
| """ | |
| function_calling.py β OpenFDA API tool functions for PharmaGuide. | |
| Each function queries a specific OpenFDA endpoint and returns clean, | |
| plain-text results ready to pass to the model or display in the UI. | |
| API facts: | |
| Base URL : https://api.fda.gov/ | |
| Auth : No key needed for basic use (1 000 req/day unauthenticated) | |
| Set OPENFDA_API_KEY env var to raise limit to 40 req/min | |
| Docs : https://open.fda.gov/apis/ | |
| All functions follow the same contract: | |
| - Accept a drug name string (case-insensitive) | |
| - Return a plain string on success | |
| - Return an empty string "" on any error (caller handles gracefully) | |
| - Never raise exceptions to the caller | |
| """ | |
| import os | |
| import re | |
| import time | |
| from typing import Optional | |
| import requests | |
| # ββ Config βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _BASE_URL = "https://api.fda.gov/drug/label.json" | |
| _EVENT_URL = "https://api.fda.gov/drug/event.json" | |
| # Optional: register a free key at https://open.fda.gov/apis/authentication/ | |
| # and set it as an env var. Without it the limit is 1 000 req/day. | |
| _API_KEY = os.environ.get("OPENFDA_API_KEY", "") | |
| # Simple in-process cache: {cache_key: (timestamp, result)} | |
| # TTL = 3 600 s (1 hour) β FDA labels don't change intra-day. | |
| _CACHE: dict = {} | |
| _CACHE_TTL = 3_600 # seconds | |
| # ββ Keyword lists for lifestyle warning extraction βββββββββββββββββββββββββββ | |
| _LIFESTYLE_KEYWORDS = { | |
| "alcohol": ["alcohol", "drinking", "drink", "wine", "beer", "liquor", "ethanol"], | |
| "grapefruit":["grapefruit", "citrus juice", "pomelo"], | |
| "food": ["take with food", "take with a meal", "on an empty stomach", | |
| "without food", "before eating", "after eating", "with meals"], | |
| "dairy": ["dairy", "milk", "antacid", "calcium", "iron", "mineral"], | |
| "sun": ["sun", "sunlight", "UV", "photosensitivity", "sunscreen", | |
| "ultraviolet", "sunburn"], | |
| "driving": ["driving", "operate machinery", "drowsy", "drowsiness", | |
| "sedation", "dizzy", "dizziness", "alertness", "operate vehicle"], | |
| "exercise": ["exercise", "heat", "dehydration", "dehydrated", "hot weather", | |
| "strenuous activity", "sweating", "exertion"], | |
| } | |
| # ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get(url: str, params: dict) -> Optional[dict]: | |
| """GET request with caching, retry on 429, and silent error handling.""" | |
| if _API_KEY: | |
| params["api_key"] = _API_KEY | |
| cache_key = url + str(sorted(params.items())) | |
| now = time.time() | |
| if cache_key in _CACHE: | |
| ts, result = _CACHE[cache_key] | |
| if now - ts < _CACHE_TTL: | |
| return result | |
| try: | |
| resp = requests.get(url, params=params, timeout=10) | |
| if resp.status_code == 429: | |
| time.sleep(2) | |
| resp = requests.get(url, params=params, timeout=10) | |
| if resp.status_code != 200: | |
| _CACHE[cache_key] = (now, None) | |
| return None | |
| data = resp.json() | |
| _CACHE[cache_key] = (now, data) | |
| return data | |
| except Exception: | |
| return None | |
| def _search_query(drug_name: str) -> str: | |
| """Build an OpenFDA search query that checks brand AND generic name fields.""" | |
| safe = requests.utils.quote(drug_name) | |
| return ( | |
| f'openfda.brand_name:"{safe}"+openfda.generic_name:"{safe}"' | |
| ) | |
| def _search_query_exact(drug_name: str) -> str: | |
| """ | |
| Prefer an exact generic name match to avoid combination drug brand names | |
| (e.g. searching 'metformin' should not return ZITUVIMET/JANUMET). | |
| Falls back to the broad query if the exact match returns nothing. | |
| """ | |
| safe = requests.utils.quote(drug_name) | |
| return f'openfda.generic_name.exact:"{safe}"' | |
| def _get_with_fallback(url: str, drug_name: str, extra_params: dict = None) -> Optional[dict]: | |
| """ | |
| Try an exact generic name query first; if no results, fall back to the | |
| broad brand+generic query. Avoids returning combination-drug labels | |
| (e.g. ZITUVIMET) when the user just asked about metformin. | |
| """ | |
| params = {"limit": 1} | |
| if extra_params: | |
| params.update(extra_params) | |
| params["search"] = _search_query_exact(drug_name) | |
| data = _get(url, params) | |
| if data and data.get("results"): | |
| return data | |
| params["search"] = _search_query(drug_name) | |
| return _get(url, params) | |
| def _first_result(data: Optional[dict]) -> Optional[dict]: | |
| """Return the first result record from an OpenFDA response, or None.""" | |
| if not data: | |
| return None | |
| results = data.get("results", []) | |
| return results[0] if results else None | |
| def _extract_field(record: Optional[dict], *field_names: str) -> str: | |
| """ | |
| Extract the first non-empty value from one of the given field names in | |
| an OpenFDA label record. FDA stores fields as lists of strings. | |
| Returns a plain string with whitespace normalised, or "". | |
| """ | |
| if not record: | |
| return "" | |
| for field in field_names: | |
| val = record.get(field) | |
| if val: | |
| raw = val[0] if isinstance(val, list) else val | |
| # Collapse excessive whitespace from FDA's raw text | |
| return re.sub(r"\s+", " ", str(raw)).strip()[:2000] | |
| return "" | |
| # ββ Public tool functions ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_drug_label(drug_name: str) -> dict: | |
| """ | |
| Fetch the full drug label record from OpenFDA for a given drug name. | |
| Returns a dict with keys: warnings, drug_interactions, dosage, | |
| geriatric_use, indications, adverse_reactions. | |
| All values are plain strings (β€ 2000 chars each), or "" if not found. | |
| """ | |
| data = _get_with_fallback(_BASE_URL, drug_name) | |
| record = _first_result(data) | |
| return { | |
| "warnings": _extract_field(record, "warnings", "warnings_and_cautions"), | |
| "drug_interactions":_extract_field(record, "drug_interactions"), | |
| "dosage": _extract_field(record, "dosage_and_administration"), | |
| "geriatric_use": _extract_field(record, "geriatric_use"), | |
| "indications": _extract_field(record, "indications_and_usage"), | |
| "adverse_reactions":_extract_field(record, "adverse_reactions"), | |
| } | |
| def check_drug_interactions(drug_name: str) -> str: | |
| """ | |
| Get drug interaction warnings for a specific drug from its FDA label. | |
| Returns a plain string with the interaction section text, or "" if | |
| not available. Suitable for direct display or passing to the model. | |
| """ | |
| data = _get_with_fallback(_BASE_URL, drug_name) | |
| record = _first_result(data) | |
| return _extract_field(record, "drug_interactions") | |
| def get_adverse_events(drug_name: str, limit: int = 8) -> str: | |
| """ | |
| Get the most commonly reported adverse events for a drug from FDA FAERS. | |
| Queries the /drug/event endpoint and counts reaction terms. | |
| Returns a plain comma-separated string of the top reactions, or "". | |
| Args: | |
| drug_name: Drug name to query. | |
| limit: Number of top reactions to return (default 8). | |
| """ | |
| params = { | |
| "search": f'patient.drug.medicinalproduct:"{requests.utils.quote(drug_name)}"', | |
| "count": "patient.reaction.reactionmeddrapt.exact", | |
| "limit": limit, | |
| } | |
| data = _get(_EVENT_URL, params) | |
| if not data: | |
| return "" | |
| results = data.get("results", []) | |
| if not results: | |
| return "" | |
| # Each result: {"term": "NAUSEA", "count": 12345} | |
| terms = [r["term"].lower().replace("_", " ") for r in results] | |
| return ", ".join(terms) | |
| def get_geriatric_warnings(drug_name: str) -> str: | |
| """ | |
| Extract geriatric-specific information from a drug's FDA label. | |
| Returns the geriatric_use section text, or falls back to scanning | |
| the warnings section for age-related keywords if geriatric_use is empty. | |
| """ | |
| data = _get_with_fallback(_BASE_URL, drug_name) | |
| record = _first_result(data) | |
| geriatric = _extract_field(record, "geriatric_use") | |
| if geriatric: | |
| return geriatric | |
| # Fallback: scan warnings for age-related content | |
| warnings = _extract_field(record, "warnings", "warnings_and_cautions") | |
| if not warnings: | |
| return "" | |
| age_keywords = ["older adult", "elderly", "geriatric", "65 years", "aged"] | |
| sentences = [s.strip() for s in re.split(r"[.!?]", warnings) if s.strip()] | |
| relevant = [s for s in sentences | |
| if any(kw in s.lower() for kw in age_keywords)] | |
| return ". ".join(relevant[:3]) + "." if relevant else "" | |
| _SECTION_HEADER_RE = re.compile(r"^\s*\d+(?:\.\d+)?\s+[A-Z][A-Z\s]+") # "7 DRUG INTERACTIONS" | |
| _PAREN_REF_RE = re.compile(r"\(\s*\d+(?:\.\d+)?\s*\)") # "( 5.1 )" | |
| _LEADING_NUM_RE = re.compile(r"^\s*\d+(?:\.\d+)?\s+") # "2 DOSAGE..." | |
| def _clean_lifestyle_sentence(sentence: str) -> str: | |
| """ | |
| Strip FDA formatting artifacts from a single sentence before display. | |
| Returns "" if the sentence is just a section header with no useful content. | |
| """ | |
| # Reject pure section headers like "7 DRUG INTERACTIONS" | |
| if _SECTION_HEADER_RE.match(sentence) and len(sentence.split()) <= 5: | |
| return "" | |
| # Strip inline section references like "( 5.1 )" or "( 2 )" | |
| sentence = _PAREN_REF_RE.sub("", sentence) | |
| # Strip leading section numbers like "2 DOSAGE AND ADMINISTRATION" | |
| sentence = _LEADING_NUM_RE.sub("", sentence) | |
| # Collapse whitespace and truncate | |
| sentence = re.sub(r"\s+", " ", sentence).strip() | |
| # Reject if too short after cleaning or still looks like a header (all caps) | |
| if len(sentence) < 20 or sentence.isupper(): | |
| return "" | |
| # Drop sentences that are too long to be a single clean thought | |
| # (likely mid-paragraph FDA text split at a bad boundary) | |
| if len(sentence) > 180: | |
| return "" | |
| return sentence | |
| def get_lifestyle_warnings(drug_list: list[str]) -> dict: | |
| """ | |
| Extract food, alcohol, and lifestyle interaction warnings for a list of drugs. | |
| Scans the warnings and drug_interactions FDA fields for lifestyle-related | |
| keywords and returns structured plain-language warnings per drug. | |
| Args: | |
| drug_list: List of drug name strings. | |
| Returns: | |
| Dict keyed by drug name. Each value is a dict keyed by lifestyle | |
| category ("alcohol", "grapefruit", "food", "dairy", "sun", "driving", | |
| "exercise"), with a list of relevant sentence strings as values. | |
| Categories with no matches are omitted from the inner dict. | |
| Example return value: | |
| { | |
| "metformin": { | |
| "alcohol": ["Avoid alcohol while taking metformin..."], | |
| "food": ["Take metformin with a meal..."], | |
| }, | |
| "atorvastatin": { | |
| "grapefruit": ["Avoid grapefruit juice..."], | |
| }, | |
| } | |
| """ | |
| result = {} | |
| for drug_name in drug_list: | |
| data = _get_with_fallback(_BASE_URL, drug_name) | |
| record = _first_result(data) | |
| # Combine all text fields that might mention lifestyle interactions | |
| raw_text = " ".join(filter(None, [ | |
| _extract_field(record, "warnings", "warnings_and_cautions"), | |
| _extract_field(record, "drug_interactions"), | |
| _extract_field(record, "dosage_and_administration"), | |
| ])) | |
| if not raw_text: | |
| continue | |
| sentences = [s.strip() for s in re.split(r"[.!?\n]", raw_text) if len(s.strip()) > 15] | |
| drug_warnings: dict[str, list[str]] = {} | |
| for category, keywords in _LIFESTYLE_KEYWORDS.items(): | |
| hits = [] | |
| for sentence in sentences: | |
| sentence_lower = sentence.lower() | |
| if any(kw in sentence_lower for kw in keywords): | |
| clean = _clean_lifestyle_sentence(sentence) | |
| if clean and clean not in hits: | |
| hits.append(clean) | |
| if hits: | |
| drug_warnings[category] = hits[:2] # max 2 sentences per category | |
| if drug_warnings: | |
| result[drug_name] = drug_warnings | |
| return result | |
| # ββ Convenience: fetch all data for a drug list in one call βββββββββββββββββ | |
| def fetch_all_drug_data(drug_list: list[str]) -> dict: | |
| """ | |
| Fetch label data and lifestyle warnings for every drug in drug_list. | |
| Returns a dict keyed by drug name, each value containing: | |
| label : dict from get_drug_label() | |
| lifestyle: dict from get_lifestyle_warnings() for this drug only | |
| events : str from get_adverse_events() | |
| Suitable for passing to prompts.build_fda_context_prompt(). | |
| """ | |
| lifestyle_all = get_lifestyle_warnings(drug_list) | |
| combined = {} | |
| for name in drug_list: | |
| combined[name] = { | |
| "label": get_drug_label(name), | |
| "lifestyle": lifestyle_all.get(name, {}), | |
| "events": get_adverse_events(name), | |
| } | |
| return combined | |