File size: 13,422 Bytes
731a1c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9dfe3e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
731a1c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84a71ec
731a1c4
 
 
 
 
 
 
 
 
 
 
9dfe3e2
731a1c4
9dfe3e2
731a1c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9dfe3e2
731a1c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9dfe3e2
731a1c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84a71ec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
731a1c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9dfe3e2
731a1c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84a71ec
731a1c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
"""
function_calling.py β€” OpenFDA API tool functions for PharmaGuide.

Each function queries a specific OpenFDA endpoint and returns clean,
plain-text results ready to pass to the model or display in the UI.

API facts:
  Base URL  : https://api.fda.gov/
  Auth      : No key needed for basic use (1 000 req/day unauthenticated)
              Set OPENFDA_API_KEY env var to raise limit to 40 req/min
  Docs      : https://open.fda.gov/apis/

All functions follow the same contract:
  - Accept a drug name string (case-insensitive)
  - Return a plain string on success
  - Return an empty string "" on any error (caller handles gracefully)
  - Never raise exceptions to the caller
"""

import os
import re
import time
from typing import Optional
import requests

# ── Config ───────────────────────────────────────────────────────────────────

_BASE_URL  = "https://api.fda.gov/drug/label.json"
_EVENT_URL = "https://api.fda.gov/drug/event.json"

# Optional: register a free key at https://open.fda.gov/apis/authentication/
# and set it as an env var.  Without it the limit is 1 000 req/day.
_API_KEY = os.environ.get("OPENFDA_API_KEY", "")

# Simple in-process cache: {cache_key: (timestamp, result)}
# TTL = 3 600 s (1 hour) β€” FDA labels don't change intra-day.
_CACHE: dict = {}
_CACHE_TTL   = 3_600  # seconds

# ── Keyword lists for lifestyle warning extraction ───────────────────────────

_LIFESTYLE_KEYWORDS = {
    "alcohol":   ["alcohol", "drinking", "drink", "wine", "beer", "liquor", "ethanol"],
    "grapefruit":["grapefruit", "citrus juice", "pomelo"],
    "food":      ["take with food", "take with a meal", "on an empty stomach",
                  "without food", "before eating", "after eating", "with meals"],
    "dairy":     ["dairy", "milk", "antacid", "calcium", "iron", "mineral"],
    "sun":       ["sun", "sunlight", "UV", "photosensitivity", "sunscreen",
                  "ultraviolet", "sunburn"],
    "driving":   ["driving", "operate machinery", "drowsy", "drowsiness",
                  "sedation", "dizzy", "dizziness", "alertness", "operate vehicle"],
    "exercise":  ["exercise", "heat", "dehydration", "dehydrated", "hot weather",
                  "strenuous activity", "sweating", "exertion"],
}

# ── Helpers ──────────────────────────────────────────────────────────────────

def _get(url: str, params: dict) -> Optional[dict]:
    """GET request with caching, retry on 429, and silent error handling."""
    if _API_KEY:
        params["api_key"] = _API_KEY

    cache_key = url + str(sorted(params.items()))
    now = time.time()
    if cache_key in _CACHE:
        ts, result = _CACHE[cache_key]
        if now - ts < _CACHE_TTL:
            return result

    try:
        resp = requests.get(url, params=params, timeout=10)
        if resp.status_code == 429:
            time.sleep(2)
            resp = requests.get(url, params=params, timeout=10)
        if resp.status_code != 200:
            _CACHE[cache_key] = (now, None)
            return None
        data = resp.json()
        _CACHE[cache_key] = (now, data)
        return data
    except Exception:
        return None


def _search_query(drug_name: str) -> str:
    """Build an OpenFDA search query that checks brand AND generic name fields."""
    safe = requests.utils.quote(drug_name)
    return (
        f'openfda.brand_name:"{safe}"+openfda.generic_name:"{safe}"'
    )


def _search_query_exact(drug_name: str) -> str:
    """
    Prefer an exact generic name match to avoid combination drug brand names
    (e.g. searching 'metformin' should not return ZITUVIMET/JANUMET).
    Falls back to the broad query if the exact match returns nothing.
    """
    safe = requests.utils.quote(drug_name)
    return f'openfda.generic_name.exact:"{safe}"'


def _get_with_fallback(url: str, drug_name: str, extra_params: dict = None) -> Optional[dict]:
    """
    Try an exact generic name query first; if no results, fall back to the
    broad brand+generic query.  Avoids returning combination-drug labels
    (e.g. ZITUVIMET) when the user just asked about metformin.
    """
    params = {"limit": 1}
    if extra_params:
        params.update(extra_params)

    params["search"] = _search_query_exact(drug_name)
    data = _get(url, params)
    if data and data.get("results"):
        return data

    params["search"] = _search_query(drug_name)
    return _get(url, params)


def _first_result(data: Optional[dict]) -> Optional[dict]:
    """Return the first result record from an OpenFDA response, or None."""
    if not data:
        return None
    results = data.get("results", [])
    return results[0] if results else None


def _extract_field(record: Optional[dict], *field_names: str) -> str:
    """
    Extract the first non-empty value from one of the given field names in
    an OpenFDA label record.  FDA stores fields as lists of strings.
    Returns a plain string with whitespace normalised, or "".
    """
    if not record:
        return ""
    for field in field_names:
        val = record.get(field)
        if val:
            raw = val[0] if isinstance(val, list) else val
            # Collapse excessive whitespace from FDA's raw text
            return re.sub(r"\s+", " ", str(raw)).strip()[:2000]
    return ""


# ── Public tool functions ────────────────────────────────────────────────────

def get_drug_label(drug_name: str) -> dict:
    """
    Fetch the full drug label record from OpenFDA for a given drug name.

    Returns a dict with keys: warnings, drug_interactions, dosage,
    geriatric_use, indications, adverse_reactions.
    All values are plain strings (≀ 2000 chars each), or "" if not found.
    """
    data   = _get_with_fallback(_BASE_URL, drug_name)
    record = _first_result(data)

    return {
        "warnings":         _extract_field(record, "warnings", "warnings_and_cautions"),
        "drug_interactions":_extract_field(record, "drug_interactions"),
        "dosage":           _extract_field(record, "dosage_and_administration"),
        "geriatric_use":    _extract_field(record, "geriatric_use"),
        "indications":      _extract_field(record, "indications_and_usage"),
        "adverse_reactions":_extract_field(record, "adverse_reactions"),
    }


def check_drug_interactions(drug_name: str) -> str:
    """
    Get drug interaction warnings for a specific drug from its FDA label.

    Returns a plain string with the interaction section text, or "" if
    not available.  Suitable for direct display or passing to the model.
    """
    data   = _get_with_fallback(_BASE_URL, drug_name)
    record = _first_result(data)
    return _extract_field(record, "drug_interactions")


def get_adverse_events(drug_name: str, limit: int = 8) -> str:
    """
    Get the most commonly reported adverse events for a drug from FDA FAERS.

    Queries the /drug/event endpoint and counts reaction terms.
    Returns a plain comma-separated string of the top reactions, or "".

    Args:
        drug_name: Drug name to query.
        limit:     Number of top reactions to return (default 8).
    """
    params = {
        "search": f'patient.drug.medicinalproduct:"{requests.utils.quote(drug_name)}"',
        "count":  "patient.reaction.reactionmeddrapt.exact",
        "limit":  limit,
    }
    data = _get(_EVENT_URL, params)
    if not data:
        return ""
    results = data.get("results", [])
    if not results:
        return ""
    # Each result: {"term": "NAUSEA", "count": 12345}
    terms = [r["term"].lower().replace("_", " ") for r in results]
    return ", ".join(terms)


def get_geriatric_warnings(drug_name: str) -> str:
    """
    Extract geriatric-specific information from a drug's FDA label.

    Returns the geriatric_use section text, or falls back to scanning
    the warnings section for age-related keywords if geriatric_use is empty.
    """
    data   = _get_with_fallback(_BASE_URL, drug_name)
    record = _first_result(data)

    geriatric = _extract_field(record, "geriatric_use")
    if geriatric:
        return geriatric

    # Fallback: scan warnings for age-related content
    warnings = _extract_field(record, "warnings", "warnings_and_cautions")
    if not warnings:
        return ""
    age_keywords = ["older adult", "elderly", "geriatric", "65 years", "aged"]
    sentences = [s.strip() for s in re.split(r"[.!?]", warnings) if s.strip()]
    relevant  = [s for s in sentences
                 if any(kw in s.lower() for kw in age_keywords)]
    return ". ".join(relevant[:3]) + "." if relevant else ""


_SECTION_HEADER_RE = re.compile(r"^\s*\d+(?:\.\d+)?\s+[A-Z][A-Z\s]+")   # "7 DRUG INTERACTIONS"
_PAREN_REF_RE      = re.compile(r"\(\s*\d+(?:\.\d+)?\s*\)")              # "( 5.1 )"
_LEADING_NUM_RE    = re.compile(r"^\s*\d+(?:\.\d+)?\s+")                 # "2 DOSAGE..."


def _clean_lifestyle_sentence(sentence: str) -> str:
    """
    Strip FDA formatting artifacts from a single sentence before display.
    Returns "" if the sentence is just a section header with no useful content.
    """
    # Reject pure section headers like "7 DRUG INTERACTIONS"
    if _SECTION_HEADER_RE.match(sentence) and len(sentence.split()) <= 5:
        return ""

    # Strip inline section references like "( 5.1 )" or "( 2 )"
    sentence = _PAREN_REF_RE.sub("", sentence)

    # Strip leading section numbers like "2 DOSAGE AND ADMINISTRATION"
    sentence = _LEADING_NUM_RE.sub("", sentence)

    # Collapse whitespace and truncate
    sentence = re.sub(r"\s+", " ", sentence).strip()

    # Reject if too short after cleaning or still looks like a header (all caps)
    if len(sentence) < 20 or sentence.isupper():
        return ""

    # Drop sentences that are too long to be a single clean thought
    # (likely mid-paragraph FDA text split at a bad boundary)
    if len(sentence) > 180:
        return ""

    return sentence


def get_lifestyle_warnings(drug_list: list[str]) -> dict:
    """
    Extract food, alcohol, and lifestyle interaction warnings for a list of drugs.

    Scans the warnings and drug_interactions FDA fields for lifestyle-related
    keywords and returns structured plain-language warnings per drug.

    Args:
        drug_list: List of drug name strings.

    Returns:
        Dict keyed by drug name.  Each value is a dict keyed by lifestyle
        category ("alcohol", "grapefruit", "food", "dairy", "sun", "driving",
        "exercise"), with a list of relevant sentence strings as values.
        Categories with no matches are omitted from the inner dict.

    Example return value:
        {
            "metformin": {
                "alcohol": ["Avoid alcohol while taking metformin..."],
                "food":    ["Take metformin with a meal..."],
            },
            "atorvastatin": {
                "grapefruit": ["Avoid grapefruit juice..."],
            },
        }
    """
    result = {}

    for drug_name in drug_list:
        data   = _get_with_fallback(_BASE_URL, drug_name)
        record = _first_result(data)

        # Combine all text fields that might mention lifestyle interactions
        raw_text = " ".join(filter(None, [
            _extract_field(record, "warnings", "warnings_and_cautions"),
            _extract_field(record, "drug_interactions"),
            _extract_field(record, "dosage_and_administration"),
        ]))

        if not raw_text:
            continue

        sentences = [s.strip() for s in re.split(r"[.!?\n]", raw_text) if len(s.strip()) > 15]
        drug_warnings: dict[str, list[str]] = {}

        for category, keywords in _LIFESTYLE_KEYWORDS.items():
            hits = []
            for sentence in sentences:
                sentence_lower = sentence.lower()
                if any(kw in sentence_lower for kw in keywords):
                    clean = _clean_lifestyle_sentence(sentence)
                    if clean and clean not in hits:
                        hits.append(clean)
            if hits:
                drug_warnings[category] = hits[:2]  # max 2 sentences per category

        if drug_warnings:
            result[drug_name] = drug_warnings

    return result


# ── Convenience: fetch all data for a drug list in one call ─────────────────

def fetch_all_drug_data(drug_list: list[str]) -> dict:
    """
    Fetch label data and lifestyle warnings for every drug in drug_list.

    Returns a dict keyed by drug name, each value containing:
        label    : dict from get_drug_label()
        lifestyle: dict from get_lifestyle_warnings() for this drug only
        events   : str from get_adverse_events()

    Suitable for passing to prompts.build_fda_context_prompt().
    """
    lifestyle_all = get_lifestyle_warnings(drug_list)
    combined = {}
    for name in drug_list:
        combined[name] = {
            "label":     get_drug_label(name),
            "lifestyle": lifestyle_all.get(name, {}),
            "events":    get_adverse_events(name),
        }
    return combined