File size: 4,279 Bytes
4b62d23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
IP and location logging service with Redis gating.
"""

import os
import logging
import requests
from typing import Optional, Dict, Any

from .redis_service import get_redis_service
from .mongodb_service import get_mongodb_service

logger = logging.getLogger(__name__)


class IPLocationService:
    """
    Service for logging IP and location data.
    Uses Redis gate to limit IPinfo API calls (once per device per 5 minutes).
    """
    
    def __init__(self):
        self.redis_service = get_redis_service()
        self.mongodb_service = get_mongodb_service()
        self.ipinfo_token = os.getenv("IPINFO_TOKEN")
        
        if not self.ipinfo_token:
            logger.warning("IPINFO_TOKEN not set. IP location logging disabled.")
    
    def _fetch_ip_location(self, ip_address: str) -> Optional[Dict[str, Any]]:
        """
        Fetch IP location from IPinfo API.
        
        Args:
            ip_address: IP address to lookup
        
        Returns:
            Location data or None
        """
        if not self.ipinfo_token:
            return None
        
        try:
            url = f"https://ipinfo.io/{ip_address}"
            headers = {"Authorization": f"Bearer {self.ipinfo_token}"}
            
            response = requests.get(url, headers=headers, timeout=5)
            
            if response.status_code == 200:
                data = response.json()
                
                # Extract relevant fields
                location_data = {
                    "ip": data.get("ip"),
                    "city": data.get("city"),
                    "region": data.get("region"),
                    "country": data.get("country"),
                    "loc": data.get("loc"),  # latitude,longitude
                    "org": data.get("org"),  # ISP/organization
                    "timezone": data.get("timezone")
                }
                
                return location_data
            else:
                logger.warning(f"IPinfo API returned status {response.status_code}")
                return None
            
        except Exception as e:
            logger.error(f"Failed to fetch IP location: {str(e)}")
            return None
    
    def log_session_metadata(
        self,
        device_id: str,
        ip_address: str,
        user_id: Optional[str] = None,
        user_agent: Optional[str] = None
    ) -> bool:
        """
        Log session metadata with IP and location (gated by Redis).
        
        Only logs if Redis gate allows (once per device per 5 minutes).
        
        Args:
            device_id: Device identifier
            ip_address: Client IP address
            user_id: Optional user identifier
            user_agent: Optional user agent string
        
        Returns:
            True if logged, False if skipped or failed
        """
        # Check Redis gate (5 minute TTL)
        should_log = self.redis_service.should_log_ip(device_id, ttl_seconds=300)
        
        if not should_log:
            logger.debug(f"IP logging skipped for device {device_id} (within TTL window)")
            return False
        
        # Fetch IP location
        location_data = self._fetch_ip_location(ip_address)
        
        # Prepare metadata
        metadata = {
            "ip": ip_address,
            "user_agent": user_agent
        }
        
        # Add location data if available
        if location_data:
            metadata.update(location_data)
        
        # Log SESSION_METADATA event to MongoDB
        success = self.mongodb_service.log_event(
            event_type="SESSION_METADATA",
            device_id=device_id,
            user_id=user_id,
            metadata=metadata
        )
        
        if success:
            logger.info(f"Session metadata logged for device {device_id}")
        else:
            logger.warning(f"Failed to log session metadata for device {device_id}")
        
        return success


# Singleton instance
_ip_location_service = None

def get_ip_location_service() -> IPLocationService:
    """Get IP location service singleton."""
    global _ip_location_service
    if _ip_location_service is None:
        _ip_location_service = IPLocationService()
    return _ip_location_service