File size: 11,609 Bytes
2a94fea 2762b8b 2a94fea 5f880f3 a1fbb09 5f880f3 2a94fea a1fbb09 5f880f3 eb25def 5f880f3 2a94fea 5f880f3 2a94fea 5f880f3 2a94fea eb25def 5f880f3 eb25def 5f880f3 2a94fea 5f880f3 2a94fea 5f880f3 2a94fea |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
import time
import re
import requests
from typing import Callable, Any
from datetime import datetime
# --- Constants & Configuration ---
USER_AGENT = "HuggingFaceNSrecruit/1.0.0"
API_URL = "https://www.nationstates.net/cgi-bin/api.cgi"
RATE_LIMITS = {"Recruitment": 180 + 2, "Non-Recruitment": 30 + 2}
def log_message(level, message):
"""Formats a log message with a timestamp."""
return f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} [{level}] {message}"
def ns_api_request(params, user_agent_override=USER_AGENT):
"""Makes a request to the NationStates API with error handling."""
headers = {"User-Agent": user_agent_override}
try:
# A short sleep for all non-telegram API calls to be safe
time.sleep(0.7)
response = requests.get(API_URL, params=params, headers=headers)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
if hasattr(e, 'response') and e.response is not None:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("X-Retry-After", 30))
raise ConnectionError(f"API Rate Limit Hit! Wait {retry_after}s.")
raise ConnectionError(f"API Error: Status {e.response.status_code} - {e.response.text}")
raise ConnectionError(f"Network Error: {e}")
class TRLParser:
"""
A robust, recursive-descent parser for Template Recipient Language,
modeled after the official JavaScript implementation.
"""
def _consume_whitespace(self, text):
return text.lstrip()
def _parse_primitive(self, text):
text = self._consume_whitespace(text)
# Match category
category_match = re.match(r'^[a-zA-Z_]+', text)
if not category_match:
raise ValueError("Expected a primitive category (e.g., 'nations', 'regions')")
category = category_match.group(0)
text = text[len(category):]
text = self._consume_whitespace(text)
# Match arguments within brackets
if not text.startswith('['):
raise ValueError("Expected '[' after primitive category")
# Start scanning after the initial opening bracket
scan_text = text[1:]
bracket_depth = 1
end_bracket_index = -1
for i, char in enumerate(scan_text):
if char == '[':
bracket_depth += 1
elif char == ']':
bracket_depth -= 1
if bracket_depth == 0:
end_bracket_index = i
break
if end_bracket_index == -1:
raise ValueError("Unmatched '[' in primitive arguments")
# The arguments are the content inside the matching brackets
args_str = scan_text[:end_bracket_index]
# The rest of the text to parse is what's after the closing bracket
text = scan_text[end_bracket_index + 1:]
# For regex, we don't split by comma. For others, we do.
if category == "name_regex":
args = [args_str]
else:
args = [arg.strip() for arg in args_str.split(',')]
primitive = {"type": "primitive", "category": category, "args": args}
return primitive, text
def _parse_command(self, text):
text = self._consume_whitespace(text)
# Match action
action_char = text[0]
if action_char in "+-/":
action = {"+": "add", "-": "remove", "/": "limit"}[action_char]
text = text[1:]
else:
action = "add" # Default action
text = self._consume_whitespace(text)
# Match body (either a group or a primitive)
if text.startswith('('):
body, text = self._parse_group(text)
else:
body, text = self._parse_primitive(text)
text = self._consume_whitespace(text)
if not text.startswith(';'):
raise ValueError("Expected ';' to terminate a command")
text = text[1:]
command = {"type": "command", "action": action, "body": body}
return command, text
def _parse_group(self, text):
text = self._consume_whitespace(text)
if not text.startswith('('):
raise ValueError("Expected '(' to start a group")
text = text[1:] # Consume '('
commands = []
while True:
text = self._consume_whitespace(text)
if not text:
raise ValueError("Unmatched '(' in group definition")
if text.startswith(')'):
break
command, text = self._parse_command(text)
commands.append(command)
text = text[1:] # Consume ')'
group = {"type": "group", "commands": commands}
return group, text
def parse(self, trl_string):
"""Parses a raw TRL string into a structured command tree."""
# Pre-processing
cleaned_string = re.sub(r'#.*$', '', trl_string, flags=re.MULTILINE)
# Wrap the entire string in an implicit group for the parser
group_to_parse = f"({cleaned_string})"
parsed_group, remaining_text = self._parse_group(group_to_parse)
if self._consume_whitespace(remaining_text):
raise ValueError(f"Unexpected trailing characters in TRL string: {remaining_text}")
return parsed_group
# --- EVALUATION LOGIC ---
def _evaluate_primitive(self, primitive, current_nations):
category = primitive["category"]
args = primitive["args"]
args_str = ",".join(args) # Re-join for the old API fetcher
if category == "nations":
return {arg.lower().replace(' ', '_') for arg in args}
if category == "regions":
xml = ns_api_request({"q": "nations", "region": args_str})
return set(re.findall(r'<NATION id="([^"]+)">', xml))
if category == "wa":
if "members" in args:
xml = ns_api_request({"q": "wamembers"})
return set(xml.split(','))
if "delegates" in args:
xml = ns_api_request({"q": "wadelegates"})
return set(xml.split(','))
if category in ("new", "refounded"):
limit = int(args[0]) if args and args[0].isdigit() else 50
happenings_filter = "founding" if category == "new" else "refounding"
xml = ns_api_request({"q": "happenings", "filter": happenings_filter, "limit": limit})
return set(re.findall(r'@@([^@]+)@@', xml))
if category == "name_regex":
try:
pattern = re.compile(args[0].strip())
return {nation for nation in current_nations if pattern.search(nation)}
except re.error as e:
raise ValueError(f"Invalid Regular Expression '{args[0]}': {e}")
return set()
def _evaluate_group(self, group, initial_nations=None):
nations = initial_nations if initial_nations is not None else set()
for command in group["commands"]:
action = command["action"]
body = command["body"]
if body["type"] == "group":
target_nations = self._evaluate_group(body)
else: # Primitive
target_nations = self._evaluate_primitive(body, current_nations=nations)
if action == 'add':
nations.update(target_nations)
elif action == 'remove':
nations.difference_update(target_nations)
elif action == 'limit':
nations.intersection_update(target_nations)
return nations
def evaluate(self, trl_string):
"""Parses and then evaluates a TRL string."""
if not trl_string.strip():
return set()
parsed_structure = self.parse(trl_string)
return self._evaluate_group(parsed_structure)
def telegram_engine(
settings: dict[str, Any],
log_callback: Callable[[str], None],
get_status_callback: Callable[[], str],
add_to_history_callback: Callable[[str], None]
):
"""Main telegramming loop. Runs inside a thread."""
log_callback(log_message("INFO", "Telegramming thread started."))
rate_limit = RATE_LIMITS.get(settings['tg_type'], 182)
parser = TRLParser()
while get_status_callback() != 'stopped':
try:
log_callback(log_message("INFO", "Evaluating TRL to get recipients..."))
target_nations = list(parser.evaluate(settings['trl']))
log_callback(log_message("INFO", f"Found {len(target_nations)} potential recipients."))
if not target_nations:
if settings['continuous']:
log_callback(log_message("WARN", "No recipients found. Waiting 30s to re-evaluate..."))
time.sleep(30)
continue
else:
log_callback(log_message("INFO", "Campaign finished: No recipients found."))
break
for nation in target_nations:
# --- Graceful Pause/Stop Check ---
while get_status_callback() == 'paused':
log_callback(log_message("INFO", "Campaign is paused..."))
time.sleep(5)
if get_status_callback() == 'stopped':
log_callback(log_message("INFO", "Stop signal received, halting."))
return
# The history check is done by the main thread now to prevent race conditions
# but we could add a check here if needed.
log_callback(log_message("INFO", f"Preparing to telegram {nation}."))
if settings['dry_run']:
log_callback(log_message("INFO", f"[DRY RUN] Would have sent telegram to {nation}."))
else:
try:
params = {
"a": "sendTG", "client": settings['client_key'],
"tgid": settings['tg_id'], "key": settings['secret_key'], "to": nation
}
response = ns_api_request(params)
if "Queued" in response:
log_callback(log_message("SUCCESS", f"Queued telegram to {nation}."))
else:
log_callback(log_message("ERROR", f"Failed for {nation}. API Response: {response.strip()}"))
except Exception as e:
log_callback(log_message("ERROR", f"Exception while telegramming {nation}: {e}"))
add_to_history_callback(nation)
log_callback(log_message("INFO", f"Waiting {rate_limit} seconds..."))
time.sleep(rate_limit)
if not settings['continuous']:
log_callback(log_message("INFO", "Finished recipient list. Continuous mode is OFF."))
break
else:
log_callback(log_message("INFO", "Finished list. Continuous mode is ON. Re-evaluating in 30s..."))
time.sleep(30)
except Exception as e:
log_callback(log_message("CRIT", f"A critical error occurred: {e}"))
if settings['continuous']:
log_callback(log_message("CRIT", "Waiting 60 seconds before retrying..."))
time.sleep(60)
else:
log_callback(log_message("CRIT", "Aborting due to critical error."))
break
log_callback(log_message("INFO", "Telegramming thread has finished.")) |