""" brief.py — Threat brief output schema and HTML renderer. """ import json import re from dataclasses import dataclass, field, asdict from typing import List, Optional # --------------------------------------------------------------------------- # Data schema # --------------------------------------------------------------------------- @dataclass class NewsItem: title: str = "" source: str = "" published: str = "" summary: str = "" url: str = "" notable: bool = False def to_dict(self) -> dict: return asdict(self) @dataclass class ThreatBrief: region: str = "" country: str = "" assessment_date: str = "" severity: str = "" confidence: str = "" primary_actors: List[str] = field(default_factory=list) event_types: List[str] = field(default_factory=list) key_locations: List[str] = field(default_factory=list) fatalities_reported: Optional[int] = None country_summary: str = "" narrative_summary: str = "" risk_analysis: str = "" key_findings: List[str] = field(default_factory=list) indicators_of_escalation: List[str] = field(default_factory=list) recommended_watch_items: List[str] = field(default_factory=list) source_types_used: List[str] = field(default_factory=list) recent_events: List[str] = field(default_factory=list) notable_news: List[NewsItem] = field(default_factory=list) # Travel advisory travel_advisory_level: str = "" travel_advisory_level_text: str = "" travel_advisory_indicators: List[str] = field(default_factory=list) travel_advisory_date: str = "" travel_advisory_url: str = "" # Embassy / consulate passport_country: str = "" embassy_name: str = "" embassy_address: str = "" embassy_phone: str = "" embassy_emergency_phone: str = "" embassy_website: str = "" embassy_notes: str = "" # Airspace airspace_status: str = "" airspace_restrictions: List[str] = field(default_factory=list) no_fly_zones: List[str] = field(default_factory=list) air_defense_activity: List[str] = field(default_factory=list) aviation_notes: str = "" def to_dict(self) -> dict: d = asdict(self) d["notable_news"] = [n.to_dict() for n in self.notable_news] return d # --------------------------------------------------------------------------- # Severity / confidence / advisory colors # --------------------------------------------------------------------------- SEVERITY_COLORS = { "Critical": "#C0392B", "High": "#E67E22", "Medium": "#F39C12", "Low": "#27AE60", "Minimal": "#7F8C8D", } CONFIDENCE_COLORS = { "High": "#27AE60", "Medium": "#F39C12", "Low": "#E74C3C", } ADVISORY_LEVEL_COLORS = { "1": "#27AE60", "2": "#F39C12", "3": "#E67E22", "4": "#C0392B", } ADVISORY_LEVEL_LABELS = { "1": "Exercise Normal Precautions", "2": "Exercise Increased Caution", "3": "Reconsider Travel", "4": "Do Not Travel", } AIRSPACE_STATUS_COLORS = { "Open": "#27AE60", "Restricted": "#E67E22", "Partially Restricted": "#F39C12", "Closed": "#C0392B", "Unknown": "#7F8C8D", } # --------------------------------------------------------------------------- # Prompt schema # --------------------------------------------------------------------------- BRIEF_PROMPT_SCHEMA = """ You are an OSINT intelligence analyst. Based on the collected source data above, produce a threat brief as a JSON object with EXACTLY these fields: { "region": "", "country": "", "assessment_date": "", "severity": "", "confidence": "", "primary_actors": ["", ""], "event_types": ["", ""], "key_locations": ["", ""], "fatalities_reported": , "country_summary": "<3-5 sentence factual background on the country: geography, population, political system, recent history, and why it is geopolitically significant>", "narrative_summary": "<2-4 sentence analytical narrative synthesizing all sources>", "risk_analysis": "<3-5 sentence focused risk assessment covering: (1) primary threat vectors and actors, (2) likelihood of near-term escalation, (3) risk to civilians, travelers, and humanitarian operations, (4) any compounding factors such as economic collapse, displacement, or regional spillover>", "key_findings": ["", "", ""], "indicators_of_escalation": ["", ""], "recommended_watch_items": ["", ""], "source_types_used": ["ACLED", "RSS", "State Dept Travel Advisory", "AIRSPACE"], "recent_events": [ "", "" ], "travel_advisory": { "level": "<1|2|3|4 — just the number, or 'Unknown'>", "level_text": "", "indicators": ["", ""], "date_updated": "", "url": "" }, "embassy": { "name": "", "address": "", "phone": "
", "emergency_phone": "", "website": "", "notes": "<1-2 sentences: nearest consulate if no embassy, appointment requirements, or emergency procedures>" }, "airspace_status": "", "airspace_restrictions": ["", ""], "no_fly_zones": ["", ""], "air_defense_activity": ["", ""], "aviation_notes": "<1-2 sentence summary of overall airspace picture and commercial aviation impact>", "notable_news": [ { "title": "
", "source": "", "published": "", "summary": "<1-2 sentence analytical summary explaining why this article matters>", "url": "
", "notable": true } ] } For embassy: provide the PASSPORT_COUNTRY's embassy or nearest consulate in the destination country. If no embassy exists there, name the nearest one in a neighbouring country and explain in notes. Use your best knowledge of official embassy details; always include the website so the traveller can verify. For recent_events: list each significant ACLED event as a single line in the format "DATE | LOCATION — description, N fatalities". Include up to 10 events ordered most-recent first. If ACLED returned no data, use an empty array []. For airspace fields: use data from the AIRSPACE tool output (EASA CZIBs, SIGMETs, NOTAMs). If no airspace data was retrieved, set airspace_status to "Unknown" and use empty arrays. Be concise and factual — cite specific restrictions, zones, or incidents where available. For notable_news: include ALL relevant articles from the RSS results, up to 10. Write the summary field in your own analytical words — explain WHY the article matters to the threat picture, not just what it says. If no RSS articles were retrieved, use an empty array []. Return ONLY the JSON object. No preamble, no markdown fences. """ # --------------------------------------------------------------------------- # Parser # --------------------------------------------------------------------------- def parse_brief_from_llm(raw_text: str) -> ThreatBrief: cleaned = re.sub(r"```json|```", "", raw_text).strip() match = re.search(r"\{.*\}", cleaned, re.DOTALL) if not match: return _fallback_brief(raw_text) try: data = json.loads(match.group()) news_items = [] for n in data.get("notable_news", []): news_items.append(NewsItem( title = n.get("title", ""), source = n.get("source", ""), published= n.get("published", ""), summary = n.get("summary", ""), url = n.get("url", ""), notable = n.get("notable", True), )) ta = data.get("travel_advisory", {}) em = data.get("embassy", {}) return ThreatBrief( region = data.get("region", ""), country = data.get("country", ""), assessment_date = data.get("assessment_date", ""), severity = data.get("severity", "Unknown"), confidence = data.get("confidence", "Low"), primary_actors = data.get("primary_actors", []), event_types = data.get("event_types", []), key_locations = data.get("key_locations", []), fatalities_reported = data.get("fatalities_reported"), country_summary = data.get("country_summary", ""), narrative_summary = data.get("narrative_summary", raw_text), risk_analysis = data.get("risk_analysis", ""), key_findings = data.get("key_findings", []), indicators_of_escalation = data.get("indicators_of_escalation", []), recommended_watch_items = data.get("recommended_watch_items", []), source_types_used = data.get("source_types_used", []), recent_events = data.get("recent_events", []), notable_news = news_items, travel_advisory_level = str(ta.get("level", "")), travel_advisory_level_text= ta.get("level_text", ""), travel_advisory_indicators= ta.get("indicators", []), travel_advisory_date = ta.get("date_updated", ""), travel_advisory_url = ta.get("url", ""), embassy_name = em.get("name", ""), embassy_address = em.get("address", ""), embassy_phone = em.get("phone", ""), embassy_emergency_phone = em.get("emergency_phone", ""), embassy_website = em.get("website", ""), embassy_notes = em.get("notes", ""), airspace_status = data.get("airspace_status", "Unknown"), airspace_restrictions = data.get("airspace_restrictions", []), no_fly_zones = data.get("no_fly_zones", []), air_defense_activity = data.get("air_defense_activity", []), aviation_notes = data.get("aviation_notes", ""), ) except json.JSONDecodeError: return _fallback_brief(raw_text) def _fallback_brief(raw_text: str) -> ThreatBrief: return ThreatBrief( narrative_summary = raw_text, severity = "Unknown", confidence = "Low", key_findings = ["Structured parsing failed — see narrative summary."], ) # --------------------------------------------------------------------------- # HTML sub-renderers # --------------------------------------------------------------------------- def _render_travel_advisory(brief: ThreatBrief) -> str: level = brief.travel_advisory_level.strip() if not level or level == "Unknown": return """

US State Department travel advisory not available for this country.

""" color = ADVISORY_LEVEL_COLORS.get(level, "#999") label = ADVISORY_LEVEL_LABELS.get(level, brief.travel_advisory_level_text or f"Level {level}") indicators_html = ( "  ·  ".join( f'{i}' for i in brief.travel_advisory_indicators ) if brief.travel_advisory_indicators else "None listed" ) link_html = ( f'  Full advisory →' if brief.travel_advisory_url else "" ) date_html = ( f'Updated: {brief.travel_advisory_date}' if brief.travel_advisory_date else "" ) return f"""

🗺️ US State Dept Travel Advisory Level {level}: {label} {link_html}

Risk categories:  {indicators_html}
{date_html}
""" def _render_embassy(brief: ThreatBrief) -> str: if not brief.passport_country: return "" if not brief.embassy_name: return f"""

Embassy information for {brief.passport_country} passport holders not available.

""" website_html = ( f'{brief.embassy_website}' if brief.embassy_website else "Not available" ) rows = [] if brief.embassy_address: rows.append(f"Address" f"{brief.embassy_address}") if brief.embassy_phone: rows.append(f"Main phone" f"{brief.embassy_phone}") if brief.embassy_emergency_phone: rows.append(f"Emergency line" f"{brief.embassy_emergency_phone}") rows.append(f"Website" f"{website_html}") notes_html = ( f'

{brief.embassy_notes}

' if brief.embassy_notes else "" ) return f"""

🏛️ {brief.passport_country} Embassy / Consulate in {brief.country}

{brief.embassy_name}
{"".join(rows)}
{notes_html}

Verify contact details before travel — embassy information can change.

""" # --------------------------------------------------------------------------- # HTML Renderer # --------------------------------------------------------------------------- def render_brief_html(brief: ThreatBrief) -> str: sev_color = SEVERITY_COLORS.get(brief.severity, "#999") conf_color = CONFIDENCE_COLORS.get(brief.confidence, "#999") def badge(label: str, color: str) -> str: return ( f'{label}' ) def bullet_list(items: list) -> str: if not items: return "
  • None identified
  • " return "".join(f"
  • {i}
  • " for i in items if i and i != "N/A") fatalities_str = ( str(brief.fatalities_reported) if brief.fatalities_reported is not None else "Unknown" ) sources_str = ", ".join(brief.source_types_used) if brief.source_types_used else "N/A" # Recent conflict events if brief.recent_events: event_rows = "".join( f"{e}" for e in brief.recent_events ) events_section = f"""

    ⚔️ Recent Conflict Events {len(brief.recent_events)} events · ACLED

    {event_rows}
    """ else: events_section = """

    ⚔️ Recent Conflict Events

    No ACLED conflict events retrieved. Check ACLED credentials or try a different date range.

    """ # News cards cards = [] for item in brief.notable_news: link_html = ( f'Read full article →' if item.url else "" ) notable_badge = ( 'NOTABLE' if item.notable else "" ) cards.append(f"""
    {item.source} {item.published[:25] if item.published else ""} {notable_badge}
    {item.title}
    {item.summary}
    {link_html}
    """) if cards: news_section = f"""

    📰 Recent News {len(brief.notable_news)} articles

    {"".join(cards)}
    """ else: news_section = """

    📰 Recent News

    No news articles were retrieved. Try adding more RSS sources or broadening the search.

    """ # Airspace section as_color = AIRSPACE_STATUS_COLORS.get(brief.airspace_status, "#7F8C8D") has_airspace = ( brief.airspace_restrictions or brief.no_fly_zones or brief.air_defense_activity or brief.aviation_notes or (brief.airspace_status and brief.airspace_status not in ("", "Unknown")) ) if has_airspace: airspace_section = f"""

    ✈️ Airspace & Aviation Status {badge(brief.airspace_status or 'Unknown', as_color)}

    {f'

    {brief.aviation_notes}

    ' if brief.aviation_notes else ""}

    🚫 No-Fly Zones

      {bullet_list(brief.no_fly_zones)}

    ⚠️ Active Restrictions / NOTAMs

      {bullet_list(brief.airspace_restrictions)}

    🛡️ Air Defense Activity

      {bullet_list(brief.air_defense_activity)}
    """ else: airspace_section = """

    ✈️ No airspace restriction data retrieved for this country.

    """ html = f"""

    🛡️ OSINT Threat Brief

    {brief.country or 'Unknown'}  |  {brief.region}  |  {brief.assessment_date}

    Severity:  {badge(brief.severity, sev_color)} Confidence:  {badge(brief.confidence, conf_color)} Sources: {sources_str} Fatalities: {fatalities_str}

    🌍 Country Background

    {brief.country_summary or 'Not available'}

    {_render_travel_advisory(brief)} {_render_embassy(brief)}

    📋 Analytical Summary

    {brief.narrative_summary or 'Not available'}

    🔴 Risk Assessment

    {brief.risk_analysis or 'Risk assessment not available.'}

    🔍 Key Findings

      {bullet_list(brief.key_findings)}

    ⚠️ Escalation Indicators

      {bullet_list(brief.indicators_of_escalation)}

    🎭 Primary Actors

      {bullet_list(brief.primary_actors)}

    📍 Key Locations

      {bullet_list(brief.key_locations)}
    {airspace_section} {events_section} {news_section}

    📡 Recommended Watch Items

      {bullet_list(brief.recommended_watch_items)}
    """ return html