Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from datasets import load_dataset | |
| import json | |
| from typing import Optional, Dict, List | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| # Global data cache | |
| data_cache = { | |
| "fr": {}, | |
| "en": {} | |
| } | |
| def load_datasets_for_language(lang: str) -> Dict: | |
| """Load all datasets for a specific language.""" | |
| if data_cache[lang]: | |
| return data_cache[lang] | |
| dataset_name = "AYI-NEDJIMI/ad-attacks-fr" if lang == "fr" else "AYI-NEDJIMI/ad-attacks-en" | |
| try: | |
| attacks_dataset = load_dataset(dataset_name, data_files="attacks.json", split="train") | |
| tools_dataset = load_dataset(dataset_name, data_files="tools.json", split="train") | |
| rules_dataset = load_dataset(dataset_name, data_files="detection_rules.json", split="train") | |
| killchains_dataset = load_dataset(dataset_name, data_files="killchains.json", split="train") | |
| qa_dataset = load_dataset(dataset_name, data_files="qa_dataset.json", split="train") | |
| data_cache[lang] = { | |
| "attacks": pd.DataFrame(attacks_dataset), | |
| "tools": pd.DataFrame(tools_dataset), | |
| "rules": pd.DataFrame(rules_dataset), | |
| "killchains": pd.DataFrame(killchains_dataset), | |
| "qa": pd.DataFrame(qa_dataset) | |
| } | |
| except Exception as e: | |
| print(f"Error loading dataset for {lang}: {e}") | |
| data_cache[lang] = { | |
| "attacks": pd.DataFrame(), | |
| "tools": pd.DataFrame(), | |
| "rules": pd.DataFrame(), | |
| "killchains": pd.DataFrame(), | |
| "qa": pd.DataFrame() | |
| } | |
| return data_cache[lang] | |
| def convert_list_to_string(val) -> str: | |
| """Convert list or dict to readable string for display.""" | |
| if isinstance(val, list): | |
| return ", ".join([str(v) for v in val]) | |
| elif isinstance(val, dict): | |
| return json.dumps(val, ensure_ascii=False, indent=2) | |
| return str(val) if val else "" | |
| def prepare_attacks_df(df: pd.DataFrame) -> pd.DataFrame: | |
| """Prepare attacks dataframe for display.""" | |
| if df.empty: | |
| return df | |
| df = df.copy() | |
| for col in ["mitre_technique_ids", "tools", "command_examples"]: | |
| if col in df.columns: | |
| df[col] = df[col].apply(convert_list_to_string) | |
| return df | |
| def prepare_tools_df(df: pd.DataFrame) -> pd.DataFrame: | |
| """Prepare tools dataframe for display.""" | |
| if df.empty: | |
| return df | |
| df = df.copy() | |
| if "attacks_related" in df.columns: | |
| df["attacks_related"] = df["attacks_related"].apply(convert_list_to_string) | |
| if "platforms" in df.columns: | |
| df["platforms"] = df["platforms"].apply(convert_list_to_string) | |
| return df | |
| def prepare_rules_df(df: pd.DataFrame) -> pd.DataFrame: | |
| """Prepare detection rules dataframe for display.""" | |
| if df.empty: | |
| return df | |
| df = df.copy() | |
| if "event_ids" in df.columns: | |
| df["event_ids"] = df["event_ids"].apply(convert_list_to_string) | |
| if "attacks_related" in df.columns: | |
| df["attacks_related"] = df["attacks_related"].apply(convert_list_to_string) | |
| return df | |
| def prepare_qa_df(df: pd.DataFrame) -> pd.DataFrame: | |
| """Prepare Q&A dataframe for display.""" | |
| if df.empty: | |
| return df | |
| df = df.copy() | |
| if "keywords" in df.columns: | |
| df["keywords"] = df["keywords"].apply(convert_list_to_string) | |
| return df | |
| def filter_dataframe(df: pd.DataFrame, search_text: str, filter_col: Optional[str] = None, filter_value: Optional[str] = None) -> pd.DataFrame: | |
| """Filter dataframe by search text and optional category/filter.""" | |
| if df.empty: | |
| return df | |
| result = df.copy() | |
| if search_text.strip(): | |
| search_lower = search_text.lower() | |
| mask = result.astype(str).apply(lambda x: x.str.contains(search_lower, case=False)).any(axis=1) | |
| result = result[mask] | |
| if filter_col and filter_value and filter_value != "All": | |
| if filter_col in result.columns: | |
| result = result[result[filter_col] == filter_value] | |
| return result | |
| def get_unique_values(df: pd.DataFrame, column: str) -> List[str]: | |
| """Get unique values from a column.""" | |
| if df.empty or column not in df.columns: | |
| return [] | |
| return ["All"] + sorted(df[column].unique().astype(str).tolist()) | |
| def create_attacks_tab(lang_data: Dict) -> tuple: | |
| """Create attacks tab content.""" | |
| df = lang_data["attacks"] | |
| if df.empty: | |
| return gr.DataFrame(value=pd.DataFrame()), [], "No data available" | |
| categories = get_unique_values(df, "category") | |
| severities = get_unique_values(df, "severity") if "severity" in df.columns else [] | |
| return prepare_attacks_df(df), categories, severities | |
| def create_tools_tab(lang_data: Dict) -> tuple: | |
| """Create tools tab content.""" | |
| df = lang_data["tools"] | |
| if df.empty: | |
| return gr.DataFrame(value=pd.DataFrame()), [] | |
| categories = get_unique_values(df, "category") if "category" in df.columns else [] | |
| return prepare_tools_df(df), categories | |
| def create_rules_tab(lang_data: Dict) -> tuple: | |
| """Create detection rules tab content.""" | |
| df = lang_data["rules"] | |
| if df.empty: | |
| return gr.DataFrame(value=pd.DataFrame()), [] | |
| log_sources = get_unique_values(df, "log_source") if "log_source" in df.columns else [] | |
| return prepare_rules_df(df), log_sources | |
| def create_qa_tab(lang_data: Dict) -> tuple: | |
| """Create Q&A tab content.""" | |
| df = lang_data["qa"] | |
| if df.empty: | |
| return gr.DataFrame(value=pd.DataFrame()), [], [] | |
| categories = get_unique_values(df, "category") if "category" in df.columns else [] | |
| difficulties = get_unique_values(df, "difficulty") if "difficulty" in df.columns else [] | |
| return prepare_qa_df(df), categories, difficulties | |
| def create_statistics(lang_data: Dict, lang: str) -> tuple: | |
| """Create statistics visualizations.""" | |
| df_attacks = lang_data["attacks"] | |
| if df_attacks.empty: | |
| empty_fig = go.Figure() | |
| empty_fig.add_annotation(text="No data available") | |
| return empty_fig, empty_fig, empty_fig, "No statistics available" | |
| # Attacks per category | |
| if "category" in df_attacks.columns: | |
| category_counts = df_attacks["category"].value_counts().reset_index() | |
| category_counts.columns = ["category", "count"] | |
| fig_category = px.bar( | |
| category_counts, | |
| x="category", | |
| y="count", | |
| title="Attacks per Category" if lang == "en" else "Attaques par Catégorie", | |
| labels={"category": "Category", "count": "Count"} if lang == "en" else {"category": "Catégorie", "count": "Nombre"} | |
| ) | |
| else: | |
| fig_category = go.Figure() | |
| fig_category.add_annotation(text="Category data not available") | |
| # Severity distribution | |
| if "severity" in df_attacks.columns: | |
| severity_counts = df_attacks["severity"].value_counts().reset_index() | |
| severity_counts.columns = ["severity", "count"] | |
| fig_severity = px.pie( | |
| severity_counts, | |
| names="severity", | |
| values="count", | |
| title="Severity Distribution" if lang == "en" else "Distribution de Sévérité" | |
| ) | |
| else: | |
| fig_severity = go.Figure() | |
| fig_severity.add_annotation(text="Severity data not available") | |
| # Tools usage | |
| tools_list = [] | |
| if "tools" in df_attacks.columns: | |
| for tools in df_attacks["tools"]: | |
| if isinstance(tools, list): | |
| tools_list.extend(tools) | |
| if tools_list: | |
| tools_df = pd.Series(tools_list).value_counts().reset_index() | |
| tools_df.columns = ["tool", "count"] | |
| tools_df = tools_df.head(10) | |
| fig_tools = px.bar( | |
| tools_df, | |
| x="tool", | |
| y="count", | |
| title="Most Used Tools (Top 10)" if lang == "en" else "Outils les Plus Utilisés (Top 10)", | |
| labels={"tool": "Tool", "count": "Count"} if lang == "en" else {"tool": "Outil", "count": "Nombre"} | |
| ) | |
| else: | |
| fig_tools = go.Figure() | |
| fig_tools.add_annotation(text="Tools data not available") | |
| stats_text = f"Total Attacks: {len(df_attacks)}" if lang == "en" else f"Attaques Totales: {len(df_attacks)}" | |
| return fig_category, fig_severity, fig_tools, stats_text | |
| def update_on_language_change(language: str): | |
| """Update all components when language changes.""" | |
| lang_data = load_datasets_for_language(language) | |
| attacks_df, categories, severities = create_attacks_tab(lang_data) | |
| tools_df, tools_cats = create_tools_tab(lang_data) | |
| rules_df, log_sources = create_rules_tab(lang_data) | |
| qa_df, qa_cats, qa_diffs = create_qa_tab(lang_data) | |
| fig_cat, fig_sev, fig_tools, stats_text = create_statistics(lang_data, language) | |
| return ( | |
| attacks_df, | |
| gr.Dropdown(choices=categories, value="All"), | |
| gr.Dropdown(choices=severities, value="All"), | |
| tools_df, | |
| gr.Dropdown(choices=tools_cats, value="All"), | |
| rules_df, | |
| gr.Dropdown(choices=log_sources, value="All"), | |
| qa_df, | |
| gr.Dropdown(choices=qa_cats, value="All"), | |
| gr.Dropdown(choices=qa_diffs, value="All"), | |
| fig_cat, | |
| fig_sev, | |
| fig_tools, | |
| stats_text | |
| ) | |
| # Load initial data | |
| initial_lang = "en" | |
| initial_data = load_datasets_for_language(initial_lang) | |
| # Create Gradio app | |
| with gr.Blocks(title="AD Attack Explorer", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🏰 AD Attack Explorer") | |
| gr.Markdown("Interactive exploration of Active Directory attacks, tools, detection rules, kill chains, and Q&A datasets") | |
| with gr.Row(): | |
| language = gr.Radio( | |
| choices=["English", "Français"], | |
| value="English", | |
| label="Language / Langue", | |
| scale=1 | |
| ) | |
| # Create tabs | |
| with gr.Tabs(): | |
| # Attacks Tab | |
| with gr.TabItem("Attacks / Attaques"): | |
| with gr.Row(): | |
| search_attacks = gr.Textbox( | |
| label="Search / Rechercher", | |
| placeholder="Search attacks...", | |
| scale=2 | |
| ) | |
| with gr.Row(): | |
| filter_category = gr.Dropdown( | |
| choices=get_unique_values(initial_data["attacks"], "category"), | |
| value="All", | |
| label="Category / Catégorie", | |
| scale=1 | |
| ) | |
| filter_severity = gr.Dropdown( | |
| choices=get_unique_values(initial_data["attacks"], "severity") if "severity" in initial_data["attacks"].columns else [], | |
| value="All", | |
| label="Severity / Sévérité", | |
| scale=1 | |
| ) | |
| attacks_table = gr.Dataframe( | |
| value=prepare_attacks_df(initial_data["attacks"]), | |
| interactive=False, | |
| scale=2 | |
| ) | |
| # Tools Tab | |
| with gr.TabItem("Tools / Outils"): | |
| with gr.Row(): | |
| search_tools = gr.Textbox( | |
| label="Search / Rechercher", | |
| placeholder="Search tools...", | |
| scale=2 | |
| ) | |
| with gr.Row(): | |
| filter_tools_cat = gr.Dropdown( | |
| choices=get_unique_values(initial_data["tools"], "category") if "category" in initial_data["tools"].columns else [], | |
| value="All", | |
| label="Category / Catégorie", | |
| scale=1 | |
| ) | |
| tools_table = gr.Dataframe( | |
| value=prepare_tools_df(initial_data["tools"]), | |
| interactive=False, | |
| scale=2 | |
| ) | |
| # Detection Rules Tab | |
| with gr.TabItem("Detection Rules / Règles Détection"): | |
| with gr.Row(): | |
| search_rules = gr.Textbox( | |
| label="Search / Rechercher", | |
| placeholder="Search rules...", | |
| scale=2 | |
| ) | |
| with gr.Row(): | |
| filter_rules_log = gr.Dropdown( | |
| choices=get_unique_values(initial_data["rules"], "log_source") if "log_source" in initial_data["rules"].columns else [], | |
| value="All", | |
| label="Log Source", | |
| scale=1 | |
| ) | |
| rules_table = gr.Dataframe( | |
| value=prepare_rules_df(initial_data["rules"]), | |
| interactive=False, | |
| scale=2 | |
| ) | |
| # Kill Chains Tab | |
| with gr.TabItem("Kill Chains"): | |
| with gr.Row(): | |
| search_killchains = gr.Textbox( | |
| label="Search / Rechercher", | |
| placeholder="Search kill chains...", | |
| scale=2 | |
| ) | |
| killchains_table = gr.Dataframe( | |
| value=initial_data["killchains"], | |
| interactive=False, | |
| scale=2 | |
| ) | |
| # Q&A Tab | |
| with gr.TabItem("Q&A"): | |
| with gr.Row(): | |
| search_qa = gr.Textbox( | |
| label="Search / Rechercher", | |
| placeholder="Search questions...", | |
| scale=2 | |
| ) | |
| with gr.Row(): | |
| filter_qa_cat = gr.Dropdown( | |
| choices=get_unique_values(initial_data["qa"], "category") if "category" in initial_data["qa"].columns else [], | |
| value="All", | |
| label="Category / Catégorie", | |
| scale=1 | |
| ) | |
| filter_qa_diff = gr.Dropdown( | |
| choices=get_unique_values(initial_data["qa"], "difficulty") if "difficulty" in initial_data["qa"].columns else [], | |
| value="All", | |
| label="Difficulty / Difficulté", | |
| scale=1 | |
| ) | |
| qa_table = gr.Dataframe( | |
| value=prepare_qa_df(initial_data["qa"]), | |
| interactive=False, | |
| scale=2 | |
| ) | |
| # Statistics Tab | |
| with gr.TabItem("Statistics / Statistiques"): | |
| gr.Markdown("### Attack Analytics") | |
| with gr.Row(): | |
| fig_cat, fig_sev, fig_tools, stats_text = create_statistics(initial_data, initial_lang) | |
| with gr.Column(): | |
| stats_info = gr.Markdown(stats_text) | |
| with gr.Row(): | |
| chart_category = gr.Plot(value=fig_cat, scale=1) | |
| chart_severity = gr.Plot(value=fig_sev, scale=1) | |
| with gr.Row(): | |
| chart_tools = gr.Plot(value=fig_tools, scale=2) | |
| # Footer | |
| gr.HTML(""" | |
| <div style='text-align:center; padding:20px; color:#666; margin-top:20px;'> | |
| <p>Created by <a href='https://www.ayinedjimi-consultants.fr' target='_blank'>Ayi NEDJIMI</a> - Senior Offensive Cybersecurity & AI Consultant</p> | |
| <p><a href='https://www.linkedin.com/in/ayi-nedjimi' target='_blank'>LinkedIn</a> | <a href='https://github.com/ayinedjimi' target='_blank'>GitHub</a> | <a href='https://x.com/AyiNEDJIMI' target='_blank'>Twitter/X</a></p> | |
| </div> | |
| """) | |
| # Language change handlers | |
| def on_language_change(language: str): | |
| lang = "fr" if language == "Français" else "en" | |
| return update_on_language_change(lang) | |
| def update_attacks_display(search, category, severity, language): | |
| lang = "fr" if language == "Français" else "en" | |
| lang_data = load_datasets_for_language(lang) | |
| df = lang_data["attacks"].copy() | |
| df = filter_dataframe(df, search, "category" if category != "All" else None, category) | |
| df = filter_dataframe(df, "", "severity" if severity != "All" else None, severity) | |
| return prepare_attacks_df(df) | |
| def update_tools_display(search, category, language): | |
| lang = "fr" if language == "Français" else "en" | |
| lang_data = load_datasets_for_language(lang) | |
| df = lang_data["tools"].copy() | |
| df = filter_dataframe(df, search, "category" if category != "All" else None, category) | |
| return prepare_tools_df(df) | |
| def update_rules_display(search, log_source, language): | |
| lang = "fr" if language == "Français" else "en" | |
| lang_data = load_datasets_for_language(lang) | |
| df = lang_data["rules"].copy() | |
| df = filter_dataframe(df, search, "log_source" if log_source != "All" else None, log_source) | |
| return prepare_rules_df(df) | |
| def update_killchains_display(search, language): | |
| lang = "fr" if language == "Français" else "en" | |
| lang_data = load_datasets_for_language(lang) | |
| df = lang_data["killchains"].copy() | |
| df = filter_dataframe(df, search) | |
| return df | |
| def update_qa_display(search, category, difficulty, language): | |
| lang = "fr" if language == "Français" else "en" | |
| lang_data = load_datasets_for_language(lang) | |
| df = lang_data["qa"].copy() | |
| df = filter_dataframe(df, search, "category" if category != "All" else None, category) | |
| df = filter_dataframe(df, "", "difficulty" if difficulty != "All" else None, difficulty) | |
| return prepare_qa_df(df) | |
| # Connect event handlers | |
| language.change( | |
| on_language_change, | |
| inputs=[language], | |
| outputs=[ | |
| attacks_table, | |
| filter_category, | |
| filter_severity, | |
| tools_table, | |
| filter_tools_cat, | |
| rules_table, | |
| filter_rules_log, | |
| qa_table, | |
| filter_qa_cat, | |
| filter_qa_diff, | |
| chart_category, | |
| chart_severity, | |
| chart_tools, | |
| stats_info | |
| ] | |
| ) | |
| search_attacks.change( | |
| update_attacks_display, | |
| inputs=[search_attacks, filter_category, filter_severity, language], | |
| outputs=[attacks_table] | |
| ) | |
| filter_category.change( | |
| update_attacks_display, | |
| inputs=[search_attacks, filter_category, filter_severity, language], | |
| outputs=[attacks_table] | |
| ) | |
| filter_severity.change( | |
| update_attacks_display, | |
| inputs=[search_attacks, filter_category, filter_severity, language], | |
| outputs=[attacks_table] | |
| ) | |
| search_tools.change( | |
| update_tools_display, | |
| inputs=[search_tools, filter_tools_cat, language], | |
| outputs=[tools_table] | |
| ) | |
| filter_tools_cat.change( | |
| update_tools_display, | |
| inputs=[search_tools, filter_tools_cat, language], | |
| outputs=[tools_table] | |
| ) | |
| search_rules.change( | |
| update_rules_display, | |
| inputs=[search_rules, filter_rules_log, language], | |
| outputs=[rules_table] | |
| ) | |
| filter_rules_log.change( | |
| update_rules_display, | |
| inputs=[search_rules, filter_rules_log, language], | |
| outputs=[rules_table] | |
| ) | |
| search_killchains.change( | |
| update_killchains_display, | |
| inputs=[search_killchains, language], | |
| outputs=[killchains_table] | |
| ) | |
| search_qa.change( | |
| update_qa_display, | |
| inputs=[search_qa, filter_qa_cat, filter_qa_diff, language], | |
| outputs=[qa_table] | |
| ) | |
| filter_qa_cat.change( | |
| update_qa_display, | |
| inputs=[search_qa, filter_qa_cat, filter_qa_diff, language], | |
| outputs=[qa_table] | |
| ) | |
| filter_qa_diff.change( | |
| update_qa_display, | |
| inputs=[search_qa, filter_qa_cat, filter_qa_diff, language], | |
| outputs=[qa_table] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |