File size: 4,326 Bytes
ad16b40
 
 
 
 
 
 
 
 
 
 
 
 
 
3c8491b
ad16b40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3c8491b
 
ad16b40
 
 
 
 
 
3c8491b
ad16b40
 
 
3c8491b
 
 
 
 
 
 
 
 
ad16b40
3c8491b
 
 
 
 
 
 
 
ad16b40
 
3c8491b
 
 
 
ad16b40
 
 
3c8491b
ad16b40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3c8491b
ad16b40
 
 
3c8491b
 
 
 
ad16b40
 
 
 
 
 
 
 
 
 
3c8491b
ad16b40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
In-memory cache for forensic analysis results.

Why caching?
- Image forensics is CPU-intensive (FFT, hashing, EXIF)
- Same file uploaded twice = wasted computation
- SHA-256 hash = unique file fingerprint

Privacy note:
- Cache stores results only, never file bytes
- Results contain no personal data
- Cache cleared on server restart (no persistence)
"""
import hashlib
from typing import Dict, Optional, Any, Union
from datetime import datetime, timedelta
from backend.core.logger import setup_logger

logger = setup_logger(__name__)

# Max cached results (prevents memory abuse)
MAX_CACHE_SIZE = 500

# Cache TTL: results expire after 1 hour
CACHE_TTL_MINUTES = 60


class ForensicsCache:
    """
    Thread-safe in-memory cache for forensic results.
    Key: SHA-256 hash of file bytes
    Value: forensic report + timestamp
    
    OPTIMIZATION: Accepts pre-computed hash to avoid duplicate hashing.
    """

    def __init__(self):
        self._cache: Dict[str, Dict[str, Any]] = {}
        logger.info("Forensics cache initialized")

    def _compute_key(self, file_identifier: Union[bytes, str]) -> str:
        """
        Compute SHA-256 hash as cache key.
        Same file = same hash = cache hit.
        
        OPTIMIZATION: If a string (pre-computed hash) is provided,
        use it directly to avoid redundant hashing.
        
        Args:
            file_identifier: Either raw file bytes OR pre-computed SHA-256 hash
            
        Returns:
            SHA-256 hash string
        """
        if isinstance(file_identifier, str):
            # Already a hash - use directly (OPTIMIZATION)
            return file_identifier
        else:
            # Compute hash from bytes
            return hashlib.sha256(file_identifier).hexdigest()

    def get(self, file_identifier: Union[bytes, str]) -> Optional[Dict[str, Any]]:
        """
        Retrieve cached result if available and not expired.
        
        Args:
            file_identifier: Either raw file bytes OR pre-computed SHA-256 hash
            
        Returns:
            Cached report dict or None if miss/expired
        """
        key = self._compute_key(file_identifier)

        if key not in self._cache:
            logger.info(f"Cache MISS: {key[:16]}...")
            return None

        entry = self._cache[key]

        # Check TTL expiry
        age = datetime.now() - entry["cached_at"]
        if age > timedelta(minutes=CACHE_TTL_MINUTES):
            del self._cache[key]
            logger.info(f"Cache EXPIRED: {key[:16]}...")
            return None

        logger.info(
            f"Cache HIT: {key[:16]}... "
            f"(age={age.seconds}s, "
            f"cache_size={len(self._cache)})"
        )

        # Add cache metadata to response
        result = entry["report"].copy()
        result["cache_info"] = {
            "cached": True,
            "age_seconds": age.seconds,
            "cache_hit": True
        }
        return result

    def set(self, file_identifier: Union[bytes, str], report: Dict[str, Any]) -> None:
        """
        Store forensic report in cache.
        Evicts oldest entry if cache is full.
        
        Args:
            file_identifier: Either raw file bytes OR pre-computed SHA-256 hash
            report: Forensic analysis report to cache
        """
        # Evict oldest if at capacity
        if len(self._cache) >= MAX_CACHE_SIZE:
            oldest_key = min(
                self._cache,
                key=lambda k: self._cache[k]["cached_at"]
            )
            del self._cache[oldest_key]
            logger.info(f"Cache EVICT: {oldest_key[:16]}...")

        key = self._compute_key(file_identifier)
        self._cache[key] = {
            "report": report,
            "cached_at": datetime.now()
        }

        logger.info(
            f"Cache SET: {key[:16]}... "
            f"(cache_size={len(self._cache)})"
        )

    def size(self) -> int:
        """Return current number of cached entries."""
        return len(self._cache)

    def clear(self) -> None:
        """Clear all cached entries."""
        count = len(self._cache)
        self._cache.clear()
        logger.info(f"Cache CLEARED: {count} entries removed")


# Singleton instance - shared across all requests
forensics_cache = ForensicsCache()