Spaces:
No application file
No application file
| import asyncio | |
| import contextlib | |
| import logging | |
| import re | |
| import uuid | |
| from enum import Enum | |
| from urllib.parse import parse_qs, urlparse | |
| from io import BytesIO | |
| from telethon import TelegramClient, events | |
| from telethon.tl.types import UpdateBotStopped | |
| from telethon.tl.types import UpdateBotInlineSend | |
| from telethon.tl.types import InputMediaPhotoExternal | |
| from telethon.tl.custom import Button | |
| import bot_chat | |
| import bot_config | |
| import bot_db | |
| import bot_oauth | |
| import bot_strings | |
| import bot_suggestions | |
| import bot_markdown | |
| import uvloop | |
| class State(Enum): | |
| FIRST_START = 1 | |
| AGREEMENT = 2 | |
| OAUTH = 3 | |
| DONE = 4 | |
| UNKNOWN = 5 | |
| SETTINGS = 6 | |
| CONNECT_CHAT = 7 | |
| class GdprState(Enum): | |
| STATE_PRIVACY_POLICY = 1 | |
| STATE_RETREIVE_DATA = 2 | |
| STATE_DELETE_DATA = 3 | |
| STATE_COLLECTED_INFORMATION = 4 | |
| STATE_WHY_WE_COLECT = 5 | |
| STATE_WHAT_WE_DO = 6 | |
| STATE_WHAT_WE_NOT_DO = 7 | |
| STATE_RIGHTS_TO_PROCESS = 8 | |
| STATES = {} | |
| GDPR_STATES = {} | |
| INLINE_QUERIES_TEXT = {} | |
| logging.basicConfig(level=logging.INFO) | |
| asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
| client = TelegramClient('sydney', bot_config.TELEGRAM_CLIENT_ID, | |
| bot_config.TELEGRAM_CLIENT_HASH, catch_up=True) | |
| client.parse_mode = bot_markdown.SydMarkdown() | |
| async def privacy_handler(event): | |
| if GDPR_STATES[event.sender_id] != GdprState.STATE_DELETE_DATA: | |
| buttons = [ | |
| [Button.inline('Retrieve data', 'privacy_retreive')], | |
| [Button.inline('Delete data', 'privacy_delete')], | |
| [Button.inline('Collected information', 'privacy_collected')], | |
| [Button.inline('Why we colect', 'privacy_why')], | |
| [Button.inline('What we do', 'privacy_whatdo')], | |
| [Button.inline('What we DO NOT do', 'privacy_whatno')], | |
| [Button.inline('Rights to process', 'privacy_rights')], | |
| [Button.inline('Back', 'back')] | |
| ] | |
| text = None | |
| if GDPR_STATES[event.sender_id] == GdprState.STATE_PRIVACY_POLICY: | |
| text = bot_strings.PRIVACY_STRING.format( | |
| bot_config.BOT_OWNER_USERNAME) | |
| elif GDPR_STATES[event.sender_id] == GdprState.STATE_COLLECTED_INFORMATION: | |
| text = bot_strings.PRIVACY_COLLECTED_INFORMATION_STRING | |
| elif GDPR_STATES[event.sender_id] == GdprState.STATE_WHY_WE_COLECT: | |
| text = bot_strings.PRIVACY_WHY_WE_COLLECT_STRING | |
| elif GDPR_STATES[event.sender_id] == GdprState.STATE_WHAT_WE_DO: | |
| text = bot_strings.PRIVACY_WHAT_WE_DO_STRING | |
| elif GDPR_STATES[event.sender_id] == GdprState.STATE_WHAT_WE_NOT_DO: | |
| text = bot_strings.PRIVACY_WHAT_WE_NOT_DO_STRING | |
| elif GDPR_STATES[event.sender_id] == GdprState.STATE_RIGHTS_TO_PROCESS: | |
| text = bot_strings.PRIVACY_RIGHT_TO_PROCESS_STRING | |
| if GDPR_STATES[event.sender_id] == GdprState.STATE_RETREIVE_DATA: | |
| text = bot_strings.PRIVACY_RETRIEVE_DATA_STRING | |
| data = await bot_db.retrieve_data(event.sender_id) | |
| if not data: | |
| await event.reply(bot_strings.PRIVACY_NO_DATA_STRING) | |
| else: | |
| sender = await event.get_sender() | |
| with BytesIO(str.encode(data)) as privacy_data: | |
| privacy_data.name = f'{event.sender_id}.txt' | |
| await event.reply( | |
| bot_strings.PRIVACY_RETRIEVE_DATA_STRING.format( | |
| sender.last_name, event.sender_id), | |
| file=privacy_data | |
| ) | |
| return | |
| else: | |
| buttons = [[Button.inline('yes')], [Button.inline('cancel')]] | |
| text = bot_strings.PRIVACY_DELETE_DATA_STRING | |
| await event.edit(text, buttons=buttons, link_preview=False) | |
| async def start_handler(event): | |
| if STATES[event.sender_id] == State.DONE: | |
| buttons = [ | |
| [Button.inline(text='Donate', data='donate'), | |
| Button.inline(text='Logout', data='logout')], | |
| [Button.inline(text='Source Code', data='donate'), | |
| Button.inline(text='Settings', data='settings')], | |
| ] | |
| else: | |
| buttons = [ | |
| [Button.inline(text='Continue', data='continue'), | |
| Button.inline(text='Donate', data='donate')], | |
| [Button.inline(text='Source Code', data='donate')], | |
| ] | |
| buttons.append([Button.inline('Privacy Policy', 'privacy_policy')]) | |
| if not hasattr(event, 'out'): | |
| await event.edit(bot_strings.FIRST_START_STRING, buttons=buttons) | |
| elif not event.out: | |
| await client.send_message(event.chat_id, bot_strings.FIRST_START_STRING, buttons=buttons) | |
| async def agreement_handler(event): | |
| back_button = Button.inline(text='Back', data='back') | |
| accept_button = Button.inline(text='Agree', data='agree') | |
| await event.edit(bot_strings.AGREEMENT_STRING, | |
| buttons=[[back_button, accept_button]]) | |
| async def oauth_handler(event): | |
| oauth_url = await bot_oauth.get_auth_url(bot_config.SYDNEY_CLIENT_ID) | |
| oauth_button = Button.url("Log in", url=oauth_url) | |
| await event.edit(bot_strings.OAUTH_STRING, buttons=[[oauth_button]]) | |
| async def logout_handler(event): | |
| await bot_db.remove_user(event.sender_id) | |
| await bot_chat.clear_session(event.sender_id) | |
| await event.edit("Logged out!") | |
| async def settings_hanlder(event): | |
| user = await bot_db.get_user(event.sender_id) | |
| style = bot_chat.Style(user['style']) | |
| chat = user['chat'] | |
| captions = user['captions'] | |
| replies = user['replies'] | |
| str_style = None | |
| if style == bot_chat.Style.CREATIVE: | |
| str_style = 'Creative' | |
| await bot_chat.clear_session(event.sender_id) | |
| if style == bot_chat.Style.BALANCED: | |
| str_style = 'Balanced' | |
| await bot_chat.clear_session(event.sender_id) | |
| if style == bot_chat.Style.PRECISE: | |
| str_style = 'Precise' | |
| await bot_chat.clear_session(event.sender_id) | |
| buttons = [ | |
| [Button.inline(f'Style: {str_style}', 'style'), | |
| Button.inline(f'Captions: {captions}', 'captions')], | |
| [Button.inline(f'Replies: {replies}', 'replies'), | |
| Button.inline('Remove Chat', 'rmchat') | |
| if chat | |
| else Button.inline('Connect Chat', 'conchat')], | |
| [Button.inline('Back', 'back')], | |
| ] | |
| await event.edit(bot_strings.SETTINGS_STRING, buttons=buttons) | |
| async def donate_handler(event): | |
| back_button = Button.inline(text='Back', data='back') | |
| source_button = Button.url( | |
| text='Source Code', url='https://github.com/nitanmarcel/sydney-telegram') | |
| await event.edit(bot_strings.DONATION_STRING, | |
| buttons=[[back_button, source_button]], link_preview=False) | |
| async def handle_chat_connect(event): | |
| await event.edit(bot_strings.CHAT_CONNECT_STRING, buttons=Button.inline('Back', 'back')) | |
| async def connect_chat(event): | |
| pass | |
| async def answer_builder(userId=None, chatID=None, style=None, query=None, cookies=None, can_swipe_topics=False, retry_on_timeout=False, request_id=None): | |
| try: | |
| buttons = [] | |
| await bot_chat.prepare_request(request_id) | |
| answer = await bot_chat.send_message(userId, query, cookies, bot_chat.Style(style), retry_on_disconnect=True, request_id=request_id) | |
| if isinstance(answer, bot_chat.ResponseTypeText): | |
| if answer.cards: | |
| buttons = [Button.url(card[0], card[1]) | |
| for card in answer.cards] | |
| buttons = [[buttons[i], buttons[i+1]] if i+1 < | |
| len(buttons) else [buttons[i]] for i in range(0, len(buttons), 2)] | |
| if answer.render_card: | |
| buttons.append( | |
| [Button.url('Read More', answer.render_card.url)]) | |
| if can_swipe_topics: | |
| buttons.append( | |
| [Button.inline(text='New Topic', data='newtopic')]) | |
| if isinstance(answer, bot_chat.ResponseTypeImage): | |
| return answer.images, buttons or None, answer.caption, True | |
| return answer.answer, buttons or None, query, False | |
| except bot_chat.ChatHubException as exc: | |
| if can_swipe_topics: | |
| buttons.append( | |
| [Button.inline(text='New Topic', data='newtopic')]) | |
| return str(exc), buttons or None, query, False | |
| except asyncio.TimeoutError as exc: | |
| if can_swipe_topics: | |
| buttons.append( | |
| [Button.inline(text='New Topic', data='newtopic')]) | |
| if retry_on_timeout: | |
| with contextlib.suppress(asyncio.TimeoutError): | |
| return await answer_builder(userId, chatID, style, query, cookies, can_swipe_topics, retry_on_timeout=False, request_id=request_id) | |
| return bot_strings.TIMEOUT_ERROR_STRING, buttons or None, query, False | |
| async def message_handler_private(event): | |
| global STATES | |
| message = event.text | |
| if not message: | |
| return | |
| if event.sender_id not in STATES.keys(): | |
| STATES[event.sender_id] = State.FIRST_START | |
| user = await bot_db.get_user(event.sender_id) | |
| if message.startswith("/start"): | |
| STATES[event.sender_id] = State.DONE if user and user['cookies'] else State.FIRST_START | |
| await start_handler(event) | |
| return | |
| if STATES[event.sender_id] == State.CONNECT_CHAT: | |
| try: | |
| sender = await event.get_sender() | |
| permission = await client.get_permissions(int(event.text), event.sender_id) | |
| if not permission.is_admin: | |
| await event.reply(bot_strings.CHAT_CONNECT_NOT_ADMIN_STRING) | |
| return | |
| user = await bot_db.get_user(chatID=int(event.text)) | |
| if user: | |
| await bot_db.insert_user(user['id'], cookies=user['cookies'], chat=None, style=user['style'], captions=user['captions'], replies=user['replies']) | |
| await client.send_message(int(event.text), bot_strings.CHAT_ID_CONNECTED_BROADCAST_STRING.format(sender.username or sender.first_name)) | |
| user = await bot_db.get_user(event.sender_id) | |
| await bot_db.insert_user(event.sender_id, cookies=user['cookies'], chat=int(event.text), style=user['style'], captions=user['captions'], replies=user['replies']) | |
| except ValueError: | |
| await event.reply(bot_strings.INVALID_CHAT_ID_STRING) | |
| return | |
| if user and user['cookies']: | |
| async with client.action(event.chat_id, 'typing'): | |
| request_id = str(uuid.uuid4()) | |
| processing_message = await event.reply(bot_strings.PROCESSING_IN_PROGRESS, buttons=[Button.inline(bot_strings.PROCESSING_CANCEL, f'wsclose_{request_id}')]) | |
| answer, buttons, caption, is_image = await answer_builder(userId=event.sender_id, query=message, style=user['style'], | |
| cookies=user['cookies'], can_swipe_topics=True, request_id=request_id) | |
| await processing_message.delete() | |
| if not answer: | |
| return | |
| if is_image: | |
| await event.reply(caption, file=[InputMediaPhotoExternal(url=link.split('?')[0]) for link in answer], buttons=buttons) | |
| else: | |
| await event.reply(answer, buttons=buttons) | |
| return | |
| state = STATES[event.sender_id] | |
| if state == State.FIRST_START: | |
| await start_handler(event) | |
| if state == State.AGREEMENT: | |
| await agreement_handler(event) | |
| if state == State.OAUTH: | |
| parsed_auth_url = urlparse(message) | |
| params_auth_url = parse_qs(parsed_auth_url.query) | |
| if 'code' not in params_auth_url.keys(): | |
| await event.reply(bot_strings.AUTHENTIFICATION_URL_NOT_VALID_STRING) | |
| return | |
| auth_code = parse_qs(parsed_auth_url.query)['code'][0] | |
| cookies, has_sydney = await bot_oauth.auth(auth_code, bot_config.SYDNEY_CLIENT_ID) | |
| if not has_sydney: | |
| await event.reply(bot_strings.NOT_IN_WHITELST_STRING, buttons=[Button.url('Join', 'https://www.bing.com/new')]) | |
| STATES[event.sender_id] = State.FIRST_START | |
| return | |
| if not cookies: | |
| await event.reply(bot_strings.AUTHENTIFICATION_FAILED_STRING) | |
| STATES[event.sender_id] = State.FIRST_START | |
| return | |
| await event.reply(bot_strings.AUTHENTIFICATION_DONE_STRING.format(bot_config.TELEGRAM_BOT_USERNAME), | |
| buttons=[Button.inline('Stay logged in', 'keepcookies')]) | |
| STATES[event.sender_id] = State.DONE | |
| await bot_db.insert_user(event.sender_id, cookies, style=bot_chat.Style.BALANCED.value, chat=None, keep_cookies=False, captions=True, replies=True) | |
| await start_handler(event) | |
| if state == State.SETTINGS: | |
| await settings_hanlder(event) | |
| async def answer_callback_query(event): | |
| global STATES | |
| if event.sender_id not in STATES.keys(): | |
| STATES[event.sender_id] = State.FIRST_START | |
| user = await bot_db.get_user(userID=event.sender_id) | |
| if user and user['cookies']: | |
| STATES[event.sender_id] = State.DONE | |
| data = event.data.decode() | |
| if data == 'donate': | |
| await donate_handler(event) | |
| if data == 'continue': | |
| if STATES[event.sender_id] != State.DONE: | |
| STATES[event.sender_id] = State.AGREEMENT | |
| await agreement_handler(event) | |
| if data == 'agree': | |
| await oauth_handler(event) | |
| if STATES[event.sender_id] != State.DONE: | |
| STATES[event.sender_id] = State.OAUTH | |
| if data == 'back': | |
| if STATES[event.sender_id] != State.DONE: | |
| STATES[event.sender_id] = State.FIRST_START | |
| await start_handler(event) | |
| if data == 'logout': | |
| await logout_handler(event) | |
| STATES[event.sender_id] = State.FIRST_START | |
| await start_handler(event) | |
| if data == 'keepcookies': | |
| user = await bot_db.get_user(event.sender_id) | |
| save = await bot_db.insert_user(event.sender_id, cookies=user['cookies'], chat=user['chat'], style=user['style'], keep_cookies=True, captions=user['captions'], replies=user['replies']) | |
| if save: | |
| message = await event.get_message() | |
| await event.edit(message.text) | |
| if data == 'settings': | |
| STATES[event.sender_id] == State.SETTINGS | |
| await settings_hanlder(event) | |
| if data == 'style': | |
| user = await bot_db.get_user(event.sender_id) | |
| style = bot_chat.Style(user['style']) | |
| if style == bot_chat.Style.CREATIVE: | |
| await bot_db.insert_user(event.sender_id, cookies=user['cookies'], chat=user['chat'], style=bot_chat.Style.BALANCED.value, captions=user['captions'], replies=user['replies']) | |
| if style == bot_chat.Style.BALANCED: | |
| await bot_db.insert_user(event.sender_id, cookies=user['cookies'], chat=user['chat'], style=bot_chat.Style.PRECISE.value, captions=user['captions'], replies=user['replies']) | |
| if style == bot_chat.Style.PRECISE: | |
| await bot_db.insert_user(event.sender_id, cookies=user['cookies'], chat=user['chat'], style=bot_chat.Style.CREATIVE.value, captions=user['captions'], replies=user['replies']) | |
| await settings_hanlder(event) | |
| if data == 'captions': | |
| user = await bot_db.get_user(event.sender_id) | |
| await bot_db.insert_user(event.sender_id, cookies=user['cookies'], chat=user['chat'], style=user['style'], captions=not user['captions'], replies=user['replies']) | |
| await settings_hanlder(event) | |
| if data == 'replies': | |
| user = await bot_db.get_user(event.sender_id) | |
| await bot_db.insert_user(event.sender_id, cookies=user['cookies'], chat=user['chat'], style=user['style'], captions=user['captions'], replies=not user['replies']) | |
| await settings_hanlder(event) | |
| if data == 'conchat': | |
| STATES[event.sender_id] = State.CONNECT_CHAT | |
| await handle_chat_connect(event) | |
| if data == 'rmchat': | |
| user = await bot_db.get_user(event.sender_id) | |
| await bot_db.insert_user(event.sender_id, cookies=user['cookies'], chat=None, style=user['style'], captions=user['captions'], replies=user['replies']) | |
| await settings_hanlder(event) | |
| if data == 'newtopic': | |
| original_message = await event.get_message() | |
| if message := original_message: | |
| if bool(message.reply_to_msg_id): | |
| reply_message = await message.get_reply_message() | |
| if reply_message: | |
| message = reply_message | |
| if message.sender_id == event.sender_id: | |
| user = await bot_db.get_user(message.sender_id) | |
| if not user: | |
| user = await bot_db.get_user(chatID=message.chat_id) | |
| if user: | |
| await bot_chat.clear_session(user['id']) | |
| buttons = None | |
| if original_message.buttons and len(original_message.buttons) > 1: | |
| buttons = original_message.buttons[:-1] | |
| await event.edit(text=original_message.text, file=original_message.file, buttons=buttons) | |
| await event.answer(bot_strings.NEW_TOPIC_CREATED_STRING) | |
| return | |
| await event.answer(bot_strings.TOPIC_EXPIRES_STRING, alert=True) | |
| return | |
| if data == 'privacy_policy': | |
| GDPR_STATES[event.sender_id] = GdprState.STATE_PRIVACY_POLICY | |
| await privacy_handler(event) | |
| if data == 'privacy_retreive': | |
| GDPR_STATES[event.sender_id] = GdprState.STATE_RETREIVE_DATA | |
| await privacy_handler(event) | |
| if data == 'privacy_delete': | |
| GDPR_STATES[event.sender_id] = GdprState.STATE_DELETE_DATA | |
| await privacy_handler(event) | |
| if data == 'privacy_collected': | |
| GDPR_STATES[event.sender_id] = GdprState.STATE_COLLECTED_INFORMATION | |
| await privacy_handler(event) | |
| if data == 'privacy_why': | |
| GDPR_STATES[event.sender_id] = GdprState.STATE_WHY_WE_COLECT | |
| await privacy_handler(event) | |
| if data == 'privacy_whatdo': | |
| GDPR_STATES[event.sender_id] = GdprState.STATE_WHAT_WE_DO | |
| await privacy_handler(event) | |
| if data == 'privacy_whatno': | |
| GDPR_STATES[event.sender_id] = GdprState.STATE_WHAT_WE_NOT_DO | |
| await privacy_handler(event) | |
| if data == 'privacy_rights': | |
| GDPR_STATES[event.sender_id] = GdprState.STATE_RIGHTS_TO_PROCESS | |
| await privacy_handler(event) | |
| if data == 'yes': | |
| if event.sender_id in GDPR_STATES.keys() and GDPR_STATES[event.sender_id] == GdprState.STATE_DELETE_DATA: | |
| result = await bot_db.remove_user(event.sender_id) | |
| GDPR_STATES[event.sender_id] = GdprState.STATE_PRIVACY_POLICY | |
| await privacy_handler(event) | |
| if not result: | |
| await event.reply(bot_strings.PRIVACY_NO_DATA_STRING) | |
| if data == 'cancel': | |
| if event.sender_id in GDPR_STATES.keys() and GDPR_STATES[event.sender_id] == GdprState.STATE_DELETE_DATA: | |
| GDPR_STATES[event.sender_id] = GdprState.STATE_PRIVACY_POLICY | |
| await privacy_handler(event) | |
| if data.startswith('wsclose'): | |
| uid = data.split('_')[-1] | |
| original_message = await event.get_message() | |
| if message := original_message: | |
| if bool(message.reply_to_msg_id): | |
| reply_message = await message.get_reply_message() | |
| if reply_message: | |
| message = reply_message | |
| if message.sender_id == event.sender_id: | |
| user = await bot_db.get_user(message.sender_id) | |
| if not user: | |
| user = await bot_db.get_user(chatID=message.chat_id) | |
| if user: | |
| buttons = None | |
| if original_message.buttons and len(original_message.buttons) > 1: | |
| buttons = original_message.buttons[:-1] | |
| await event.edit(text=original_message.text, file=original_message.file, buttons=buttons) | |
| await bot_chat.cancel_request(uid) | |
| else: | |
| await event.answer(bot_strings.TOPIC_EXPIRES_STRING, alert=True) | |
| else: | |
| if len(data.split('_')) == 3: | |
| if int(data.split('_')[1]) == event.sender_id: | |
| await bot_chat.cancel_request(uid) | |
| else: | |
| await event.answer(bot_strings.TOPIC_EXPIRES_STRING, alert=True) | |
| else: | |
| await event.answer(bot_strings.TOPIC_EXPIRES_STRING, alert=True) | |
| await event.answer() | |
| async def answer_inline_query(event): | |
| global INLINE_QUERIES_TEXT | |
| message = event.text | |
| builder = event.builder | |
| user = await bot_db.get_user(event.sender_id) | |
| if not user: | |
| await event.answer(switch_pm=bot_strings.INLINE_NO_COOKIE_STRING, switch_pm_param='start') | |
| return | |
| if not message: | |
| if bool(await bot_chat.get_session(event.sender_id)): | |
| await event.answer([builder.article('Start new topic', text=bot_strings.NEW_TOPIC_CREATED_STRING, id=f'{uuid.uuid4()}_newtopic')]) | |
| return | |
| INLINE_QUERIES_TEXT[event.sender_id] = {} | |
| request_id = str(uuid.uuid4())[0:5] | |
| suggestions = await bot_suggestions.get_suggestions(message) | |
| articles = [builder.article(message, text=f'❓ __{message}__', buttons=[ | |
| Button.inline(bot_strings.PROCESSING_CANCEL, f'wsclose_{event.sender_id}_{request_id}')], id=request_id)] | |
| if suggestions: | |
| for suggestion in suggestions: | |
| message = suggestion['query'] | |
| if event.text != message: | |
| INLINE_QUERIES_TEXT[event.sender_id].update( | |
| {suggestion['id']: message}) | |
| articles.append(builder.article(message, text=f'❓ __{message}__', buttons=[ | |
| Button.inline(bot_strings.PROCESSING_CANCEL, f'wsclose_{request_id}')], id=f'{suggestion["id"]}_{request_id}')) | |
| await event.answer(articles) | |
| async def handle_inline_send(event): | |
| user = await bot_db.get_user(event.user_id) | |
| query = event.query | |
| if event.id.endswith('_newtopic'): | |
| await bot_chat.clear_session(event.user_id) | |
| return | |
| if event.id in INLINE_QUERIES_TEXT[event.user_id]: | |
| suggestions = INLINE_QUERIES_TEXT[event.user_id] | |
| query = suggestions[event.id.split('_')[0]] | |
| answer, buttons, caption, is_image = await answer_builder(userId=event.user_id, query=query, style=user['style'], cookies=user['cookies'], request_id=event.id.split('_')[-1]) | |
| if not answer: | |
| await client.edit_message(event.msg_id, text=bot_strings.PROCESSING_CANCELED_STRING) | |
| return | |
| if is_image: | |
| images_list = '- ' + \ | |
| '\n- '.join([link.split('?')[0] for link in answer]) | |
| await client.edit_message(event.msg_id, text=f'{caption}\n\n{images_list}') | |
| else: | |
| text = f'❓ __{caption}__\n\n{answer}' if user['captions'] else f'{answer}' | |
| if buttons: | |
| await client.edit_message( | |
| event.msg_id, text=text, buttons=buttons | |
| ) | |
| else: | |
| await client.edit_message(event.msg_id, text=text) | |
| async def handle_bot_stopped(event): | |
| if event.stopped: | |
| user = await bot_db.get_user(event.user_id) | |
| if user: | |
| await bot_db.remove_user(event.user_id) | |
| await bot_chat.clear_session(event.user_id) | |
| async def message_handler_groups(event): | |
| if event.text and event.text.split()[0] == '/id': | |
| await event.reply(f'`{event.chat_id}`') | |
| return | |
| if not event.mentioned or not event.text: | |
| return | |
| user = await bot_db.get_user(userID=None, chatID=event.chat_id) | |
| if not user: | |
| user = await bot_db.get_user(userID=event.sender_id) | |
| if event.reply_to_msg_id and not user['replies']: | |
| return | |
| async with client.action(event.chat_id, 'typing'): | |
| request_id = str(uuid.uuid4()) | |
| message = event.text.replace( | |
| f'@{bot_config.TELEGRAM_BOT_USERNAME}', '').strip() | |
| if not user: | |
| answer, buttons, caption, is_image = await answer_builder(userId=None, query=message, style=bot_chat.Style.BALANCED, cookies=None, request_id=request_id) | |
| await event.edit(f'⚠️ {answer}', buttons=[Button.url('Log in', url=f'http://t.me/{bot_config.TELEGRAM_BOT_USERNAME}?start=help')]) | |
| return | |
| processing_message = await event.reply(bot_strings.PROCESSING_IN_PROGRESS, buttons=[Button.inline(bot_strings.PROCESSING_CANCEL, f'wsclose_{request_id}')]) | |
| answer, buttons, caption, is_image = await answer_builder(userId=event.chat_id, query=message, style=user['style'], cookies=user['cookies'] if user else None, can_swipe_topics=True, request_id=request_id) | |
| await processing_message.delete() | |
| if not answer: | |
| return | |
| if is_image: | |
| await event.reply(caption, file=[InputMediaPhotoExternal(url=link.split('?')[0]) for link in answer], buttons=buttons) | |
| else: | |
| await event.reply(answer, buttons=buttons) | |
| async def main(): | |
| await bot_db.init(bot_config.POSTGRES_CONNECTION_STRING, bot_config.COOKIE_ENCRYPTION_KEY) | |
| await client.start(bot_token=bot_config.TELEGRAM_BOT_TOKEN) | |
| await client.run_until_disconnected() | |
| if __name__ == '__main__': | |
| try: | |
| client.loop.run_until_complete(main()) | |
| finally: | |
| if client.is_connected(): | |
| client.disconnect() | |