| | import gradio as gr |
| | import pandas as pd |
| | import plotly.express as px |
| | import plotly.graph_objects as go |
| | from datasets import load_dataset |
| | import json |
| | from typing import Optional, Dict, List |
| | import warnings |
| |
|
| | warnings.filterwarnings("ignore") |
| |
|
| | |
| | data_cache = { |
| | "fr": {}, |
| | "en": {} |
| | } |
| |
|
| | def load_datasets_for_language(lang: str) -> Dict: |
| | """Load all datasets for a specific language.""" |
| | if data_cache[lang]: |
| | return data_cache[lang] |
| |
|
| | dataset_name = "AYI-NEDJIMI/ad-attacks-fr" if lang == "fr" else "AYI-NEDJIMI/ad-attacks-en" |
| |
|
| | try: |
| | attacks_dataset = load_dataset(dataset_name, data_files="attacks.json", split="train") |
| | tools_dataset = load_dataset(dataset_name, data_files="tools.json", split="train") |
| | rules_dataset = load_dataset(dataset_name, data_files="detection_rules.json", split="train") |
| | killchains_dataset = load_dataset(dataset_name, data_files="killchains.json", split="train") |
| | qa_dataset = load_dataset(dataset_name, data_files="qa_dataset.json", split="train") |
| |
|
| | data_cache[lang] = { |
| | "attacks": pd.DataFrame(attacks_dataset), |
| | "tools": pd.DataFrame(tools_dataset), |
| | "rules": pd.DataFrame(rules_dataset), |
| | "killchains": pd.DataFrame(killchains_dataset), |
| | "qa": pd.DataFrame(qa_dataset) |
| | } |
| | except Exception as e: |
| | print(f"Error loading dataset for {lang}: {e}") |
| | data_cache[lang] = { |
| | "attacks": pd.DataFrame(), |
| | "tools": pd.DataFrame(), |
| | "rules": pd.DataFrame(), |
| | "killchains": pd.DataFrame(), |
| | "qa": pd.DataFrame() |
| | } |
| |
|
| | return data_cache[lang] |
| |
|
| | def convert_list_to_string(val) -> str: |
| | """Convert list or dict to readable string for display.""" |
| | if isinstance(val, list): |
| | return ", ".join([str(v) for v in val]) |
| | elif isinstance(val, dict): |
| | return json.dumps(val, ensure_ascii=False, indent=2) |
| | return str(val) if val else "" |
| |
|
| | def prepare_attacks_df(df: pd.DataFrame) -> pd.DataFrame: |
| | """Prepare attacks dataframe for display.""" |
| | if df.empty: |
| | return df |
| | df = df.copy() |
| | for col in ["mitre_technique_ids", "tools", "command_examples"]: |
| | if col in df.columns: |
| | df[col] = df[col].apply(convert_list_to_string) |
| | return df |
| |
|
| | def prepare_tools_df(df: pd.DataFrame) -> pd.DataFrame: |
| | """Prepare tools dataframe for display.""" |
| | if df.empty: |
| | return df |
| | df = df.copy() |
| | if "attacks_related" in df.columns: |
| | df["attacks_related"] = df["attacks_related"].apply(convert_list_to_string) |
| | if "platforms" in df.columns: |
| | df["platforms"] = df["platforms"].apply(convert_list_to_string) |
| | return df |
| |
|
| | def prepare_rules_df(df: pd.DataFrame) -> pd.DataFrame: |
| | """Prepare detection rules dataframe for display.""" |
| | if df.empty: |
| | return df |
| | df = df.copy() |
| | if "event_ids" in df.columns: |
| | df["event_ids"] = df["event_ids"].apply(convert_list_to_string) |
| | if "attacks_related" in df.columns: |
| | df["attacks_related"] = df["attacks_related"].apply(convert_list_to_string) |
| | return df |
| |
|
| | def prepare_qa_df(df: pd.DataFrame) -> pd.DataFrame: |
| | """Prepare Q&A dataframe for display.""" |
| | if df.empty: |
| | return df |
| | df = df.copy() |
| | if "keywords" in df.columns: |
| | df["keywords"] = df["keywords"].apply(convert_list_to_string) |
| | return df |
| |
|
| | def filter_dataframe(df: pd.DataFrame, search_text: str, filter_col: Optional[str] = None, filter_value: Optional[str] = None) -> pd.DataFrame: |
| | """Filter dataframe by search text and optional category/filter.""" |
| | if df.empty: |
| | return df |
| |
|
| | result = df.copy() |
| |
|
| | if search_text.strip(): |
| | search_lower = search_text.lower() |
| | mask = result.astype(str).apply(lambda x: x.str.contains(search_lower, case=False)).any(axis=1) |
| | result = result[mask] |
| |
|
| | if filter_col and filter_value and filter_value != "All": |
| | if filter_col in result.columns: |
| | result = result[result[filter_col] == filter_value] |
| |
|
| | return result |
| |
|
| | def get_unique_values(df: pd.DataFrame, column: str) -> List[str]: |
| | """Get unique values from a column.""" |
| | if df.empty or column not in df.columns: |
| | return [] |
| | return ["All"] + sorted(df[column].unique().astype(str).tolist()) |
| |
|
| | def create_attacks_tab(lang_data: Dict) -> tuple: |
| | """Create attacks tab content.""" |
| | df = lang_data["attacks"] |
| |
|
| | if df.empty: |
| | return gr.DataFrame(value=pd.DataFrame()), [], "No data available" |
| |
|
| | categories = get_unique_values(df, "category") |
| | severities = get_unique_values(df, "severity") if "severity" in df.columns else [] |
| |
|
| | return prepare_attacks_df(df), categories, severities |
| |
|
| | def create_tools_tab(lang_data: Dict) -> tuple: |
| | """Create tools tab content.""" |
| | df = lang_data["tools"] |
| |
|
| | if df.empty: |
| | return gr.DataFrame(value=pd.DataFrame()), [] |
| |
|
| | categories = get_unique_values(df, "category") if "category" in df.columns else [] |
| |
|
| | return prepare_tools_df(df), categories |
| |
|
| | def create_rules_tab(lang_data: Dict) -> tuple: |
| | """Create detection rules tab content.""" |
| | df = lang_data["rules"] |
| |
|
| | if df.empty: |
| | return gr.DataFrame(value=pd.DataFrame()), [] |
| |
|
| | log_sources = get_unique_values(df, "log_source") if "log_source" in df.columns else [] |
| |
|
| | return prepare_rules_df(df), log_sources |
| |
|
| | def create_qa_tab(lang_data: Dict) -> tuple: |
| | """Create Q&A tab content.""" |
| | df = lang_data["qa"] |
| |
|
| | if df.empty: |
| | return gr.DataFrame(value=pd.DataFrame()), [], [] |
| |
|
| | categories = get_unique_values(df, "category") if "category" in df.columns else [] |
| | difficulties = get_unique_values(df, "difficulty") if "difficulty" in df.columns else [] |
| |
|
| | return prepare_qa_df(df), categories, difficulties |
| |
|
| | def create_statistics(lang_data: Dict, lang: str) -> tuple: |
| | """Create statistics visualizations.""" |
| | df_attacks = lang_data["attacks"] |
| |
|
| | if df_attacks.empty: |
| | empty_fig = go.Figure() |
| | empty_fig.add_annotation(text="No data available") |
| | return empty_fig, empty_fig, empty_fig, "No statistics available" |
| |
|
| | |
| | if "category" in df_attacks.columns: |
| | category_counts = df_attacks["category"].value_counts().reset_index() |
| | category_counts.columns = ["category", "count"] |
| | fig_category = px.bar( |
| | category_counts, |
| | x="category", |
| | y="count", |
| | title="Attacks per Category" if lang == "en" else "Attaques par Catégorie", |
| | labels={"category": "Category", "count": "Count"} if lang == "en" else {"category": "Catégorie", "count": "Nombre"} |
| | ) |
| | else: |
| | fig_category = go.Figure() |
| | fig_category.add_annotation(text="Category data not available") |
| |
|
| | |
| | if "severity" in df_attacks.columns: |
| | severity_counts = df_attacks["severity"].value_counts().reset_index() |
| | severity_counts.columns = ["severity", "count"] |
| | fig_severity = px.pie( |
| | severity_counts, |
| | names="severity", |
| | values="count", |
| | title="Severity Distribution" if lang == "en" else "Distribution de Sévérité" |
| | ) |
| | else: |
| | fig_severity = go.Figure() |
| | fig_severity.add_annotation(text="Severity data not available") |
| |
|
| | |
| | tools_list = [] |
| | if "tools" in df_attacks.columns: |
| | for tools in df_attacks["tools"]: |
| | if isinstance(tools, list): |
| | tools_list.extend(tools) |
| |
|
| | if tools_list: |
| | tools_df = pd.Series(tools_list).value_counts().reset_index() |
| | tools_df.columns = ["tool", "count"] |
| | tools_df = tools_df.head(10) |
| | fig_tools = px.bar( |
| | tools_df, |
| | x="tool", |
| | y="count", |
| | title="Most Used Tools (Top 10)" if lang == "en" else "Outils les Plus Utilisés (Top 10)", |
| | labels={"tool": "Tool", "count": "Count"} if lang == "en" else {"tool": "Outil", "count": "Nombre"} |
| | ) |
| | else: |
| | fig_tools = go.Figure() |
| | fig_tools.add_annotation(text="Tools data not available") |
| |
|
| | stats_text = f"Total Attacks: {len(df_attacks)}" if lang == "en" else f"Attaques Totales: {len(df_attacks)}" |
| |
|
| | return fig_category, fig_severity, fig_tools, stats_text |
| |
|
| | def update_on_language_change(language: str): |
| | """Update all components when language changes.""" |
| | lang_data = load_datasets_for_language(language) |
| |
|
| | attacks_df, categories, severities = create_attacks_tab(lang_data) |
| | tools_df, tools_cats = create_tools_tab(lang_data) |
| | rules_df, log_sources = create_rules_tab(lang_data) |
| | qa_df, qa_cats, qa_diffs = create_qa_tab(lang_data) |
| | fig_cat, fig_sev, fig_tools, stats_text = create_statistics(lang_data, language) |
| |
|
| | return ( |
| | attacks_df, |
| | gr.Dropdown(choices=categories, value="All"), |
| | gr.Dropdown(choices=severities, value="All"), |
| | tools_df, |
| | gr.Dropdown(choices=tools_cats, value="All"), |
| | rules_df, |
| | gr.Dropdown(choices=log_sources, value="All"), |
| | qa_df, |
| | gr.Dropdown(choices=qa_cats, value="All"), |
| | gr.Dropdown(choices=qa_diffs, value="All"), |
| | fig_cat, |
| | fig_sev, |
| | fig_tools, |
| | stats_text |
| | ) |
| |
|
| | |
| | initial_lang = "en" |
| | initial_data = load_datasets_for_language(initial_lang) |
| |
|
| | |
| | with gr.Blocks(title="AD Attack Explorer", theme=gr.themes.Soft()) as demo: |
| | gr.Markdown("# 🏰 AD Attack Explorer") |
| | gr.Markdown("Interactive exploration of Active Directory attacks, tools, detection rules, kill chains, and Q&A datasets") |
| |
|
| | with gr.Row(): |
| | language = gr.Radio( |
| | choices=["English", "Français"], |
| | value="English", |
| | label="Language / Langue", |
| | scale=1 |
| | ) |
| |
|
| | |
| | with gr.Tabs(): |
| | |
| | with gr.TabItem("Attacks / Attaques"): |
| | with gr.Row(): |
| | search_attacks = gr.Textbox( |
| | label="Search / Rechercher", |
| | placeholder="Search attacks...", |
| | scale=2 |
| | ) |
| | with gr.Row(): |
| | filter_category = gr.Dropdown( |
| | choices=get_unique_values(initial_data["attacks"], "category"), |
| | value="All", |
| | label="Category / Catégorie", |
| | scale=1 |
| | ) |
| | filter_severity = gr.Dropdown( |
| | choices=get_unique_values(initial_data["attacks"], "severity") if "severity" in initial_data["attacks"].columns else [], |
| | value="All", |
| | label="Severity / Sévérité", |
| | scale=1 |
| | ) |
| |
|
| | attacks_table = gr.Dataframe( |
| | value=prepare_attacks_df(initial_data["attacks"]), |
| | interactive=False, |
| | scale=2 |
| | ) |
| |
|
| | |
| | with gr.TabItem("Tools / Outils"): |
| | with gr.Row(): |
| | search_tools = gr.Textbox( |
| | label="Search / Rechercher", |
| | placeholder="Search tools...", |
| | scale=2 |
| | ) |
| | with gr.Row(): |
| | filter_tools_cat = gr.Dropdown( |
| | choices=get_unique_values(initial_data["tools"], "category") if "category" in initial_data["tools"].columns else [], |
| | value="All", |
| | label="Category / Catégorie", |
| | scale=1 |
| | ) |
| |
|
| | tools_table = gr.Dataframe( |
| | value=prepare_tools_df(initial_data["tools"]), |
| | interactive=False, |
| | scale=2 |
| | ) |
| |
|
| | |
| | with gr.TabItem("Detection Rules / Règles Détection"): |
| | with gr.Row(): |
| | search_rules = gr.Textbox( |
| | label="Search / Rechercher", |
| | placeholder="Search rules...", |
| | scale=2 |
| | ) |
| | with gr.Row(): |
| | filter_rules_log = gr.Dropdown( |
| | choices=get_unique_values(initial_data["rules"], "log_source") if "log_source" in initial_data["rules"].columns else [], |
| | value="All", |
| | label="Log Source", |
| | scale=1 |
| | ) |
| |
|
| | rules_table = gr.Dataframe( |
| | value=prepare_rules_df(initial_data["rules"]), |
| | interactive=False, |
| | scale=2 |
| | ) |
| |
|
| | |
| | with gr.TabItem("Kill Chains"): |
| | with gr.Row(): |
| | search_killchains = gr.Textbox( |
| | label="Search / Rechercher", |
| | placeholder="Search kill chains...", |
| | scale=2 |
| | ) |
| |
|
| | killchains_table = gr.Dataframe( |
| | value=initial_data["killchains"], |
| | interactive=False, |
| | scale=2 |
| | ) |
| |
|
| | |
| | with gr.TabItem("Q&A"): |
| | with gr.Row(): |
| | search_qa = gr.Textbox( |
| | label="Search / Rechercher", |
| | placeholder="Search questions...", |
| | scale=2 |
| | ) |
| | with gr.Row(): |
| | filter_qa_cat = gr.Dropdown( |
| | choices=get_unique_values(initial_data["qa"], "category") if "category" in initial_data["qa"].columns else [], |
| | value="All", |
| | label="Category / Catégorie", |
| | scale=1 |
| | ) |
| | filter_qa_diff = gr.Dropdown( |
| | choices=get_unique_values(initial_data["qa"], "difficulty") if "difficulty" in initial_data["qa"].columns else [], |
| | value="All", |
| | label="Difficulty / Difficulté", |
| | scale=1 |
| | ) |
| |
|
| | qa_table = gr.Dataframe( |
| | value=prepare_qa_df(initial_data["qa"]), |
| | interactive=False, |
| | scale=2 |
| | ) |
| |
|
| | |
| | with gr.TabItem("Statistics / Statistiques"): |
| | gr.Markdown("### Attack Analytics") |
| |
|
| | with gr.Row(): |
| | fig_cat, fig_sev, fig_tools, stats_text = create_statistics(initial_data, initial_lang) |
| |
|
| | with gr.Column(): |
| | stats_info = gr.Markdown(stats_text) |
| |
|
| | with gr.Row(): |
| | chart_category = gr.Plot(value=fig_cat, scale=1) |
| | chart_severity = gr.Plot(value=fig_sev, scale=1) |
| |
|
| | with gr.Row(): |
| | chart_tools = gr.Plot(value=fig_tools, scale=2) |
| |
|
| | |
| | gr.HTML(""" |
| | <div style='text-align:center; padding:20px; color:#666; margin-top:20px;'> |
| | <p>Created by <a href='https://www.ayinedjimi-consultants.fr' target='_blank'>Ayi NEDJIMI</a> - Senior Offensive Cybersecurity & AI Consultant</p> |
| | <p><a href='https://www.linkedin.com/in/ayi-nedjimi' target='_blank'>LinkedIn</a> | <a href='https://github.com/ayinedjimi' target='_blank'>GitHub</a> | <a href='https://x.com/AyiNEDJIMI' target='_blank'>Twitter/X</a></p> |
| | </div> |
| | """) |
| |
|
| | |
| | def on_language_change(language: str): |
| | lang = "fr" if language == "Français" else "en" |
| | return update_on_language_change(lang) |
| |
|
| | def update_attacks_display(search, category, severity, language): |
| | lang = "fr" if language == "Français" else "en" |
| | lang_data = load_datasets_for_language(lang) |
| | df = lang_data["attacks"].copy() |
| | df = filter_dataframe(df, search, "category" if category != "All" else None, category) |
| | df = filter_dataframe(df, "", "severity" if severity != "All" else None, severity) |
| | return prepare_attacks_df(df) |
| |
|
| | def update_tools_display(search, category, language): |
| | lang = "fr" if language == "Français" else "en" |
| | lang_data = load_datasets_for_language(lang) |
| | df = lang_data["tools"].copy() |
| | df = filter_dataframe(df, search, "category" if category != "All" else None, category) |
| | return prepare_tools_df(df) |
| |
|
| | def update_rules_display(search, log_source, language): |
| | lang = "fr" if language == "Français" else "en" |
| | lang_data = load_datasets_for_language(lang) |
| | df = lang_data["rules"].copy() |
| | df = filter_dataframe(df, search, "log_source" if log_source != "All" else None, log_source) |
| | return prepare_rules_df(df) |
| |
|
| | def update_killchains_display(search, language): |
| | lang = "fr" if language == "Français" else "en" |
| | lang_data = load_datasets_for_language(lang) |
| | df = lang_data["killchains"].copy() |
| | df = filter_dataframe(df, search) |
| | return df |
| |
|
| | def update_qa_display(search, category, difficulty, language): |
| | lang = "fr" if language == "Français" else "en" |
| | lang_data = load_datasets_for_language(lang) |
| | df = lang_data["qa"].copy() |
| | df = filter_dataframe(df, search, "category" if category != "All" else None, category) |
| | df = filter_dataframe(df, "", "difficulty" if difficulty != "All" else None, difficulty) |
| | return prepare_qa_df(df) |
| |
|
| | |
| | language.change( |
| | on_language_change, |
| | inputs=[language], |
| | outputs=[ |
| | attacks_table, |
| | filter_category, |
| | filter_severity, |
| | tools_table, |
| | filter_tools_cat, |
| | rules_table, |
| | filter_rules_log, |
| | qa_table, |
| | filter_qa_cat, |
| | filter_qa_diff, |
| | chart_category, |
| | chart_severity, |
| | chart_tools, |
| | stats_info |
| | ] |
| | ) |
| |
|
| | search_attacks.change( |
| | update_attacks_display, |
| | inputs=[search_attacks, filter_category, filter_severity, language], |
| | outputs=[attacks_table] |
| | ) |
| |
|
| | filter_category.change( |
| | update_attacks_display, |
| | inputs=[search_attacks, filter_category, filter_severity, language], |
| | outputs=[attacks_table] |
| | ) |
| |
|
| | filter_severity.change( |
| | update_attacks_display, |
| | inputs=[search_attacks, filter_category, filter_severity, language], |
| | outputs=[attacks_table] |
| | ) |
| |
|
| | search_tools.change( |
| | update_tools_display, |
| | inputs=[search_tools, filter_tools_cat, language], |
| | outputs=[tools_table] |
| | ) |
| |
|
| | filter_tools_cat.change( |
| | update_tools_display, |
| | inputs=[search_tools, filter_tools_cat, language], |
| | outputs=[tools_table] |
| | ) |
| |
|
| | search_rules.change( |
| | update_rules_display, |
| | inputs=[search_rules, filter_rules_log, language], |
| | outputs=[rules_table] |
| | ) |
| |
|
| | filter_rules_log.change( |
| | update_rules_display, |
| | inputs=[search_rules, filter_rules_log, language], |
| | outputs=[rules_table] |
| | ) |
| |
|
| | search_killchains.change( |
| | update_killchains_display, |
| | inputs=[search_killchains, language], |
| | outputs=[killchains_table] |
| | ) |
| |
|
| | search_qa.change( |
| | update_qa_display, |
| | inputs=[search_qa, filter_qa_cat, filter_qa_diff, language], |
| | outputs=[qa_table] |
| | ) |
| |
|
| | filter_qa_cat.change( |
| | update_qa_display, |
| | inputs=[search_qa, filter_qa_cat, filter_qa_diff, language], |
| | outputs=[qa_table] |
| | ) |
| |
|
| | filter_qa_diff.change( |
| | update_qa_display, |
| | inputs=[search_qa, filter_qa_cat, filter_qa_diff, language], |
| | outputs=[qa_table] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |
| |
|