Bohaska
Update issues DB and GA resolutions
fc61079
# filename: encode_issues_components_and_sparse.py
import os
import re
import json
import numpy as np
from FlagEmbedding import BGEM3FlagModel
MODEL_PATH = '../../../../Downloads/bge-m3'
OUTPUT_DIR = '../../'
CACHE_DIR = './.issue_embeddings_cache'
RE_EMBED_ALL = False
CHANGED_FILES = ["1720 TO 1729.txt","1730 TO 1739.txt","1740 TO 1749.txt","1700 TO 1719.txt"]
ISSUE_SPLIT_MARKER = "[hr][/hr]"
BB_TAG_RE = re.compile(r'\[(?:\/)?[^\]]+\]') # strips BBCode tags
def strip_bbcode(s: str) -> str:
# Stripping BBCode ensures robust header and description detection
return BB_TAG_RE.sub('', s)
def get_issue_files(directory="."):
issue_files = []
file_pattern = re.compile(r'(\d+) TO (\d+)\.txt')
if not os.path.isdir(directory):
print(f"Error: Directory '{directory}' not found.")
return []
for filename in os.listdir(directory):
if filename.endswith('.txt'):
match = file_pattern.match(filename)
if match:
start_num = int(match.group(1))
issue_files.append((start_num, filename))
issue_files.sort(key=lambda x: x[0])
return [os.path.join(directory, filename) for _, filename in issue_files]
def ensure_dirs(dirs):
for d in dirs:
os.makedirs(d, exist_ok=True)
def _split_raw_issues(raw_text):
return [issue.strip() for issue in raw_text.split(ISSUE_SPLIT_MARKER) if issue.strip()]
def _extract_title(issue_block):
for line in issue_block.splitlines():
line = line.strip()
if line:
return line
return "Untitled Issue"
def find_header_index(header: str, lines):
# Strips BBCode and whitespace, compares case-insensitively
header_lower = header.lower()
for idx, line in enumerate(lines):
line_clean = strip_bbcode(line).strip().lower()
if line_clean == header_lower:
return idx
return -1
def is_placeholder_issue(issue_block):
# Skips issues that are just a title line with 'TBD' and no content
lines = [line.strip() for line in issue_block.splitlines() if line.strip()]
if len(lines) == 1 and 'TBD' in lines[0]:
return True
# Also skip if all non-empty lines are BBCode or anchor/title lines and contain 'TBD'
non_title_lines = [
l for l in lines
if not (l.startswith('[b][anchor=') and 'TBD' in l)
]
if not non_title_lines and any('TBD' in l for l in lines):
return True
return False
def _parse_issue_strict(issue_block: str, global_issue_index: int):
lines = issue_block.splitlines()
i_issue = find_header_index("The Issue", lines)
i_debate = find_header_index("The Debate", lines)
if i_issue == -1 or i_debate == -1 or i_debate <= i_issue:
print(f"Parse error: missing 'The Issue' or 'The Debate' in issue #{global_issue_index}")
raise ValueError(f"Parse error in issue #{global_issue_index}")
between = lines[i_issue + 1:i_debate]
cleaned = [strip_bbcode(l).strip() for l in between]
non_empty_idx = [k for k, c in enumerate(cleaned) if c]
if len(non_empty_idx) >= 1:
desc_text = cleaned[non_empty_idx[0]]
desc_text = re.sub(r"\[\[color=.*?].*\[\/color\]\]",'', desc_text).strip()
elif len(non_empty_idx) == 0:
first_raw = None
for l in between:
if l.strip():
first_raw = l
break
if not first_raw:
print(f"Parse error: issue #{global_issue_index} has no usable description lines")
raise ValueError(f"Parse error in issue #{global_issue_index}")
desc_text = strip_bbcode(first_raw).strip()
else:
offending = [between[k] for k in non_empty_idx]
print(f"Parse error: issue #{global_issue_index} has {len(non_empty_idx)} non-empty description lines (expected 1)")
print(f"Description lines (raw): {offending}")
raise ValueError(f"Parse error in issue #{global_issue_index}")
after_debate = [l.strip() for l in lines[i_debate + 1:] if l.strip()]
option_lines = after_debate
return desc_text, option_lines
BASE = "https://forum.nationstates.net/viewtopic.php?f=13&t=88"
def compute_start_from_anchor(anchor: int) -> int:
"""
Returns the 'start' offset for the forum URL given an integer anchor (issue number).
start increases by 25 every 500 anchors, beginning at 420.
"""
# k is 0 for [0..419], 1 for [420..919], 2 for [920..1419], 3 for [1420..], etc.
anchor = int(anchor)
k = int(((anchor - 420) / 500) + 1)
if anchor < 420:
k = 0
if k < 0:
k = 0
return 25 * k
def craft_issue_url(anchor: int) -> str:
start = compute_start_from_anchor(anchor)
if start == 0:
return f"{BASE}#{anchor}"
return f"{BASE}&start={start}#{anchor}"
ANCHOR_RE = re.compile(r"\[anchor=(\d+)\]")
def extract_anchor(issue_title_line: str):
"""
From a title like:
[b][anchor=1379]#1379[/anchor]: [color=#CE532A][i]MADness:[/i][/color] A View to a Thrill ...
returns 1379 as int, or None if not found.
"""
m = ANCHOR_RE.search(issue_title_line)
return m.group(1) if m else None
def format_issue_title_markdown(issue_block: str) -> tuple[str, str]:
"""
Returns (display_markdown, url) such as:
"#1379: [MADness: A View to a Thrill](...#1379)"
Keeps chain/fancy formatting in the visible title (BBCode stripped),
and builds the correct paginated URL using the anchor.
"""
# First non-empty line should be the title line
title_line = next((ln.strip() for ln in issue_block.splitlines() if ln.strip()), "")
anchor = extract_anchor(title_line)
# Extract visible title to the right of '[/anchor]:'
# Example matches "...[/anchor]: <title text>"
title_part = title_line.split('[/anchor]:', 1)[-1].strip() if '[/anchor]:' in title_line else title_line
# Strip BBCode for display text while preserving the chain wording itself
title_text = re.sub(r"\[/?[^\]]+\]", "", title_part).strip()
if anchor is None:
# Fallback: no anchor found; return plain title
return (title_text or "Untitled Issue", f"{BASE}")
url = craft_issue_url(anchor)
display = f"#{anchor}: [{title_text}]({url})"
return display
def encode_issues_components_and_sparse():
print("Initializing BGEM3FlagModel...")
try:
model = BGEM3FlagModel(MODEL_PATH, use_fp16=True)
print("Model loaded.")
except Exception as e:
print(f"Error loading model from {MODEL_PATH}: {e}")
return
issues_input_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'NationStates-Issue-Megathread/002 - Issue Megalist (MAIN)')
issue_files = get_issue_files(issues_input_dir)
if not issue_files:
print(f"No issue files found in '{issues_input_dir}'.")
return
cache_dense_dir = os.path.join(CACHE_DIR, 'dense_components')
cache_sparse_dir = os.path.join(CACHE_DIR, 'sparse_issues')
ensure_dirs([cache_dense_dir, cache_sparse_dir])
os.makedirs(OUTPUT_DIR, exist_ok=True)
# --- Component-level dense (semantic) ---
perfile_component_texts = []
perfile_component_meta = []
all_issue_titles = []
global_issue_index_offset = 0
# --- Issue-level sparse (loose) ---
perfile_issue_texts = []
titles_dict = {}
print(f"Parsing and preparing issue blocks from {len(issue_files)} files...")
for i, filepath in enumerate(issue_files):
filename = os.path.basename(filepath)
print(f" [{i+1}/{len(issue_files)}] Parsing file: {filename}")
with open(filepath, 'r', encoding='utf-8') as f:
raw = f.read()
issue_blocks = _split_raw_issues(raw)
file_components_texts = []
file_components_meta = []
file_issue_texts = []
file_issue_titles = []
for local_issue_idx, issue_block in enumerate(issue_blocks):
if is_placeholder_issue(issue_block):
continue # Skip placeholder/empty issues
title_line = _extract_title(issue_block)
this_issue_global_idx = global_issue_index_offset + local_issue_idx
titles_dict[str(this_issue_global_idx)] = format_issue_title_markdown(issue_block)
try:
desc_text, option_texts = _parse_issue_strict(issue_block, this_issue_global_idx)
except Exception as e:
print(f"Aborting due to parse error in issue #{this_issue_global_idx}")
raise
# Dense: description and options as separate components
file_components_texts.append(desc_text)
file_components_meta.append({
"issue_index": this_issue_global_idx,
"component_type": "desc",
"option_index": None
})
for opt_idx, opt_text in enumerate(option_texts, start=1):
file_components_texts.append(opt_text)
file_components_meta.append({
"issue_index": this_issue_global_idx,
"component_type": "option",
"option_index": opt_idx
})
# Sparse: whole issue block (not chunked)
file_issue_texts.append(issue_block)
file_issue_titles.append(title_line)
perfile_component_texts.append(file_components_texts)
perfile_component_meta.append(file_components_meta)
perfile_issue_texts.append(file_issue_texts)
global_issue_index_offset += len(issue_blocks)
# --- Dense embedding for components ---
print("\nStarting dense (semantic) embedding for components...")
all_dense_chunks = []
all_meta = []
for i, filepath in enumerate(issue_files):
filename = os.path.basename(filepath)
base_name = os.path.splitext(filename)[0]
file_cache_dense_path = os.path.join(cache_dense_dir, f"{base_name}.npy")
texts = perfile_component_texts[i]
metas = perfile_component_meta[i]
if not texts:
print(f" [Dense] Skipping file {filename} (no components to embed).")
continue
is_cached = os.path.exists(file_cache_dense_path)
if not RE_EMBED_ALL and filename not in CHANGED_FILES and is_cached:
print(f" [Dense] Loading cached embeddings for {filename} ({len(texts)} components).")
dense_vecs = np.load(file_cache_dense_path)
else:
print(f" [Dense] Embedding {len(texts)} components from {filename}...")
embeddings = model.encode(
texts,
batch_size=12,
max_length=8192,
return_dense=True,
return_sparse=False, # Only dense for components
return_colbert_vecs=False
)
dense_vecs = embeddings['dense_vecs']
np.save(file_cache_dense_path, dense_vecs)
print(f" [Dense] Saved cache for {filename} ({dense_vecs.shape[0]} components).")
all_dense_chunks.append(dense_vecs)
all_meta.extend(metas)
if not all_dense_chunks:
print("No component embeddings produced.")
return
final_dense = np.vstack(all_dense_chunks)
dense_out = os.path.join(OUTPUT_DIR, 'ns_issue_components_semantic_bge-m3.npy')
meta_out = os.path.join(OUTPUT_DIR, 'ns_issue_components_meta.json')
titles_out = os.path.join(OUTPUT_DIR, 'issue_titles_components.json')
np.save(dense_out, final_dense)
with open(meta_out, 'w', encoding='utf-8') as f:
json.dump(all_meta, f, ensure_ascii=False)
with open(titles_out, 'w', encoding='utf-8') as f:
# Only titles for non-placeholder issues
json.dump(titles_dict, f, ensure_ascii=False)
print(f"\nDense embedding complete. Saved:")
print(f" Dense: {dense_out} shape={final_dense.shape}")
print(f" Meta: {meta_out} items={len(all_meta)}")
print(f" Titles: {titles_out} issues={len(titles_dict)}")
# --- Sparse embedding for whole issues, cached per file ---
print("\nStarting sparse (loose) embedding for whole issues (per file)...")
sparse_out = os.path.join(OUTPUT_DIR, 'ns_issues_loose_bge-m3.npy')
titles_sparse_out = os.path.join(OUTPUT_DIR, 'issue_titles.json')
all_sparse_chunks = []
for i, filepath in enumerate(issue_files):
filename = os.path.basename(filepath)
base_name = os.path.splitext(filename)[0]
file_cache_sparse_path = os.path.join(cache_sparse_dir, f"{base_name}.npy")
issue_texts = perfile_issue_texts[i]
if not issue_texts:
print(f" [Sparse] Skipping file {filename} (no issues to embed).")
continue
is_cached = os.path.exists(file_cache_sparse_path)
if not RE_EMBED_ALL and filename not in CHANGED_FILES and is_cached:
print(f" [Sparse] Loading cached sparse embeddings for {filename} ({len(issue_texts)} issues).")
sparse_dicts = np.load(file_cache_sparse_path, allow_pickle=True).tolist()
else:
print(f" [Sparse] Embedding {len(issue_texts)} issues from {filename}...")
embeddings = model.encode(
issue_texts,
batch_size=12,
max_length=8192,
return_dense=False,
return_sparse=True,
return_colbert_vecs=False
)
sparse_dicts = embeddings['lexical_weights']
np.save(file_cache_sparse_path, np.array(sparse_dicts, dtype=object), allow_pickle=True)
print(f" [Sparse] Saved cache for {filename} ({len(sparse_dicts)} issues).")
all_sparse_chunks.extend(sparse_dicts)
np.save(sparse_out, np.array(all_sparse_chunks, dtype=object), allow_pickle=True)
# Flatten all titles for sparse
with open(titles_sparse_out, 'w', encoding='utf-8') as f:
json.dump(titles_dict, f, ensure_ascii=False)
print(f"\nSparse embedding complete. Saved:")
print(f" Sparse: {sparse_out} count={len(all_sparse_chunks)}")
print(f" Titles (sparse): {titles_sparse_out} issues={len(titles_dict)}")
print("Embedding generation (components dense, issues sparse, strict) complete!")
if __name__ == "__main__":
encode_issues_components_and_sparse()