Spaces:
Sleeping
Sleeping
File size: 7,717 Bytes
246df79 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | import datetime
import re
import sys
import time
import tweepy
from Pinpoint.ConfigManager import ConfigManager
class Twitter:
'''
Twitter aggregator class
'''
tweepy_api = None
def __init__(self):
'''
Constrcutor
'''
twitter_config = ConfigManager.getTwitterConfig()
consumer_key = twitter_config["consumer_key"]
consumer_secret = twitter_config["consumer_secret"]
access_token = twitter_config["access_token"]
access_token_secret = twitter_config["access_token_secret"]
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
self.tweepy_api = tweepy.API(auth)
def get_tweet(self, tweet_info, attempts=1):
'''
returns a list of up to two tweets. This is because the provided tweet could be a quoted tweet. If this is the case
we take that as two seperate tweets. Otherwise one tweet is returned with the necessary extracted data.
:param tweet_info:
:return: a list of up to two tweets with the necessary data extracted as defined in the serilizer.
'''
# If we've received several errors in a row then it's probably not going to fix itself.
if attempts > 5:
return []
list_of_tweets = []
tweet = None
try:
retweets = tweet_info.retweet_count
likes = tweet_info.favorite_count
date = tweet_info.created_at.timestamp()
# Gets full tweet if normal tweet or re-tweet
if tweet_info.retweeted:
try:
tweet = tweet_info.retweeted_status.full_text
retweets = tweet_info.retweeted_status.retweet_count
likes = tweet_info.retweeted_status.favorite_count
tweet_info = self.tweepy_api.get_status(id=tweet_info.id, tweet_mode='extended')
# Gets author of tweet
source = tweet_info.full_text.split(":", 1)[0]
regex = r"RT @(.+)"
matchObj = re.match(regex, source)
if matchObj:
source = matchObj.group(1)
else:
source = "self"
except AttributeError as e:
print(e)
pass
else:
# Gets full tweet and sets author to self
tweet = tweet_info.full_text
source = "self"
# For quotes retweets we take the quoted tweet and the parent tweet as two seperate tweets.
if tweet_info.is_quote_status:
try:
quoted_id = tweet_info.quoted_status_id
quoted_tweet_info = self.tweepy_api.get_status(id=quoted_id, tweet_mode='extended')
quoted_tweet_text = quoted_tweet_info.full_text
quoted_source = quoted_tweet_info.user.name
quoted_retweets = quoted_tweet_info.retweet_count
quoted_likes = quoted_tweet_info.favorite_count
quoted_date = quoted_tweet_info.created_at.timestamp()
# As this function can return two tweets (i.e. a quoted tweet and normal tweet) the tweets are added to a list
list_of_tweets.append(
Serializer.createPostDict(date=quoted_date, post_text=quoted_tweet_text, likes=quoted_likes,
comments='', shares=quoted_retweets, source=quoted_source))
except AttributeError as e:
print("Tweepy Twitter api error. On attempt {} \n {}".format(attempts, e))
pass
# As this function can return two tweets (i.e. a quoted tweet and normal tweet) the tweets are added to a list
if tweet is not None:
list_of_tweets.append(
Serializer.createPostDict(date=date, post_text=tweet, likes=likes, comments='', shares=retweets,
source=source))
except tweepy.RateLimitError as e:
print("Tweepy Twitter api rate limit reached. On attempt {} \n {}".format(attempts, e))
time.sleep(300)
return self.get_tweet(tweet_info, attempts + 1) # if error, try again.
except tweepy.TweepError as e:
print("Tweepy Twitter api error. On attempt {} \n {}".format(attempts, e))
pass
return list_of_tweets
def get_posts(self, username, attempts=1):
'''
Loops through all tweets for the provided user
:param username:
:return: a list of serilised tweets
'''
# If a participant has enteres their username with spaces in error this will format it.
username = username.replace(" ", "")
# Checks attempts. If exceeded return empty list.
if attempts > 3:
return []
list_of_tweets = []
# If an @ symbol has been added to the string then it's removed.
if str(username).startswith("@"):
username = username[1:]
try:
for tweet_info in tweepy.Cursor(self.tweepy_api.user_timeline, id=username, tweet_mode='extended').items():
# As this function can return two tweets (i.e. a quoted tweet and normal tweet) the tweets are added to a list
list_of_tweets = list_of_tweets + self.get_tweet(tweet_info)
except tweepy.error.TweepError as e:
print("Tweepy Twitter api error on user {}. On Attempt {} .\n {}".format(username, attempts, e))
time.sleep(300)
return self.get_posts(username, sys.maxsize) # Unlinkely to be an error that can be fixed by waiting
return list_of_tweets
def get_user(self, user_name):
"""
Gets a Twepy user object for a given user name
:param user_name: a string representation of a Twitter username
:return: a Tweepy user object, None if no user found
"""
user = None
try:
user = self.tweepy_api.get_user(user_name)
except:
pass
return user
def is_valid_user(self, user_name):
"""
Gets a Twepy user object for a given user name
:param user_name: a string representation of a Twitter username
:return: None if doesn't exist or suspended, user object if valid.
"""
user = None
try:
user = self.tweepy_api.get_user(user_name)
if user.suspended:
user = None
except:
pass
return user
def get_user_post_frequency(self, user_name):
"""
A utility function used to retrieve a users post frequency
:param user_name:
:return:
"""
user = self.tweepy_api.get_user(user_name)
created_at_time = user.created_at
number_of_posts = user.statuses_count
current_date = datetime.datetime.now()
elapse_time = current_date - created_at_time
frequency = number_of_posts / elapse_time.days
return frequency
def get_follower_following_frequency(self, user_name):
"""
A utility function used to retrieve a users follower/ following frequency
:param user_name:
:return:
"""
user = self.tweepy_api.get_user(user_name)
followers_count = user.followers_count
following_count = user.friends_count
ration = following_count / followers_count
return ration
|