| """Database query layer for TWL Concordancer.""" |
|
|
| import sqlite3 |
| from pathlib import Path |
| from typing import Iterable |
|
|
| import regex |
|
|
| DEFAULT_DB = Path(__file__).parent / "twl_concordancer.db" |
|
|
|
|
| def get_conn(db_path=None): |
| if db_path is None: |
| db_path = DEFAULT_DB |
| conn = sqlite3.connect(str(db_path)) |
| conn.row_factory = sqlite3.Row |
| conn.execute("PRAGMA busy_timeout=5000") |
| try: |
| conn.execute("PRAGMA journal_mode=WAL") |
| except sqlite3.OperationalError: |
| |
| |
| pass |
| return conn |
|
|
|
|
| def _expand_category_options(categories: Iterable[str]): |
| expanded = set() |
| for category in categories: |
| category = (category or "").strip() |
| if not category: |
| continue |
| expanded.add(category) |
| parts = [part.strip() for part in category.split(">") if part.strip()] |
| if len(parts) <= 1: |
| continue |
| for depth in range(1, len(parts)): |
| expanded.add(">".join(parts[:depth]) + ">") |
| return sorted(expanded, key=_category_sort_key) |
|
|
|
|
| def _category_sort_key(category: str): |
| category = (category or "").strip() |
| is_repealed = category.startswith("廢止法規>") |
| return (1 if is_repealed else 0, category) |
|
|
|
|
| def _build_order_by(lang): |
| if lang == "zh": |
| return """ |
| ORDER BY |
| CASE |
| WHEN s.en_text IS NOT NULL AND trim(s.en_text) != '' THEN 0 |
| ELSE 1 |
| END, |
| s.alignment_score, |
| s.id |
| """ |
| if lang == "en": |
| return """ |
| ORDER BY |
| CASE |
| WHEN s.zh_text IS NOT NULL AND trim(s.zh_text) != '' THEN 0 |
| ELSE 1 |
| END, |
| s.alignment_score, |
| s.id |
| """ |
| return """ |
| ORDER BY |
| CASE |
| WHEN s.zh_text IS NOT NULL AND trim(s.zh_text) != '' |
| AND s.en_text IS NOT NULL AND trim(s.en_text) != '' THEN 0 |
| ELSE 1 |
| END, |
| s.alignment_score, |
| s.id |
| """ |
|
|
|
|
| def search_sentences( |
| conn, |
| query, |
| use_regex=False, |
| case_sensitive=False, |
| law_id=None, |
| category=None, |
| article_no=None, |
| max_score=None, |
| lang="both", |
| limit=100, |
| offset=0, |
| ): |
| if use_regex: |
| return _search_regex( |
| conn, |
| query, |
| case_sensitive, |
| law_id, |
| category, |
| article_no, |
| max_score, |
| lang, |
| limit, |
| offset, |
| ) |
| return _search_like( |
| conn, |
| query, |
| case_sensitive, |
| law_id, |
| category, |
| article_no, |
| max_score, |
| lang, |
| limit, |
| offset, |
| ) |
|
|
|
|
| def _search_like( |
| conn, query, case_sensitive, law_id, category, article_no, max_score, lang, limit, offset |
| ): |
| terms = query.strip() |
| if not terms: |
| return [], 0 |
|
|
| where = [] |
| params = [] |
|
|
| if lang == "zh": |
| where.append("s.zh_text LIKE ?") |
| params.append(f"%{terms}%") |
| elif lang == "en": |
| if case_sensitive: |
| where.append("instr(s.en_text, ?) > 0") |
| params.append(terms) |
| else: |
| where.append("instr(lower(s.en_text), lower(?)) > 0") |
| params.append(terms) |
| else: |
| if case_sensitive: |
| where.append("(s.zh_text LIKE ? OR instr(s.en_text, ?) > 0)") |
| params.extend([f"%{terms}%", terms]) |
| else: |
| where.append("(s.zh_text LIKE ? OR instr(lower(s.en_text), lower(?)) > 0)") |
| params.extend([f"%{terms}%", terms]) |
|
|
| if max_score is not None: |
| where.append("s.alignment_score <= ?") |
| params.append(max_score) |
| if law_id: |
| where.append("l.law_id = ?") |
| params.append(law_id) |
| if category: |
| if category.endswith(">"): |
| where.append("l.category LIKE ?") |
| params.append(f"{category}%") |
| else: |
| where.append("l.category = ?") |
| params.append(category) |
| if article_no: |
| pat = f"%{article_no}%" |
| where.append("(a.article_no_zh LIKE ? OR a.article_no_en LIKE ?)") |
| params.extend([pat, pat]) |
|
|
| where_clause = " AND ".join(where) |
|
|
| count_sql = f""" |
| SELECT count(*) FROM sentences s |
| JOIN laws l ON s.law_id = l.id |
| JOIN articles a ON s.article_id = a.id |
| WHERE {where_clause} |
| """ |
|
|
| order_by = _build_order_by(lang) |
|
|
| data_sql = f""" |
| SELECT s.id, s.zh_text, s.en_text, s.alignment_score, |
| l.law_id, l.zh_name, l.en_name, l.type, |
| a.article_no_zh, a.article_no_en, a.article_type, |
| s.zh_sentence_idx, s.en_sentence_idx |
| FROM sentences s |
| JOIN laws l ON s.law_id = l.id |
| JOIN articles a ON s.article_id = a.id |
| WHERE {where_clause} |
| {order_by} |
| LIMIT ? OFFSET ? |
| """ |
| data_params = params + [limit, offset] |
|
|
| cur = conn.execute(count_sql, params) |
| total = cur.fetchone()[0] |
|
|
| cur = conn.execute(data_sql, data_params) |
| rows = [dict(r) for r in cur.fetchall()] |
|
|
| return rows, total |
|
|
|
|
| def _search_regex( |
| conn, |
| pattern, |
| case_sensitive, |
| law_id, |
| category, |
| article_no, |
| max_score, |
| lang, |
| limit, |
| offset, |
| ): |
| try: |
| regex.compile(pattern, flags=regex.V1 if case_sensitive else regex.V1 | regex.IGNORECASE) |
| except regex.error: |
| return [], 0 |
|
|
| where = "1=1" |
| params = [] |
|
|
| if lang == "zh": |
| where += " AND s.zh_text REGEXP ?" |
| params.append(pattern) |
| elif lang == "en": |
| where += " AND s.en_text REGEXP ?" |
| params.append(pattern) |
| else: |
| where += " AND (s.zh_text REGEXP ? OR s.en_text REGEXP ?)" |
| params.extend([pattern, pattern]) |
|
|
| if max_score is not None: |
| where += " AND s.alignment_score <= ?" |
| params.append(max_score) |
| if law_id: |
| where += " AND l.law_id = ?" |
| params.append(law_id) |
| if category: |
| if category.endswith(">"): |
| where += " AND l.category LIKE ?" |
| params.append(f"{category}%") |
| else: |
| where += " AND l.category = ?" |
| params.append(category) |
| if article_no: |
| where += " AND (a.article_no_zh LIKE ? OR a.article_no_en LIKE ?)" |
| pat = f"%{article_no}%" |
| params.extend([pat, pat]) |
|
|
| count_sql = f""" |
| SELECT count(*) FROM sentences s |
| JOIN laws l ON s.law_id = l.id |
| JOIN articles a ON s.article_id = a.id |
| WHERE {where} |
| """ |
|
|
| order_by = _build_order_by(lang) |
|
|
| data_sql = f""" |
| SELECT s.id, s.zh_text, s.en_text, s.alignment_score, |
| l.law_id, l.zh_name, l.en_name, l.type, |
| a.article_no_zh, a.article_no_en, a.article_type, |
| s.zh_sentence_idx, s.en_sentence_idx |
| FROM sentences s |
| JOIN laws l ON s.law_id = l.id |
| JOIN articles a ON s.article_id = a.id |
| WHERE {where} |
| {order_by} |
| LIMIT ? OFFSET ? |
| """ |
| data_params = params + [limit, offset] |
|
|
| conn.create_function( |
| "REGEXP", |
| 2, |
| lambda pat, txt: bool( |
| regex.search( |
| pat, |
| txt, |
| flags=regex.V1 if case_sensitive else regex.V1 | regex.IGNORECASE, |
| ) |
| ) |
| if txt |
| else False, |
| ) |
|
|
| cur = conn.execute(count_sql, params) |
| total = cur.fetchone()[0] |
|
|
| cur = conn.execute(data_sql, data_params) |
| rows = [dict(r) for r in cur.fetchall()] |
|
|
| return rows, total |
|
|
|
|
| def get_paragraph(conn, sentence_id): |
| cur = conn.execute( |
| """ |
| SELECT s.id, s.zh_text, s.en_text, s.alignment_score, s.zh_sentence_idx, s.en_sentence_idx, |
| p.paragraph_index, a.article_no_zh, a.article_no_en |
| FROM sentences s |
| JOIN paragraphs p ON s.paragraph_id = p.id |
| JOIN articles a ON s.article_id = a.id |
| WHERE s.paragraph_id = (SELECT paragraph_id FROM sentences WHERE id = ?) |
| ORDER BY s.zh_sentence_idx |
| """, |
| (sentence_id,), |
| ) |
| rows = [dict(r) for r in cur.fetchall()] |
| if not rows: |
| return None |
| return { |
| "paragraph_index": rows[0]["paragraph_index"], |
| "article_no_zh": rows[0]["article_no_zh"], |
| "article_no_en": rows[0]["article_no_en"], |
| "sentences": rows, |
| } |
|
|
|
|
| def get_article(conn, sentence_id): |
| cur = conn.execute( |
| """ |
| SELECT s.id, s.zh_text, s.en_text, s.alignment_score, s.zh_sentence_idx, s.en_sentence_idx, |
| p.paragraph_index, p.id as paragraph_id, |
| a.article_no_zh, a.article_no_en, a.article_type |
| FROM sentences s |
| JOIN paragraphs p ON s.paragraph_id = p.id |
| JOIN articles a ON s.article_id = a.id |
| WHERE s.article_id = (SELECT article_id FROM sentences WHERE id = ?) |
| ORDER BY p.paragraph_index, s.zh_sentence_idx |
| """, |
| (sentence_id,), |
| ) |
| rows = [dict(r) for r in cur.fetchall()] |
| if not rows: |
| return None |
|
|
| paragraphs = {} |
| for r in rows: |
| pidx = r["paragraph_index"] |
| if pidx not in paragraphs: |
| paragraphs[pidx] = { |
| "paragraph_index": pidx, |
| "paragraph_id": r["paragraph_id"], |
| "sentences": [], |
| } |
| paragraphs[pidx]["sentences"].append( |
| { |
| "id": r["id"], |
| "zh_text": r["zh_text"], |
| "en_text": r["en_text"], |
| "alignment_score": r["alignment_score"], |
| "zh_sentence_idx": r["zh_sentence_idx"], |
| "en_sentence_idx": r["en_sentence_idx"], |
| } |
| ) |
|
|
| return { |
| "article_no_zh": rows[0]["article_no_zh"], |
| "article_no_en": rows[0]["article_no_en"], |
| "article_type": rows[0]["article_type"], |
| "paragraphs": [paragraphs[k] for k in sorted(paragraphs.keys())], |
| } |
|
|
|
|
| def list_laws(conn, law_type=None, category=None): |
| where = [] |
| params = [] |
| if law_type: |
| where.append("type = ?") |
| params.append(law_type) |
| if category: |
| if category.endswith(">"): |
| where.append("category LIKE ?") |
| params.append(f"{category}%") |
| else: |
| where.append("category = ?") |
| params.append(category) |
|
|
| where_clause = " AND ".join(where) if where else "1=1" |
| cur = conn.execute( |
| f"SELECT law_id, type, zh_name, en_name, category FROM laws WHERE {where_clause} ORDER BY law_id", |
| params, |
| ) |
| return [dict(r) for r in cur.fetchall()] |
|
|
|
|
| def list_categories(conn, law_type=None): |
| if law_type: |
| where = "WHERE type = ? AND category IS NOT NULL AND category != ''" |
| params = [law_type] |
| else: |
| where = "WHERE category IS NOT NULL AND category != ''" |
| params = [] |
| cur = conn.execute( |
| f"SELECT DISTINCT category FROM laws {where} ORDER BY category", |
| params, |
| ) |
| return _expand_category_options(r["category"] for r in cur.fetchall()) |
|
|
|
|
| def get_law_articles(conn, law_id): |
| cur = conn.execute( |
| """ |
| SELECT article_no_zh, article_no_en, article_type, article_index |
| FROM articles |
| WHERE law_id = (SELECT id FROM laws WHERE law_id = ?) |
| ORDER BY article_index |
| """, |
| (law_id,), |
| ) |
| return [dict(r) for r in cur.fetchall()] |
|
|
|
|
| def get_law_full_text(conn, law_id): |
| cur = conn.execute( |
| """ |
| SELECT s.zh_text, s.en_text, s.alignment_score, |
| a.article_no_zh, a.article_no_en, a.article_type, a.article_index, |
| p.paragraph_index |
| FROM sentences s |
| JOIN paragraphs p ON s.paragraph_id = p.id |
| JOIN articles a ON s.article_id = a.id |
| JOIN laws l ON s.law_id = l.id |
| WHERE l.law_id = ? |
| ORDER BY a.article_index, p.paragraph_index, s.zh_sentence_idx |
| """, |
| (law_id,), |
| ) |
| return [dict(r) for r in cur.fetchall()] |
|
|