File size: 4,549 Bytes
f73aa58
 
d13b751
 
f73aa58
a19c579
f73aa58
a19c579
 
 
 
f73aa58
 
a19c579
f73aa58
a19c579
f73aa58
a19c579
f73aa58
 
a19c579
 
 
 
 
 
f73aa58
 
 
 
 
a19c579
 
f73aa58
a19c579
f73aa58
 
 
 
 
 
 
a19c579
 
f73aa58
 
 
 
 
a19c579
 
 
f73aa58
a19c579
 
f73aa58
 
 
 
 
a19c579
 
f73aa58
 
 
 
 
 
 
 
 
 
a19c579
 
f73aa58
 
 
 
 
 
a19c579
f73aa58
 
 
 
 
 
 
 
 
 
 
 
a19c579
f73aa58
 
a19c579
 
f73aa58
a19c579
f73aa58
 
 
 
a19c579
f73aa58
 
a19c579
f73aa58
a19c579
f73aa58
 
a19c579
 
f73aa58
 
 
 
 
 
a19c579
 
f73aa58
 
a19c579
f73aa58
 
 
 
a19c579
 
 
 
 
 
 
f73aa58
 
a19c579
 
f73aa58
a19c579
f73aa58
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import re
from typing import Dict, List, Tuple, Any
# ★ 相対ではなく絶対(modules 配下直読み)に変更
from llm import LLMClient

# ----------------- Regex helpers -----------------
LIST_BULLET = re.compile(r"^(?:[-*•・]|\d+\.|\d+\))\s+(.*)")
KEYVAL_LINE = re.compile(r"^\s*([^::]+?)\s*[::]\s*([^\n]+?)\s*$")
LABEL_NUM = re.compile(r"^\s*([^::]+?)\s*[::]\s*([+-]?\d+(?:\.\d+)?)\s*$")
HEADER = re.compile(r"^(#+|\d+\.|\d+\))\s*(.+)$")


def naive_section_split(text: str, target_chars: int = 1200) -> List[Tuple[str, str]]:
    """Split into (title, content) using headings or by size."""
    lines = text.splitlines()
    sections: List[Tuple[str, str]] = []
    cur_title = "セクション"
    cur_buf: List[str] = []

    def flush():
        nonlocal cur_title, cur_buf
        if cur_buf:
            sections.append((cur_title, "\n".join(cur_buf).strip()))
            cur_buf = []

    for ln in lines:
        m = HEADER.match(ln.strip())
        if m:
            flush()
            cur_title = m.group(2).strip()
            continue
        cur_buf.append(ln)
        if sum(len(x) for x in cur_buf) > target_chars:
            flush()
            cur_title = f"セクション{len(sections)+1}"
    flush()

    # Fallback single section
    if not sections:
        sections = [("本文", text)]
    return sections


def extract_bullets(section_text: str, max_items: int = 8) -> List[str]:
    bullets: List[str] = []
    for line in section_text.splitlines():
        m = LIST_BULLET.match(line.strip())
        if m:
            bullets.append(m.group(1).strip())
    if not bullets:
        # Heuristic: split by '。' or '.' and take concise sentences
        sents = re.split(r"[。\.!?]\s*", section_text)
        for s in sents:
            s = s.strip()
            if 8 <= len(s) <= 120:
                bullets.append(s)
            if len(bullets) >= max_items:
                break
    return bullets[:max_items]


def extract_keyval_table(section_text: str) -> List[Tuple[str, str]]:
    pairs: List[Tuple[str, str]] = []
    for line in section_text.splitlines():
        m = KEYVAL_LINE.match(line)
        if m:
            k = m.group(1).strip()
            v = m.group(2).strip()
            if k and v:
                pairs.append((k, v))
    return pairs


def extract_chart_data(section_text: str, top_k: int = 10) -> List[Tuple[str, float]]:
    data: List[Tuple[str, float]] = []
    for line in section_text.splitlines():
        m = LABEL_NUM.match(line)
        if m:
            label = m.group(1).strip()
            try:
                val = float(m.group(2))
            except ValueError:
                continue
            data.append((label, val))
    # Deduplicate by label, keep last occurrence
    seen = {}
    for k, v in data:
        seen[k] = v
    items = list(seen.items())
    # Sort by abs value desc
    items.sort(key=lambda x: abs(x[1]), reverse=True)
    return items[:top_k]


def process_text(text: str,
                 use_inference_api: bool,
                 summarizer_model: str,
                 generator_model: str,
                 want_summary: bool,
                 want_tables: bool,
                 want_charts: bool,
                 max_summary_words: int = 200) -> Dict[str, Any]:
    client = LLMClient(use_inference_api=use_inference_api)

    # 1) Executive summary
    summary = None
    if want_summary:
        summary = client.summarize(text, model=summarizer_model, max_words=max_summary_words)

    # 2) Sections (rule-based; reliable on CPU)
    sections = naive_section_split(text)

    # 3) Per-section bullets / tables / charts
    bullets_by_section: Dict[int, List[str]] = {}
    tables: List[Dict[str, Any]] = []
    charts: List[Dict[str, Any]] = []

    for idx, (title, body) in enumerate(sections):
        bullets_by_section[idx] = extract_bullets(body)

        if want_tables:
            kv = extract_keyval_table(body)
            if kv:
                tables.append({
                    "title": f"{title} — 表",
                    "pairs": kv
                })

        if want_charts:
            series = extract_chart_data(body)
            if series:
                charts.append({
                    "title": f"{title} — チャート",
                    "series": series
                })

    return {
        "summary": summary,
        "sections": sections,  # list of (title, text)
        "bullets": bullets_by_section,
        "tables": tables,
        "charts": charts,
    }