| |
|
|
| import WarClient |
| import requests |
| import re |
| from bs4 import BeautifulSoup |
| import urllib.request as urllib |
| import warnings |
| |
| import time |
| |
| warnings.filterwarnings("ignore") |
|
|
| |
| login_url = 'https://waronline.org/fora/index.php?login/login' |
| thread_url = 'https://waronline.org/fora/index.php?threads/warbot-playground.17636/' |
| post_url = "https://waronline.org/fora/index.php?threads/warbot-playground.17636/add-reply" |
|
|
| |
| username = 'WarBot' |
| password = 'naP2tion' |
|
|
| |
| session = requests.Session() |
|
|
| def fixString(S): |
| |
| S = re.sub(",+", ",", S) |
| return S |
|
|
| def compare_pages(url1, url2): |
| |
| return urllib.urlopen(url1).geturl() == urllib.urlopen(url2).geturl() |
|
|
| def remove_non_english_russian_chars(s): |
| |
| pattern = '[^0-9A-Za-zА-Яа-яЁё(),.!?"\s-]' |
| |
| return re.sub(pattern, '', s) |
| def login(username=username, password=password, thread_url=thread_url): |
| |
|
|
| |
| login_page_response = session.get(login_url) |
| soup = BeautifulSoup(login_page_response.text, 'html.parser') |
| csrf_token = soup.find('input', {'name': '_xfToken'})['value'] |
|
|
| |
| login_data = { |
| 'login': username, |
| 'password': password, |
| 'remember': '1', |
| '_xfRedirect': thread_url, |
| '_xfToken': csrf_token |
| } |
| response = session.post(login_url, data=login_data) |
|
|
| |
| if 'Invalid login' in response.text: |
| print('Login failed!') |
| exit() |
|
|
| def post(message="", thread_url=thread_url, post_url=post_url, quoted_by="",quote_text="",quote_source=""): |
| |
| |
| quote_source = quote_source.split('-')[-1] |
|
|
| if quoted_by: |
| message = f'[QUOTE="{quoted_by}, post: {quote_source}"]{quote_text}[/QUOTE]{message}' |
| |
| |
|
|
| |
| response = session.get(thread_url) |
|
|
| |
| soup = BeautifulSoup(response.text, 'html.parser') |
|
|
| |
| xf_token = soup.find('input', {'name': '_xfToken'}).get('value') |
|
|
| |
| message_data = { |
| '_xfToken': xf_token, |
| 'message': message, |
| 'attachment_hash': '', |
| 'last_date': '', |
| '_xfRequestUri': post_url, |
| '_xfWithData': '1', |
| '_xfResponseType': 'json' |
| } |
|
|
| response = session.post(post_url, data=message_data) |
|
|
| |
| if not response.ok: |
| print('Post failed!') |
| exit() |
|
|
| print('Post submitted successfully.') |
|
|
| def getMessages(thread_url=thread_url, quotedUser="", startingPage=1): |
| |
| allquotes =[] |
|
|
| page = startingPage |
| lastPage = False |
|
|
| |
| messengerName = "" |
| messageID = "" |
| quotedID = "" |
|
|
| |
| namePattern = re.compile('data-lb-caption-desc="(.*?) ·') |
| messageIDPattern = re.compile('data-lb-id="(.*?)"') |
| quotedIDPattern = re.compile('data-source="(.*?)"') |
| quotedNamePattern = re.compile('data-quote="(.*?)"') |
|
|
| while not lastPage: |
| response = requests.get(thread_url + 'page-' + str(page)) |
| if response.status_code == 200: |
|
|
| |
| html_content = response.content |
|
|
| |
| soup = BeautifulSoup(html_content, 'html.parser') |
|
|
| |
| messageData = soup.find_all('div', {'class': 'message-userContent lbContainer js-lbContainer'}) |
|
|
| for data in messageData: |
| try: |
| |
| matchName = namePattern.search(str(data)) |
| if matchName: |
| messengerName = matchName.group(1) |
|
|
| |
| matchID = quotedIDPattern.search(str(data)) |
| if matchID: |
| quotedID = matchID.group(1) |
|
|
| |
| matchID = messageIDPattern.search(str(data)) |
| if matchID: |
| messageID = matchID.group(1) |
|
|
| matchQuotedName = quotedNamePattern.search(str(data)) |
| if matchQuotedName: |
| quotedName = matchQuotedName.group(1) |
| if quotedUser and (quotedUser != quotedName): |
| continue |
|
|
| |
| blockquote = data.find('blockquote') |
| if blockquote: |
| |
| text = data.find('div', {'class': 'bbWrapper'}) |
| for bq in text.find_all('blockquote'): |
| bq.extract() |
| reply = text.get_text().replace('\n', ' ').strip() |
|
|
| allquotes.append({'reply': reply, 'messengerName': messengerName, 'messageID': messageID, 'quotedID': quotedID}) |
|
|
| except: |
| continue |
|
|
| |
| if not compare_pages(thread_url + 'page-' + str(page), thread_url + 'page-' + str(page + 1)): |
| page += 1 |
| else: |
| lastPage = True |
| else: |
| lastPage = True |
|
|
| return allquotes |
|
|
| |
| def WarOnlineBot(): |
|
|
| login(username=username, password=password, thread_url=thread_url) |
| |
|
|
| |
| allMessages = getMessages(thread_url=thread_url, quotedUser='', startingPage=1) |
|
|
| |
| messages_by_bot_IDs = [] |
|
|
| for msg in allMessages: |
| |
| if msg['messengerName'] == username: |
| messages_by_bot_IDs.append(msg['quotedID'].split(': ')[-1]) |
| |
| messages_by_bot_IDs = list(set([elem for elem in messages_by_bot_IDs if elem])) |
|
|
| |
| messagesForBot = getMessages(thread_url=thread_url, quotedUser=username, startingPage=1) |
|
|
| |
| messages_for_bot_IDs = [] |
|
|
| for msg in messagesForBot: |
| |
| messages_for_bot_IDs.append(msg['messageID'].split('-')[-1]) |
| |
| messages_for_bot_IDs = [elem for elem in messages_for_bot_IDs if elem] |
|
|
| |
| messages_for_bot_IDs = [ID for ID in messages_for_bot_IDs if ID not in messages_by_bot_IDs] |
|
|
| |
| for msg in messagesForBot: |
| if msg['messageID'].split('-')[-1] in messages_for_bot_IDs: |
|
|
| originalQuote = msg['reply'] |
| quote = remove_non_english_russian_chars(msg['reply']) |
| message = "" |
|
|
| while not message: |
| message = WarClient.getReply(message=quote) |
|
|
| |
| message = fixString(message) |
|
|
| print('Quote: ', originalQuote) |
| print('Reply: ', message) |
|
|
| login(username=username, password=password, thread_url=thread_url) |
| time.sleep(1) |
| post(message=message, thread_url=thread_url, post_url=post_url, quoted_by=msg['messengerName'], quote_text=originalQuote, quote_source=msg['messageID']) |
|
|
| time.sleep(10) |
|
|
|
|
| if __name__ == '__main__': |
| timeout = 5 |
|
|
| |
| while True: |
| WarOnlineBot() |
|
|
| timer = range(60 * timeout) |
| for t in timer: |
| time.sleep(1) |
|
|