Spaces:
Runtime error
Runtime error
| import time | |
| from pprint import pprint | |
| from mastodon import Mastodon, MastodonNotFoundError # Mastodon.py | |
| list_of_servers = [ | |
| 'hostux.social', | |
| 'social.linux.pizza', | |
| 'nerdculture.de', | |
| 'toot.wales', | |
| 'kolektiva.social', | |
| 'noc.social', | |
| 'seo.chat', | |
| 'ioc.exchange', | |
| 'glasgow.social', | |
| 'mindly.social', | |
| 'mstdn.party', | |
| 'universeodon.com', | |
| 'learningdisability.social', | |
| 'ravenation.club', | |
| 'home.social', | |
| 'mastodon.ie', | |
| 'techhub.social', | |
| 'mastodon.scot', | |
| 'sfba.social', | |
| 'mastodon.sdf.org', | |
| 'mastodon.lol', | |
| 'mstdn.social', | |
| 'mas.to', | |
| 'newsie.social'] | |
| # A HTML blob of text used to define the 'login in with' button | |
| auth_button_text = '''<a href="{}"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'"> | |
| <i class=""></i> Login with Mastodon! 🐘 | |
| </button></a><br>''' | |
| def client_log_in(provided_server = None): | |
| ''' | |
| Generates an authenticated Mastodon client for the admin user. Used for retireving the target website URL | |
| ''' | |
| if provided_server == None: | |
| global slider_choice | |
| server = slider_choice | |
| else: | |
| server = provided_server | |
| if server != None: | |
| id = "WatchTower Ivory | Mastodon" | |
| secrets = json.loads(os.getenv('secrets_json')) | |
| client_id = secrets[server]["client_id"] | |
| client_secret = secrets[server]["client_secret"] | |
| password = secrets[server]["password"] | |
| username = secrets[server]["email"] | |
| mastodon = Mastodon( | |
| client_id=client_id,client_secret=client_secret, | |
| api_base_url='https://{}'.format(server) | |
| ) | |
| access_token = mastodon.log_in( | |
| username=username, | |
| scopes=["write:blocks", "write:mutes","read:search","read:accounts"], | |
| password=password, | |
| redirect_uri="https://user1342-ivory.hf.space/" | |
| ) | |
| else: | |
| mastodon = None | |
| return mastodon | |
| def get_auth_url(mastodon,provided_server = None): | |
| ''' | |
| Retrieves a URL for the user to visit to auth them with WatchTower. | |
| :param mastodon: A admin masterdon instance. | |
| :return: The target URL. | |
| ''' | |
| if provided_server == None: | |
| global slider_choice | |
| server = slider_choice | |
| else: | |
| server = provided_server | |
| secrets = json.loads(os.getenv('secrets_json')) | |
| client_id = secrets[server]["client_id"] | |
| client_secret = secrets[server]["client_secret"] | |
| return mastodon.auth_request_url(client_id=client_id, scopes=["write:blocks", "write:mutes","read:search","read:accounts"], | |
| redirect_uris="https://user1342-ivory.hf.space/") | |
| def login_from_code(code): | |
| ''' | |
| Used to create a masterdon client instance based on an authenticated user code (retrieved from the URL). | |
| :param code: The code which will authenticate the user/ WatchTower. | |
| :return: A masterdon client instance signed in as the user. | |
| ''' | |
| global slider_choice | |
| server = slider_choice | |
| if server != None: | |
| secrets = json.loads(os.getenv('secrets_json')) | |
| client_id = secrets[server]["client_id"] | |
| client_secret = secrets[server]["client_secret"] | |
| mastodon = Mastodon( | |
| client_id=client_id, client_secret=client_secret, | |
| api_base_url='https://{}'.format(server) | |
| ) | |
| mastodon.log_in(code=code, | |
| scopes=["write:blocks", "write:mutes","read:search","read:accounts"], | |
| redirect_uri="https://user1342-ivory.hf.space/") | |
| else: | |
| mastodon = None | |
| return mastodon | |
| # !/usr/bin/env python | |
| # coding: utf-8 | |
| html_data = ''' | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1"> | |
| <link rel="stylesheet" href="https://www.w3schools.com/w3css/4/w3.css"> | |
| <link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Poppins"> | |
| <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css"> | |
| <style> | |
| body,h1,h2,h3,h4,h5 {font-family: "Poppins", sans-serif} | |
| body {font-size: 16px;} | |
| img {margin-bottom: -8px;} | |
| .mySlides {display: none;} | |
| </style> | |
| </head> | |
| <body class="w3-content w3-black" style="max-width:1500px;"> | |
| <!-- The App Section --> | |
| <div class="w3-padding-large w3-white"> | |
| <div class="w3-row-padding-large"> | |
| <div class="w3-col"> | |
| <h1 class="w3-jumbo"><b>WatchTower 🐘🚧</b></h1> | |
| <h1 class="w3-xxxlarge w3-text-purple"><b>Remove Unfavorable Messages From Your Mastodon Feed </b></h1> | |
| <p><span class="w3-xlarge">Scroll down to use WatchTower Ivory. ⬇ </span> WatchTower is a tool that identifies hate speech, misinformation, and extremist content and blocks/ mutes it from your feed. WatchTower Ivory is the second iteration of WatchTower, designed specifically for Mastodon. WatchTower blocks content based on it's current database, so make sure to come back regularly to ensure you're up to date! We use a queue system, which means <b>you may need to wait your turn to run WatchTower</b> - however, once you've clicked run, you can close the tab as WatchTower will continue in the background. WatchTower is simple to use: first scroll down the page, choose your Mastodon server, then click 'sign in with Mastodon', after this you'll be taken to the Mastodon server website and asked to verify yourself, after this you'll be taken back here, then simply scroll down to the bottom of the page and click run!</p> | |
| <a href="https://www.watchtower.cartographer.one/"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'"> | |
| <i class=""></i> Find Out More! 💬 | |
| </button></a> | |
| <a href="https://ko-fi.com/jamesstevenson"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'"> | |
| <i class=""></i> Support The Creator! ❤ | |
| </button></a> | |
| <a href="https://infosec.exchange/@JamesStevenson"><button class="w3-button w3-light-grey w3-padding-large w3-section " onclick="document.getElementById('download').style.display='block'"> | |
| <i class=""></i> Follow Us! 🐦 | |
| </button></a> | |
| </div> | |
| </div> | |
| </div> | |
| <script> | |
| // Slideshow | |
| var slideIndex = 1; | |
| showDivs(slideIndex); | |
| function plusDivs(n) { | |
| showDivs(slideIndex += n); | |
| } | |
| function showDivs(n) { | |
| var i; | |
| var x = document.getElementsByClassName("mySlides"); | |
| if (n > x.length) {slideIndex = 1} | |
| if (n < 1) {slideIndex = x.length} | |
| for (i = 0; i < x.length; i++) { | |
| x[i].style.display = "none"; | |
| } | |
| x[slideIndex-1].style.display = "block"; | |
| } | |
| </script> | |
| <br> | |
| <br> | |
| <br> | |
| </body> | |
| </html> | |
| ''' | |
| # Imports | |
| import json | |
| import os | |
| import gradio as gr | |
| # Setup the gradio block and add some generic CSS | |
| block = gr.Blocks( | |
| css=".container { max-width: 800px; margin: auto; } h1 { margin: 0px; padding: 5px 0; line-height: 50px; font-size: 60pt; }.close-heading {margin: 0px; padding: 0px;} .close-heading p { margin: 0px; padding: 0px;}", | |
| title="WatchTower") | |
| # Chat history variable used for the chatbot prompt on the 'getting started' page. | |
| chat_history = [] | |
| def get_client_from_tokens(code): | |
| ''' | |
| This function is used for generating a masterdon client object based on the code URL paramiters | |
| :param code | |
| :return: A Masterdon client object | |
| ''' | |
| return login_from_code(code) | |
| def block_user(user_id, user, reason): | |
| finished = False | |
| blocked = True | |
| attempts = 0 | |
| while not finished: | |
| try: | |
| user ="alcinnz@floss.social" | |
| user_dict = client.account_search(user) | |
| pprint(user_dict) | |
| client.account_block(user_dict[0]["id"]) | |
| print("Blocked {} for {}".format(user, reason)) | |
| except MastodonNotFoundError as e: | |
| if "Record not found" in str(e): | |
| print("Record not found...") | |
| return False | |
| print("Blocked {}, for {}".format(user, reason)) | |
| return blocked | |
| def block_users(client, threshold, dataset): | |
| ''' | |
| Used for blocking a series of users based on the threshold and datasets provided. Here the users folder is used. | |
| :param client: | |
| :param threshold: | |
| :param dataset: | |
| :return: The number of blocked users. | |
| ''' | |
| num_users_blocked = 0 | |
| for filename in os.listdir("users"): | |
| filename = os.path.join("users", filename) | |
| print("File {} open".format(filename)) | |
| user_file = open(filename, "r") | |
| users = json.load(user_file) | |
| for user in users: | |
| print("Reviewing user {}".format(user)) | |
| if "Violent" in dataset: | |
| # Due to the low number of posts used to aggregate the initial dataset and unreliability of this model, | |
| # the weight has been lowered to represent this. | |
| if user["violence-threshold"]/2 >= threshold: | |
| user_id = str(user["acct"]) | |
| if block_user(user_id, user, "Violent"): | |
| num_users_blocked = num_users_blocked + 1 | |
| continue | |
| if "Hate Speech" in dataset: | |
| if user["toxicity-threshold"] >= threshold: | |
| user_id = str(user["acct"]) | |
| if block_user(user_id, user, "Hate Speech"): | |
| num_users_blocked = num_users_blocked + 1 | |
| continue | |
| return num_users_blocked | |
| def chat(selected_option=None, radio_score=None, url_params=None): | |
| ''' | |
| This function is used to initialise blocking users once the user has authenticated with Mastodon. | |
| :param selected_option: | |
| :param radio_score: | |
| :param url_params: | |
| :return: the chatbot history is returned (including information on blocked accounts). | |
| ''' | |
| global client | |
| global chat_history | |
| history = [] | |
| # app id | |
| if "code" in url_params and client is None: | |
| client = get_client_from_tokens(url_params["code"]) | |
| if radio_score != None and selected_option != None: | |
| if client != None: | |
| # Extract the list to a string representation | |
| if type(selected_option) is list: | |
| block_type = "" | |
| for b_type in selected_option: | |
| block_type = block_type + " + " + b_type.capitalize() | |
| block_type = "'" + block_type[3:] + "'" | |
| else: | |
| block_type = selected_option | |
| # Display to user, set options | |
| history.append( | |
| ["Model tuned to a '{}%' threshold and is using the {} dataset.".format(radio_score, | |
| block_type.capitalize()), | |
| "{} Account blocking initialised".format(block_type.capitalize())]) | |
| num_users_blocked = block_users(client, radio_score, selected_option) | |
| history.append( | |
| ["Blocked {} user account(s).".format(num_users_blocked), "Thank you for using Watchtower."]) | |
| elif radio_score != None or selected_option != None: | |
| chat_history.append(["Initialisation error!", "Please tune the model by using the above options"]) | |
| history = chat_history + history | |
| chatbot.value = history | |
| chatbot.update(value=history) | |
| client = None | |
| return history | |
| def infer(prompt): | |
| pass | |
| have_initialised = False | |
| client = None | |
| name = None | |
| def button_pressed(slider_value, url_params): | |
| # print(url_params) | |
| return [None, chat(radio.value, slider_value, url_params)] | |
| # The website that the user will visit to authenticate WatchTower. | |
| target_website = None | |
| def update_target_website(): | |
| ''' | |
| Updates the URL used to authenticate WatchTower with Mastodon. | |
| #TODO this function is full of old code and can be optimised. | |
| :return: | |
| ''' | |
| global have_initialised | |
| global chatbot | |
| global chat_history | |
| global client | |
| global name | |
| client = None | |
| name = "no username" | |
| chat_history = [ | |
| ["Welcome to Watchtower.".format(name), "Log in via Mastodon and configure your blocking options above."]] | |
| chatbot.value = chat_history | |
| chatbot.update(value=chat_history) | |
| mastodon_auth_button.value = auth_button_text.format( | |
| get_target_website()) | |
| mastodon_auth_button.update( | |
| value=auth_button_text.format( | |
| get_target_website())) | |
| return auth_button_text.format(get_target_website(list_of_servers[0])) | |
| # The below is a JS blob used to retrieve the URL params. | |
| # Thanks to here: https://discuss.huggingface.co/t/hugging-face-and-gradio-url-paramiters/21110/2 | |
| get_window_url_params = """ | |
| function(text_input, url_params) { | |
| console.log(text_input, url_params); | |
| const params = new URLSearchParams(window.location.search); | |
| url_params = Object.fromEntries(params); | |
| return [text_input, url_params]; | |
| } | |
| """ | |
| slider_choice = list_of_servers[0] | |
| def update_server(choice): | |
| print("In change server") | |
| global slider_choice | |
| slider_choice = choice | |
| get_target_website() | |
| return auth_button_text.format(get_target_website()) | |
| def get_chatbot_text(): | |
| return [('Welcome to Watchtower.', 'Log in via Mastodon and configure your blocking options above.')] | |
| def get_target_website(provided_server = None): | |
| ''' | |
| A wrapper function used for retrieving the URL a user will use to authenticate WatchTower with Mastodon. | |
| :return: auth url | |
| ''' | |
| mastodon = client_log_in(provided_server) | |
| return get_auth_url(mastodon, provided_server) | |
| # The Gradio HTML component used for the 'sign in with Mastodon' button | |
| # The main chunk of code that uses Gradio blocks to create the UI | |
| html_button = None | |
| dropdown = None | |
| with block: | |
| gr.HTML(''' | |
| <meta name="viewport" content="width=device-width, initial-scale=1"> | |
| <link rel="stylesheet" href="https://www.w3schools.com/w3css/4/w3.css"> | |
| ''') | |
| # todo check if user signed in | |
| user_message = "Log in via Mastodon and configure your blocking options above." | |
| chat_history.append(["WelcoMe to Watchtower.", user_message]) | |
| gr.HTML(value=html_data) | |
| with gr.Group(): | |
| with gr.Row().style(equal_height=True): | |
| with gr.Box(): | |
| # gr.Label(value="WatchTower", visible=True, interactive=False) | |
| url_params = gr.JSON({}, visible=False, label="URL Params").style( | |
| ) | |
| text_input = gr.Text(label="Input", visible=False).style() | |
| text_output = gr.Text(label="Output", visible=False).style() | |
| with gr.Row().style(): | |
| dropdown = gr.Dropdown(choices=list_of_servers, label="Server", value=list_of_servers[0]) | |
| html_button = mastodon_auth_button = gr.HTML( | |
| value=auth_button_text.format( | |
| get_target_website())).style( | |
| ) | |
| with gr.Row().style(equal_height=True): | |
| radio = gr.CheckboxGroup(value=["Violent", "Hate Speech"], | |
| choices=["Violent", "Hate Speech", "Misinformation"], | |
| interactive=False, label="Behaviour To Block").style() | |
| slider = gr.Slider(value=35, interactive=True, label="Threshold Confidence Tolerance").style() | |
| chatbot = gr.Chatbot(label="Watchtower Output", value=[('Welcome to Watchtower.', | |
| 'Log in via Mastodon and configure your blocking options above.')]).style( | |
| color_map=["grey", "purple"]) | |
| btn = gr.Button("Run WatchTower").style(full_width=True).style() | |
| dropdown.change(fn=update_server, inputs=[dropdown], outputs=html_button) | |
| btn.click(fn=button_pressed, inputs=[slider, url_params], | |
| outputs=[text_output, chatbot], _js=get_window_url_params) | |
| gr.Markdown( | |
| """___ | |
| <p style='text-align: center'> | |
| Created by <a href="https://JamesStevenson.me" target="_blank"</a> James Stevenson | |
| <br/> | |
| </p>""" | |
| ) | |
| # Setup callback for when page loads (used to set a new Mastodon auth target webspage) | |
| block.__enter__() | |
| block.set_event_trigger( | |
| event_name="load", fn=update_target_website, inputs=None, outputs=[html_button], no_target=True | |
| ) | |
| block.set_event_trigger( | |
| event_name="load", fn=get_chatbot_text, inputs=None, outputs=[chatbot], no_target=True | |
| ) | |
| block.attach_load_events() | |
| # Launcg the page | |
| block.launch(enable_queue=True) | |