Spaces:
Runtime error
Runtime error
| # This file wraps around Pinpoint to provide simple prediction functionality. | |
| import csv | |
| import time | |
| import uuid | |
| from pprint import pprint | |
| import Pinpoint.FeatureExtraction | |
| from Pinpoint.RandomForest import * | |
| class predictor(): | |
| def __init__(self): | |
| ''' | |
| Constructor | |
| ''' | |
| self.model = random_forest() | |
| self.model.PSYCHOLOGICAL_SIGNALS_ENABLED = False # Needs LIWC markup | |
| self.model.BEHAVIOURAL_FEATURES_ENABLED = False | |
| self.model.train_model(features_file=None, force_new_dataset=False, | |
| model_location=r"far-right-radical-language.model") | |
| self.dict_of_users_all = {} | |
| self.feature_extractor = Pinpoint.FeatureExtraction.feature_extraction( | |
| violent_words_dataset_location="swears", | |
| baseline_training_dataset_location="LIWC2015 Results (Storm_Front_Posts).csv") | |
| def predict(self, string_to_predict = None, username = "unknown"): | |
| ''' | |
| A wrapper function used to call pinpoint and predict if a given piece of text is extremist. | |
| :param string_to_predict: | |
| :param username: | |
| :return: boolean true/ false | |
| ''' | |
| if string_to_predict == None: | |
| raise Exception("No prediction material given...") | |
| extended_prediction_uuid = str(uuid.uuid1())+"-"+str(uuid.uuid1()) | |
| self.model.model_folder = "{}-output".format(extended_prediction_uuid) | |
| self.feature_extractor.MESSAGE_TMP_CACHE_LOCATION = "{}-message-cache".format(extended_prediction_uuid) | |
| print("Starting prediction for {}".format(extended_prediction_uuid)) | |
| if string_to_predict != None: | |
| users_posts = [{"username": "{}".format(username), "timestamp": "tmp", "message": "{}".format(string_to_predict)}] | |
| try: | |
| os.remove("./{}-messages.json".format(extended_prediction_uuid)) | |
| except: | |
| pass | |
| with open('{}-all-messages.csv'.format(extended_prediction_uuid), 'w', encoding='utf8', newline='') as output_file: | |
| writer = csv.DictWriter(output_file, fieldnames=["username", "timestamp", "message"]) | |
| for users_post in users_posts: | |
| writer.writerow(users_post) | |
| try: | |
| self.feature_extractor._get_standard_tweets("{}-all-messages.csv".format(extended_prediction_uuid)) | |
| except FileNotFoundError: | |
| return False | |
| with open("./{}-messages.json".format(extended_prediction_uuid), 'w') as outfile: | |
| features = self.feature_extractor.completed_tweet_user_features | |
| json.dump(features, outfile, indent=4) | |
| rows = self.model.get_features_as_df("./{}-messages.json".format(extended_prediction_uuid), True) | |
| rows.pop("is_extremist") | |
| try: | |
| features = rows.loc[0] | |
| is_extremist = self.model.model.predict([features]) | |
| except FileNotFoundError as e: | |
| is_extremist = False | |
| print("Message cache error, next - {}".format(e)) | |
| print("Ending prediction for {}".format(extended_prediction_uuid)) | |
| dir_name = "." | |
| test = os.listdir(dir_name) | |
| os.remove("{}-all-messages.csv".format(extended_prediction_uuid)) | |
| os.remove("{}-messages.json.csv".format(extended_prediction_uuid)) | |
| os.remove("{}-messages.json".format(extended_prediction_uuid)) | |
| if is_extremist == True: | |
| return True | |
| else: | |
| return False | |