Spaces:
Running
Running
| # filename: encode_issues_components_and_sparse.py | |
| import os | |
| import re | |
| import json | |
| import numpy as np | |
| from FlagEmbedding import BGEM3FlagModel | |
| MODEL_PATH = '../../../../Downloads/bge-m3' | |
| OUTPUT_DIR = '../../' | |
| CACHE_DIR = './.issue_embeddings_cache' | |
| RE_EMBED_ALL = False | |
| CHANGED_FILES = ["1720 TO 1729.txt","1730 TO 1739.txt","1740 TO 1749.txt","1700 TO 1719.txt"] | |
| ISSUE_SPLIT_MARKER = "[hr][/hr]" | |
| BB_TAG_RE = re.compile(r'\[(?:\/)?[^\]]+\]') # strips BBCode tags | |
| def strip_bbcode(s: str) -> str: | |
| # Stripping BBCode ensures robust header and description detection | |
| return BB_TAG_RE.sub('', s) | |
| def get_issue_files(directory="."): | |
| issue_files = [] | |
| file_pattern = re.compile(r'(\d+) TO (\d+)\.txt') | |
| if not os.path.isdir(directory): | |
| print(f"Error: Directory '{directory}' not found.") | |
| return [] | |
| for filename in os.listdir(directory): | |
| if filename.endswith('.txt'): | |
| match = file_pattern.match(filename) | |
| if match: | |
| start_num = int(match.group(1)) | |
| issue_files.append((start_num, filename)) | |
| issue_files.sort(key=lambda x: x[0]) | |
| return [os.path.join(directory, filename) for _, filename in issue_files] | |
| def ensure_dirs(dirs): | |
| for d in dirs: | |
| os.makedirs(d, exist_ok=True) | |
| def _split_raw_issues(raw_text): | |
| return [issue.strip() for issue in raw_text.split(ISSUE_SPLIT_MARKER) if issue.strip()] | |
| def _extract_title(issue_block): | |
| for line in issue_block.splitlines(): | |
| line = line.strip() | |
| if line: | |
| return line | |
| return "Untitled Issue" | |
| def find_header_index(header: str, lines): | |
| # Strips BBCode and whitespace, compares case-insensitively | |
| header_lower = header.lower() | |
| for idx, line in enumerate(lines): | |
| line_clean = strip_bbcode(line).strip().lower() | |
| if line_clean == header_lower: | |
| return idx | |
| return -1 | |
| def is_placeholder_issue(issue_block): | |
| # Skips issues that are just a title line with 'TBD' and no content | |
| lines = [line.strip() for line in issue_block.splitlines() if line.strip()] | |
| if len(lines) == 1 and 'TBD' in lines[0]: | |
| return True | |
| # Also skip if all non-empty lines are BBCode or anchor/title lines and contain 'TBD' | |
| non_title_lines = [ | |
| l for l in lines | |
| if not (l.startswith('[b][anchor=') and 'TBD' in l) | |
| ] | |
| if not non_title_lines and any('TBD' in l for l in lines): | |
| return True | |
| return False | |
| def _parse_issue_strict(issue_block: str, global_issue_index: int): | |
| lines = issue_block.splitlines() | |
| i_issue = find_header_index("The Issue", lines) | |
| i_debate = find_header_index("The Debate", lines) | |
| if i_issue == -1 or i_debate == -1 or i_debate <= i_issue: | |
| print(f"Parse error: missing 'The Issue' or 'The Debate' in issue #{global_issue_index}") | |
| raise ValueError(f"Parse error in issue #{global_issue_index}") | |
| between = lines[i_issue + 1:i_debate] | |
| cleaned = [strip_bbcode(l).strip() for l in between] | |
| non_empty_idx = [k for k, c in enumerate(cleaned) if c] | |
| if len(non_empty_idx) >= 1: | |
| desc_text = cleaned[non_empty_idx[0]] | |
| desc_text = re.sub(r"\[\[color=.*?].*\[\/color\]\]",'', desc_text).strip() | |
| elif len(non_empty_idx) == 0: | |
| first_raw = None | |
| for l in between: | |
| if l.strip(): | |
| first_raw = l | |
| break | |
| if not first_raw: | |
| print(f"Parse error: issue #{global_issue_index} has no usable description lines") | |
| raise ValueError(f"Parse error in issue #{global_issue_index}") | |
| desc_text = strip_bbcode(first_raw).strip() | |
| else: | |
| offending = [between[k] for k in non_empty_idx] | |
| print(f"Parse error: issue #{global_issue_index} has {len(non_empty_idx)} non-empty description lines (expected 1)") | |
| print(f"Description lines (raw): {offending}") | |
| raise ValueError(f"Parse error in issue #{global_issue_index}") | |
| after_debate = [l.strip() for l in lines[i_debate + 1:] if l.strip()] | |
| option_lines = after_debate | |
| return desc_text, option_lines | |
| BASE = "https://forum.nationstates.net/viewtopic.php?f=13&t=88" | |
| def compute_start_from_anchor(anchor: int) -> int: | |
| """ | |
| Returns the 'start' offset for the forum URL given an integer anchor (issue number). | |
| start increases by 25 every 500 anchors, beginning at 420. | |
| """ | |
| # k is 0 for [0..419], 1 for [420..919], 2 for [920..1419], 3 for [1420..], etc. | |
| anchor = int(anchor) | |
| k = int(((anchor - 420) / 500) + 1) | |
| if anchor < 420: | |
| k = 0 | |
| if k < 0: | |
| k = 0 | |
| return 25 * k | |
| def craft_issue_url(anchor: int) -> str: | |
| start = compute_start_from_anchor(anchor) | |
| if start == 0: | |
| return f"{BASE}#{anchor}" | |
| return f"{BASE}&start={start}#{anchor}" | |
| ANCHOR_RE = re.compile(r"\[anchor=(\d+)\]") | |
| def extract_anchor(issue_title_line: str): | |
| """ | |
| From a title like: | |
| [b][anchor=1379]#1379[/anchor]: [color=#CE532A][i]MADness:[/i][/color] A View to a Thrill ... | |
| returns 1379 as int, or None if not found. | |
| """ | |
| m = ANCHOR_RE.search(issue_title_line) | |
| return m.group(1) if m else None | |
| def format_issue_title_markdown(issue_block: str) -> tuple[str, str]: | |
| """ | |
| Returns (display_markdown, url) such as: | |
| "#1379: [MADness: A View to a Thrill](...#1379)" | |
| Keeps chain/fancy formatting in the visible title (BBCode stripped), | |
| and builds the correct paginated URL using the anchor. | |
| """ | |
| # First non-empty line should be the title line | |
| title_line = next((ln.strip() for ln in issue_block.splitlines() if ln.strip()), "") | |
| anchor = extract_anchor(title_line) | |
| # Extract visible title to the right of '[/anchor]:' | |
| # Example matches "...[/anchor]: <title text>" | |
| title_part = title_line.split('[/anchor]:', 1)[-1].strip() if '[/anchor]:' in title_line else title_line | |
| # Strip BBCode for display text while preserving the chain wording itself | |
| title_text = re.sub(r"\[/?[^\]]+\]", "", title_part).strip() | |
| if anchor is None: | |
| # Fallback: no anchor found; return plain title | |
| return (title_text or "Untitled Issue", f"{BASE}") | |
| url = craft_issue_url(anchor) | |
| display = f"#{anchor}: [{title_text}]({url})" | |
| return display | |
| def encode_issues_components_and_sparse(): | |
| print("Initializing BGEM3FlagModel...") | |
| try: | |
| model = BGEM3FlagModel(MODEL_PATH, use_fp16=True) | |
| print("Model loaded.") | |
| except Exception as e: | |
| print(f"Error loading model from {MODEL_PATH}: {e}") | |
| return | |
| issues_input_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), | |
| 'NationStates-Issue-Megathread/002 - Issue Megalist (MAIN)') | |
| issue_files = get_issue_files(issues_input_dir) | |
| if not issue_files: | |
| print(f"No issue files found in '{issues_input_dir}'.") | |
| return | |
| cache_dense_dir = os.path.join(CACHE_DIR, 'dense_components') | |
| cache_sparse_dir = os.path.join(CACHE_DIR, 'sparse_issues') | |
| ensure_dirs([cache_dense_dir, cache_sparse_dir]) | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # --- Component-level dense (semantic) --- | |
| perfile_component_texts = [] | |
| perfile_component_meta = [] | |
| all_issue_titles = [] | |
| global_issue_index_offset = 0 | |
| # --- Issue-level sparse (loose) --- | |
| perfile_issue_texts = [] | |
| titles_dict = {} | |
| print(f"Parsing and preparing issue blocks from {len(issue_files)} files...") | |
| for i, filepath in enumerate(issue_files): | |
| filename = os.path.basename(filepath) | |
| print(f" [{i+1}/{len(issue_files)}] Parsing file: {filename}") | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| raw = f.read() | |
| issue_blocks = _split_raw_issues(raw) | |
| file_components_texts = [] | |
| file_components_meta = [] | |
| file_issue_texts = [] | |
| file_issue_titles = [] | |
| for local_issue_idx, issue_block in enumerate(issue_blocks): | |
| if is_placeholder_issue(issue_block): | |
| continue # Skip placeholder/empty issues | |
| title_line = _extract_title(issue_block) | |
| this_issue_global_idx = global_issue_index_offset + local_issue_idx | |
| titles_dict[str(this_issue_global_idx)] = format_issue_title_markdown(issue_block) | |
| try: | |
| desc_text, option_texts = _parse_issue_strict(issue_block, this_issue_global_idx) | |
| except Exception as e: | |
| print(f"Aborting due to parse error in issue #{this_issue_global_idx}") | |
| raise | |
| # Dense: description and options as separate components | |
| file_components_texts.append(desc_text) | |
| file_components_meta.append({ | |
| "issue_index": this_issue_global_idx, | |
| "component_type": "desc", | |
| "option_index": None | |
| }) | |
| for opt_idx, opt_text in enumerate(option_texts, start=1): | |
| file_components_texts.append(opt_text) | |
| file_components_meta.append({ | |
| "issue_index": this_issue_global_idx, | |
| "component_type": "option", | |
| "option_index": opt_idx | |
| }) | |
| # Sparse: whole issue block (not chunked) | |
| file_issue_texts.append(issue_block) | |
| file_issue_titles.append(title_line) | |
| perfile_component_texts.append(file_components_texts) | |
| perfile_component_meta.append(file_components_meta) | |
| perfile_issue_texts.append(file_issue_texts) | |
| global_issue_index_offset += len(issue_blocks) | |
| # --- Dense embedding for components --- | |
| print("\nStarting dense (semantic) embedding for components...") | |
| all_dense_chunks = [] | |
| all_meta = [] | |
| for i, filepath in enumerate(issue_files): | |
| filename = os.path.basename(filepath) | |
| base_name = os.path.splitext(filename)[0] | |
| file_cache_dense_path = os.path.join(cache_dense_dir, f"{base_name}.npy") | |
| texts = perfile_component_texts[i] | |
| metas = perfile_component_meta[i] | |
| if not texts: | |
| print(f" [Dense] Skipping file {filename} (no components to embed).") | |
| continue | |
| is_cached = os.path.exists(file_cache_dense_path) | |
| if not RE_EMBED_ALL and filename not in CHANGED_FILES and is_cached: | |
| print(f" [Dense] Loading cached embeddings for {filename} ({len(texts)} components).") | |
| dense_vecs = np.load(file_cache_dense_path) | |
| else: | |
| print(f" [Dense] Embedding {len(texts)} components from {filename}...") | |
| embeddings = model.encode( | |
| texts, | |
| batch_size=12, | |
| max_length=8192, | |
| return_dense=True, | |
| return_sparse=False, # Only dense for components | |
| return_colbert_vecs=False | |
| ) | |
| dense_vecs = embeddings['dense_vecs'] | |
| np.save(file_cache_dense_path, dense_vecs) | |
| print(f" [Dense] Saved cache for {filename} ({dense_vecs.shape[0]} components).") | |
| all_dense_chunks.append(dense_vecs) | |
| all_meta.extend(metas) | |
| if not all_dense_chunks: | |
| print("No component embeddings produced.") | |
| return | |
| final_dense = np.vstack(all_dense_chunks) | |
| dense_out = os.path.join(OUTPUT_DIR, 'ns_issue_components_semantic_bge-m3.npy') | |
| meta_out = os.path.join(OUTPUT_DIR, 'ns_issue_components_meta.json') | |
| titles_out = os.path.join(OUTPUT_DIR, 'issue_titles_components.json') | |
| np.save(dense_out, final_dense) | |
| with open(meta_out, 'w', encoding='utf-8') as f: | |
| json.dump(all_meta, f, ensure_ascii=False) | |
| with open(titles_out, 'w', encoding='utf-8') as f: | |
| # Only titles for non-placeholder issues | |
| json.dump(titles_dict, f, ensure_ascii=False) | |
| print(f"\nDense embedding complete. Saved:") | |
| print(f" Dense: {dense_out} shape={final_dense.shape}") | |
| print(f" Meta: {meta_out} items={len(all_meta)}") | |
| print(f" Titles: {titles_out} issues={len(titles_dict)}") | |
| # --- Sparse embedding for whole issues, cached per file --- | |
| print("\nStarting sparse (loose) embedding for whole issues (per file)...") | |
| sparse_out = os.path.join(OUTPUT_DIR, 'ns_issues_loose_bge-m3.npy') | |
| titles_sparse_out = os.path.join(OUTPUT_DIR, 'issue_titles.json') | |
| all_sparse_chunks = [] | |
| for i, filepath in enumerate(issue_files): | |
| filename = os.path.basename(filepath) | |
| base_name = os.path.splitext(filename)[0] | |
| file_cache_sparse_path = os.path.join(cache_sparse_dir, f"{base_name}.npy") | |
| issue_texts = perfile_issue_texts[i] | |
| if not issue_texts: | |
| print(f" [Sparse] Skipping file {filename} (no issues to embed).") | |
| continue | |
| is_cached = os.path.exists(file_cache_sparse_path) | |
| if not RE_EMBED_ALL and filename not in CHANGED_FILES and is_cached: | |
| print(f" [Sparse] Loading cached sparse embeddings for {filename} ({len(issue_texts)} issues).") | |
| sparse_dicts = np.load(file_cache_sparse_path, allow_pickle=True).tolist() | |
| else: | |
| print(f" [Sparse] Embedding {len(issue_texts)} issues from {filename}...") | |
| embeddings = model.encode( | |
| issue_texts, | |
| batch_size=12, | |
| max_length=8192, | |
| return_dense=False, | |
| return_sparse=True, | |
| return_colbert_vecs=False | |
| ) | |
| sparse_dicts = embeddings['lexical_weights'] | |
| np.save(file_cache_sparse_path, np.array(sparse_dicts, dtype=object), allow_pickle=True) | |
| print(f" [Sparse] Saved cache for {filename} ({len(sparse_dicts)} issues).") | |
| all_sparse_chunks.extend(sparse_dicts) | |
| np.save(sparse_out, np.array(all_sparse_chunks, dtype=object), allow_pickle=True) | |
| # Flatten all titles for sparse | |
| with open(titles_sparse_out, 'w', encoding='utf-8') as f: | |
| json.dump(titles_dict, f, ensure_ascii=False) | |
| print(f"\nSparse embedding complete. Saved:") | |
| print(f" Sparse: {sparse_out} count={len(all_sparse_chunks)}") | |
| print(f" Titles (sparse): {titles_sparse_out} issues={len(titles_dict)}") | |
| print("Embedding generation (components dense, issues sparse, strict) complete!") | |
| if __name__ == "__main__": | |
| encode_issues_components_and_sparse() | |