NSrecruit / telegram_engine.py
Bohaska
fix parsing of nested brackets
a1fbb09
import time
import re
import requests
from typing import Callable, Any
from datetime import datetime
# --- Constants & Configuration ---
USER_AGENT = "HuggingFaceNSrecruit/1.0.0"
API_URL = "https://www.nationstates.net/cgi-bin/api.cgi"
RATE_LIMITS = {"Recruitment": 180 + 2, "Non-Recruitment": 30 + 2}
def log_message(level, message):
"""Formats a log message with a timestamp."""
return f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} [{level}] {message}"
def ns_api_request(params, user_agent_override=USER_AGENT):
"""Makes a request to the NationStates API with error handling."""
headers = {"User-Agent": user_agent_override}
try:
# A short sleep for all non-telegram API calls to be safe
time.sleep(0.7)
response = requests.get(API_URL, params=params, headers=headers)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
if hasattr(e, 'response') and e.response is not None:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("X-Retry-After", 30))
raise ConnectionError(f"API Rate Limit Hit! Wait {retry_after}s.")
raise ConnectionError(f"API Error: Status {e.response.status_code} - {e.response.text}")
raise ConnectionError(f"Network Error: {e}")
class TRLParser:
"""
A robust, recursive-descent parser for Template Recipient Language,
modeled after the official JavaScript implementation.
"""
def _consume_whitespace(self, text):
return text.lstrip()
def _parse_primitive(self, text):
text = self._consume_whitespace(text)
# Match category
category_match = re.match(r'^[a-zA-Z_]+', text)
if not category_match:
raise ValueError("Expected a primitive category (e.g., 'nations', 'regions')")
category = category_match.group(0)
text = text[len(category):]
text = self._consume_whitespace(text)
# Match arguments within brackets
if not text.startswith('['):
raise ValueError("Expected '[' after primitive category")
# Start scanning after the initial opening bracket
scan_text = text[1:]
bracket_depth = 1
end_bracket_index = -1
for i, char in enumerate(scan_text):
if char == '[':
bracket_depth += 1
elif char == ']':
bracket_depth -= 1
if bracket_depth == 0:
end_bracket_index = i
break
if end_bracket_index == -1:
raise ValueError("Unmatched '[' in primitive arguments")
# The arguments are the content inside the matching brackets
args_str = scan_text[:end_bracket_index]
# The rest of the text to parse is what's after the closing bracket
text = scan_text[end_bracket_index + 1:]
# For regex, we don't split by comma. For others, we do.
if category == "name_regex":
args = [args_str]
else:
args = [arg.strip() for arg in args_str.split(',')]
primitive = {"type": "primitive", "category": category, "args": args}
return primitive, text
def _parse_command(self, text):
text = self._consume_whitespace(text)
# Match action
action_char = text[0]
if action_char in "+-/":
action = {"+": "add", "-": "remove", "/": "limit"}[action_char]
text = text[1:]
else:
action = "add" # Default action
text = self._consume_whitespace(text)
# Match body (either a group or a primitive)
if text.startswith('('):
body, text = self._parse_group(text)
else:
body, text = self._parse_primitive(text)
text = self._consume_whitespace(text)
if not text.startswith(';'):
raise ValueError("Expected ';' to terminate a command")
text = text[1:]
command = {"type": "command", "action": action, "body": body}
return command, text
def _parse_group(self, text):
text = self._consume_whitespace(text)
if not text.startswith('('):
raise ValueError("Expected '(' to start a group")
text = text[1:] # Consume '('
commands = []
while True:
text = self._consume_whitespace(text)
if not text:
raise ValueError("Unmatched '(' in group definition")
if text.startswith(')'):
break
command, text = self._parse_command(text)
commands.append(command)
text = text[1:] # Consume ')'
group = {"type": "group", "commands": commands}
return group, text
def parse(self, trl_string):
"""Parses a raw TRL string into a structured command tree."""
# Pre-processing
cleaned_string = re.sub(r'#.*$', '', trl_string, flags=re.MULTILINE)
# Wrap the entire string in an implicit group for the parser
group_to_parse = f"({cleaned_string})"
parsed_group, remaining_text = self._parse_group(group_to_parse)
if self._consume_whitespace(remaining_text):
raise ValueError(f"Unexpected trailing characters in TRL string: {remaining_text}")
return parsed_group
# --- EVALUATION LOGIC ---
def _evaluate_primitive(self, primitive, current_nations):
category = primitive["category"]
args = primitive["args"]
args_str = ",".join(args) # Re-join for the old API fetcher
if category == "nations":
return {arg.lower().replace(' ', '_') for arg in args}
if category == "regions":
xml = ns_api_request({"q": "nations", "region": args_str})
return set(re.findall(r'<NATION id="([^"]+)">', xml))
if category == "wa":
if "members" in args:
xml = ns_api_request({"q": "wamembers"})
return set(xml.split(','))
if "delegates" in args:
xml = ns_api_request({"q": "wadelegates"})
return set(xml.split(','))
if category in ("new", "refounded"):
limit = int(args[0]) if args and args[0].isdigit() else 50
happenings_filter = "founding" if category == "new" else "refounding"
xml = ns_api_request({"q": "happenings", "filter": happenings_filter, "limit": limit})
return set(re.findall(r'@@([^@]+)@@', xml))
if category == "name_regex":
try:
pattern = re.compile(args[0].strip())
return {nation for nation in current_nations if pattern.search(nation)}
except re.error as e:
raise ValueError(f"Invalid Regular Expression '{args[0]}': {e}")
return set()
def _evaluate_group(self, group, initial_nations=None):
nations = initial_nations if initial_nations is not None else set()
for command in group["commands"]:
action = command["action"]
body = command["body"]
if body["type"] == "group":
target_nations = self._evaluate_group(body)
else: # Primitive
target_nations = self._evaluate_primitive(body, current_nations=nations)
if action == 'add':
nations.update(target_nations)
elif action == 'remove':
nations.difference_update(target_nations)
elif action == 'limit':
nations.intersection_update(target_nations)
return nations
def evaluate(self, trl_string):
"""Parses and then evaluates a TRL string."""
if not trl_string.strip():
return set()
parsed_structure = self.parse(trl_string)
return self._evaluate_group(parsed_structure)
def telegram_engine(
settings: dict[str, Any],
log_callback: Callable[[str], None],
get_status_callback: Callable[[], str],
add_to_history_callback: Callable[[str], None]
):
"""Main telegramming loop. Runs inside a thread."""
log_callback(log_message("INFO", "Telegramming thread started."))
rate_limit = RATE_LIMITS.get(settings['tg_type'], 182)
parser = TRLParser()
while get_status_callback() != 'stopped':
try:
log_callback(log_message("INFO", "Evaluating TRL to get recipients..."))
target_nations = list(parser.evaluate(settings['trl']))
log_callback(log_message("INFO", f"Found {len(target_nations)} potential recipients."))
if not target_nations:
if settings['continuous']:
log_callback(log_message("WARN", "No recipients found. Waiting 30s to re-evaluate..."))
time.sleep(30)
continue
else:
log_callback(log_message("INFO", "Campaign finished: No recipients found."))
break
for nation in target_nations:
# --- Graceful Pause/Stop Check ---
while get_status_callback() == 'paused':
log_callback(log_message("INFO", "Campaign is paused..."))
time.sleep(5)
if get_status_callback() == 'stopped':
log_callback(log_message("INFO", "Stop signal received, halting."))
return
# The history check is done by the main thread now to prevent race conditions
# but we could add a check here if needed.
log_callback(log_message("INFO", f"Preparing to telegram {nation}."))
if settings['dry_run']:
log_callback(log_message("INFO", f"[DRY RUN] Would have sent telegram to {nation}."))
else:
try:
params = {
"a": "sendTG", "client": settings['client_key'],
"tgid": settings['tg_id'], "key": settings['secret_key'], "to": nation
}
response = ns_api_request(params)
if "Queued" in response:
log_callback(log_message("SUCCESS", f"Queued telegram to {nation}."))
else:
log_callback(log_message("ERROR", f"Failed for {nation}. API Response: {response.strip()}"))
except Exception as e:
log_callback(log_message("ERROR", f"Exception while telegramming {nation}: {e}"))
add_to_history_callback(nation)
log_callback(log_message("INFO", f"Waiting {rate_limit} seconds..."))
time.sleep(rate_limit)
if not settings['continuous']:
log_callback(log_message("INFO", "Finished recipient list. Continuous mode is OFF."))
break
else:
log_callback(log_message("INFO", "Finished list. Continuous mode is ON. Re-evaluating in 30s..."))
time.sleep(30)
except Exception as e:
log_callback(log_message("CRIT", f"A critical error occurred: {e}"))
if settings['continuous']:
log_callback(log_message("CRIT", "Waiting 60 seconds before retrying..."))
time.sleep(60)
else:
log_callback(log_message("CRIT", "Aborting due to critical error."))
break
log_callback(log_message("INFO", "Telegramming thread has finished."))