Spaces:
Paused
Paused
File size: 1,817 Bytes
4717db6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
import sys
import os
from pyngrok import ngrok
import uvicorn
import asyncio
from .events import Events
from .constants import Constants
__all__ = []
if __name__ == "__main__":
if Events.TOKEN is None:
sys.exit("No TELEGRAM_TOKEN found in the environment, exiting now.")
PORT = Events.PORT
loop = asyncio.get_event_loop()
# Run with ngrok if the parameter is given
running_option = None
if len(sys.argv) == 2:
running_option = sys.argv[1]
if running_option == "ngrok":
ngrok_token = str(os.environ.get("NGROK_TOKEN"))
if ngrok_token == "None":
print(
"NGROK auth token is not found in the environment. Ngrok will timeout after a few hours."
)
else:
print(f"NGROK TOKEN: {ngrok_token}")
ngrok.set_auth_token(ngrok_token)
http_tunnel = ngrok.connect(PORT, bind_tls=True)
ssh_tunnel = ngrok.connect(22, "tcp")
public_url = http_tunnel.public_url
ssh_url = ssh_tunnel.public_url
Events.HOST_URL = public_url
_ = loop.run_until_complete(
Events.send_a_message_to_user(
Constants.BROADCAST_CHAT_ID, f"ssh: {ssh_url}, http:{public_url}"
)
)
else:
public_url = loop.run_until_complete(Events.get_public_ip())
Events.HOST_URL = f"https://{public_url}"
if running_option == "self_signed":
Events.SELF_SIGNED = True
print(Events.HOST_URL)
success = loop.run_until_complete(Events.set_telegram_webhook_url())
if success:
uvicorn.run(
"src.listener:app",
host="0.0.0.0",
port=PORT,
reload=True,
log_level="info",
)
else:
print("Fail, closing the app.")
|