""" usage_logging.py ---------------- Purpose: This module implements privacy-preserving telemetry for the AI Recruiting Agent Hugging Face Space. Its sole purpose is to measure anonymous usage and adoption metrics in order to: - Understand how the tool is being used - Improve reliability and performance - Gauge sense of real-world adoption - Support research and evaluation of responsible AI practices Privacy Principles: This module is explicitly designed to minimize data collection and avoid storing any personally identifiable information (PII). It DOES NOT collect or store: - Raw IP addresses - User names or Hugging Face account IDs - Resume contents or job descriptions - Emails, phone numbers, or file names - Full user-agent strings or device fingerprints - Any demographic attributes about users It ONLY records: - Approximate country and city (derived from IP, not stored) - UTC timestamp of the event - Space URL - High-level event type (e.g., "app_open") - Non-identifying, aggregate metadata (e.g., counts, booleans, latencies) All usage logs are: - Anonymized - Append-only - Persisted in a public Hugging Face Dataset repository (https://huggingface.co/datasets/19arjun89/ai_recruiting_agent_usage) - Versioned via immutable commit history for auditability Ethical Safeguards: - Logging failures never break application functionality - No raw identifiers are persisted at any time - All telemetry is optional and best-effort - The system is intended for transparency and improvement, not for surveillance or profiling Transparency: A public-facing usage reporting Space will be provided to allow independent verification of aggregate adoption metrics. Author: Arjun Singh Last Updated: 2026-01-27 """ import os import json from datetime import datetime import requests import gradio as gr from huggingface_hub import HfApi, hf_hub_url import ipaddress import pycountry from io import BytesIO import uuid SPACE_URL = "https://huggingface.co/spaces/19arjun89/AI_Recruiting_Agent" USAGE_DATASET_REPO = "19arjun89/ai_recruiting_agent_usage" USAGE_EVENTS_DIR = "usage/events" def _hf_api(): token = os.environ.get("HF_TOKEN") if not token: return None return HfApi(token=token) def _download_text_if_exists(repo_id: str, path_in_repo: str) -> str: try: url = hf_hub_url( repo_id=repo_id, filename=path_in_repo, repo_type="dataset" ) r = requests.get(url, timeout=5) if r.status_code == 200: return r.text except Exception: pass return "" def _is_public_ip(ip: str) -> bool: try: obj = ipaddress.ip_address(ip) return not (obj.is_private or obj.is_loopback or obj.is_reserved or obj.is_multicast or obj.is_link_local) except Exception: return False def _get_client_ip(request: gr.Request) -> str: if request: xff = request.headers.get("x-forwarded-for") if xff: for part in xff.split(","): ip = part.strip() if _is_public_ip(ip): return ip if request.client: host = request.client.host return host if _is_public_ip(host) else "" return "" def _country_lookup(ip: str) -> tuple[str, str]: token = os.environ.get("IPINFO_TOKEN") if not token: return ("", "") try: url = f"https://ipinfo.io/{ip}/json?token={token}" r = requests.get(url, timeout=4) if r.status_code != 200: return ("", "") data = r.json() # Some plans: country="US" # Some plans: country_code="US" and country="United States" cc = (data.get("country_code") or data.get("country") or "").strip().upper() name = (data.get("country") or "").strip() # If name is actually a code like "US", expand it if len(name) == 2 and name.upper() == cc: name = _expand_country_code(cc) # If name is missing but cc exists, expand if not name and cc: name = _expand_country_code(cc) return (cc, name) except Exception: return ("", "") def append_visit_to_dataset( country: str, city: str, event_type: str = "usage_start", country_source: str = "unknown", country_code: str = "", **extra_fields ): api = _hf_api() if not api: return event = { "ts_utc": datetime.utcnow().isoformat() + "Z", "space_url": SPACE_URL, "event": event_type, "country": country or "Unknown", "country_code": (country_code or "").strip().upper(), "country_source": country_source or "unknown", "city": city or "", } if extra_fields: # Prevent JSON nulls event.update({k: v for k, v in extra_fields.items() if v is not None}) # Unique file path per event (prevents collisions) ts = datetime.utcnow().strftime("%Y%m%dT%H%M%S%f") uid = uuid.uuid4().hex[:8] path_in_repo = f"{USAGE_EVENTS_DIR}/{ts}_{uid}.json" try: api.upload_file( repo_id=USAGE_DATASET_REPO, repo_type="dataset", path_in_repo=path_in_repo, path_or_fileobj=BytesIO(json.dumps(event).encode("utf-8")), commit_message=f"log {event_type}", ) except Exception: pass def record_visit(request: gr.Request): # 1) Header hint country_hint = _country_from_headers(request) if _is_valid_country_code(country_hint): append_visit_to_dataset( country=_expand_country_code(country_hint), city="", event_type="usage_start", country_source="header", country_code=country_hint.strip().upper(), ) return # 2) IP-based lookup ip = _get_client_ip(request) if ip: cc, name = _country_lookup(ip) if _is_valid_country_code(cc): append_visit_to_dataset( country=name or _expand_country_code(cc), city="", event_type="usage_start", country_source="ipinfo", country_code=cc, ) else: append_visit_to_dataset( country="Unknown", city="", event_type="usage_start", country_source="ipinfo_unknown", country_code="", ) return # 3) Nothing usable append_visit_to_dataset( country="Unknown", city="", event_type="usage_start", country_source="none", country_code="", ) def _country_from_headers(request: gr.Request) -> str: if not request: return "" return ( request.headers.get("cf-ipcountry") or request.headers.get("x-country") or request.headers.get("x-geo-country") or "" ).strip().upper() def _is_valid_country_code(code: str) -> bool: if not code: return False code = code.strip().upper() # Common "unknown" markers from CDNs / proxies if code in {"XX", "ZZ", "UNKNOWN", "NA", "N/A", "NONE", "-"}: return False # ISO2 should be exactly 2 letters return len(code) == 2 and code.isalpha() def _expand_country_code(code: str) -> str: if not code or len(code) != 2: return "Unknown" try: country = pycountry.countries.get(alpha_2=code.upper()) return country.name if country else "Unknown" except Exception: return "Unknown"