|
|
|
|
|
|
| """
|
| Telegram Verification API Client.
|
| Communicates with the HuggingFace telegram DB server
|
| at TELEGRAM_API_URL to verify codes and fetch user data.
|
| """
|
|
|
| from config import TELEGRAM_API_URL, TELEGRAM_API_SECRET, TELEGRAM_API_TIMEOUT
|
| from http_pool import telegram_api_session
|
|
|
|
|
| def _headers():
|
| return {
|
| "X-API-Key": TELEGRAM_API_SECRET,
|
| "Content-Type": "application/json",
|
| }
|
|
|
|
|
| def verify_code_on_remote(code):
|
| """
|
| Search all users on the remote telegram DB for a matching code.
|
|
|
| Returns:
|
| (user_id, user_data, error)
|
| - user_id: telegram user ID string
|
| - user_data: dict with username, first_name, profile_photo, etc.
|
| - error: error string or None
|
| """
|
| if not code or not code.strip():
|
| return None, None, "الرمز فارغ"
|
|
|
| code = code.strip()
|
|
|
| try:
|
|
|
| resp = telegram_api_session.get(
|
| f"{TELEGRAM_API_URL}/api/users",
|
| headers=_headers(),
|
| params={"limit": 1000},
|
| timeout=TELEGRAM_API_TIMEOUT
|
| )
|
|
|
| if resp.status_code == 401:
|
| return None, None, "خطأ في مفتاح API للتحقق"
|
|
|
| resp.raise_for_status()
|
| data = resp.json()
|
|
|
| if not data.get("ok"):
|
| return None, None, "فشل الاتصال بخادم التحقق"
|
|
|
| users = data.get("users", [])
|
|
|
|
|
| for user in users:
|
| user_code = user.get("code", "")
|
| if user_code == code and user_code != "DONE":
|
| user_id = str(user.get("user_id", ""))
|
| return user_id, user, None
|
|
|
| return None, None, "رمز التحقق غير صحيح أو تم استخدامه"
|
|
|
| except Exception as e:
|
| print(f" ❌ Telegram API verify error: {e}")
|
| return None, None, f"خطأ في الاتصال بخادم التحقق: {str(e)}"
|
|
|
|
|
| def mark_code_done_on_remote(user_id):
|
| """
|
| Mark a user's code as DONE and status as verified on the remote DB.
|
|
|
| Returns:
|
| (success, error)
|
| """
|
| if not user_id:
|
| return False, "No user_id"
|
|
|
| try:
|
| resp = telegram_api_session.put(
|
| f"{TELEGRAM_API_URL}/api/user/{user_id}",
|
| headers=_headers(),
|
| json={
|
| "code": "DONE",
|
| "status": "verified"
|
| },
|
| timeout=TELEGRAM_API_TIMEOUT
|
| )
|
|
|
| if resp.status_code == 401:
|
| return False, "خطأ في مفتاح API"
|
|
|
| resp.raise_for_status()
|
| data = resp.json()
|
|
|
| if data.get("ok"):
|
| print(f" ✅ Marked user {user_id} as DONE on remote DB")
|
| return True, None
|
| else:
|
| return False, data.get("error", "Unknown error")
|
|
|
| except Exception as e:
|
| print(f" ❌ Telegram API mark_done error: {e}")
|
| return False, str(e)
|
|
|
|
|
| def get_remote_user(user_id):
|
| """
|
| Fetch a single user's data from the remote telegram DB.
|
| Useful for getting profile_photo later.
|
|
|
| Returns:
|
| (user_data, error)
|
| """
|
| if not user_id:
|
| return None, "No user_id"
|
|
|
| try:
|
| resp = telegram_api_session.get(
|
| f"{TELEGRAM_API_URL}/api/user/{user_id}",
|
| headers=_headers(),
|
| timeout=TELEGRAM_API_TIMEOUT
|
| )
|
|
|
| if resp.status_code == 404:
|
| return None, "User not found on remote DB"
|
|
|
| if resp.status_code == 401:
|
| return None, "خطأ في مفتاح API"
|
|
|
| resp.raise_for_status()
|
| data = resp.json()
|
|
|
| if data.get("ok"):
|
| return data.get("user"), None
|
| else:
|
| return None, data.get("error", "Unknown error")
|
|
|
| except Exception as e:
|
| print(f" ❌ Telegram API get_user error: {e}")
|
| return None, str(e)
|
|
|