import time import re import requests from typing import Callable, Any from datetime import datetime # --- Constants & Configuration --- USER_AGENT = "HuggingFaceNSrecruit/1.0.0" API_URL = "https://www.nationstates.net/cgi-bin/api.cgi" RATE_LIMITS = {"Recruitment": 180 + 2, "Non-Recruitment": 30 + 2} def log_message(level, message): """Formats a log message with a timestamp.""" return f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} [{level}] {message}" def ns_api_request(params, user_agent_override=USER_AGENT): """Makes a request to the NationStates API with error handling.""" headers = {"User-Agent": user_agent_override} try: # A short sleep for all non-telegram API calls to be safe time.sleep(0.7) response = requests.get(API_URL, params=params, headers=headers) response.raise_for_status() return response.text except requests.exceptions.RequestException as e: if hasattr(e, 'response') and e.response is not None: if e.response.status_code == 429: retry_after = int(e.response.headers.get("X-Retry-After", 30)) raise ConnectionError(f"API Rate Limit Hit! Wait {retry_after}s.") raise ConnectionError(f"API Error: Status {e.response.status_code} - {e.response.text}") raise ConnectionError(f"Network Error: {e}") class TRLParser: """ A robust, recursive-descent parser for Template Recipient Language, modeled after the official JavaScript implementation. """ def _consume_whitespace(self, text): return text.lstrip() def _parse_primitive(self, text): text = self._consume_whitespace(text) # Match category category_match = re.match(r'^[a-zA-Z_]+', text) if not category_match: raise ValueError("Expected a primitive category (e.g., 'nations', 'regions')") category = category_match.group(0) text = text[len(category):] text = self._consume_whitespace(text) # Match arguments within brackets if not text.startswith('['): raise ValueError("Expected '[' after primitive category") # Start scanning after the initial opening bracket scan_text = text[1:] bracket_depth = 1 end_bracket_index = -1 for i, char in enumerate(scan_text): if char == '[': bracket_depth += 1 elif char == ']': bracket_depth -= 1 if bracket_depth == 0: end_bracket_index = i break if end_bracket_index == -1: raise ValueError("Unmatched '[' in primitive arguments") # The arguments are the content inside the matching brackets args_str = scan_text[:end_bracket_index] # The rest of the text to parse is what's after the closing bracket text = scan_text[end_bracket_index + 1:] # For regex, we don't split by comma. For others, we do. if category == "name_regex": args = [args_str] else: args = [arg.strip() for arg in args_str.split(',')] primitive = {"type": "primitive", "category": category, "args": args} return primitive, text def _parse_command(self, text): text = self._consume_whitespace(text) # Match action action_char = text[0] if action_char in "+-/": action = {"+": "add", "-": "remove", "/": "limit"}[action_char] text = text[1:] else: action = "add" # Default action text = self._consume_whitespace(text) # Match body (either a group or a primitive) if text.startswith('('): body, text = self._parse_group(text) else: body, text = self._parse_primitive(text) text = self._consume_whitespace(text) if not text.startswith(';'): raise ValueError("Expected ';' to terminate a command") text = text[1:] command = {"type": "command", "action": action, "body": body} return command, text def _parse_group(self, text): text = self._consume_whitespace(text) if not text.startswith('('): raise ValueError("Expected '(' to start a group") text = text[1:] # Consume '(' commands = [] while True: text = self._consume_whitespace(text) if not text: raise ValueError("Unmatched '(' in group definition") if text.startswith(')'): break command, text = self._parse_command(text) commands.append(command) text = text[1:] # Consume ')' group = {"type": "group", "commands": commands} return group, text def parse(self, trl_string): """Parses a raw TRL string into a structured command tree.""" # Pre-processing cleaned_string = re.sub(r'#.*$', '', trl_string, flags=re.MULTILINE) # Wrap the entire string in an implicit group for the parser group_to_parse = f"({cleaned_string})" parsed_group, remaining_text = self._parse_group(group_to_parse) if self._consume_whitespace(remaining_text): raise ValueError(f"Unexpected trailing characters in TRL string: {remaining_text}") return parsed_group # --- EVALUATION LOGIC --- def _evaluate_primitive(self, primitive, current_nations): category = primitive["category"] args = primitive["args"] args_str = ",".join(args) # Re-join for the old API fetcher if category == "nations": return {arg.lower().replace(' ', '_') for arg in args} if category == "regions": xml = ns_api_request({"q": "nations", "region": args_str}) return set(re.findall(r'', xml)) if category == "wa": if "members" in args: xml = ns_api_request({"q": "wamembers"}) return set(xml.split(',')) if "delegates" in args: xml = ns_api_request({"q": "wadelegates"}) return set(xml.split(',')) if category in ("new", "refounded"): limit = int(args[0]) if args and args[0].isdigit() else 50 happenings_filter = "founding" if category == "new" else "refounding" xml = ns_api_request({"q": "happenings", "filter": happenings_filter, "limit": limit}) return set(re.findall(r'@@([^@]+)@@', xml)) if category == "name_regex": try: pattern = re.compile(args[0].strip()) return {nation for nation in current_nations if pattern.search(nation)} except re.error as e: raise ValueError(f"Invalid Regular Expression '{args[0]}': {e}") return set() def _evaluate_group(self, group, initial_nations=None): nations = initial_nations if initial_nations is not None else set() for command in group["commands"]: action = command["action"] body = command["body"] if body["type"] == "group": target_nations = self._evaluate_group(body) else: # Primitive target_nations = self._evaluate_primitive(body, current_nations=nations) if action == 'add': nations.update(target_nations) elif action == 'remove': nations.difference_update(target_nations) elif action == 'limit': nations.intersection_update(target_nations) return nations def evaluate(self, trl_string): """Parses and then evaluates a TRL string.""" if not trl_string.strip(): return set() parsed_structure = self.parse(trl_string) return self._evaluate_group(parsed_structure) def telegram_engine( settings: dict[str, Any], log_callback: Callable[[str], None], get_status_callback: Callable[[], str], add_to_history_callback: Callable[[str], None] ): """Main telegramming loop. Runs inside a thread.""" log_callback(log_message("INFO", "Telegramming thread started.")) rate_limit = RATE_LIMITS.get(settings['tg_type'], 182) parser = TRLParser() while get_status_callback() != 'stopped': try: log_callback(log_message("INFO", "Evaluating TRL to get recipients...")) target_nations = list(parser.evaluate(settings['trl'])) log_callback(log_message("INFO", f"Found {len(target_nations)} potential recipients.")) if not target_nations: if settings['continuous']: log_callback(log_message("WARN", "No recipients found. Waiting 30s to re-evaluate...")) time.sleep(30) continue else: log_callback(log_message("INFO", "Campaign finished: No recipients found.")) break for nation in target_nations: # --- Graceful Pause/Stop Check --- while get_status_callback() == 'paused': log_callback(log_message("INFO", "Campaign is paused...")) time.sleep(5) if get_status_callback() == 'stopped': log_callback(log_message("INFO", "Stop signal received, halting.")) return # The history check is done by the main thread now to prevent race conditions # but we could add a check here if needed. log_callback(log_message("INFO", f"Preparing to telegram {nation}.")) if settings['dry_run']: log_callback(log_message("INFO", f"[DRY RUN] Would have sent telegram to {nation}.")) else: try: params = { "a": "sendTG", "client": settings['client_key'], "tgid": settings['tg_id'], "key": settings['secret_key'], "to": nation } response = ns_api_request(params) if "Queued" in response: log_callback(log_message("SUCCESS", f"Queued telegram to {nation}.")) else: log_callback(log_message("ERROR", f"Failed for {nation}. API Response: {response.strip()}")) except Exception as e: log_callback(log_message("ERROR", f"Exception while telegramming {nation}: {e}")) add_to_history_callback(nation) log_callback(log_message("INFO", f"Waiting {rate_limit} seconds...")) time.sleep(rate_limit) if not settings['continuous']: log_callback(log_message("INFO", "Finished recipient list. Continuous mode is OFF.")) break else: log_callback(log_message("INFO", "Finished list. Continuous mode is ON. Re-evaluating in 30s...")) time.sleep(30) except Exception as e: log_callback(log_message("CRIT", f"A critical error occurred: {e}")) if settings['continuous']: log_callback(log_message("CRIT", "Waiting 60 seconds before retrying...")) time.sleep(60) else: log_callback(log_message("CRIT", "Aborting due to critical error.")) break log_callback(log_message("INFO", "Telegramming thread has finished."))