Spaces:
Runtime error
Runtime error
Initial release: CyberSec-API gateway with REST endpoints for 3 cybersecurity models
ecbf601 verified | """ | |
| CyberSec-API: REST API Gateway for Cybersecurity AI Models | |
| =========================================================== | |
| Provides unified API access to three specialized cybersecurity models: | |
| - ISO27001-Expert (1.5B) - ISO 27001 compliance guidance | |
| - RGPD-Expert (1.5B) - GDPR/RGPD data protection | |
| - CyberSec-Assistant (3B) - General cybersecurity operations | |
| """ | |
| import os | |
| import json | |
| import time | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| # --------------------------------------------------------------------------- | |
| # Configuration | |
| # --------------------------------------------------------------------------- | |
| MODELS = { | |
| "ISO27001-Expert": { | |
| "id": "AYI-NEDJIMI/ISO27001-Expert-1.5B", | |
| "description": "Specialized in ISO 27001 standards, ISMS implementation, risk assessment, and compliance auditing.", | |
| "parameters": "1.5B", | |
| "specialty": "ISO 27001 Compliance", | |
| }, | |
| "RGPD-Expert": { | |
| "id": "AYI-NEDJIMI/RGPD-Expert-1.5B", | |
| "description": "Specialized in GDPR/RGPD regulations, data protection, privacy impact assessments, and DPO guidance.", | |
| "parameters": "1.5B", | |
| "specialty": "GDPR/RGPD Data Protection", | |
| }, | |
| "CyberSec-Assistant": { | |
| "id": "AYI-NEDJIMI/CyberSec-Assistant-3B", | |
| "description": "General-purpose cybersecurity assistant for incident response, threat analysis, vulnerability management, and security operations.", | |
| "parameters": "3B", | |
| "specialty": "General Cybersecurity", | |
| }, | |
| } | |
| MODEL_NAMES = list(MODELS.keys()) | |
| # System prompts per model | |
| SYSTEM_PROMPTS = { | |
| "ISO27001-Expert": ( | |
| "You are ISO27001-Expert, an AI assistant specialized in ISO 27001 information security management systems. " | |
| "Provide accurate, professional guidance on ISMS implementation, risk assessment, control selection, " | |
| "audit preparation, and compliance requirements. Reference specific ISO 27001 clauses and Annex A controls when relevant." | |
| ), | |
| "RGPD-Expert": ( | |
| "You are RGPD-Expert, an AI assistant specialized in GDPR (General Data Protection Regulation) / RGPD. " | |
| "Provide accurate guidance on data protection principles, lawful bases for processing, data subject rights, " | |
| "DPIA procedures, breach notification requirements, and DPO responsibilities. Reference specific GDPR articles when relevant." | |
| ), | |
| "CyberSec-Assistant": ( | |
| "You are CyberSec-Assistant, a general-purpose cybersecurity AI assistant. " | |
| "Provide expert guidance on incident response, threat intelligence, vulnerability management, " | |
| "penetration testing, SOC operations, network security, and security architecture. " | |
| "Be practical and actionable in your recommendations." | |
| ), | |
| } | |
| # Inference client | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| client = InferenceClient(token=HF_TOKEN) if HF_TOKEN else None | |
| # Rate limiting state | |
| _request_log: list[float] = [] | |
| RATE_LIMIT_WINDOW = 60 # seconds | |
| RATE_LIMIT_MAX = 30 # requests per window | |
| # --------------------------------------------------------------------------- | |
| # Core functions | |
| # --------------------------------------------------------------------------- | |
| def _check_rate_limit() -> bool: | |
| """Return True if within rate limit.""" | |
| now = time.time() | |
| _request_log[:] = [t for t in _request_log if now - t < RATE_LIMIT_WINDOW] | |
| if len(_request_log) >= RATE_LIMIT_MAX: | |
| return False | |
| _request_log.append(now) | |
| return True | |
| def _query_model(message: str, model_name: str, max_tokens: int = 512) -> str: | |
| """Send a prompt to the specified model via the HF Inference API.""" | |
| if not client: | |
| return "[Error] HF_TOKEN is not configured. The API is unavailable." | |
| if model_name not in MODELS: | |
| return f"[Error] Unknown model '{model_name}'. Available: {', '.join(MODEL_NAMES)}" | |
| if not _check_rate_limit(): | |
| return "[Error] Rate limit exceeded. Please wait before sending more requests." | |
| model_id = MODELS[model_name]["id"] | |
| system_prompt = SYSTEM_PROMPTS[model_name] | |
| try: | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": message}, | |
| ] | |
| response = client.chat_completion( | |
| model=model_id, | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=0.7, | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| error_str = str(e) | |
| # Fallback to text_generation if chat_completion is not supported | |
| if "not supported" in error_str.lower() or "chat" in error_str.lower(): | |
| try: | |
| prompt = f"### System:\n{system_prompt}\n\n### User:\n{message}\n\n### Assistant:\n" | |
| response = client.text_generation( | |
| prompt=prompt, | |
| model=model_id, | |
| max_new_tokens=max_tokens, | |
| temperature=0.7, | |
| do_sample=True, | |
| ) | |
| return response | |
| except Exception as fallback_err: | |
| return f"[Error] Model query failed: {fallback_err}" | |
| return f"[Error] Model query failed: {e}" | |
| # --------------------------------------------------------------------------- | |
| # API endpoint functions (exposed via Gradio) | |
| # --------------------------------------------------------------------------- | |
| def chat(message: str, model_name: str) -> str: | |
| """Send a message to a specific cybersecurity model and get a response. | |
| Args: | |
| message: The question or prompt to send to the model. | |
| model_name: One of 'ISO27001-Expert', 'RGPD-Expert', or 'CyberSec-Assistant'. | |
| Returns: | |
| The model's response text. | |
| """ | |
| if not message or not message.strip(): | |
| return "[Error] Message cannot be empty." | |
| return _query_model(message.strip(), model_name) | |
| def compare(message: str) -> str: | |
| """Send a message to all 3 models and compare their responses side by side. | |
| Args: | |
| message: The question or prompt to send to all models. | |
| Returns: | |
| JSON string with responses from each model. | |
| """ | |
| if not message or not message.strip(): | |
| return json.dumps({"error": "Message cannot be empty."}, indent=2) | |
| results = {} | |
| for name in MODEL_NAMES: | |
| results[name] = { | |
| "model_id": MODELS[name]["id"], | |
| "specialty": MODELS[name]["specialty"], | |
| "response": _query_model(message.strip(), name), | |
| } | |
| return json.dumps(results, indent=2, ensure_ascii=False) | |
| def list_models() -> str: | |
| """List all available cybersecurity models and their details. | |
| Returns: | |
| JSON string with model information. | |
| """ | |
| model_list = [] | |
| for name, info in MODELS.items(): | |
| model_list.append({ | |
| "name": name, | |
| "model_id": info["id"], | |
| "description": info["description"], | |
| "parameters": info["parameters"], | |
| "specialty": info["specialty"], | |
| "endpoint": f"/api/chat with model_name='{name}'", | |
| }) | |
| return json.dumps({"models": model_list, "count": len(model_list)}, indent=2) | |
| def health_check() -> str: | |
| """Check the health status of the API and its dependencies. | |
| Returns: | |
| JSON string with health status information. | |
| """ | |
| status = { | |
| "status": "healthy" if client else "degraded", | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), | |
| "version": "1.0.0", | |
| "hf_token_configured": bool(HF_TOKEN), | |
| "models_available": MODEL_NAMES, | |
| "rate_limit": { | |
| "window_seconds": RATE_LIMIT_WINDOW, | |
| "max_requests": RATE_LIMIT_MAX, | |
| "current_usage": len([t for t in _request_log if time.time() - t < RATE_LIMIT_WINDOW]), | |
| }, | |
| } | |
| return json.dumps(status, indent=2) | |
| # --------------------------------------------------------------------------- | |
| # Tab content builders | |
| # --------------------------------------------------------------------------- | |
| API_DOCS_MD = """ | |
| # CyberSec-API Documentation | |
| A REST API gateway providing unified access to three specialized cybersecurity AI models hosted on Hugging Face. | |
| --- | |
| ## Available Models | |
| | Model | Specialty | Parameters | Model ID | | |
| |-------|-----------|------------|----------| | |
| | **ISO27001-Expert** | ISO 27001 compliance, ISMS, risk assessment | 1.5B | `AYI-NEDJIMI/ISO27001-Expert-1.5B` | | |
| | **RGPD-Expert** | GDPR/RGPD, data protection, privacy | 1.5B | `AYI-NEDJIMI/RGPD-Expert-1.5B` | | |
| | **CyberSec-Assistant** | Incident response, threat analysis, SOC | 3B | `AYI-NEDJIMI/CyberSec-Assistant-3B` | | |
| --- | |
| ## Endpoints | |
| ### POST `/api/chat` | |
| Send a message to a specific cybersecurity model. | |
| **Parameters:** | |
| | Parameter | Type | Required | Description | | |
| |-----------|------|----------|-------------| | |
| | `message` | string | Yes | The question or prompt | | |
| | `model_name` | string | Yes | One of: `ISO27001-Expert`, `RGPD-Expert`, `CyberSec-Assistant` | | |
| **Response:** Plain text response from the model. | |
| --- | |
| ### POST `/api/compare` | |
| Send the same message to all 3 models and compare their responses. | |
| **Parameters:** | |
| | Parameter | Type | Required | Description | | |
| |-----------|------|----------|-------------| | |
| | `message` | string | Yes | The question or prompt | | |
| **Response:** JSON object with each model's response. | |
| --- | |
| ### GET `/api/models` | |
| List all available models and their details. | |
| **Parameters:** None | |
| **Response:** JSON object with model information. | |
| --- | |
| ### GET `/api/health` | |
| Health check endpoint for monitoring. | |
| **Parameters:** None | |
| **Response:** JSON object with API status, version, and rate limit info. | |
| --- | |
| ## Rate Limits | |
| | Limit | Value | | |
| |-------|-------| | |
| | Requests per minute | 30 | | |
| | Max tokens per request | 512 | | |
| | Concurrent requests | 5 | | |
| --- | |
| ## Code Examples | |
| ### Python (using `gradio_client`) | |
| ```python | |
| from gradio_client import Client | |
| # Connect to the API | |
| client = Client("AYI-NEDJIMI/CyberSec-API") | |
| # Chat with a specific model | |
| result = client.predict( | |
| message="What are the key requirements of ISO 27001 Clause 6?", | |
| model_name="ISO27001-Expert", | |
| api_name="/chat" | |
| ) | |
| print(result) | |
| # Compare all models | |
| result = client.predict( | |
| message="How should we handle a data breach?", | |
| api_name="/compare" | |
| ) | |
| print(result) | |
| # List available models | |
| models = client.predict(api_name="/models") | |
| print(models) | |
| # Health check | |
| status = client.predict(api_name="/health") | |
| print(status) | |
| ``` | |
| ### Python (using `requests`) | |
| ```python | |
| import requests | |
| SPACE_URL = "https://ayi-nedjimi-cybersec-api.hf.space" | |
| # Chat endpoint | |
| response = requests.post( | |
| f"{SPACE_URL}/api/chat", | |
| json={ | |
| "data": [ | |
| "What controls does ISO 27001 Annex A recommend for access management?", | |
| "ISO27001-Expert" | |
| ] | |
| } | |
| ) | |
| print(response.json()["data"][0]) | |
| # Compare endpoint | |
| response = requests.post( | |
| f"{SPACE_URL}/api/compare", | |
| json={ | |
| "data": ["How do you perform a risk assessment?"] | |
| } | |
| ) | |
| print(response.json()["data"][0]) | |
| ``` | |
| ### cURL | |
| ```bash | |
| # Chat with a model | |
| curl -X POST "https://ayi-nedjimi-cybersec-api.hf.space/api/chat" \\ | |
| -H "Content-Type: application/json" \\ | |
| -d '{"data": ["What is ISO 27001?", "ISO27001-Expert"]}' | |
| # Compare all models | |
| curl -X POST "https://ayi-nedjimi-cybersec-api.hf.space/api/compare" \\ | |
| -H "Content-Type: application/json" \\ | |
| -d '{"data": ["Explain the principle of least privilege"]}' | |
| # List models | |
| curl -X POST "https://ayi-nedjimi-cybersec-api.hf.space/api/models" \\ | |
| -H "Content-Type: application/json" \\ | |
| -d '{"data": []}' | |
| # Health check | |
| curl -X POST "https://ayi-nedjimi-cybersec-api.hf.space/api/health" \\ | |
| -H "Content-Type: application/json" \\ | |
| -d '{"data": []}' | |
| ``` | |
| ### JavaScript | |
| ```javascript | |
| import { Client } from "@gradio/client"; | |
| const client = await Client.connect("AYI-NEDJIMI/CyberSec-API"); | |
| // Chat with a model | |
| const chatResult = await client.predict("/chat", { | |
| message: "What are GDPR data subject rights?", | |
| model_name: "RGPD-Expert", | |
| }); | |
| console.log(chatResult.data[0]); | |
| // Compare all models | |
| const compareResult = await client.predict("/compare", { | |
| message: "How to respond to a ransomware attack?", | |
| }); | |
| console.log(JSON.parse(compareResult.data[0])); | |
| // List models | |
| const models = await client.predict("/models", {}); | |
| console.log(JSON.parse(models.data[0])); | |
| ``` | |
| --- | |
| ## Authentication | |
| This API is publicly accessible. No authentication token is required to call the endpoints. | |
| The API uses an internal HF token (configured as a Space secret) to communicate with the | |
| Hugging Face Inference API on your behalf. | |
| --- | |
| ## Error Handling | |
| All endpoints return error messages in a consistent format: | |
| | Error | Description | | |
| |-------|-------------| | |
| | `[Error] Message cannot be empty.` | The message parameter was empty or missing | | |
| | `[Error] Unknown model '...'` | Invalid model_name provided | | |
| | `[Error] Rate limit exceeded.` | Too many requests -- wait and retry | | |
| | `[Error] Model query failed: ...` | Upstream inference error | | |
| """ | |
| INTEGRATION_GUIDE_MD = """ | |
| # Integration Guide | |
| Integrate CyberSec-API into your security infrastructure, automation pipelines, and communication tools. | |
| --- | |
| ## 1. SIEM Integration | |
| ### Splunk Integration | |
| Create a custom Splunk alert action that queries CyberSec-API for incident analysis: | |
| ```python | |
| # splunk_cybersec_action.py | |
| # Place in $SPLUNK_HOME/etc/apps/your_app/bin/ | |
| import sys | |
| import json | |
| import requests | |
| CYBERSEC_API = "https://ayi-nedjimi-cybersec-api.hf.space" | |
| def analyze_alert(alert_data): | |
| \"\"\"Send Splunk alert data to CyberSec-Assistant for analysis.\"\"\" | |
| prompt = f\"\"\"Analyze this security alert and provide: | |
| 1. Severity assessment | |
| 2. Recommended immediate actions | |
| 3. Investigation steps | |
| Alert Data: | |
| {json.dumps(alert_data, indent=2)} | |
| \"\"\" | |
| response = requests.post( | |
| f"{CYBERSEC_API}/api/chat", | |
| json={"data": [prompt, "CyberSec-Assistant"]}, | |
| timeout=60 | |
| ) | |
| return response.json()["data"][0] | |
| if __name__ == "__main__": | |
| # Read alert payload from Splunk | |
| alert_payload = json.loads(sys.stdin.read()) | |
| analysis = analyze_alert(alert_payload) | |
| print(analysis) | |
| ``` | |
| **Splunk `alert_actions.conf`:** | |
| ```ini | |
| [cybersec_analyze] | |
| label = CyberSec AI Analysis | |
| description = Analyze security alerts using CyberSec-API | |
| command = python3 $SPLUNK_HOME/etc/apps/cybersec/bin/splunk_cybersec_action.py | |
| is_custom = 1 | |
| ``` | |
| ### Microsoft Sentinel Integration | |
| Use an Azure Logic App or Function to call CyberSec-API from Sentinel playbooks: | |
| ```python | |
| # azure_function/cybersec_sentinel/__init__.py | |
| import json | |
| import logging | |
| import requests | |
| import azure.functions as func | |
| CYBERSEC_API = "https://ayi-nedjimi-cybersec-api.hf.space" | |
| def main(req: func.HttpRequest) -> func.HttpResponse: | |
| \"\"\"Azure Function triggered by Sentinel incident.\"\"\" | |
| incident = req.get_json() | |
| prompt = f\"\"\"Analyze this Microsoft Sentinel security incident: | |
| Title: {incident.get('title', 'N/A')} | |
| Severity: {incident.get('severity', 'N/A')} | |
| Description: {incident.get('description', 'N/A')} | |
| Entities: {json.dumps(incident.get('entities', []))} | |
| Provide: severity validation, recommended response actions, and investigation queries. | |
| \"\"\" | |
| # Check if it is compliance-related | |
| model = "CyberSec-Assistant" | |
| title_lower = incident.get("title", "").lower() | |
| if "gdpr" in title_lower or "data protection" in title_lower: | |
| model = "RGPD-Expert" | |
| elif "compliance" in title_lower or "audit" in title_lower: | |
| model = "ISO27001-Expert" | |
| response = requests.post( | |
| f"{CYBERSEC_API}/api/chat", | |
| json={"data": [prompt, model]}, | |
| timeout=60 | |
| ) | |
| return func.HttpResponse( | |
| json.dumps({"analysis": response.json()["data"][0], "model_used": model}), | |
| mimetype="application/json" | |
| ) | |
| ``` | |
| --- | |
| ## 2. Chat Bot Integration | |
| ### Slack Bot | |
| ```python | |
| # slack_cybersec_bot.py | |
| import os | |
| import json | |
| import requests | |
| from slack_bolt import App | |
| from slack_bolt.adapter.socket_mode import SocketModeHandler | |
| CYBERSEC_API = "https://ayi-nedjimi-cybersec-api.hf.space" | |
| app = App(token=os.environ["SLACK_BOT_TOKEN"]) | |
| MODEL_MAP = { | |
| "iso": "ISO27001-Expert", | |
| "gdpr": "RGPD-Expert", | |
| "rgpd": "RGPD-Expert", | |
| "sec": "CyberSec-Assistant", | |
| "cyber": "CyberSec-Assistant", | |
| } | |
| def detect_model(text): | |
| \"\"\"Auto-detect the best model based on keywords.\"\"\" | |
| text_lower = text.lower() | |
| for keyword, model in MODEL_MAP.items(): | |
| if keyword in text_lower: | |
| return model | |
| return "CyberSec-Assistant" # default | |
| @app.message("!ask") | |
| def handle_ask(message, say): | |
| \"\"\"Handle '!ask <question>' messages.\"\"\" | |
| query = message["text"].replace("!ask", "").strip() | |
| if not query: | |
| say("Usage: `!ask <your cybersecurity question>`") | |
| return | |
| model = detect_model(query) | |
| say(f"Asking *{model}*... :hourglass:") | |
| response = requests.post( | |
| f"{CYBERSEC_API}/api/chat", | |
| json={"data": [query, model]}, | |
| timeout=60 | |
| ) | |
| answer = response.json()["data"][0] | |
| say(f"*{model}:*\\n{answer}") | |
| @app.message("!compare") | |
| def handle_compare(message, say): | |
| \"\"\"Handle '!compare <question>' to get all 3 model responses.\"\"\" | |
| query = message["text"].replace("!compare", "").strip() | |
| if not query: | |
| say("Usage: `!compare <your cybersecurity question>`") | |
| return | |
| say("Comparing all 3 models... :hourglass:") | |
| response = requests.post( | |
| f"{CYBERSEC_API}/api/compare", | |
| json={"data": [query]}, | |
| timeout=120 | |
| ) | |
| results = json.loads(response.json()["data"][0]) | |
| for model_name, data in results.items(): | |
| say(f"*{model_name}* ({data['specialty']}):\\n{data['response']}") | |
| if __name__ == "__main__": | |
| handler = SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"]) | |
| handler.start() | |
| ``` | |
| ### Discord Bot | |
| ```python | |
| # discord_cybersec_bot.py | |
| import os | |
| import json | |
| import discord | |
| import requests | |
| from discord.ext import commands | |
| CYBERSEC_API = "https://ayi-nedjimi-cybersec-api.hf.space" | |
| bot = commands.Bot(command_prefix="!", intents=discord.Intents.default()) | |
| @bot.command(name="ask") | |
| async def ask(ctx, model: str = "CyberSec-Assistant", *, question: str): | |
| \"\"\"Ask a cybersecurity question. Usage: !ask [model] <question>\"\"\" | |
| valid_models = ["ISO27001-Expert", "RGPD-Expert", "CyberSec-Assistant"] | |
| if model not in valid_models: | |
| question = f"{model} {question}" | |
| model = "CyberSec-Assistant" | |
| await ctx.send(f"Querying **{model}**...") | |
| response = requests.post( | |
| f"{CYBERSEC_API}/api/chat", | |
| json={"data": [question, model]}, | |
| timeout=60 | |
| ) | |
| answer = response.json()["data"][0] | |
| # Discord has a 2000 char limit | |
| if len(answer) > 1900: | |
| for i in range(0, len(answer), 1900): | |
| await ctx.send(answer[i:i+1900]) | |
| else: | |
| await ctx.send(f"**{model}:**\\n{answer}") | |
| bot.run(os.environ["DISCORD_TOKEN"]) | |
| ``` | |
| --- | |
| ## 3. CI/CD Pipeline Integration | |
| ### GitHub Actions | |
| ```yaml | |
| # .github/workflows/security-review.yml | |
| name: AI Security Review | |
| on: | |
| pull_request: | |
| paths: | |
| - '**.py' | |
| - '**.js' | |
| - '**.yml' | |
| - 'Dockerfile' | |
| jobs: | |
| security-review: | |
| runs-on: ubuntu-latest | |
| steps: | |
| - uses: actions/checkout@v4 | |
| - name: Get changed files | |
| id: changed | |
| run: | | |
| FILES=$(git diff --name-only ${{ github.event.pull_request.base.sha }} HEAD) | |
| echo "files=$FILES" >> $GITHUB_OUTPUT | |
| - name: AI Security Review | |
| run: | | |
| pip install requests | |
| python - <<'SCRIPT' | |
| import requests, os, json | |
| API = "https://ayi-nedjimi-cybersec-api.hf.space" | |
| files = "${{ steps.changed.outputs.files }}".split("\\n") | |
| prompt = f\"\"\"Review these changed files for security vulnerabilities, | |
| hardcoded secrets, and compliance issues: | |
| Changed files: {', '.join(files)} | |
| Provide a security assessment with: | |
| 1. Critical issues found | |
| 2. Recommendations | |
| 3. Compliance notes (ISO 27001 / GDPR if applicable) | |
| \"\"\" | |
| resp = requests.post( | |
| f"{API}/api/compare", | |
| json={"data": [prompt]}, | |
| timeout=120 | |
| ) | |
| results = json.loads(resp.json()["data"][0]) | |
| for model, data in results.items(): | |
| print(f"\\n{'='*60}") | |
| print(f"Model: {model} ({data['specialty']})") | |
| print(f"{'='*60}") | |
| print(data["response"]) | |
| SCRIPT | |
| ``` | |
| ### GitLab CI | |
| ```yaml | |
| # .gitlab-ci.yml | |
| security-ai-scan: | |
| stage: test | |
| image: python:3.11-slim | |
| script: | |
| - pip install requests | |
| - | | |
| python3 -c " | |
| import requests, json | |
| API = 'https://ayi-nedjimi-cybersec-api.hf.space' | |
| resp = requests.post( | |
| f'{API}/api/chat', | |
| json={'data': [ | |
| 'Review this CI/CD pipeline for security best practices and suggest improvements.', | |
| 'CyberSec-Assistant' | |
| ]}, | |
| timeout=60 | |
| ) | |
| print(resp.json()['data'][0]) | |
| " | |
| only: | |
| changes: | |
| - .gitlab-ci.yml | |
| - Dockerfile | |
| - docker-compose*.yml | |
| ``` | |
| --- | |
| ## 4. Python SDK Example | |
| Create a reusable Python SDK wrapper for clean integration: | |
| ```python | |
| # cybersec_sdk.py | |
| \"\"\"CyberSec-API Python SDK\"\"\" | |
| import json | |
| from typing import Optional | |
| from gradio_client import Client | |
| class CyberSecAPI: | |
| \"\"\"Client for the CyberSec-API gateway.\"\"\" | |
| MODELS = ["ISO27001-Expert", "RGPD-Expert", "CyberSec-Assistant"] | |
| def __init__(self, space_id: str = "AYI-NEDJIMI/CyberSec-API"): | |
| self.client = Client(space_id) | |
| def chat(self, message: str, model: str = "CyberSec-Assistant") -> str: | |
| \"\"\"Send a question to a specific model.\"\"\" | |
| if model not in self.MODELS: | |
| raise ValueError(f"Unknown model '{model}'. Choose from: {self.MODELS}") | |
| return self.client.predict( | |
| message=message, | |
| model_name=model, | |
| api_name="/chat" | |
| ) | |
| def compare(self, message: str) -> dict: | |
| \"\"\"Get responses from all 3 models for comparison.\"\"\" | |
| result = self.client.predict(message=message, api_name="/compare") | |
| return json.loads(result) | |
| def models(self) -> dict: | |
| \"\"\"List available models.\"\"\" | |
| result = self.client.predict(api_name="/models") | |
| return json.loads(result) | |
| def health(self) -> dict: | |
| \"\"\"Check API health status.\"\"\" | |
| result = self.client.predict(api_name="/health") | |
| return json.loads(result) | |
| def ask_iso27001(self, question: str) -> str: | |
| \"\"\"Shortcut to query the ISO 27001 expert.\"\"\" | |
| return self.chat(question, model="ISO27001-Expert") | |
| def ask_rgpd(self, question: str) -> str: | |
| \"\"\"Shortcut to query the RGPD/GDPR expert.\"\"\" | |
| return self.chat(question, model="RGPD-Expert") | |
| def ask_cybersec(self, question: str) -> str: | |
| \"\"\"Shortcut to query the general cybersecurity assistant.\"\"\" | |
| return self.chat(question, model="CyberSec-Assistant") | |
| # Usage example | |
| if __name__ == "__main__": | |
| api = CyberSecAPI() | |
| # Check health | |
| print("Health:", api.health()) | |
| # Ask a question | |
| answer = api.ask_iso27001("What are the mandatory documents for ISO 27001 certification?") | |
| print("Answer:", answer) | |
| # Compare models | |
| comparison = api.compare("What is the best approach to incident response?") | |
| for model, data in comparison.items(): | |
| print(f"\\n{model}: {data['response'][:200]}...") | |
| ``` | |
| --- | |
| ## 5. Webhook Integration | |
| For event-driven architectures, set up a webhook relay: | |
| ```python | |
| # webhook_relay.py | |
| from flask import Flask, request, jsonify | |
| import requests | |
| app = Flask(__name__) | |
| CYBERSEC_API = "https://ayi-nedjimi-cybersec-api.hf.space" | |
| @app.route("/webhook/security-alert", methods=["POST"]) | |
| def security_alert_webhook(): | |
| \"\"\"Receive security alerts and auto-analyze with CyberSec-API.\"\"\" | |
| alert = request.json | |
| prompt = f"Analyze this security alert: {json.dumps(alert)}" | |
| response = requests.post( | |
| f"{CYBERSEC_API}/api/chat", | |
| json={"data": [prompt, "CyberSec-Assistant"]}, | |
| timeout=60 | |
| ) | |
| return jsonify({ | |
| "alert_id": alert.get("id"), | |
| "ai_analysis": response.json()["data"][0] | |
| }) | |
| ``` | |
| """ | |
| # --------------------------------------------------------------------------- | |
| # CSS | |
| # --------------------------------------------------------------------------- | |
| CUSTOM_CSS = """ | |
| .api-docs { | |
| max-width: 900px; | |
| margin: 0 auto; | |
| } | |
| .model-card { | |
| border: 1px solid #374151; | |
| border-radius: 8px; | |
| padding: 16px; | |
| margin: 8px 0; | |
| background: #1a1a2e; | |
| } | |
| .header-banner { | |
| background: linear-gradient(135deg, #0f0f23 0%, #1a1a3e 50%, #2d1b4e 100%); | |
| padding: 24px; | |
| border-radius: 12px; | |
| margin-bottom: 16px; | |
| border: 1px solid #333; | |
| text-align: center; | |
| } | |
| .status-badge { | |
| display: inline-block; | |
| padding: 4px 12px; | |
| border-radius: 12px; | |
| font-size: 0.85em; | |
| font-weight: 600; | |
| } | |
| .status-healthy { background: #064e3b; color: #6ee7b7; } | |
| .status-degraded { background: #78350f; color: #fcd34d; } | |
| footer { display: none !important; } | |
| """ | |
| # --------------------------------------------------------------------------- | |
| # Gradio UI | |
| # --------------------------------------------------------------------------- | |
| with gr.Blocks( | |
| title="CyberSec-API", | |
| css=CUSTOM_CSS, | |
| theme=gr.themes.Base( | |
| primary_hue="blue", | |
| secondary_hue="gray", | |
| neutral_hue="gray", | |
| ), | |
| ) as demo: | |
| # Header | |
| gr.HTML(""" | |
| <div class="header-banner"> | |
| <h1 style="margin:0; font-size:2em; color:#60a5fa;">CyberSec-API</h1> | |
| <p style="margin:4px 0 0; color:#9ca3af; font-size:1.1em;"> | |
| REST API Gateway for Cybersecurity AI Models | |
| </p> | |
| <p style="margin:8px 0 0; color:#6b7280; font-size:0.9em;"> | |
| ISO 27001 • GDPR/RGPD • General Cybersecurity | |
| </p> | |
| </div> | |
| """) | |
| with gr.Tabs(): | |
| # ===== Tab 1: API Documentation ===== | |
| with gr.Tab("API Documentation", id="docs"): | |
| gr.Markdown(API_DOCS_MD, elem_classes=["api-docs"]) | |
| # ===== Tab 2: Try It ===== | |
| with gr.Tab("Try It", id="try-it"): | |
| gr.Markdown("## Interactive API Tester") | |
| gr.Markdown("Select a model, type your cybersecurity question, and get a response.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| model_selector = gr.Dropdown( | |
| choices=MODEL_NAMES, | |
| value="CyberSec-Assistant", | |
| label="Select Model", | |
| info="Choose which cybersecurity expert to query", | |
| ) | |
| user_input = gr.Textbox( | |
| label="Your Question", | |
| placeholder="e.g., What are the key steps for implementing an ISMS according to ISO 27001?", | |
| lines=4, | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button("Submit", variant="primary", scale=2) | |
| clear_btn = gr.Button("Clear", variant="secondary", scale=1) | |
| with gr.Column(scale=3): | |
| response_output = gr.Textbox( | |
| label="Model Response", | |
| lines=16, | |
| interactive=False, | |
| show_copy_button=True, | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("### Quick Examples") | |
| gr.Examples( | |
| examples=[ | |
| ["What are the mandatory documents required for ISO 27001 certification?", "ISO27001-Expert"], | |
| ["Explain the GDPR right to data portability under Article 20.", "RGPD-Expert"], | |
| ["How should a SOC team respond to a ransomware incident?", "CyberSec-Assistant"], | |
| ["What is the difference between ISO 27001 and ISO 27002?", "ISO27001-Expert"], | |
| ["What are the lawful bases for processing personal data under GDPR?", "RGPD-Expert"], | |
| ["Explain the MITRE ATT&CK framework and its use in threat hunting.", "CyberSec-Assistant"], | |
| ], | |
| inputs=[user_input, model_selector], | |
| label="Click an example to populate the form", | |
| ) | |
| # Compare section | |
| gr.Markdown("---") | |
| gr.Markdown("### Compare All Models") | |
| gr.Markdown("Send the same question to all 3 models and see how each expert responds.") | |
| compare_input = gr.Textbox( | |
| label="Question for All Models", | |
| placeholder="e.g., How do you perform a security risk assessment?", | |
| lines=2, | |
| ) | |
| compare_btn = gr.Button("Compare All Models", variant="primary") | |
| compare_output = gr.Textbox( | |
| label="Comparison Results (JSON)", | |
| lines=20, | |
| interactive=False, | |
| show_copy_button=True, | |
| ) | |
| # Status section | |
| gr.Markdown("---") | |
| gr.Markdown("### API Status") | |
| with gr.Row(): | |
| models_btn = gr.Button("List Models", variant="secondary") | |
| health_btn = gr.Button("Health Check", variant="secondary") | |
| status_output = gr.Textbox( | |
| label="Status Output", | |
| lines=10, | |
| interactive=False, | |
| show_copy_button=True, | |
| ) | |
| # Wire up events with api_name for clean API URLs | |
| submit_btn.click( | |
| fn=chat, | |
| inputs=[user_input, model_selector], | |
| outputs=response_output, | |
| api_name="chat", | |
| ) | |
| clear_btn.click( | |
| fn=lambda: ("", ""), | |
| inputs=None, | |
| outputs=[user_input, response_output], | |
| api_name=False, | |
| ) | |
| compare_btn.click( | |
| fn=compare, | |
| inputs=compare_input, | |
| outputs=compare_output, | |
| api_name="compare", | |
| ) | |
| models_btn.click( | |
| fn=list_models, | |
| inputs=None, | |
| outputs=status_output, | |
| api_name="models", | |
| ) | |
| health_btn.click( | |
| fn=health_check, | |
| inputs=None, | |
| outputs=status_output, | |
| api_name="health", | |
| ) | |
| # ===== Tab 3: Integration Guide ===== | |
| with gr.Tab("Integration Guide", id="integration"): | |
| gr.Markdown(INTEGRATION_GUIDE_MD, elem_classes=["api-docs"]) | |
| # Footer | |
| gr.Markdown( | |
| "<center style='color:#6b7280; margin-top:16px;'>" | |
| "CyberSec-API v1.0.0 | " | |
| "<a href='https://huggingface.co/AYI-NEDJIMI' target='_blank'>AYI-NEDJIMI</a> | " | |
| "Powered by Hugging Face Inference API" | |
| "</center>" | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Launch | |
| # --------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_api=True, | |
| ) | |