AI_Recruiting_Agent / usage_logging.py
19arjun89's picture
Update usage_logging.py
d329207 verified
"""
usage_logging.py
----------------
Purpose:
This module implements privacy-preserving telemetry for the
AI Recruiting Agent Hugging Face Space.
Its sole purpose is to measure anonymous usage and adoption
metrics in order to:
- Understand how the tool is being used
- Improve reliability and performance
- Gauge sense of real-world adoption
- Support research and evaluation of responsible AI practices
Privacy Principles:
This module is explicitly designed to minimize data collection
and avoid storing any personally identifiable information (PII).
It DOES NOT collect or store:
- Raw IP addresses
- User names or Hugging Face account IDs
- Resume contents or job descriptions
- Emails, phone numbers, or file names
- Full user-agent strings or device fingerprints
- Any demographic attributes about users
It ONLY records:
- Approximate country and city (derived from IP, not stored)
- UTC timestamp of the event
- Space URL
- High-level event type (e.g., "app_open")
- Non-identifying, aggregate metadata (e.g., counts, booleans, latencies)
All usage logs are:
- Anonymized
- Append-only
- Persisted in a public Hugging Face Dataset repository (https://huggingface.co/datasets/19arjun89/ai_recruiting_agent_usage)
- Versioned via immutable commit history for auditability
Ethical Safeguards:
- Logging failures never break application functionality
- No raw identifiers are persisted at any time
- All telemetry is optional and best-effort
- The system is intended for transparency and improvement,
not for surveillance or profiling
Transparency:
A public-facing usage reporting Space will be provided to allow
independent verification of aggregate adoption metrics.
Author:
Arjun Singh
Last Updated:
2026-01-22
"""
import os
import json
from datetime import datetime
import requests
import gradio as gr
from huggingface_hub import HfApi, hf_hub_url
import ipaddress
SPACE_URL = "https://huggingface.co/spaces/19arjun89/AI_Recruiting_Agent"
USAGE_DATASET_REPO = "19arjun89/ai_recruiting_agent_usage"
USAGE_JSONL_PATH = "usage/visits.jsonl"
def _hf_api():
token = os.environ.get("HF_TOKEN")
if not token:
return None
return HfApi(token=token)
def _download_text_if_exists(repo_id: str, path_in_repo: str) -> str:
try:
url = hf_hub_url(
repo_id=repo_id,
filename=path_in_repo,
repo_type="dataset"
)
r = requests.get(url, timeout=5)
if r.status_code == 200:
return r.text
except Exception:
pass
return ""
def _is_public_ip(ip: str) -> bool:
try:
obj = ipaddress.ip_address(ip)
return not (obj.is_private or obj.is_loopback or obj.is_reserved or obj.is_multicast or obj.is_link_local)
except Exception:
return False
def _get_client_ip(request: gr.Request) -> str:
if request:
xff = request.headers.get("x-forwarded-for")
if xff:
for part in xff.split(","):
ip = part.strip()
if _is_public_ip(ip):
return ip
if request.client:
host = request.client.host
return host if _is_public_ip(host) else ""
return ""
def _country_lookup(ip: str) -> str:
try:
r = requests.get(f"https://ipapi.co/{ip}/json/", timeout=4)
if r.status_code == 200:
data = r.json()
return (data.get("country_name") or "Unknown").strip()
except Exception:
pass
return "Unknown"
def append_visit_to_dataset(country: str, city: str, event_type: str = "usage_start", **extra_fields):
api = _hf_api()
if not api:
return
existing = _download_text_if_exists(
USAGE_DATASET_REPO,
USAGE_JSONL_PATH
)
event = {
"ts_utc": datetime.utcnow().isoformat() + "Z",
"space_url": SPACE_URL,
"country": country,
"city": city,
"event": event_type,
}
if extra_fields:
event.update(extra_fields)
new_content = (
existing.rstrip("\n") + "\n"
if existing.strip()
else ""
) + json.dumps(event) + "\n"
try:
api.upload_file(
repo_id=USAGE_DATASET_REPO,
repo_type="dataset",
path_in_repo=USAGE_JSONL_PATH,
path_or_fileobj=new_content.encode("utf-8"),
commit_message="append visit log",
)
except Exception:
pass
def record_visit(request: gr.Request):
# 1) Best-effort header hint (no external call)
country_hint = _country_from_headers(request)
if country_hint:
append_visit_to_dataset(
country=country_hint,
city="",
event_type="usage_start",
country_source="header"
)
return
# 2) Fall back to IP-based lookup
ip = _get_client_ip(request)
if ip:
country = _country_lookup(ip)
append_visit_to_dataset(
country=country,
city="",
event_type="usage_start",
country_source="ipapi" if country != "Unknown" else "ipapi_unknown"
)
return
# 3) No usable signal
append_visit_to_dataset(
country="Unknown",
city="",
event_type="usage_start",
country_source="none"
)
return
def _country_from_headers(request: gr.Request) -> str:
if not request:
return ""
# These may or may not be present; harmless to check
return (
request.headers.get("cf-ipcountry") or
request.headers.get("x-country") or
request.headers.get("x-geo-country") or
""
).strip()