Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import csv | |
| import time | |
| import threading | |
| import requests | |
| from datetime import datetime, timezone | |
| from typing import Dict, Optional, Tuple | |
| from collections import defaultdict | |
| import gradio as gr | |
| from huggingface_hub import HfApi | |
| # ------------------------- | |
| # Config | |
| # ------------------------- | |
| DATASET_REPO_ID = os.environ.get("DATASET_REPO_ID") # Space secret | |
| HF_TOKEN = os.environ["HF_TOKEN"] # Space secret | |
| OPENAI_API_KEY = os.environ["OPENAI_API_KEY"] # Space secret | |
| SENT_PATH = "sent.csv" | |
| REQUEST_LOG_PATH = "request_log.csv" | |
| # Maximum keys per order = quantity * KEYS_PER_QUANTITY | |
| KEYS_PER_QUANTITY = 10 | |
| # Reuse ephemeral keys if created within this time window (in seconds) | |
| KEY_REUSE_WINDOW_SECONDS = 40 * 60 # 40 minutes | |
| api = HfApi(token=HF_TOKEN) | |
| _lock = threading.Lock() | |
| # Simple cache to reduce Hub reads | |
| _cache = { | |
| "ts": 0.0, | |
| "sent": None, # type: ignore | |
| } | |
| CACHE_TTL_SEC = 10.0 | |
| # Request tracking to identify suspicious patterns | |
| _request_tracker = defaultdict(list) # {ip: [timestamps]} | |
| _request_log_buffer = [] # Buffer logs before writing to avoid too many uploads | |
| # ------------------------- | |
| # Helpers | |
| # ------------------------- | |
| def _utc_now_iso() -> str: | |
| return datetime.now(timezone.utc).replace(microsecond=0).isoformat() | |
| def _download_csv(repo_id: str, path_in_repo: str) -> str: | |
| # Download file bytes from the dataset repo, return text | |
| file_bytes = api.hf_hub_download( | |
| repo_id=repo_id, | |
| filename=path_in_repo, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| ) | |
| with open(file_bytes, "r", encoding="utf-8") as f: | |
| return f.read() | |
| def _upload_csv(repo_id: str, path_in_repo: str, content: str, commit_message: str) -> None: | |
| api.upload_file( | |
| path_or_fileobj=io.BytesIO(content.encode("utf-8")), | |
| path_in_repo=path_in_repo, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message=commit_message, | |
| token=HF_TOKEN, | |
| ) | |
| def _parse_sent(csv_text: str) -> dict: | |
| """Parse sent.csv and return a dict keyed by order_number""" | |
| sent = {} | |
| reader = csv.DictReader(io.StringIO(csv_text)) | |
| for r in reader: | |
| order_num = r["order_number"].strip() | |
| if order_num: | |
| keys_sent_str = r.get("keys_sent", "").strip() | |
| quantity_keys_sent_str = r.get("quantity_keys_sent", "").strip() | |
| last_key_sent_str = r.get("last_key_sent", "").strip() | |
| last_key_created_at_str = r.get("last_key_created_at", "").strip() | |
| sent[order_num] = { | |
| "order_number": order_num, | |
| "quantity": int(r.get("quantity", "0")), | |
| "keys_sent": keys_sent_str if keys_sent_str else "", | |
| "quantity_keys_sent": int(quantity_keys_sent_str) if quantity_keys_sent_str else 0, | |
| "last_key_sent": last_key_sent_str if last_key_sent_str else "", | |
| "last_key_created_at": last_key_created_at_str if last_key_created_at_str else "", | |
| } | |
| return sent | |
| def _serialize_sent(sent_dict: dict) -> str: | |
| """Serialize sent dict back to CSV format""" | |
| out = io.StringIO() | |
| fieldnames = ["order_number", "quantity", "keys_sent", "quantity_keys_sent", "last_key_sent", "last_key_created_at"] | |
| w = csv.DictWriter(out, fieldnames=fieldnames) | |
| w.writeheader() | |
| for order_num in sorted(sent_dict.keys()): | |
| entry = sent_dict[order_num] | |
| w.writerow({ | |
| "order_number": entry["order_number"], | |
| "quantity": entry["quantity"], | |
| "keys_sent": entry.get("keys_sent", ""), | |
| "quantity_keys_sent": entry.get("quantity_keys_sent", 0), | |
| "last_key_sent": entry.get("last_key_sent", ""), | |
| "last_key_created_at": entry.get("last_key_created_at", ""), | |
| }) | |
| return out.getvalue() | |
| def load_state(force: bool = False): | |
| now = time.time() | |
| if (not force) and _cache["sent"] is not None and (now - _cache["ts"] < CACHE_TTL_SEC): | |
| return _cache["sent"] | |
| sent_csv = _download_csv(DATASET_REPO_ID, SENT_PATH) | |
| sent = _parse_sent(sent_csv) | |
| _cache["ts"] = now | |
| _cache["sent"] = sent | |
| return sent | |
| def _log_request( | |
| order_number: str, | |
| ip_address: str, | |
| user_agent: str, | |
| success: bool, | |
| message: str, | |
| suspicious_flags: list, | |
| ): | |
| """Log request details for security monitoring""" | |
| timestamp = _utc_now_iso() | |
| log_entry = { | |
| "timestamp": timestamp, | |
| "ip_address": ip_address, | |
| "user_agent": user_agent, | |
| "order_number": order_number, | |
| "success": success, | |
| "message": message, | |
| "suspicious_flags": "|".join(suspicious_flags) if suspicious_flags else "", | |
| } | |
| _request_log_buffer.append(log_entry) | |
| # Flush buffer to Hub every 10 requests or if suspicious activity detected | |
| if len(_request_log_buffer) >= 10 or suspicious_flags: | |
| _flush_request_log() | |
| def _flush_request_log(): | |
| """Write buffered logs to the dataset repo""" | |
| if not _request_log_buffer: | |
| return | |
| try: | |
| # Download existing log | |
| try: | |
| log_csv = _download_csv(DATASET_REPO_ID, REQUEST_LOG_PATH) | |
| except Exception: | |
| # If log doesn't exist yet, create header | |
| log_csv = "timestamp,ip_address,user_agent,order_number,success,message,suspicious_flags\n" | |
| # Append new entries | |
| out = io.StringIO() | |
| out.write(log_csv) | |
| fieldnames = ["timestamp", "ip_address", "user_agent", "order_number", "success", "message", "suspicious_flags"] | |
| writer = csv.DictWriter(out, fieldnames=fieldnames) | |
| for entry in _request_log_buffer: | |
| writer.writerow(entry) | |
| # Upload | |
| _upload_csv( | |
| DATASET_REPO_ID, | |
| REQUEST_LOG_PATH, | |
| out.getvalue(), | |
| commit_message=f"Request log update at {_utc_now_iso()}", | |
| ) | |
| _request_log_buffer.clear() | |
| except Exception as e: | |
| print(f"Warning: Failed to flush request log: {e}") | |
| def _check_suspicious_activity(ip_address: str, order_number: str) -> list: | |
| """Check for suspicious patterns and return list of flags""" | |
| flags = [] | |
| now = time.time() | |
| # Track requests from this IP | |
| _request_tracker[ip_address].append(now) | |
| # Clean old entries (older than 1 hour) | |
| _request_tracker[ip_address] = [ | |
| ts for ts in _request_tracker[ip_address] | |
| if now - ts < 3600 | |
| ] | |
| recent_requests = _request_tracker[ip_address] | |
| # Flag: More than 20 requests in the last hour | |
| if len(recent_requests) > 20: | |
| flags.append("HIGH_FREQUENCY") | |
| # Flag: More than 5 requests in the last minute | |
| last_minute = [ts for ts in recent_requests if now - ts < 60] | |
| if len(last_minute) > 5: | |
| flags.append("RAPID_REQUESTS") | |
| # Flag: More than 10 requests in the last 5 minutes | |
| last_5_min = [ts for ts in recent_requests if now - ts < 300] | |
| if len(last_5_min) > 10: | |
| flags.append("BURST_PATTERN") | |
| return flags | |
| # ------------------------- | |
| # Core API | |
| # ------------------------- | |
| def claim_c_key( | |
| order_number: str, | |
| request: Optional[gr.Request] = None, | |
| ) -> Tuple[str, str]: | |
| """ | |
| Returns a key for a given order number if the order is valid | |
| and hasn't exceeded the allowed key limit (quantity * KEYS_PER_QUANTITY). | |
| Returns (key, status_message) | |
| """ | |
| # Extract request metadata | |
| ip_address = "unknown" | |
| user_agent = "unknown" | |
| if request: | |
| ip_address = request.client.host if request.client else "unknown" | |
| user_agent = request.headers.get("user-agent", "unknown") | |
| order_number = order_number.strip() | |
| # Check for suspicious activity patterns | |
| suspicious_flags = _check_suspicious_activity(ip_address, order_number) | |
| if not order_number: | |
| _log_request(order_number, ip_address, user_agent, False, "Empty order number", suspicious_flags) | |
| return "", "Please provide an order number." | |
| with _lock: | |
| sent = load_state(force=True) | |
| # Check if order exists in sent.csv | |
| if order_number not in sent: | |
| msg = f"Order number {order_number} not found." | |
| _log_request(order_number, ip_address, user_agent, False, "Order not found", suspicious_flags) | |
| return "", msg | |
| quantity = sent[order_number]["quantity"] | |
| # Check how many keys have been sent for this order | |
| keys_already_sent = sent[order_number]["quantity_keys_sent"] | |
| # Calculate maximum allowed keys | |
| max_keys = quantity * KEYS_PER_QUANTITY | |
| # Check if limit has been reached | |
| if keys_already_sent >= max_keys: | |
| msg = f"Key limit reached for order {order_number} ({keys_already_sent}/{max_keys} keys sent)." | |
| _log_request(order_number, ip_address, user_agent, False, "Key limit reached", suspicious_flags) | |
| return "", msg | |
| # Check if we can reuse a recently created key | |
| last_key_sent = sent[order_number].get("last_key_sent", "") | |
| last_key_created_at = sent[order_number].get("last_key_created_at", "") | |
| if last_key_sent and last_key_created_at: | |
| try: | |
| # Parse the last creation timestamp | |
| last_created_time = datetime.fromisoformat(last_key_created_at) | |
| now = datetime.now(timezone.utc) | |
| time_since_creation = (now - last_created_time).total_seconds() | |
| # If the last key was created within the reuse window, reuse it | |
| if time_since_creation < KEY_REUSE_WINDOW_SECONDS: | |
| msg = f"Reused recent key. ({keys_already_sent}/{max_keys} keys sent for this order)" | |
| _log_request(order_number, ip_address, user_agent, True, "Key reused", suspicious_flags) | |
| return last_key_sent, msg | |
| except (ValueError, TypeError): | |
| # If there's an error parsing the timestamp, create a new key | |
| pass | |
| # Create a new ephemeral key | |
| try: | |
| response = requests.post( | |
| "https://api.openai.com/v1/realtime/client_secrets", | |
| headers={ | |
| "Authorization": f"Bearer {OPENAI_API_KEY}", | |
| "Content-Type": "application/json" | |
| }, | |
| json={ | |
| "expires_after": { | |
| "anchor": "created_at", | |
| "seconds": 3600 # 1 hour | |
| }, | |
| "session": { | |
| "type": "realtime", | |
| "model": "gpt-realtime", | |
| "audio": { | |
| "output": {"voice": "alloy"} | |
| } | |
| } | |
| } | |
| ) | |
| response.raise_for_status() | |
| ephemeral_data = response.json() | |
| ephemeral_key = ephemeral_data["value"] | |
| except Exception as e: | |
| msg = f"Failed to create ephemeral key: {str(e)}" | |
| _log_request(order_number, ip_address, user_agent, False, "API error", suspicious_flags) | |
| return "", msg | |
| # Update sent.csv - store the ephemeral key and timestamp | |
| existing_keys = sent[order_number]["keys_sent"] | |
| if existing_keys: | |
| sent[order_number]["keys_sent"] = existing_keys + "," + ephemeral_key | |
| else: | |
| sent[order_number]["keys_sent"] = ephemeral_key | |
| sent[order_number]["quantity_keys_sent"] += 1 | |
| sent[order_number]["last_key_sent"] = ephemeral_key | |
| sent[order_number]["last_key_created_at"] = _utc_now_iso() | |
| # Save changes | |
| updated_sent_csv = _serialize_sent(sent) | |
| _upload_csv( | |
| DATASET_REPO_ID, | |
| SENT_PATH, | |
| updated_sent_csv, | |
| commit_message=f"Updated sent tracking at {_utc_now_iso()}", | |
| ) | |
| # refresh cache immediately | |
| _cache["ts"] = 0.0 | |
| keys_sent_count = sent[order_number]["quantity_keys_sent"] | |
| msg = f"New ephemeral key sent successfully. ({keys_sent_count}/{max_keys} keys sent for this order)" | |
| _log_request(order_number, ip_address, user_agent, True, "Key created", suspicious_flags) | |
| return ephemeral_key, msg | |
| # ------------------------- | |
| # UI | |
| # ------------------------- | |
| with gr.Blocks(title="API") as demo: | |
| gr.Markdown("## Ephemeral Key Service") | |
| order_input = gr.Textbox(label="Order Number", placeholder="Enter order number (e.g., 2724-1857)") | |
| btn_c = gr.Button("Request Key", variant="primary", size="sm") | |
| out_c = gr.Textbox(label="Response", interactive=False, show_label=False) | |
| status_c = gr.Textbox(label="", interactive=False, show_label=False) | |
| btn_c.click( | |
| fn=claim_c_key, | |
| inputs=[order_input], | |
| outputs=[out_c, status_c], | |
| api_name="claim_c_key", | |
| ) | |
| demo.queue() | |
| demo.launch() | |