File size: 4,655 Bytes
0355450
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""LLM provider abstraction and implementations."""

import asyncio
import logging
from abc import ABC, abstractmethod
from typing import Optional

import httpx

from src.utils import config


class BaseLLMProvider(ABC):
    """Abstract base class for LLM providers."""

    @abstractmethod
    async def analyze(self, log_text: str) -> str:
        """
        Analyze a security log using the LLM.

        Args:
            log_text: Raw log text to analyze

        Returns:
            Analysis response from the LLM
        """
        pass


class OpenAIProvider(BaseLLMProvider):
    """OpenAI GPT provider."""

    def __init__(self, api_key: str, model: str = "gpt-4-turbo"):
        self.api_key = api_key
        self.model = model
        self.logger = logging.getLogger(__name__)

    async def analyze(self, log_text: str) -> str:
        """Call OpenAI API for analysis."""
        from src.llm.prompts import get_analysis_prompt

        prompt = get_analysis_prompt(log_text)

        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    "https://api.openai.com/v1/chat/completions",
                    headers={"Authorization": f"Bearer {self.api_key}"},
                    json={
                        "model": self.model,
                        "messages": [{"role": "user", "content": prompt}],
                        "temperature": 0.7,
                        "max_tokens": 1000,
                    },
                    timeout=30,
                )
                response.raise_for_status()
                return response.json()["choices"][0]["message"]["content"]
        except Exception as e:
            self.logger.error(f"OpenAI API error: {e}")
            raise


class LocalLLMProvider(BaseLLMProvider):
    """Local LLM provider (Ollama, LM Studio, etc.)."""

    def __init__(self, base_url: str = "http://localhost:11434", model: str = "mistral"):
        self.base_url = base_url
        self.model = model
        self.logger = logging.getLogger(__name__)

    async def analyze(self, log_text: str) -> str:
        """Call local LLM endpoint for analysis."""
        from src.llm.prompts import get_analysis_prompt

        prompt = get_analysis_prompt(log_text)

        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    f"{self.base_url}/api/generate",
                    json={
                        "model": self.model,
                        "prompt": prompt,
                        "stream": False,
                    },
                    timeout=60,
                )
                response.raise_for_status()
                return response.json()["response"]
        except Exception as e:
            self.logger.error(f"Local LLM error: {e}")
            raise


class MockLLMProvider(BaseLLMProvider):
    """Mock LLM for testing and demonstrations."""

    def __init__(self):
        self.logger = logging.getLogger(__name__)

    async def analyze(self, log_text: str) -> str:
        """Return a deterministic mock analysis."""
        # Simulate network delay
        await asyncio.sleep(0.5)

        # Generate response based on log content
        risk_level = "MEDIUM"
        if any(word in log_text.lower() for word in ["critical", "ransomware", "breach"]):
            risk_level = "CRITICAL"
        elif any(word in log_text.lower() for word in ["failed", "denied", "suspicious"]):
            risk_level = "HIGH"
        elif len(log_text) < 100:
            risk_level = "LOW"

        return f"""**What Happened:**
An authentication anomaly was detected in the system logs. Multiple failed login attempts were recorded from unusual IP addresses during off-peak hours.

**Risk Level:**
{risk_level}

**Suggested Actions:**
- Review authentication logs for the affected accounts
- Check IP reputation of source addresses
- Consider implementing rate-limiting on login endpoints
- Notify affected users of suspicious activity

**Key Indicators:**
- Multiple failed authentication attempts (5+ in short timeframe)
- Unusual geographic location
- Off-peak access time
"""


def create_provider() -> BaseLLMProvider:
    """Factory function to create appropriate LLM provider based on config."""
    from src.utils.config import LLMProvider

    if config.llm_provider == LLMProvider.OPENAI:
        return OpenAIProvider(config.openai_api_key, config.model_name)
    elif config.llm_provider == LLMProvider.LOCAL:
        return LocalLLMProvider(model=config.model_name)
    else:  # MOCK
        return MockLLMProvider()