app / auth /telegram_api.py
Dooratre's picture
Upload 216 files
78c0e6a verified
# auth/telegram_api.py
# NEW FILE
"""
Telegram Verification API Client.
Communicates with the HuggingFace telegram DB server
at TELEGRAM_API_URL to verify codes and fetch user data.
"""
from config import TELEGRAM_API_URL, TELEGRAM_API_SECRET, TELEGRAM_API_TIMEOUT
from http_pool import telegram_api_session
def _headers():
return {
"X-API-Key": TELEGRAM_API_SECRET,
"Content-Type": "application/json",
}
def verify_code_on_remote(code):
"""
Search all users on the remote telegram DB for a matching code.
Returns:
(user_id, user_data, error)
- user_id: telegram user ID string
- user_data: dict with username, first_name, profile_photo, etc.
- error: error string or None
"""
if not code or not code.strip():
return None, None, "الرمز فارغ"
code = code.strip()
try:
# Fetch all users from remote DB
resp = telegram_api_session.get(
f"{TELEGRAM_API_URL}/api/users",
headers=_headers(),
params={"limit": 1000},
timeout=TELEGRAM_API_TIMEOUT
)
if resp.status_code == 401:
return None, None, "خطأ في مفتاح API للتحقق"
resp.raise_for_status()
data = resp.json()
if not data.get("ok"):
return None, None, "فشل الاتصال بخادم التحقق"
users = data.get("users", [])
# Search for matching code that is NOT "DONE"
for user in users:
user_code = user.get("code", "")
if user_code == code and user_code != "DONE":
user_id = str(user.get("user_id", ""))
return user_id, user, None
return None, None, "رمز التحقق غير صحيح أو تم استخدامه"
except Exception as e:
print(f" ❌ Telegram API verify error: {e}")
return None, None, f"خطأ في الاتصال بخادم التحقق: {str(e)}"
def mark_code_done_on_remote(user_id):
"""
Mark a user's code as DONE and status as verified on the remote DB.
Returns:
(success, error)
"""
if not user_id:
return False, "No user_id"
try:
resp = telegram_api_session.put(
f"{TELEGRAM_API_URL}/api/user/{user_id}",
headers=_headers(),
json={
"code": "DONE",
"status": "verified"
},
timeout=TELEGRAM_API_TIMEOUT
)
if resp.status_code == 401:
return False, "خطأ في مفتاح API"
resp.raise_for_status()
data = resp.json()
if data.get("ok"):
print(f" ✅ Marked user {user_id} as DONE on remote DB")
return True, None
else:
return False, data.get("error", "Unknown error")
except Exception as e:
print(f" ❌ Telegram API mark_done error: {e}")
return False, str(e)
def get_remote_user(user_id):
"""
Fetch a single user's data from the remote telegram DB.
Useful for getting profile_photo later.
Returns:
(user_data, error)
"""
if not user_id:
return None, "No user_id"
try:
resp = telegram_api_session.get(
f"{TELEGRAM_API_URL}/api/user/{user_id}",
headers=_headers(),
timeout=TELEGRAM_API_TIMEOUT
)
if resp.status_code == 404:
return None, "User not found on remote DB"
if resp.status_code == 401:
return None, "خطأ في مفتاح API"
resp.raise_for_status()
data = resp.json()
if data.get("ok"):
return data.get("user"), None
else:
return None, data.get("error", "Unknown error")
except Exception as e:
print(f" ❌ Telegram API get_user error: {e}")
return None, str(e)