File size: 7,350 Bytes
dfdbd3d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#  Moon-Userbot - telegram userbot
#  Copyright (C) 2020-present Moon Userbot Organization
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.

#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.

#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.

# TODO: Add ability to kill session by hash

import time
from datetime import datetime
from html import escape

from pyrogram import Client, filters
from pyrogram import ContinuePropagation
from pyrogram.errors import RPCError
from pyrogram.raw.functions.account import GetAuthorizations, ResetAuthorization
from pyrogram.raw.types import UpdateServiceNotification
from pyrogram.types import Message

from utils.db import db
from utils.misc import modules_help, prefix

auth_hashes = db.get("core.sessionkiller", "auths_hashes", [])


@Client.on_message(filters.command(["sessions"], prefix) & filters.me)
async def sessions_list(client: Client, message: Message):
    formatted_sessions = []
    sessions = (await client.invoke(GetAuthorizations())).authorizations
    for num, session in enumerate(sessions, 1):
        formatted_sessions.append(
            f"<b>{num}</b>. <b>{escape(session.device_model)}</b> on <code>{escape(session.platform if session.platform!= '' else 'unknown platform')}</code>\n"
            f"<b>Hash:</b> <code>{session.hash}</code>\n"
            f"<b>App name:</b> <code>{escape(session.app_name)}</code> <code>v.{escape(session.app_version if session.app_version!= '' else 'unknown')}</code>\n"
            f"<b>Created (last activity):</b> {datetime.fromtimestamp(session.date_created).isoformat()} ({datetime.fromtimestamp(session.date_active).isoformat()})\n"
            f"<b>IP and location: </b>: <code>{session.ip}</code> (<i>{session.country}</i>)\n"
            f"<b>Official status:</b> <code>{'✅' if session.official_app else '❌️'}</code>\n"
            f"<b>2FA accepted:</b> <code>{'❌️️' if session.password_pending else '✅'}</code>\n"
            f"<b>Can accept calls / secret chats:</b> {'❌️️' if session.call_requests_disabled else '✅'} / {'❌️️' if session.encrypted_requests_disabled else '✅'}"
        )
    answer = "<b>Active sessions at your account:</b>\n\n"
    chunk = []
    for s in formatted_sessions:
        chunk.append(s)
        if len(chunk) == 5:
            answer += "\n\n".join(chunk)
            await message.reply(answer)
            answer = ""
            chunk.clear()
    if chunk:
        await message.reply("\n\n".join(chunk))
    await message.delete()


@Client.on_message(filters.command(["sessionkiller", "sk"], prefix) & filters.me)
async def sessionkiller(client: Client, message: Message):
    if len(message.command) == 1:
        if db.get("core.sessionkiller", "enabled", False):
            await message.edit(
                "<b>Sessionkiller status: enabled\n"
                f"You can disable it with <code>{prefix}sessionkiller disable</code></b>"
            )
        else:
            await message.edit(
                "<b>Sessionkiller status: disabled\n"
                f"You can enable it with <code>{prefix}sessionkiller enable</code></b>"
            )
    elif message.command[1] in ["enable", "on", "1", "yes", "true"]:
        db.set("core.sessionkiller", "enabled", True)
        await message.edit("<b>Sessionkiller enabled!</b>")
        db.set(
            "core.sessionkiller",
            "auths_hashes",
            [
                auth.hash
                for auth in (await client.invoke(GetAuthorizations())).authorizations
            ],
        )

    elif message.command[1] in ["disable", "off", "0", "no", "false"]:
        db.set("core.sessionkiller", "enabled", False)
        await message.edit("<b>Sessionkiller disabled!</b>")
    else:
        await message.edit(f"<b>Usage: {prefix}sessionkiller [enable|disable]</b>")


@Client.on_raw_update()
async def check_new_login(client: Client, update: UpdateServiceNotification, _, __):
    if not isinstance(update, UpdateServiceNotification) or not update.type.startswith(
        "auth"
    ):
        raise ContinuePropagation
    if not db.get("core.sessionkiller", "enabled", False):
        raise ContinuePropagation
    authorizations = (await client.invoke(GetAuthorizations()))["authorizations"]
    for auth in authorizations:
        if auth.current:
            continue
        if auth["hash"] not in auth_hashes:
            # found new unexpected login
            try:
                await client.invoke(ResetAuthorization(hash=auth.hash))
            except RPCError:
                info_text = (
                    "Someone tried to log in to your account. You can see this report because you"
                    "turned on this feature. But I couldn't terminate attacker's session and "
                    "⚠ <b>you must reset it manually</b>. You should change your 2FA password "
                    "(if enabled), or set it.\n"
                )
            else:
                info_text = (
                    "Someone tried to log in to your account. Since you have enabled "
                    "this feature, I deleted the attacker's session from your account. "
                    "You should change your 2FA password (if enabled), or set it.\n"
                )
            logined_time = datetime.utcfromtimestamp(auth.date_created).strftime(
                "%d-%m-%Y %H-%M-%S UTC"
            )
            full_report = (
                "<b>!!! ACTION REQUIRED !!!</b>\n"
                + info_text
                + "Below is the information about the attacker that I got.\n\n"
                f"Unique authorization hash: <code>{auth.hash}</code> (not valid anymore)\n"
                f"Device model: <code>{escape(auth.device_model)}</code>\n"
                f"Platform: <code>{escape(auth.platform)}</code>\n"
                f"API ID: <code>{auth.api_id}</code>\n"
                f"App name: <code>{escape(auth.app_name)}</code>\n"
                f"App version: <code>{auth.app_version}</code>\n"
                f"Logined at: <code>{logined_time}</code>\n"
                f"IP: <code>{auth.ip}</code>\n"
                f"Country: <code>{auth.country}</code>\n"
                f'Official app: <b>{"yes" if auth.official_app else "no"}</b>\n\n'
                f"<b>It is you? Type <code>{prefix}sk off</code> and try logging "
                f"in again.</b>"
            )
            # schedule sending report message so user will get notification
            schedule_date = int(time.time() + 15)
            await client.send_message("me", full_report, schedule_date=schedule_date)
            return


modules_help["sessions"] = {
    "sessionkiller [enable|disable]": "When enabled, every new session will be terminated.\n"
    "Useful for additional protection for your account",
    "sessions": "List all sessions on your account",
}