| """ |
| Synthetic Query Generator |
| - 双模型(Generator + Judge) |
| - 多轮对话生成 |
| - LLM 评分(1–10) |
| - LLM 改写(rewrite) |
| - embedding 去重(bge-m3) |
| - 语言一致性过滤 |
| - 条文痕迹过滤 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import os |
| import re |
| import json |
| import time |
| import random |
| import logging |
| import argparse |
| from pathlib import Path |
| from typing import List, Dict, Any, Optional |
|
|
| import pandas as pd |
| from tqdm import tqdm |
| import torch |
|
|
| import numpy as np |
| from sentence_transformers import SentenceTransformer |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| from legalrag.config import AppConfig |
| from legalrag.llm.client import LLMClient |
|
|
|
|
| def set_seed(seed: int = 42) -> None: |
| random.seed(seed) |
| np.random.seed(seed) |
|
|
|
|
| def setup_logger() -> logging.Logger: |
| logging.basicConfig( |
| level=logging.INFO, |
| format="[%(asctime)s] %(levelname)s - %(message)s", |
| datefmt="%Y-%m-%d %H:%M:%S", |
| ) |
| return logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| ARTICLE_PATTERN = re.compile( |
| r"(第[一二三四五六七八九十百千零〇两0-9]+条)|" |
| r"(本法第[一二三四五六七八九十百千零〇两0-9]+条[第款项]*)|" |
| r"(依据本法规定)|" |
| r"(根据本法规定)|" |
| r"(依照本法规定)", |
| re.UNICODE, |
| ) |
|
|
| FULLWIDTH_SPACE = "\u3000" |
|
|
|
|
| def strip_citation_markers(text: str) -> str: |
| """去掉条文编号、‘本法规定’等引用痕迹。""" |
| if not text: |
| return "" |
| text = text.replace(FULLWIDTH_SPACE, " ") |
| text = ARTICLE_PATTERN.sub("", text) |
| text = re.sub(r"\s+", " ", text).strip() |
| return text |
|
|
|
|
| def is_chinese_char(ch: str) -> bool: |
| return "\u4e00" <= ch <= "\u9fff" |
|
|
|
|
| def detect_language_simple(text: str) -> str: |
| """非常简单的语言检测:中文字符占比 > 0.3 → zh,否则 en。""" |
| if not text: |
| return "unknown" |
| chinese_count = sum(1 for ch in text if is_chinese_char(ch)) |
| ratio = chinese_count / max(len(text), 1) |
| return "zh" if ratio > 0.3 else "en" |
|
|
|
|
| def looks_like_article(text: str) -> bool: |
| """判断是否像条文本身,而不是问题。""" |
| if not text: |
| return True |
| if "?" not in text and "?" not in text: |
| if text.strip().endswith(("的", "时", "为", "者", "之", "等", ";", ";", "。", ".")): |
| return True |
| if re.search(r"第[一二三四五六七八九十百千零〇两0-9]+条", text): |
| return True |
| return False |
|
|
|
|
| def is_question_like(text: str) -> bool: |
| """判断是否像问句。""" |
| if not text: |
| return False |
| if "?" in text or "?" in text: |
| return True |
| if text.strip().startswith(("如何", "什么", "哪些", "是否", "能否", "在什么情况下")): |
| return True |
| if re.match(r"^(How|What|When|Why|Which|Can|Could|Should|Is|Are|Do|Does)\b", text.strip(), re.I): |
| return True |
| return False |
|
|
|
|
| def has_mixed_lang(text: str) -> bool: |
| """简单判断是否中英文混杂。""" |
| if not text: |
| return False |
| has_zh = any(is_chinese_char(ch) for ch in text) |
| has_en = any("a" <= ch.lower() <= "z" for ch in text) |
| return has_zh and has_en |
|
|
|
|
|
|
| |
| |
| |
|
|
| DEICTIC_PATTERN = re.compile( |
| r"(这种|这样的|该等|上述|前述|本案|该案|此种|该种|其中|对此|在此情况下|在这种情况下|该情况下|这种情况下|该情形|该行为)", |
| re.UNICODE, |
| ) |
|
|
| ABSTRACT_PATTERN = re.compile( |
| r"(如何合理|一般而言|原则上|通常情况下|应当如何|法律如何规定|如何理解|如何适用|如何认定|如何判断)", |
| re.UNICODE, |
| ) |
|
|
| ZH_FACT_ANCHORS = [ |
| "合同", "违约", "履行", "解除", "终止", "效力", "赔偿", "违约金", "定金", "退款", |
| "交付", "付款", "交货", "租赁", "买卖", "借款", "担保", "保证", "抵押", "质押", |
| "订立", "签订", "签署", "协商", "通知", "催告", "解除权", "撤销", "无效", "可撤销", |
| ] |
|
|
| EN_FACT_ANCHORS = [ |
| "contract", "breach", "performance", "terminate", "termination", "liability", "damages", |
| "penalty", "deposit", "refund", "deliver", "delivery", "payment", "lease", "sale", "loan", |
| "guarantee", "mortgage", "void", "rescission", |
| ] |
|
|
|
|
| def has_deictic_reference(text: str) -> bool: |
| if not text: |
| return False |
| return bool(DEICTIC_PATTERN.search(text)) |
|
|
|
|
| def is_overly_abstract(text: str) -> bool: |
| if not text: |
| return True |
| |
| if ABSTRACT_PATTERN.search(text): |
| if not any(k in text for k in ZH_FACT_ANCHORS) and not any(k in text.lower() for k in EN_FACT_ANCHORS): |
| return True |
| |
| if ("权利" in text and "义务" in text and ("如何" in text or "应该" in text)) and ("合同" not in text): |
| return True |
| return False |
|
|
|
|
| def has_min_fact_anchor(text: str) -> bool: |
| if not text: |
| return False |
| t = text.strip() |
| if any(k in t for k in ZH_FACT_ANCHORS): |
| return True |
| tl = t.lower() |
| if any(k in tl for k in EN_FACT_ANCHORS): |
| return True |
| return False |
|
|
|
|
| def clean_and_filter_query( |
| q: str, |
| min_len: int = 6, |
| max_len: int = 200, |
| ) -> Optional[str]: |
| "Clean and filter a single query; it must be standalone-answerable." |
| if not q: |
| return None |
| q = q.strip() |
| q = re.sub(r"\s+", " ", q) |
| q = strip_citation_markers(q) |
|
|
| |
| if len(q) < min_len or len(q) > max_len: |
| return None |
|
|
| |
| if looks_like_article(q): |
| return None |
|
|
| |
| if not is_question_like(q): |
| return None |
|
|
| |
| if has_deictic_reference(q): |
| return None |
|
|
| |
| if is_overly_abstract(q): |
| return None |
|
|
| |
| if not has_min_fact_anchor(q): |
| return None |
|
|
| return q |
|
|
|
|
| |
| |
| |
|
|
| def init_generator_llm(cfg: AppConfig, logger: logging.Logger) -> LLMClient: |
| """ |
| Generator(生成模型)选择逻辑: |
| 1. 如果有 GPU → 使用 Qwen(本地) |
| 2. 否则如果用户提供 OPENAI_API_KEY → 使用 OpenAI |
| 3. 否则 → degraded 模式(LLMClient 会自动 fallback) |
| """ |
| llm = LLMClient.from_config(cfg) |
| logger.info(f"[Generator] provider={llm.provider}, model={llm.model_name}") |
| return llm |
|
|
|
|
| def init_judge_llm(cfg: AppConfig, logger: logging.Logger) -> LLMClient: |
| """ |
| Judge(评审模型)选择逻辑: |
| 1. 如果用户提供 OPENAI_API_KEY → 用 OpenAI(最强) |
| 2. 否则如果有 GPU → 用 Qwen 小模型(7B/14B) |
| 3. 否则 → fallback(使用同一个模型) |
| """ |
| openai_key = os.getenv(cfg.llm.api_key_env, "").strip() |
|
|
| if openai_key: |
| logger.info("[Judge] Using OpenAI for evaluation") |
| return LLMClient.from_config_with_key(cfg, openai_key=openai_key) |
|
|
| |
| if torch.cuda.is_available(): |
| logger.info("[Judge] Using Qwen-local (7B/14B) for evaluation") |
| |
| os.environ[cfg.llm.qwen_model_env] = "Qwen/Qwen2.5-7B-Instruct" |
| return LLMClient.from_config(cfg) |
|
|
| |
| logger.info("[Judge] Fallback: using same provider as Generator") |
| return LLMClient.from_config(cfg) |
|
|
|
|
| |
| |
| |
|
|
| SYSTEM_PROMPT_EN = ( |
| "You are a helpful assistant that generates natural, high-quality legal questions " |
| "for retrieval evaluation.\n" |
| "Follow the user's instructions strictly.\n" |
| "Do not copy or restate the legal article.\n" |
| "Do not include article numbers or citations.\n" |
| "Produce natural questions that real users would ask.\n" |
| "Self-check each question: is it natural, are connectors used appropriately, and is any rephrasing needed.\n" |
| "Avoid clunky connector phrases and keep each question natural.\n" |
| "One question per line." |
| ) |
|
|
| |
|
|
| def build_single_turn_prompt(text: str, law_name: str, article_no: str, role: str, num_q: int, lang: str) -> str: |
| if lang == "zh": |
| return f""" |
| 你是一名{role},正在阅读一条法律条文。请根据这条条文,提出 {num_q} 个自然的问题。 |
| |
| 要求: |
| - 问题必须是你“真实可能会问”的,而不是复述条文。 |
| - 不要出现条文编号(如“第几条”)、“本法规定”等字样。 |
| - 不要直接照抄条文原句。 |
| - 生成后先自检:是否自然?连接词是否用得当?如不自然请改写。 |
| - 每个问题单独一行。 |
| - 语言:中文。 |
| |
| 法律名称:{law_name} |
| 条文编号:{article_no} |
| 条文内容: |
| {text} |
| """.strip() |
|
|
| else: |
| return f""" |
| You are a {role} reading a legal provision. Based on this article, generate {num_q} natural questions you might genuinely ask. |
| |
| Requirements: |
| - Questions must sound like real user questions, not restatements of the article. |
| - Do NOT include article numbers or citations. |
| - Do NOT copy sentences from the article. |
| - Self-check each question for naturalness and connector usage; rewrite if needed. |
| - Avoid clunky connector phrases; split into two questions if a single sentence becomes awkward. |
| - One question per line. |
| - Language: English. |
| |
| Law name: {law_name} |
| Article number: {article_no} |
| Article text: |
| {text} |
| """.strip() |
|
|
|
|
| |
|
|
| MULTI_TURN_PROMPT = """ |
| Simulate a realistic multi-turn conversation between a user and a lawyer about the following legal article. |
| |
| Rules: |
| - 5 turns total: User → Lawyer → User → Lawyer → User |
| - User questions must be natural, realistic, and based on the article. |
| - Do NOT copy or restate the article. |
| - Do NOT include article numbers or citations. |
| - Self-check each user question for naturalness and connector usage; rewrite if needed. |
| - Avoid clunky connector phrases; split into two questions if a single sentence becomes awkward. |
| - Keep each turn short and natural. |
| - Output format: |
| User: ... |
| Lawyer: ... |
| User: ... |
| Lawyer: ... |
| User: ... |
| |
| Article: |
| {article} |
| """ |
|
|
|
|
| |
|
|
| JUDGE_SCORE_PROMPT = """ |
| You are a strict evaluator. Score the following question from 1 to 10. |
| |
| Scoring rules: |
| - 10: Extremely natural, realistic, and useful legal question. |
| - 8–9: High quality, natural, meaningful. |
| - 6–7: Acceptable but slightly unnatural or vague. |
| - 4–5: Low quality, unnatural, or unclear. |
| - 1–3: Very poor, meaningless, or copied from the article. |
| |
| Return ONLY a number (1–10), nothing else. |
| |
| Question: |
| {query} |
| """ |
|
|
|
|
| |
|
|
| REWRITE_PROMPT = """ |
| Rewrite the following legal question so that it can be answered WITHOUT any external context. |
| |
| Rules: |
| - Remove vague references such as "this", "such", "in this case", and Chinese deictic phrases like "上述/前述/这种/这样的/该情况". |
| - If needed, restate the minimal factual situation explicitly, but do NOT invent new facts. |
| - Do NOT make it more abstract or philosophical. |
| - Do NOT add article numbers, citations, or legal sources. |
| - Output ONLY the rewritten question (one line). |
| |
| Question: |
| {query} |
| |
| Rewrite: |
| """ |
|
|
|
|
|
|
| |
| |
| |
|
|
| def llm_chat(llm: LLMClient, system_prompt: str, user_prompt: str) -> str: |
| """统一封装 LLMClient.chat()""" |
| out = llm.chat(messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_prompt}, |
| ]) |
| return out.strip() if isinstance(out, str) else str(out) |
|
|
|
|
| |
| |
| |
|
|
| def generate_multi_turn_dialog(llm: LLMClient, article_text: str) -> List[str]: |
| """生成 5 轮对话,并抽取所有 User 的问题""" |
| prompt = MULTI_TURN_PROMPT.format(article=article_text) |
| raw = llm_chat(llm, SYSTEM_PROMPT_EN, prompt) |
|
|
| lines = [l.strip() for l in raw.split("\n") if l.strip()] |
| user_questions = [] |
|
|
| for line in lines: |
| if line.lower().startswith("user:"): |
| q = line.split(":", 1)[1].strip() |
| cleaned = clean_and_filter_query(q) |
| if cleaned: |
| user_questions.append(cleaned) |
|
|
| return user_questions |
|
|
|
|
| |
| |
| |
|
|
| def generate_single_turn_questions( |
| llm: LLMClient, |
| text: str, |
| law_name: str, |
| article_no: str, |
| role: str, |
| num_q: int, |
| lang: str, |
| ) -> List[str]: |
|
|
| prompt = build_single_turn_prompt(text, law_name, article_no, role, num_q, lang) |
| raw = llm_chat(llm, SYSTEM_PROMPT_EN, prompt) |
|
|
| lines = [l.strip() for l in raw.split("\n") if l.strip()] |
| out = [] |
|
|
| for line in lines: |
| line = re.sub(r"^[0-9]+\.\s*", "", line) |
| line = re.sub(r"^[-•]\s*", "", line) |
| cleaned = clean_and_filter_query(line) |
| if cleaned: |
| out.append(cleaned) |
|
|
| return out |
|
|
|
|
| |
| |
| |
|
|
| def judge_score(judge_llm: LLMClient, query: str) -> float: |
| prompt = JUDGE_SCORE_PROMPT.format(query=query) |
| raw = llm_chat(judge_llm, SYSTEM_PROMPT_EN, prompt) |
|
|
| |
| m = re.search(r"([0-9]+(\.[0-9]+)?)", raw) |
| if not m: |
| return 0.0 |
| try: |
| score = float(m.group(1)) |
| return max(1.0, min(10.0, score)) |
| except: |
| return 0.0 |
|
|
|
|
| |
| |
| |
|
|
|
|
| def judge_rewrite(judge_llm: LLMClient, query: str) -> Optional[str]: |
| "Rewrite query to be standalone; return None if rewrite still fails quality rules." |
| prompt = REWRITE_PROMPT.format(query=query) |
| raw = llm_chat(judge_llm, SYSTEM_PROMPT_EN, prompt) |
|
|
| cleaned = clean_and_filter_query(raw) |
| if not cleaned: |
| return None |
|
|
| |
| if has_deictic_reference(cleaned) or is_overly_abstract(cleaned) or (not has_min_fact_anchor(cleaned)): |
| return None |
| return cleaned |
|
|
|
|
|
|
| |
| |
| |
|
|
| def deduplicate_by_embedding(queries, model, threshold = 0.85): |
| if not queries: |
| return [] |
|
|
| texts = [q["query"] for q in queries] |
| emb = model.encode(texts, normalize_embeddings=True) |
|
|
| keep = [] |
| used = set() |
|
|
| for i, q in enumerate(queries): |
| if i in used: |
| continue |
| keep.append(q) |
| for j in range(i + 1, len(queries)): |
| if j in used: |
| continue |
| sim = float(np.dot(emb[i], emb[j])) |
| if sim >= threshold: |
| used.add(j) |
|
|
| return keep |
|
|
|
|
| |
| |
| |
|
|
| def generate_queries_for_article( |
| generator_llm: LLMClient, |
| judge_llm: LLMClient, |
| row: pd.Series, |
| per_article: int = 5, |
| zh_ratio: float = 0.7, |
| logger: Optional[logging.Logger] = None, |
| ) -> List[Dict[str, Any]]: |
|
|
| text_raw = str(row.get("text", "")).strip() |
| if not text_raw: |
| return [] |
|
|
| text = strip_citation_markers(text_raw) |
| if len(text) < 10: |
| return [] |
|
|
| law_name = str(row.get("law_name", "")) |
| article_no = str(row.get("article_no", "")) |
| article_id = row.get("article_id", None) |
|
|
| roles = ["user", "lawyer", "judge", "inhouse"] |
| all_queries = [] |
|
|
| |
| for role in roles: |
| lang = "zh" if random.random() < zh_ratio else "en" |
| qs = generate_single_turn_questions( |
| generator_llm, text, law_name, article_no, role, per_article, lang |
| ) |
| for q in qs: |
| all_queries.append({ |
| "query": q, |
| "lang": detect_language_simple(q), |
| "role": role, |
| "law_name": law_name, |
| "article_no": article_no, |
| "article_id": article_id, |
| "round": 1, |
| "rewritten": False, |
| "score": None, |
| }) |
|
|
| |
| dialog_qs = generate_multi_turn_dialog(generator_llm, text) |
| for q in dialog_qs: |
| all_queries.append({ |
| "query": q, |
| "lang": detect_language_simple(q), |
| "role": "user", |
| "law_name": law_name, |
| "article_no": article_no, |
| "article_id": article_id, |
| "round": 2, |
| "rewritten": False, |
| "score": None, |
| }) |
|
|
| |
| final_queries = [] |
| for q in all_queries: |
| score = judge_score(judge_llm, q["query"]) |
| q["score"] = score |
|
|
| if score < 7.0: |
| |
| rewritten = judge_rewrite(judge_llm, q["query"]) |
| if not rewritten: |
| continue |
| q["query"] = rewritten |
| q["rewritten"] = True |
| q["score"] = judge_score(judge_llm, rewritten) |
|
|
| if q["score"] >= 7.0: |
| final_queries.append(q) |
|
|
| return final_queries |
|
|
|
|
| |
| |
| |
|
|
| def build_ground_truth_queries( |
| df_chunks: pd.DataFrame, |
| per_article: int, |
| max_articles: Optional[int], |
| total_queries: Optional[int], |
| logger: logging.Logger, |
| zh_ratio: float = 0.7, |
| seed: int = 0, |
| generator_llm=None, |
| judge_llm=None, |
| embedding_model=None |
| ) -> pd.DataFrame: |
|
|
| set_seed(seed) |
|
|
| cfg = AppConfig.load(None) |
|
|
| if generator_llm is None: |
| generator_llm = init_generator_llm(cfg, logger) |
| if judge_llm is None: |
| judge_llm = init_judge_llm(cfg, logger) |
|
|
| |
| if embedding_model is None: |
| embedding_model = SentenceTransformer(cfg.retrieval.embedding_model) |
| |
|
|
| if max_articles is None: |
| max_articles = len(df_chunks) |
|
|
| df_sampled = df_chunks.sample( |
| n=min(max_articles, len(df_chunks)), |
| random_state=seed, |
| ).reset_index(drop=True) |
|
|
| all_rows: List[Dict[str, Any]] = [] |
|
|
| pbar = tqdm(df_sampled.itertuples(index=False), total=len(df_sampled), desc="Generating queries") |
| for row in pbar: |
| row_s = pd.Series(row._asdict()) |
|
|
| qs = generate_queries_for_article( |
| generator_llm=generator_llm, |
| judge_llm=judge_llm, |
| row=row_s, |
| per_article=per_article, |
| zh_ratio=zh_ratio, |
| logger=logger, |
| ) |
| all_rows.extend(qs) |
|
|
| if total_queries is not None and len(all_rows) >= total_queries: |
| break |
|
|
| if not all_rows: |
| return pd.DataFrame() |
|
|
| |
| |
| all_rows = deduplicate_by_embedding(all_rows, embedding_model, threshold=0.85) |
| |
|
|
| df = pd.DataFrame(all_rows) |
|
|
| |
| if total_queries is not None and len(df) > total_queries: |
| df = df.sample(n=total_queries, random_state=seed).reset_index(drop=True) |
|
|
| return df |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="Synthetic Query Generator for LegalRAG (2026 Edition)" |
| ) |
|
|
| parser.add_argument( |
| "--input", |
| type=str, |
| default=None, |
| help="Path to processed law JSONL. Default: cfg.paths.law_jsonl", |
| ) |
|
|
| parser.add_argument( |
| "--per-article", |
| type=int, |
| default=5, |
| help="Approximate number of queries generated per article.", |
| ) |
|
|
| parser.add_argument( |
| "--max-articles", |
| type=int, |
| default=None, |
| help="Max number of articles to sample.", |
| ) |
|
|
| parser.add_argument( |
| "--total-queries", |
| type=int, |
| default=None, |
| help="Target total number of synthetic queries.", |
| ) |
|
|
| parser.add_argument( |
| "--output", |
| type=str, |
| default="data/eval/law_qa.jsonl", |
| help="Output JSONL file path.", |
| ) |
|
|
| parser.add_argument( |
| "--seed", |
| type=int, |
| default=42, |
| help="Random seed for reproducibility.", |
| ) |
|
|
| parser.add_argument( |
| "--zh-ratio", |
| type=float, |
| default=0.7, |
| help="Probability of generating Chinese queries (0~1).", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| logger = setup_logger() |
| set_seed(args.seed) |
|
|
| logger.info("Starting Synthetic Query Generation (Level 1 + 2 + 3)...") |
| logger.info(f"Seed={args.seed}, zh_ratio={args.zh_ratio}") |
|
|
| cfg = AppConfig.load(None) |
| input_path = Path(args.input) if args.input else Path(cfg.paths.law_jsonl) |
|
|
| if not input_path.exists(): |
| raise FileNotFoundError(f"{input_path} not found; run preprocess first.") |
|
|
| logger.info(f"Loading corpus from: {input_path}") |
| chunks = [ |
| json.loads(l) |
| for l in input_path.open("r", encoding="utf-8") |
| if l.strip() |
| ] |
| df_chunks = pd.DataFrame(chunks) |
| logger.info(f"Loaded chunks: {len(df_chunks)}") |
|
|
| if args.max_articles is None: |
| args.max_articles = min(500, len(df_chunks)) |
|
|
| logger.info( |
| f"Generation plan: per_article={args.per_article}, " |
| f"max_articles={args.max_articles}, total_queries={args.total_queries}" |
| ) |
|
|
| df_queries = build_ground_truth_queries( |
| df_chunks=df_chunks, |
| per_article=args.per_article, |
| max_articles=args.max_articles, |
| total_queries=args.total_queries, |
| logger=logger, |
| zh_ratio=args.zh_ratio, |
| seed=args.seed |
| ) |
|
|
|
|
| logger.info(f"Generated queries: {len(df_queries)}") |
|
|
| if df_queries.empty: |
| logger.warning("No queries generated. Check LLM config / API key / filters.") |
| return |
|
|
| logger.info("Sample queries:") |
| try: |
| logger.info(df_queries.sample(min(5, len(df_queries)), random_state=args.seed)) |
| except Exception: |
| logger.info(df_queries.head(5)) |
|
|
| out_path = Path(args.output) |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| with out_path.open("w", encoding="utf-8") as f: |
| for _, row in df_queries.iterrows(): |
| f.write(json.dumps(row.to_dict(), ensure_ascii=False) + "\n") |
|
|
| logger.info(f"Synthetic queries saved to: {out_path}") |
| logger.info("Done.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|