# filename: encode_issues_components_and_sparse.py import os import re import json import numpy as np from FlagEmbedding import BGEM3FlagModel MODEL_PATH = '../../../../Downloads/bge-m3' OUTPUT_DIR = '../../' CACHE_DIR = './.issue_embeddings_cache' RE_EMBED_ALL = False CHANGED_FILES = ["1720 TO 1729.txt","1730 TO 1739.txt","1740 TO 1749.txt","1700 TO 1719.txt"] ISSUE_SPLIT_MARKER = "[hr][/hr]" BB_TAG_RE = re.compile(r'\[(?:\/)?[^\]]+\]') # strips BBCode tags def strip_bbcode(s: str) -> str: # Stripping BBCode ensures robust header and description detection return BB_TAG_RE.sub('', s) def get_issue_files(directory="."): issue_files = [] file_pattern = re.compile(r'(\d+) TO (\d+)\.txt') if not os.path.isdir(directory): print(f"Error: Directory '{directory}' not found.") return [] for filename in os.listdir(directory): if filename.endswith('.txt'): match = file_pattern.match(filename) if match: start_num = int(match.group(1)) issue_files.append((start_num, filename)) issue_files.sort(key=lambda x: x[0]) return [os.path.join(directory, filename) for _, filename in issue_files] def ensure_dirs(dirs): for d in dirs: os.makedirs(d, exist_ok=True) def _split_raw_issues(raw_text): return [issue.strip() for issue in raw_text.split(ISSUE_SPLIT_MARKER) if issue.strip()] def _extract_title(issue_block): for line in issue_block.splitlines(): line = line.strip() if line: return line return "Untitled Issue" def find_header_index(header: str, lines): # Strips BBCode and whitespace, compares case-insensitively header_lower = header.lower() for idx, line in enumerate(lines): line_clean = strip_bbcode(line).strip().lower() if line_clean == header_lower: return idx return -1 def is_placeholder_issue(issue_block): # Skips issues that are just a title line with 'TBD' and no content lines = [line.strip() for line in issue_block.splitlines() if line.strip()] if len(lines) == 1 and 'TBD' in lines[0]: return True # Also skip if all non-empty lines are BBCode or anchor/title lines and contain 'TBD' non_title_lines = [ l for l in lines if not (l.startswith('[b][anchor=') and 'TBD' in l) ] if not non_title_lines and any('TBD' in l for l in lines): return True return False def _parse_issue_strict(issue_block: str, global_issue_index: int): lines = issue_block.splitlines() i_issue = find_header_index("The Issue", lines) i_debate = find_header_index("The Debate", lines) if i_issue == -1 or i_debate == -1 or i_debate <= i_issue: print(f"Parse error: missing 'The Issue' or 'The Debate' in issue #{global_issue_index}") raise ValueError(f"Parse error in issue #{global_issue_index}") between = lines[i_issue + 1:i_debate] cleaned = [strip_bbcode(l).strip() for l in between] non_empty_idx = [k for k, c in enumerate(cleaned) if c] if len(non_empty_idx) >= 1: desc_text = cleaned[non_empty_idx[0]] desc_text = re.sub(r"\[\[color=.*?].*\[\/color\]\]",'', desc_text).strip() elif len(non_empty_idx) == 0: first_raw = None for l in between: if l.strip(): first_raw = l break if not first_raw: print(f"Parse error: issue #{global_issue_index} has no usable description lines") raise ValueError(f"Parse error in issue #{global_issue_index}") desc_text = strip_bbcode(first_raw).strip() else: offending = [between[k] for k in non_empty_idx] print(f"Parse error: issue #{global_issue_index} has {len(non_empty_idx)} non-empty description lines (expected 1)") print(f"Description lines (raw): {offending}") raise ValueError(f"Parse error in issue #{global_issue_index}") after_debate = [l.strip() for l in lines[i_debate + 1:] if l.strip()] option_lines = after_debate return desc_text, option_lines BASE = "https://forum.nationstates.net/viewtopic.php?f=13&t=88" def compute_start_from_anchor(anchor: int) -> int: """ Returns the 'start' offset for the forum URL given an integer anchor (issue number). start increases by 25 every 500 anchors, beginning at 420. """ # k is 0 for [0..419], 1 for [420..919], 2 for [920..1419], 3 for [1420..], etc. anchor = int(anchor) k = int(((anchor - 420) / 500) + 1) if anchor < 420: k = 0 if k < 0: k = 0 return 25 * k def craft_issue_url(anchor: int) -> str: start = compute_start_from_anchor(anchor) if start == 0: return f"{BASE}#{anchor}" return f"{BASE}&start={start}#{anchor}" ANCHOR_RE = re.compile(r"\[anchor=(\d+)\]") def extract_anchor(issue_title_line: str): """ From a title like: [b][anchor=1379]#1379[/anchor]: [color=#CE532A][i]MADness:[/i][/color] A View to a Thrill ... returns 1379 as int, or None if not found. """ m = ANCHOR_RE.search(issue_title_line) return m.group(1) if m else None def format_issue_title_markdown(issue_block: str) -> tuple[str, str]: """ Returns (display_markdown, url) such as: "#1379: [MADness: A View to a Thrill](...#1379)" Keeps chain/fancy formatting in the visible title (BBCode stripped), and builds the correct paginated URL using the anchor. """ # First non-empty line should be the title line title_line = next((ln.strip() for ln in issue_block.splitlines() if ln.strip()), "") anchor = extract_anchor(title_line) # Extract visible title to the right of '[/anchor]:' # Example matches "...[/anchor]: " title_part = title_line.split('[/anchor]:', 1)[-1].strip() if '[/anchor]:' in title_line else title_line # Strip BBCode for display text while preserving the chain wording itself title_text = re.sub(r"\[/?[^\]]+\]", "", title_part).strip() if anchor is None: # Fallback: no anchor found; return plain title return (title_text or "Untitled Issue", f"{BASE}") url = craft_issue_url(anchor) display = f"#{anchor}: [{title_text}]({url})" return display def encode_issues_components_and_sparse(): print("Initializing BGEM3FlagModel...") try: model = BGEM3FlagModel(MODEL_PATH, use_fp16=True) print("Model loaded.") except Exception as e: print(f"Error loading model from {MODEL_PATH}: {e}") return issues_input_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'NationStates-Issue-Megathread/002 - Issue Megalist (MAIN)') issue_files = get_issue_files(issues_input_dir) if not issue_files: print(f"No issue files found in '{issues_input_dir}'.") return cache_dense_dir = os.path.join(CACHE_DIR, 'dense_components') cache_sparse_dir = os.path.join(CACHE_DIR, 'sparse_issues') ensure_dirs([cache_dense_dir, cache_sparse_dir]) os.makedirs(OUTPUT_DIR, exist_ok=True) # --- Component-level dense (semantic) --- perfile_component_texts = [] perfile_component_meta = [] all_issue_titles = [] global_issue_index_offset = 0 # --- Issue-level sparse (loose) --- perfile_issue_texts = [] titles_dict = {} print(f"Parsing and preparing issue blocks from {len(issue_files)} files...") for i, filepath in enumerate(issue_files): filename = os.path.basename(filepath) print(f" [{i+1}/{len(issue_files)}] Parsing file: {filename}") with open(filepath, 'r', encoding='utf-8') as f: raw = f.read() issue_blocks = _split_raw_issues(raw) file_components_texts = [] file_components_meta = [] file_issue_texts = [] file_issue_titles = [] for local_issue_idx, issue_block in enumerate(issue_blocks): if is_placeholder_issue(issue_block): continue # Skip placeholder/empty issues title_line = _extract_title(issue_block) this_issue_global_idx = global_issue_index_offset + local_issue_idx titles_dict[str(this_issue_global_idx)] = format_issue_title_markdown(issue_block) try: desc_text, option_texts = _parse_issue_strict(issue_block, this_issue_global_idx) except Exception as e: print(f"Aborting due to parse error in issue #{this_issue_global_idx}") raise # Dense: description and options as separate components file_components_texts.append(desc_text) file_components_meta.append({ "issue_index": this_issue_global_idx, "component_type": "desc", "option_index": None }) for opt_idx, opt_text in enumerate(option_texts, start=1): file_components_texts.append(opt_text) file_components_meta.append({ "issue_index": this_issue_global_idx, "component_type": "option", "option_index": opt_idx }) # Sparse: whole issue block (not chunked) file_issue_texts.append(issue_block) file_issue_titles.append(title_line) perfile_component_texts.append(file_components_texts) perfile_component_meta.append(file_components_meta) perfile_issue_texts.append(file_issue_texts) global_issue_index_offset += len(issue_blocks) # --- Dense embedding for components --- print("\nStarting dense (semantic) embedding for components...") all_dense_chunks = [] all_meta = [] for i, filepath in enumerate(issue_files): filename = os.path.basename(filepath) base_name = os.path.splitext(filename)[0] file_cache_dense_path = os.path.join(cache_dense_dir, f"{base_name}.npy") texts = perfile_component_texts[i] metas = perfile_component_meta[i] if not texts: print(f" [Dense] Skipping file {filename} (no components to embed).") continue is_cached = os.path.exists(file_cache_dense_path) if not RE_EMBED_ALL and filename not in CHANGED_FILES and is_cached: print(f" [Dense] Loading cached embeddings for {filename} ({len(texts)} components).") dense_vecs = np.load(file_cache_dense_path) else: print(f" [Dense] Embedding {len(texts)} components from {filename}...") embeddings = model.encode( texts, batch_size=12, max_length=8192, return_dense=True, return_sparse=False, # Only dense for components return_colbert_vecs=False ) dense_vecs = embeddings['dense_vecs'] np.save(file_cache_dense_path, dense_vecs) print(f" [Dense] Saved cache for {filename} ({dense_vecs.shape[0]} components).") all_dense_chunks.append(dense_vecs) all_meta.extend(metas) if not all_dense_chunks: print("No component embeddings produced.") return final_dense = np.vstack(all_dense_chunks) dense_out = os.path.join(OUTPUT_DIR, 'ns_issue_components_semantic_bge-m3.npy') meta_out = os.path.join(OUTPUT_DIR, 'ns_issue_components_meta.json') titles_out = os.path.join(OUTPUT_DIR, 'issue_titles_components.json') np.save(dense_out, final_dense) with open(meta_out, 'w', encoding='utf-8') as f: json.dump(all_meta, f, ensure_ascii=False) with open(titles_out, 'w', encoding='utf-8') as f: # Only titles for non-placeholder issues json.dump(titles_dict, f, ensure_ascii=False) print(f"\nDense embedding complete. Saved:") print(f" Dense: {dense_out} shape={final_dense.shape}") print(f" Meta: {meta_out} items={len(all_meta)}") print(f" Titles: {titles_out} issues={len(titles_dict)}") # --- Sparse embedding for whole issues, cached per file --- print("\nStarting sparse (loose) embedding for whole issues (per file)...") sparse_out = os.path.join(OUTPUT_DIR, 'ns_issues_loose_bge-m3.npy') titles_sparse_out = os.path.join(OUTPUT_DIR, 'issue_titles.json') all_sparse_chunks = [] for i, filepath in enumerate(issue_files): filename = os.path.basename(filepath) base_name = os.path.splitext(filename)[0] file_cache_sparse_path = os.path.join(cache_sparse_dir, f"{base_name}.npy") issue_texts = perfile_issue_texts[i] if not issue_texts: print(f" [Sparse] Skipping file {filename} (no issues to embed).") continue is_cached = os.path.exists(file_cache_sparse_path) if not RE_EMBED_ALL and filename not in CHANGED_FILES and is_cached: print(f" [Sparse] Loading cached sparse embeddings for {filename} ({len(issue_texts)} issues).") sparse_dicts = np.load(file_cache_sparse_path, allow_pickle=True).tolist() else: print(f" [Sparse] Embedding {len(issue_texts)} issues from {filename}...") embeddings = model.encode( issue_texts, batch_size=12, max_length=8192, return_dense=False, return_sparse=True, return_colbert_vecs=False ) sparse_dicts = embeddings['lexical_weights'] np.save(file_cache_sparse_path, np.array(sparse_dicts, dtype=object), allow_pickle=True) print(f" [Sparse] Saved cache for {filename} ({len(sparse_dicts)} issues).") all_sparse_chunks.extend(sparse_dicts) np.save(sparse_out, np.array(all_sparse_chunks, dtype=object), allow_pickle=True) # Flatten all titles for sparse with open(titles_sparse_out, 'w', encoding='utf-8') as f: json.dump(titles_dict, f, ensure_ascii=False) print(f"\nSparse embedding complete. Saved:") print(f" Sparse: {sparse_out} count={len(all_sparse_chunks)}") print(f" Titles (sparse): {titles_sparse_out} issues={len(titles_dict)}") print("Embedding generation (components dense, issues sparse, strict) complete!") if __name__ == "__main__": encode_issues_components_and_sparse()