| import requests |
| import os |
| import time |
|
|
| TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") |
| HF_API_URL = os.getenv("HF_API_URL") |
|
|
| BASE_URL = f"https://api.telegram.org/bot{TOKEN}" |
| STATE_FILE = ".github/state/last_id.txt" |
|
|
|
|
| def load_last_id(): |
| try: |
| with open(STATE_FILE, "r") as f: |
| return int(f.read().strip()) |
| except: |
| return 0 |
|
|
|
|
| def save_last_id(last_id): |
| with open(STATE_FILE, "w") as f: |
| f.write(str(last_id)) |
|
|
|
|
| def get_updates(): |
| url = f"{BASE_URL}/getUpdates" |
| return requests.get(url, params={"timeout": 10}).json() |
|
|
|
|
| def send_message(chat_id, text): |
| requests.post(f"{BASE_URL}/sendMessage", json={ |
| "chat_id": chat_id, |
| "text": text[:4000] |
| }) |
|
|
|
|
| def main(): |
| print("π Checking Telegram...") |
|
|
| last_id = load_last_id() |
| print(f"π Last processed ID: {last_id}") |
|
|
| data = get_updates() |
| updates = data.get("result", []) |
|
|
| if not updates: |
| print("π No messages") |
| return |
|
|
| current_time = int(time.time()) |
| new_last_id = last_id |
|
|
| for update in updates: |
| update_id = update["update_id"] |
|
|
| |
| if update_id <= last_id: |
| continue |
|
|
| message = update.get("message", {}) |
| chat_id = message.get("chat", {}).get("id") |
| text = message.get("text", "") |
| message_time = message.get("date", 0) |
|
|
| |
| if current_time - message_time > 180: |
| continue |
|
|
| if not text: |
| continue |
|
|
| print(f"π© User: {text}") |
|
|
| try: |
| response = requests.post( |
| HF_API_URL, |
| json={ |
| "message": text, |
| "chat_id": str(chat_id) |
| }, |
| timeout=180 |
| ) |
|
|
| if response.status_code == 200: |
| reply = response.json().get("reply", "β οΈ Empty reply") |
| else: |
| print("β HF error:", response.text) |
| reply = "β οΈ AI error" |
|
|
| except Exception as e: |
| print("β Request failed:", str(e)) |
| reply = "β οΈ Request failed" |
|
|
| print(f"π€ Reply: {reply}") |
| send_message(chat_id, reply) |
|
|
| |
| new_last_id = max(new_last_id, update_id) |
|
|
| |
| if new_last_id != last_id: |
| save_last_id(new_last_id) |
| print(f"β
Updated last_id: {new_last_id}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |