Notulen_Otomatis / src /summarizer.py
Yermia's picture
Upload 13 files
fda93d9 verified
"""
BERT Extractive Summarization Module
====================================
Implements extractive summarization using IndoBERT/mBERT for meeting minutes.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import numpy as np
def _collapse_repeated_phrases_global(text: str, max_ngram: int = 6, min_repeats: int = 2) -> str:
"""Module-level helper to collapse repeated n-gram phrases.
Iteratively collapses repeated adjacent n-gram phrases into a single occurrence.
"""
if not text or min_repeats < 2:
return text
pattern = re.compile(r"(\b(?:\w+\s+){0,%d}\w+\b)(?:\s+\1){%d,}" % (max_ngram - 1, min_repeats - 1), flags=re.IGNORECASE)
prev = None
out = text
while prev != out:
prev = out
out = pattern.sub(r"\1", out)
return out
from src.transcriber import TranscriptSegment
@dataclass
class SummarizationConfig:
"""Configuration for summarization"""
# Method: 'extractive' (BERT embeddings) or 'abstractive' (seq2seq model)
method: str = "extractive"
# Models
# Use a cached/available model for reliability in offline environments
sentence_model_id: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
abstractive_model_id: str = "google/mt5-base"
# Extractive settings (increase to capture more key points)
num_sentences: int = 7
min_sentence_length: int = 6
max_sentence_length: int = 300
# Abstractive settings
max_input_chars: int = 1000
max_summary_length: int = 128
min_summary_length: int = 30
# Light abstractive refinement step (run on condensed extractive overview)
do_abstractive_refinement: bool = True
abstractive_refine_max_len: int = 80
# Generate a comprehensive executive overview (long, covering entire meeting)
comprehensive_overview: bool = True
comprehensive_max_length: int = 512
# Post-processing options
polish_overview: bool = True
semantic_dedup_threshold: float = 0.75
# Scoring weights
position_weight: float = 0.15
length_weight: float = 0.10
similarity_weight: float = 0.75
# Keywords for detection
decision_keywords: List[str] = field(
default_factory=lambda: [
"diputuskan",
"disepakati",
"kesimpulan",
"keputusan",
"jadi",
"maka",
"sepakat",
"setuju",
"final",
"kesepakatan",
"disimpulkan",
"ditetapkan",
"disetujui",
"putus",
]
)
action_keywords: List[str] = field(
default_factory=lambda: [
"akan",
"harus",
"perlu",
"tolong",
"mohon",
"harap",
"deadline",
"target",
"tugas",
"tanggung jawab",
"action item",
"follow up",
"tindak lanjut",
"dikerjakan",
"selesaikan",
"lakukan",
"siapkan",
"minggu depan",
"besok",
"segera",
"bikin",
"buat",
]
)
# Device
device: str = "cpu"
@dataclass
class MeetingSummary:
"""Structured meeting summary"""
overview: str
key_points: List[str]
decisions: List[str]
action_items: List[Dict[str, str]]
topics: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"overview": self.overview,
"key_points": self.key_points,
"decisions": self.decisions,
"action_items": self.action_items,
"topics": self.topics,
"keywords": getattr(self, "keywords", []),
}
def __str__(self) -> str:
"""String representation"""
lines = []
lines.append("=== RINGKASAN RAPAT ===\n")
lines.append(f"Overview:\n{self.overview}\n")
if self.key_points:
lines.append("Poin-Poin Penting:")
for i, point in enumerate(self.key_points, 1):
lines.append(f" {i}. {point}")
lines.append("")
if self.decisions:
lines.append("Keputusan:")
for i, decision in enumerate(self.decisions, 1):
lines.append(f" {i}. {decision}")
lines.append("")
if self.action_items:
lines.append("Action Items:")
for i, item in enumerate(self.action_items, 1):
owner = item.get("owner", "TBD")
task = item.get("task", "")
due = item.get("due", "")
if due:
lines.append(f" {i}. [{owner}] {task} (Due: {due})")
else:
lines.append(f" {i}. [{owner}] {task}")
if self.topics:
lines.append("")
lines.append("Topik:")
lines.append(", ".join(self.topics))
return "\n".join(lines)
def to_json(self) -> str:
"""Return a JSON string for machine-readable outputs."""
import json
return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
def to_yaml(self) -> str:
"""Return a YAML string (requires PyYAML)."""
try:
import yaml
return yaml.safe_dump(self.to_dict(), allow_unicode=True)
except Exception:
# Fallback to JSON if YAML not available
return self.to_json()
class AbstractiveSummarizer:
"""Abstractive summarizer using HuggingFace transformers pipeline (mt5/mbart/etc)."""
def __init__(self, config: Optional[SummarizationConfig] = None):
self.config = config or SummarizationConfig()
self._pipeline = None
def _load_model(self):
if self._pipeline is None:
try:
from transformers import pipeline
device = 0 if self.config.device.startswith("cuda") else -1
print(f"[Summarizer] Loading abstractive model: {self.config.abstractive_model_id}")
self._pipeline = pipeline(
"summarization",
model=self.config.abstractive_model_id,
tokenizer=self.config.abstractive_model_id,
device=device,
truncation=True,
)
print("[Summarizer] Abstractive model loaded successfully")
except Exception as e:
print(f"[Summarizer] Warning: abstractive model load failed: {e}")
self._pipeline = None
def _chunk_text(self, text: str) -> List[str]:
max_chars = int(self.config.max_input_chars)
if len(text) <= max_chars:
return [text]
chunks = []
start = 0
while start < len(text):
end = min(len(text), start + max_chars)
# try to cut at sentence boundary
cut = text.rfind(".", start, end)
if cut <= start:
cut = end
chunk = text[start:cut].strip()
if chunk:
# prevent repeating identical chunks
chunk = self._collapse_repeated_phrases(chunk)
chunks.append(chunk)
start = cut
return chunks
def _clean_abstractive_output(self, overview: str, full_text: str) -> (str, List[str]):
"""Clean artifacts from abstractive model output and produce fallback key points.
Returns (overview_clean, key_points)
"""
overview_clean = self._clean_abstractive_text(overview)
# If abstract output is still noisy (placeholders remain or too few alpha tokens), fallback to extractive
if "<extra_id" in overview or len(re.findall(r"[a-zA-Z]{2,}", overview_clean)) < 10 or re.search(r"\b(\w+)(?:\s+\1){2,}", overview_clean.lower()):
sentences = BERTSummarizer(self.config)._split_sentences(full_text)
key_points = [s for s in sentences[: self.config.num_sentences]]
overview_clean = " ".join(key_points[:3])
return overview_clean, key_points
# Otherwise make sure key points are meaningful and deduplicated
parts = [s.strip() for s in re.split(r"\.|!|\?", overview_clean) if s.strip()]
seen_kp = set()
key_points: List[str] = []
for p in parts:
p_clean = re.sub(r"[^\w\s]", "", p) if p else p
p_clean = re.sub(r"\s+", " ", p_clean).strip()
if len(p_clean.split()) < 3:
continue
low = p_clean.lower()
if low in seen_kp:
continue
seen_kp.add(low)
key_points.append(p_clean)
if len(key_points) >= self.config.num_sentences:
break
return overview_clean, key_points
def _clean_abstractive_text(self, text: str) -> str:
"""Lightweight cleaning of abstractive text outputs (remove placeholders, collapse punctuation).
Kept as a separate method for unit testing/backwards compatibility with older tests.
Also collapses repeated trivial tokens and reduces punctuation runs.
"""
t = re.sub(r"<extra_id_\d+>", "", text)
t = re.sub(r"\)\s*<extra_id_\d+>", "", t)
# collapse repeated short filler words sequences e.g. "Jadi contohnya Jadi contohnya ..."
t = self._collapse_repeated_phrases(t)
t = re.sub(r"\s*[\.]{2,}\s*", ". ", t)
t = re.sub(r"[!?]{2,}", ".", t)
t = re.sub(r"\s+", " ", t).strip()
# Remove leading/trailing hyphens and stray punctuation
t = re.sub(r"^[-\s]+|[-\s]+$", "", t)
if not re.search(r"[.!?]$", t):
t = t + "."
return t
def _generate_keywords(self, text: str, top_k: int = 8) -> List[str]:
"""Generate simple keywords by frequency (fallback)."""
toks = re.findall(r"\b[a-zA-Z]{4,}\b", text.lower())
freq = {}
stop = {"yang","dan","ini","itu","untuk","dengan","juga","sudah","ada","kita","saya","kamu"}
for w in toks:
if w in stop:
continue
freq[w] = freq.get(w, 0) + 1
sorted_words = sorted(freq.items(), key=lambda x: x[1], reverse=True)
return [w for w, _ in sorted_words[:top_k]]
def _collapse_repeated_phrases(self, text: str, max_ngram: int = 6, min_repeats: int = 2) -> str:
"""Delegates to module-level collapse helper"""
return _collapse_repeated_phrases_global(text, max_ngram=max_ngram, min_repeats=min_repeats)
def _semantic_deduplicate(self, items: List[str], threshold: Optional[float] = None) -> List[str]:
"""Delegate to AbstractiveSummarizer's semantic dedupe for compatibility."""
return AbstractiveSummarizer(self.config)._semantic_deduplicate(items, threshold)
def _semantic_dedup_action_items(self, actions: List[Dict[str, str]], threshold: Optional[float] = None) -> List[Dict[str, str]]:
"""Delegate to AbstractiveSummarizer's action-item dedupe for compatibility."""
return AbstractiveSummarizer(self.config)._semantic_dedup_action_items(actions, threshold)
def _parse_structured_output(self, raw: str, defaults: Dict[str, Any]) -> (str, List[str]):
"""Try to parse YAML/JSON or simple structured text into (overview, keywords).
If parsing fails, return (cleaned_raw, fallback_keywords)
"""
cleaned = raw.strip()
# Try YAML first (if available)
try:
import yaml
parsed = yaml.safe_load(cleaned)
if isinstance(parsed, dict):
ov = parsed.get("overview", "")
kws = parsed.get("keywords", None)
if kws is None:
kws = self._generate_keywords(ov or " ".join(defaults.get("key_points", [])))
return (ov.strip() if isinstance(ov, str) else "", kws)
except Exception:
pass
# Try JSON
try:
import json
parsed = json.loads(cleaned)
if isinstance(parsed, dict):
ov = parsed.get("overview", "")
kws = parsed.get("keywords", None)
if kws is None:
kws = self._generate_keywords(ov or " ".join(defaults.get("key_points", [])))
return (ov.strip() if isinstance(ov, str) else "", kws)
except Exception:
pass
# Simple heuristic: look for header 'overview:' or 'Ringkasan:' in text
m = re.search(r"(?im)^(overview|ringkasan)\s*:\s*(.*)$", cleaned)
if m:
ov = m.group(2).strip()
kws = self._generate_keywords(ov or " ".join(defaults.get("key_points", [])))
return ov, kws
# If nothing recognized, return fallback cleaned text and keywords
return cleaned, self._generate_keywords(cleaned or " ".join(defaults.get("key_points", [])))
def _sanitize_for_prompt(self, text: str) -> str:
"""Sanitize text before injecting into the prompt: remove model placeholders, URLs/domains/emails,
common web-article boilerplate (closing lines like "Semoga bermanfaat"), and collapse repeats."""
if not text:
return text
t = re.sub(r"<extra_id_\d+>", "", text)
# remove emails
t = re.sub(r"\b\S+@\S+\.\S+\b", " ", t)
# remove domain-like tokens (e.g., Eksekutif.com.co.id)
t = re.sub(r"\b\S+\.(?:com|co\.id|info|id|net|org)(?:\.[a-z]{2,})*\b", " ", t, flags=re.IGNORECASE)
# remove common article/web boilerplate short phrases that often appear as closings
t = re.sub(r"(?i)\b(semoga artikel ini bermanfaat(?: bagi anda semua)?|semoga bermanfaat|terima kasih(?: atas masukannya| juga)?)\b[.!\s,]*", " ", t)
t = re.sub(r"\s+", " ", t).strip()
t = _collapse_repeated_phrases_global(t)
return t
def _is_repetitive_text(self, text: str, max_run: int = 6) -> bool:
"""Detect highly repetitive model outputs (including repeated n-gram phrases).
Returns True if repetition patterns exceed thresholds.
"""
if not text:
return False
# check placeholder presence quickly
if re.search(r"<extra_id_\d+>", text):
return True
# Tokenize
tokens = re.findall(r"\w+", text.lower())
if not tokens:
return False
# Check simple token runs
run = 1
last = tokens[0]
for tok in tokens[1:]:
if tok == last:
run += 1
if run >= max_run:
return True
else:
last = tok
run = 1
# Check n-gram repeated phrase runs for n=1..4
max_ngram = 4
n_tokens = len(tokens)
for n in range(1, max_ngram + 1):
i = 0
while i + 2 * n <= n_tokens:
# compare tokens[i:i+n] with subsequent repeated occurrences
pattern = tokens[i:i + n]
run = 1
j = i + n
while j + n <= n_tokens and tokens[j:j + n] == pattern:
run += 1
j += n
if run >= max_run:
return True
i += 1
# fallback regex for single-token repetition
if re.search(r"(\b\w+\b)(?:\s+\1\b){%d,}" % (max_run - 1), text.lower()):
return True
return False
def _contains_domain_noise(self, text: str) -> bool:
"""Detect domain-like or short web boilerplate noise (e.g., 'Eksekutif.com', 'Semoga artikel ini bermanfaat').
Returns True if common domain patterns or boilerplate phrases are found.
"""
if not text:
return False
if re.search(r"\b\S+\.(?:com|co\.id|info|id|net|org)(?:\.[a-z]{2,})*\b", text, flags=re.IGNORECASE):
return True
if re.search(r"(?i)\b(semoga artikel ini bermanfaat(?: bagi anda semua)?|semoga bermanfaat|terima kasih)\b", text):
return True
return False
def _normalize_overview_text(self, text: str) -> str:
"""Normalize overview into a readable paragraph or keep structured lists tidy."""
if not text:
return text
t = text.strip()
# collapse repeated fragments first
t = _collapse_repeated_phrases_global(t)
# If text contains list markers or section headers, tidy spacing and return
if "\n-" in t or "Poin-Poin Penting" in t or "Keputusan" in t or "Action Items" in t:
# normalize newlines and strip extra spaces
t = re.sub(r"\n\s+", "\n", t)
t = re.sub(r"\n{2,}", "\n\n", t)
return t.strip()
# Otherwise make a single paragraph and deduplicate near-duplicate fragments
# split by common separators (newline, bullet, or hyphen sequences)
if " - " in t:
parts = [p.strip(" -" ) for p in re.split(r"\s*-\s*", t) if p.strip()]
else:
parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", t) if p.strip()]
seen = set()
uniq = []
for p in parts:
norm = re.sub(r"[^a-z0-9 ]", "", p.lower())
norm = re.sub(r"\s+", " ", norm).strip()
if not norm:
continue
if norm in seen:
continue
seen.add(norm)
uniq.append(p.strip(" -."))
para = " ".join(uniq)
para = re.sub(r"\s+", " ", para).strip()
# Remove any leftover emails/domains or short web boilerplate that slipped through
para = re.sub(r"\b\S+@\S+\.\S+\b", " ", para)
para = re.sub(r"\b\S+\.(?:com|co\.id|info|id|net|org)(?:\.[a-z]{2,})*\b", " ", para, flags=re.IGNORECASE)
para = re.sub(r"(?i)\b(semoga artikel ini bermanfaat(?: bagi anda semua)?|semoga bermanfaat|terima kasih(?: atas masukannya| juga)?)\b[.!\s,]*", " ", para)
para = re.sub(r"\s+", " ", para).strip()
if para and not re.search(r"[.!?]$", para):
para = para + "."
if para:
para = para[0].upper() + para[1:]
return para
def _polish_overview(self, overview: str, full_text: str) -> str:
"""Polish overview into an executive, coherent paragraph using abstractive model (if available).
Falls back to normalization and deduplication if model not available.
"""
if not overview:
return overview
# Basic normalization first
overview = _collapse_repeated_phrases_global(overview)
overview = self._normalize_overview_text(overview)
# If model available and config allows, ask for paraphrase/expansion
if getattr(self.config, "polish_overview", True):
try:
self._load_model()
if self._pipeline is not None:
prompt = (
"Paraphrase dan perluas teks berikut menjadi paragraf eksekutif yang jelas, ringkas, dan mudah dibaca. "
"Jangan sertakan header."
"\n\nTeks:\n" + overview
)
out = self._pipeline(
prompt,
max_length=min(getattr(self.config, "comprehensive_max_length", 512), 350),
min_length=40,
truncation=True,
do_sample=False,
)
if isinstance(out, list) and out:
candidate = out[0].get("summary_text", "").strip()
candidate = self._clean_abstractive_text(candidate)
candidate = _collapse_repeated_phrases_global(candidate)
candidate = self._normalize_overview_text(candidate)
return candidate
except Exception:
pass
return overview
def _semantic_deduplicate(self, items: List[str], threshold: Optional[float] = None) -> List[str]:
"""Deduplicate similar items using sentence-transformer embeddings + cosine similarity.
Returns the first occurrence for each semantic group.
"""
if not items:
return []
thr = threshold if threshold is not None else getattr(self.config, "semantic_dedup_threshold", 0.75)
# try embeddings
try:
embs = self._compute_embeddings(items)
if embs is not None:
from sklearn.metrics.pairwise import cosine_similarity
sim = cosine_similarity(embs)
n = len(items)
taken = set()
result = []
for i in range(n):
if i in taken:
continue
result.append(items[i])
for j in range(i + 1, n):
if sim[i, j] >= thr:
taken.add(j)
# If embeddings didn't merge anything useful, fallback to token-jaccard grouping
if len(result) == len(items) and len(items) > 1:
# token Jaccard
token_sets = [set(re.findall(r"\w+", it.lower())) for it in items]
taken2 = set()
result2 = []
for i in range(len(items)):
if i in taken2:
continue
result2.append(items[i])
for j in range(i + 1, len(items)):
if j in taken2:
continue
si = token_sets[i]
sj = token_sets[j]
if not si or not sj:
continue
jacc = len(si & sj) / float(len(si | sj))
if jacc >= 0.45:
taken2.add(j)
return result2
return result
else:
raise ValueError("No embeddings")
except Exception:
# fallback to token-jaccard grouping first (robust when embeddings aren't available)
try:
token_sets = [set(re.findall(r"\w+", it.lower())) for it in items]
taken = set()
res = []
for i in range(len(items)):
if i in taken:
continue
res.append(items[i])
si = token_sets[i]
for j in range(i + 1, len(items)):
if j in taken:
continue
sj = token_sets[j]
if not si or not sj:
continue
jacc = len(si & sj) / float(len(si | sj))
if jacc >= 0.45:
taken.add(j)
return res
except Exception:
# final fallback to naive textual deduplication
seen = set()
res = []
for it in items:
low = re.sub(r"\s+", " ", it.lower()).strip()
if low in seen:
continue
seen.add(low)
res.append(it)
return res
def _semantic_dedup_action_items(self, actions: List[Dict[str, str]], threshold: Optional[float] = None) -> List[Dict[str, str]]:
"""Deduplicate action items by task text; merge owners when necessary."""
if not actions:
return []
tasks = [a.get("task", "") for a in actions]
groups = self._semantic_deduplicate(tasks, threshold=threshold)
# groups contains first representative tasks; now build merged items
merged = []
for rep in groups:
owners = []
timestamps = []
dues = set()
for a in actions:
if a.get("task", "") == rep or (rep and rep in a.get("task", "")):
if a.get("owner") and a.get("owner") not in owners:
owners.append(a.get("owner"))
if a.get("timestamp"):
timestamps.append(a.get("timestamp"))
if a.get("due"):
dues.add(a.get("due"))
owner_str = " / ".join(owners) if owners else "TBD"
merged.append({
"owner": owner_str,
"task": rep,
"timestamp": timestamps[0] if timestamps else "",
"due": ", ".join(sorted(list(dues))) if dues else "",
})
return merged
def generate_comprehensive_summary(self, full_text: str, key_points: List[str], decisions: List[str], action_items: List[Dict[str, str]], topics: List[str]) -> (str, List[str]):
"""Generate a comprehensive executive summary covering the meeting.
Uses the abstractive pipeline with a guided prompt built from extracted components.
Attempts to request YAML-structured output for reliable parsing; falls back to rule-based assembly.
Returns (overview_text, keywords)
"""
# Build a structured prompt that requests YAML output for safe parsing
prompt_parts = [
"Anda adalah asisten yang menulis ringkasan rapat yang komprehensif dan terstruktur.",
"Output harus dalam format YAML dengan kunci: overview, key_points (list), decisions (list), action_items (list of {owner, task, due}), keywords (list).",
"Berikan overview naratif yang jelas, serta daftar poin penting, keputusan, dan tindak lanjut.",
"Topik yang dibahas:",
", ".join(topics) if topics else "-",
"Poin-poin penting:\n" + "\n".join([f"- {p}" for p in key_points]) if key_points else "",
"Keputusan:\n" + "\n".join([f"- {d}" for d in decisions]) if decisions else "",
"Tindak lanjut (Action Items):\n" + "\n".join([f"- [{a.get('owner','TBD')}] {a.get('task','')}" for a in action_items]) if action_items else "",
"Tuliskan field 'overview' minimal 80 kata sebagai paragraf naratif yang merangkum seluruh rapat dengan jelas.",
"Mohon hasilkan YAML yang valid."
]
prompt = "\n\n".join([p for p in prompt_parts if p])
# Sanitize inputs to avoid placeholder tokens and repeated garbage
key_points = [self._sanitize_for_prompt(k) for k in key_points if k and k.strip()]
decisions = [self._sanitize_for_prompt(d) for d in decisions if d and d.strip()]
for a in action_items:
a['task'] = self._sanitize_for_prompt(a.get('task',''))
# Deduplicate before sending to model
try:
key_points = self._semantic_deduplicate(key_points)
decisions = self._semantic_deduplicate(decisions)
except Exception:
key_points = list(dict.fromkeys(key_points))
decisions = list(dict.fromkeys(decisions))
# Use pipeline if available
try:
self._load_model()
if self._pipeline is not None:
# Try up to 2 attempts: first deterministic, second sampled if repetition/shortness detected
attempts = 2
for attempt in range(attempts):
gen_kwargs = dict(
max_length=getattr(self.config, "comprehensive_max_length", 512),
min_length=max(80, int(getattr(self.config, "comprehensive_max_length", 512) * 0.12)),
truncation=True,
do_sample=False,
no_repeat_ngram_size=4,
repetition_penalty=1.3,
)
if attempt == 1:
# more creative generation if deterministic attempt failed
gen_kwargs.update({"do_sample": True, "temperature": 0.7, "top_p": 0.9})
out = self._pipeline(prompt, **gen_kwargs)
text = out[0].get("summary_text", "").strip()
# collapse repeated fragments, then clean
text = self._collapse_repeated_phrases(text)
cleaned = self._clean_abstractive_text(text)
# Quick heuristic checks (repetition, too short, or domain-like web boilerplate -> retry)
if self._is_repetitive_text(cleaned) or len(cleaned.split()) < 20 or self._contains_domain_noise(cleaned):
# try again (next attempt) with sampling
if attempt + 1 < attempts:
continue
# Attempt to parse structured YAML/JSON
overview, keywords = self._parse_structured_output(cleaned, {
"key_points": key_points,
"decisions": decisions,
"action_items": action_items,
})
# Final normalization / optional polish
overview = self._normalize_overview_text(overview)
if getattr(self.config, "polish_overview", True):
overview = self._polish_overview(overview, full_text)
# Validate overview quality: non-empty, not too short, not repetitive
if overview and len(overview.split()) >= 10 and not self._is_repetitive_text(overview):
return overview, keywords
else:
# Try next attempt if available, otherwise break to fallback
if attempt + 1 < attempts:
continue
else:
break
except Exception:
pass
# Fallback rule-based assembly: construct a narrative paragraph summarizing meeting,
# rather than repeating the list headers. Use polishing to turn it into an executive paragraph.
def _format_action_items(ai_list):
pairs = []
for a in ai_list:
owner = a.get('owner', 'TBD')
task = a.get('task', '').strip()
if task:
pairs.append(f"{owner} akan {task.rstrip('.')}.")
return " ".join(pairs)
def _join_points(pts):
# join key points into a sentence
if not pts:
return ""
# take up to 4 points to avoid overly long lists
pts_sample = pts[:4]
return "; ".join([p.rstrip('.') for p in pts_sample]) + ""
narrative_parts = []
if topics:
narrative_parts.append("Topik utama yang dibahas meliputi: " + ", ".join(topics) + ".")
if key_points:
narrative_parts.append("Beberapa poin penting termasuk: " + _join_points(key_points) + ".")
if decisions:
narrative_parts.append("Keputusan utama yang dicapai termasuk: " + ", ".join([d.rstrip('.') for d in decisions]) + ".")
if action_items:
narrative_parts.append("Tindak lanjut yang disepakati di antaranya: " + _format_action_items(action_items))
assembled = " ".join([p for p in narrative_parts if p]).strip()
# Normalize and then optionally polish into a smooth executive paragraph
assembled = self._normalize_overview_text(assembled)
if getattr(self.config, "polish_overview", True):
assembled = self._polish_overview(assembled, full_text)
keywords = self._generate_keywords(assembled, top_k=8)
return assembled, keywords
def summarize(self, transcript_segments: List[TranscriptSegment]) -> MeetingSummary:
self._load_model()
full_text = " ".join([seg.text for seg in transcript_segments if seg.text])
if not full_text.strip():
return MeetingSummary(
overview="Tidak ada konten yang dapat diringkas.",
key_points=[],
decisions=[],
action_items=[],
)
# Clean up common disfluencies/politeness tokens and ASR annotations
full_text = re.sub(r"\[OVERLAP\]|\[NOISE\]|<.*?>", "", full_text)
full_text = re.sub(
r"\b(oke|ya|oke,|baik|sekarang|sekarang kita|nah|jadi|oke\.|jadi\.)\b",
"",
full_text,
flags=re.IGNORECASE,
)
full_text = re.sub(r"\s+", " ", full_text).strip()
# Chunk and summarize
if self._pipeline is None:
# fallback: return first few sentences
sentences = BERTSummarizer(self.config)._split_sentences(full_text)
overview = " ".join(sentences[: min(3, len(sentences))])
else:
chunks = self._chunk_text(full_text)
partial_summaries = []
for chunk in chunks:
try:
out = self._pipeline(
chunk,
max_length=self.config.max_summary_length,
min_length=self.config.min_summary_length,
truncation=True,
do_sample=False,
)
partial_summaries.append(out[0]["summary_text"].strip())
except Exception as e:
print(f"[Summarizer] chunk summarization failed: {e}")
continue
# If multiple partial summaries, join and optionally summarize again
combined = " ".join(partial_summaries)
if len(combined) > self.config.max_input_chars and self._pipeline:
try:
out = self._pipeline(
combined,
max_length=self.config.max_summary_length,
min_length=self.config.min_summary_length,
truncation=True,
do_sample=False,
)
overview = out[0]["summary_text"].strip()
except Exception:
overview = combined
else:
overview = combined
# Clean abstractive overview and produce robust key points (use helper)
overview, key_points = self._clean_abstractive_output(overview, full_text)
# Extract decisions and actions via keywords
sentences = BERTSummarizer(self.config)._split_sentences(full_text)
decisions = BERTSummarizer(self.config)._extract_decisions(sentences)
action_items = BERTSummarizer(self.config)._extract_action_items(transcript_segments)
topics = BERTSummarizer(self.config)._extract_topics(full_text)
# Optionally produce a comprehensive overview (uses abstractive pipeline)
if getattr(self.config, "comprehensive_overview", False):
try:
comp_overview, keywords = self.generate_comprehensive_summary(full_text, key_points, decisions, action_items, topics)
overview = comp_overview
except Exception:
keywords = []
ms = MeetingSummary(
overview=overview,
key_points=key_points,
decisions=decisions,
action_items=action_items,
topics=topics,
)
if 'keywords' in locals():
setattr(ms, 'keywords', keywords)
return ms
class BERTSummarizer:
"""
Extractive Summarization using BERT sentence embeddings.
Selects most important sentences based on semantic similarity
to document centroid and other features.
Attributes:
config: SummarizationConfig object
Example:
>>> summarizer = BERTSummarizer()
>>> summary = summarizer.summarize(transcript_segments)
>>> print(summary.overview)
>>> print(summary.decisions)
"""
def __init__(self, config: Optional[SummarizationConfig] = None):
"""
Initialize BERTSummarizer.
Args:
config: SummarizationConfig object
"""
self.config = config or SummarizationConfig()
self._model = None
def _load_model(self):
"""Lazy load sentence transformer model"""
if self._model is None:
try:
from sentence_transformers import SentenceTransformer
print(f"[Summarizer] Loading model: {self.config.sentence_model_id}")
self._model = SentenceTransformer(self.config.sentence_model_id)
print("[Summarizer] Model loaded successfully")
except Exception as e:
print(f"[Summarizer] Warning: Could not load model: {e}")
print("[Summarizer] Using fallback mode")
self._model = "FALLBACK"
def _semantic_deduplicate(self, items: List[str], threshold: Optional[float] = None) -> List[str]:
"""Delegate to AbstractiveSummarizer semantic dedup for compatibility."""
return AbstractiveSummarizer(self.config)._semantic_deduplicate(items, threshold)
def _semantic_dedup_action_items(self, actions: List[Dict[str, str]], threshold: Optional[float] = None) -> List[Dict[str, str]]:
"""Delegate to AbstractiveSummarizer action-item dedup for compatibility."""
return AbstractiveSummarizer(self.config)._semantic_dedup_action_items(actions, threshold)
def _collapse_repeated_phrases(self, text: str, max_ngram: int = 6, min_repeats: int = 2) -> str:
"""Delegates to module-level collapse helper for compatibility."""
return _collapse_repeated_phrases_global(text, max_ngram=max_ngram, min_repeats=min_repeats)
def summarize(self, transcript_segments: List[TranscriptSegment]) -> MeetingSummary:
"""
Generate meeting summary from transcript.
Args:
transcript_segments: List of transcript segments with speaker info
Returns:
MeetingSummary with overview, key points, decisions, and action items
"""
# If configuration prefers abstractive summarization, delegate to AbstractiveSummarizer
if getattr(self.config, "method", "extractive") == "abstractive":
try:
return AbstractiveSummarizer(self.config).summarize(transcript_segments)
except Exception as e:
print(
f"[Summarizer] Abstractive summarization failed, falling back to extractive: {e}"
)
self._load_model()
# Combine all text
full_text = " ".join([seg.text for seg in transcript_segments if seg.text])
# Clean up disfluencies and annotations commonly appearing in ASR output
full_text = re.sub(r"\[OVERLAP\]|\[NOISE\]|<.*?>", "", full_text)
full_text = re.sub(r"\s+", " ", full_text).strip()
if not full_text.strip():
return MeetingSummary(
overview="Tidak ada konten yang dapat diringkas.",
key_points=[],
decisions=[],
action_items=[],
)
# Get sentence-level metadata by merging speaker turns
sent_meta = self._get_sentences_with_meta(transcript_segments)
if not sent_meta:
return MeetingSummary(
overview="Tidak ada kalimat yang dapat diidentifikasi.",
key_points=[],
decisions=[],
action_items=[],
)
sentences = [s["text"] for s in sent_meta]
# Compute embeddings and select a diverse set of representative sentences via MMR
embeddings = self._compute_embeddings(sentences)
num_select = min(max(5, self.config.num_sentences + 2), len(sentences))
if embeddings is not None:
selected_idx = self._mmr_selection(sentences, embeddings, k=num_select)
key_sentences = [sentences[i] for i in selected_idx]
else:
# fallback: use earlier scoring
key_sentences = self._extract_key_sentences(sentences)
# Generate a multi-sentence overview with some ordering and cleaning
overview = self._generate_overview(key_sentences[:3])
# Optionally perform a light abstractive refinement on the extractive overview
if getattr(self.config, "do_abstractive_refinement", False):
try:
abs_sum = AbstractiveSummarizer(self.config)
abs_sum._load_model()
if abs_sum._pipeline is not None and overview:
out = abs_sum._pipeline(
overview,
max_length=getattr(self.config, "abstractive_refine_max_len", 80),
min_length=30,
truncation=True,
do_sample=False,
)
# Expect a single summary text
if isinstance(out, list) and out:
raw_overview = out[0].get("summary_text", overview).strip()
# Use AbstractiveSummarizer's cleaning & fallback logic
overview_cleaned, _ = abs_sum._clean_abstractive_output(raw_overview, full_text)
overview = overview_cleaned
except Exception:
# Fail silently and use extractive overview
pass
# Build richer key points: include speaker attribution and short cleaned sentences
key_points = []
for i in selected_idx if embeddings is not None else list(range(len(key_sentences))):
s = sentences[i]
sp = sent_meta[i]["speaker_id"]
# Short clean
s_clean = re.sub(r"\s+", " ", s).strip()
key_points.append(f"{s_clean} (oleh {sp})")
# Extract decisions using expanded context (look for decision keywords and enumerations)
decisions = []
seen_decisions = set()
for i, s in enumerate(sentences):
s_clean = re.sub(r"\s+", " ", s).strip()
s_lower = s_clean.lower()
if any(kw in s_lower for kw in self.config.decision_keywords) or re.match(
r"^(pertama|kedua|ketiga|keempat|kelima)\b", s_lower
):
context = self._expand_context_for_sentence(sent_meta, i, window=1)
dec_text = re.sub(r"\[.*?\]", "", context)
dec_text = re.sub(r"\s+", " ", dec_text).strip()
# Truncate to a reasonable length (35 words) and remove trailing punctuation
words = dec_text.split()
dec_text = " ".join(words[:35]).rstrip(" ,.;:")
if len(dec_text.split()) < 3:
continue
if dec_text and dec_text not in seen_decisions:
decisions.append(dec_text)
seen_decisions.add(dec_text)
# If no decisions found, try to extract from key_sentences
if not decisions:
for ks in key_sentences:
if any(kw in ks.lower() for kw in self.config.decision_keywords):
if ks not in seen_decisions:
decisions.append(ks)
seen_decisions.add(ks)
# Apply semantic deduplication to decisions
try:
decisions = self._semantic_deduplicate(decisions)
except Exception:
pass
# Extract action items at sentence level with speaker inference
action_items = []
seen_tasks = set()
action_kw_re = re.compile(
r"\b(" + "|".join([re.escape(k) for k in self.config.action_keywords]) + r")\b",
flags=re.IGNORECASE,
)
# verbs that indicate an actionable commitment (used to validate generic keyword matches)
action_verbs_re = re.compile(r"\b(akan|harus|siapkan|bikin|buat|selesaikan|dikerjakan|tolong|mohon|harap)\b", flags=re.IGNORECASE)
for i, s in enumerate(sentences):
text = re.sub(r"\[OVERLAP\]|\[NOISE\]|<.*?>", "", s).strip()
if not text:
continue
# explicit commit patterns
commit_re = re.compile(
r"\b(aku|saya|kami|kita|kamu)\b.*\b(bertanggung jawab|akan|saya akan|aku akan|aku akan membuat|kamu tolong|tolong|siapkan|bikin|harus|selesaikan|dikerjakan)\b",
flags=re.IGNORECASE,
)
owner = None
task = None
if commit_re.search(text):
owner = sent_meta[i]["speaker_id"]
# try to isolate the actionable clause
task = re.sub(
r"^.*?\b(bertanggung jawab|akan|saya akan|aku akan|kamu tolong|tolong|siapkan|bikin|harus|selesaikan|dikerjakan)\b",
"",
text,
flags=re.IGNORECASE,
)
task = task.strip(" .,:;-")
if not task:
task = text
elif action_kw_re.search(text):
# Validate generic matches for actionability using helper
if not self._is_actionable_text(text):
continue
owner = sent_meta[i]["speaker_id"]
task = text
if task:
# Normalize task text
task = re.sub(
r"^\s*(aku|saya|kami|kita|kamu)\b[:,\s]*", "", task, flags=re.IGNORECASE
).strip()
task = re.sub(r"\s+", " ", task).strip(" .,:;-")
if len(task.split()) < 3:
continue
filler_short = {"setuju", "oke", "ya", "nah", "betul"}
if task.lower() in filler_short:
continue
key = task.lower()[:120]
if key in seen_tasks:
continue
seen_tasks.add(key)
action_items.append(
{
"owner": owner or "TBD",
"task": task,
"timestamp": f"{sent_meta[i]['start']:.1f}s",
"due": "",
}
)
# Fall back to segment-level action extraction if none found
if not action_items:
action_items = self._extract_action_items(transcript_segments)
# Apply semantic deduplication to action items (merge owners when possible)
try:
action_items = self._semantic_dedup_action_items(action_items)
except Exception:
pass
# Extract topics (frequency-based) from cleaned full_text
topics = self._extract_topics(full_text)
# Optionally produce a comprehensive overview (may use abstractive pipeline)
if getattr(self.config, "comprehensive_overview", False):
try:
abs_s = AbstractiveSummarizer(self.config)
comp_overview, keywords = abs_s.generate_comprehensive_summary(full_text, key_points, decisions, action_items, topics)
overview = comp_overview
except Exception:
keywords = []
# Return comprehensive MeetingSummary
ms = MeetingSummary(
overview=overview,
key_points=key_points,
decisions=decisions,
action_items=action_items,
topics=topics,
)
if 'keywords' in locals():
setattr(ms, 'keywords', keywords)
return ms
def _split_sentences(self, text: str) -> List[str]:
"""Split text into sentences"""
# Indonesian sentence splitting
# Handle common abbreviations
text = re.sub(r"([Dd]r|[Pp]rof|[Bb]pk|[Ii]bu|[Ss]dr|[Nn]o|[Hh]al)\.", r"\1<PERIOD>", text)
# Split on sentence-ending punctuation
sentences = re.split(r"[.!?]+\s*", text)
# Restore periods in abbreviations
sentences = [s.replace("<PERIOD>", ".") for s in sentences]
# Clean and filter
cleaned = []
for s in sentences:
s = s.strip()
# Filter by length
if len(s) < self.config.min_sentence_length:
continue
if len(s) > self.config.max_sentence_length:
# Truncate very long sentences
s = s[: self.config.max_sentence_length] + "..."
# Collapse trivial repeated fragments inside sentence
s = self._collapse_repeated_phrases(s)
cleaned.append(s)
return cleaned
def _merge_speaker_turns(self, segments: List[TranscriptSegment]) -> List[Dict[str, Any]]:
"""Merge consecutive segments by the same speaker into 'turns' to provide more context.
Returns a list of dicts: {speaker_id, start, end, text, indices}
"""
turns: List[Dict[str, Any]] = []
for i, seg in enumerate(segments):
if not seg.text or not seg.text.strip():
continue
# Clean common ASR artifacts and leading fillers
text = re.sub(r"\[OVERLAP\]|\[NOISE\]|<.*?>", "", seg.text)
text = re.sub(
r"^\s*(oke|ya|nah|oke,|baik|sekarang|jadi)\b[\s,:-]*", "", text, flags=re.IGNORECASE
)
text = re.sub(r"\s+", " ", text).strip()
if not text:
continue
if turns and turns[-1]["speaker_id"] == seg.speaker_id:
turns[-1]["end"] = seg.end
turns[-1]["text"] += " " + text
turns[-1]["indices"].append(i)
else:
turns.append(
{
"speaker_id": seg.speaker_id,
"start": seg.start,
"end": seg.end,
"text": text,
"indices": [i],
}
)
return turns
def _get_sentences_with_meta(self, segments: List[TranscriptSegment]) -> List[Dict[str, Any]]:
"""Split merged speaker turns into sentences and keep metadata."""
turns = self._merge_speaker_turns(segments)
sent_meta: List[Dict[str, Any]] = []
for t in turns:
sents = self._split_sentences(t["text"])
for j, s in enumerate(sents):
sent_meta.append(
{
"text": s,
"speaker_id": t["speaker_id"],
"start": t["start"],
"end": t["end"],
"turn_indices": t["indices"],
"sent_idx_in_turn": j,
}
)
return sent_meta
def _compute_embeddings(self, sentences: List[str]):
"""Compute sentence embeddings using sentence-transformers (lazy load)."""
if not sentences:
return None
try:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer(self.config.sentence_model_id)
embs = model.encode(sentences, show_progress_bar=False)
return embs
except Exception as e:
print(f"[Summarizer] Embedding model error: {e}")
return None
def _mmr_selection(
self, sentences: List[str], embeddings, k: int = 5, lambda_param: float = 0.6
) -> List[int]:
"""Maximal Marginal Relevance (MMR) selection for diversity and coverage.
Returns list of selected sentence indices in original order.
"""
import numpy as _np
if embeddings is None or len(sentences) <= k:
return list(range(min(len(sentences), k)))
centroid = _np.mean(embeddings, axis=0)
# similarity to centroid
sim_to_centroid = _np.dot(embeddings, centroid) / (
_np.linalg.norm(embeddings, axis=1) * (_np.linalg.norm(centroid) + 1e-8)
)
selected = []
candidate_indices = list(range(len(sentences)))
# pick the top similarity as first
first = int(_np.argmax(sim_to_centroid))
selected.append(first)
candidate_indices.remove(first)
while len(selected) < k and candidate_indices:
mmr_scores = []
for idx in candidate_indices:
sim_to_sel = max(
[
_np.dot(embeddings[idx], embeddings[s])
/ (_np.linalg.norm(embeddings[idx]) * _np.linalg.norm(embeddings[s]) + 1e-8)
for s in selected
]
)
score = lambda_param * sim_to_centroid[idx] - (1 - lambda_param) * sim_to_sel
mmr_scores.append((idx, score))
idx_best, _ = max(mmr_scores, key=lambda x: x[1])
selected.append(idx_best)
candidate_indices.remove(idx_best)
# return in original order
selected_sorted = sorted(selected)
return selected_sorted
def _expand_context_for_sentence(
self, sent_meta: List[Dict[str, Any]], idx: int, window: int = 1
) -> str:
"""Return concatenated sentence with neighboring contextual sentences for better decision/action extraction."""
start = max(0, idx - window)
end = min(len(sent_meta), idx + window + 1)
return " ".join([s["text"] for s in sent_meta[start:end]])
def _infer_owner_for_action(self, seg_index: int, sent_meta: List[Dict[str, Any]]) -> str:
"""Infer owner for an action by looking at the sentence speaker and recent explicit mentions."""
# Prefer sentence speaker
if 0 <= seg_index < len(sent_meta):
return sent_meta[seg_index]["speaker_id"]
return "TBD"
def _extract_key_sentences(self, sentences: List[str]) -> List[str]:
"""Extract most important sentences using BERT embeddings"""
if not sentences:
return []
# Fallback mode: simple heuristics
if self._model == "FALLBACK" or len(sentences) <= self.config.num_sentences:
return sentences[: self.config.num_sentences]
try:
# Get sentence embeddings
embeddings = self._model.encode(sentences, show_progress_bar=False)
# Calculate document centroid
centroid = np.mean(embeddings, axis=0)
# Calculate importance scores for each sentence
scores = []
for i, (sent, emb) in enumerate(zip(sentences, embeddings)):
score = self._calculate_sentence_score(
sentence=sent,
embedding=emb,
centroid=centroid,
position=i,
total_sentences=len(sentences),
)
scores.append((i, score, sent))
# Sort by score
scores.sort(key=lambda x: x[1], reverse=True)
# Get top-k sentences (maintain original order)
top_indices = sorted([s[0] for s in scores[: self.config.num_sentences]])
return [sentences[i] for i in top_indices]
except Exception as e:
print(f"[Summarizer] Embedding extraction failed: {e}")
return sentences[: self.config.num_sentences]
def _calculate_sentence_score(
self,
sentence: str,
embedding: np.ndarray,
centroid: np.ndarray,
position: int,
total_sentences: int,
) -> float:
"""Calculate importance score for a sentence"""
# 1. Cosine similarity to centroid
similarity = np.dot(embedding, centroid) / (
np.linalg.norm(embedding) * np.linalg.norm(centroid) + 1e-8
)
# 2. Position score (favor beginning and end)
if total_sentences > 1:
normalized_pos = position / (total_sentences - 1)
# U-shaped curve: high at start and end
position_score = 1.0 - 0.6 * np.sin(np.pi * normalized_pos)
else:
position_score = 1.0
# 3. Length score (favor medium-length sentences)
word_count = len(sentence.split())
optimal_length = 20
length_score = 1.0 - min(abs(word_count - optimal_length) / 30, 1.0)
# 4. Keyword bonus
keyword_score = 0.0
sentence_lower = sentence.lower()
for kw in self.config.decision_keywords + self.config.action_keywords:
if kw in sentence_lower:
keyword_score += 0.1
keyword_score = min(keyword_score, 0.3) # Cap bonus
# Combined score
score = (
self.config.similarity_weight * similarity
+ self.config.position_weight * position_score
+ self.config.length_weight * length_score
+ keyword_score
)
return score
def _generate_overview(self, key_sentences: List[str]) -> str:
"""Generate overview from key sentences"""
if not key_sentences:
return "Tidak ada ringkasan yang dapat dibuat."
# Use top 2-3 sentences for overview
overview_sentences = key_sentences[: min(3, len(key_sentences))]
overview = " ".join(overview_sentences)
# Clean up
overview = re.sub(r"\s+", " ", overview).strip()
return overview
def _extract_decisions(self, sentences: List[str]) -> List[str]:
"""Extract decision-related sentences and synthesize enumerated decisions.
This method collects sentence-level decision mentions, attempts to synthesize
clauses from enumerated statements (e.g., "Pertama..., Kedua..."),
and performs semantic deduplication to avoid repeated/near-duplicate items.
"""
raw = []
for sent in sentences:
sent_lower = sent.lower()
# Check for decision keywords
if any(kw in sent_lower for kw in self.config.decision_keywords):
# Clean the sentence
clean_sent = re.sub(r"\s+", " ", sent).strip()
if clean_sent and clean_sent not in raw:
raw.append(clean_sent)
# Try to synthesize enumerated decisions from sentences
synthesized = self._synthesize_enumerated_decisions(sentences)
all_decisions = raw + synthesized
# Deduplicate semantically (Jaccard over tokens)
deduped = self._deduplicate_strings(all_decisions)
# Limit number of decisions returned
return deduped[:7]
def _synthesize_enumerated_decisions(self, sentences: List[str]) -> List[str]:
"""Extract clauses following enumerations like 'Pertama..., Kedua...' and return list.
Handles both ordinal words (pertama, kedua, ...) and numbered lists (1., 2.)
by splitting and returning non-trivial clauses.
"""
synth: List[str] = []
enum_words_re = re.compile(r"\b(pertama|kedua|ketiga|keempat|kelima)\b", flags=re.IGNORECASE)
for s in sentences:
s_clean = s.strip()
if enum_words_re.search(s_clean.lower()):
# Split by Indonesian ordinal words
parts = re.split(r"\bpertama\b|\bkedua\b|\bketiga\b|\bkeempat\b|\bkelima\b", s_clean, flags=re.IGNORECASE)
for p in parts:
p = p.strip(" .,:;\n-–—")
if len(p.split()) >= 3 and p not in synth:
synth.append(p)
# Also handle simple numbered enumerations like '1. ... 2. ...'
if re.search(r"\d+\.\s*", s_clean):
parts = re.split(r"\d+\.\s*", s_clean)
for p in parts:
p = p.strip(" .,:;\n-–—")
if len(p.split()) >= 3 and p not in synth:
synth.append(p)
return synth
def _normalize_text_for_dedup(self, text: str) -> str:
"""Normalize text for lightweight semantic deduplication."""
t = text.lower()
# remove punctuation, keep alphanumerics and spaces
t = re.sub(r"[^a-z0-9\s]+", "", t)
t = re.sub(r"\s+", " ", t).strip()
return t
def _deduplicate_strings(self, items: List[str], threshold: float = 0.5) -> List[str]:
"""Deduplicate items using token Jaccard similarity threshold."""
kept: List[str] = []
norms: List[str] = []
for it in items:
n = self._normalize_text_for_dedup(it)
if not n:
continue
toks1 = set(n.split())
is_dup = False
for other in norms:
toks2 = set(other.split())
if not toks1 or not toks2:
continue
inter = len(toks1 & toks2)
union = len(toks1 | toks2)
if union > 0 and (inter / union) >= threshold:
is_dup = True
break
if not is_dup:
kept.append(it)
norms.append(n)
return kept
def _extract_action_items(self, segments: List[TranscriptSegment]) -> List[Dict[str, str]]:
"""Extract action items with speaker attribution (improved heuristics)
Heuristics:
- Detect explicit commitments like "aku akan", "saya bertanggung jawab", "kamu siapkan" and assign owner
- Fallback to keyword-based detection
- Normalize duplicate tasks and detect simple due-date mentions like "minggu depan", "besok"
- Try to infer explicit owner names mentioned in the clause
"""
action_items: List[Dict[str, str]] = []
seen_tasks = set()
# Try to use AdvancedNLPExtractor (NER + dependency parse) for higher-quality extraction
try:
from src.nlp_utils import AdvancedNLPExtractor
extractor = AdvancedNLPExtractor()
sent_meta = self._get_sentences_with_meta(segments)
nlp_actions = extractor.extract_actions_from_sentences(sent_meta)
for item in nlp_actions:
task_key = item.get("task", "").lower()[:120]
if task_key in seen_tasks:
continue
seen_tasks.add(task_key)
action_items.append(
{
"owner": item.get("owner", "TBD"),
"task": item.get("task", "").strip(),
"timestamp": f"{sent_meta[item.get('sentence_idx', 0)]['start']:.1f}s",
"due": self._detect_due_from_text(item.get("task", "")),
}
)
except Exception:
extractor = None
commit_re = re.compile(
r"\b(aku|saya|kami|kita|kamu)\b.*\b(bertanggung jawab|akan|saya akan|aku akan|aku akan membuat|kamu tolong|tolong|siapkan|bikin|harus|selesaikan|dikerjakan)\b",
flags=re.IGNORECASE,
)
# Actionable verbs/phrases to validate generic keyword matches
_action_verbs_re = re.compile(r"\b(akan|harus|siapkan|bikin|buat|selesaikan|dikerjakan|tolong|mohon|harap)\b", flags=re.IGNORECASE)
for seg in segments:
if not seg.text:
continue
text = re.sub(r"\[OVERLAP\]|\[NOISE\]|<.*?>", "", seg.text).strip()
text_lower = text.lower()
# 1) explicit commitment patterns
if commit_re.search(text_lower):
# Try to extract short actionable clause
task = re.sub(
r"^.*?(bertanggung jawab|akan|membuat|siapkan|tolong|saya akan|aku akan|kamu tolong)\b",
"",
text,
flags=re.IGNORECASE,
)
task = task.strip(" .,:;-")
if not task:
# fallback to whole segment
task = text
# Try to detect explicit owner name within the clause (e.g., "Budi akan ...")
owner = self._extract_name_as_owner(text) or seg.speaker_id
task_key = task.lower()[:120]
if task_key not in seen_tasks:
seen_tasks.add(task_key)
action_items.append(
{
"owner": owner,
"task": task,
"timestamp": f"{seg.start:.1f}s",
"due": self._detect_due_from_text(task),
}
)
continue
# 2) keyword-based detection
if any(kw in text_lower for kw in self.config.action_keywords):
# Validate that the segment is actionable (has verbs like 'akan'/'perlu' or explicit name)
if not self._is_actionable_text(text):
continue
owner = self._extract_name_as_owner(text) or seg.speaker_id
task = text.strip()
task_key = task.lower()[:120]
if task_key in seen_tasks:
continue
seen_tasks.add(task_key)
action_items.append(
{
"owner": owner,
"task": task,
"timestamp": f"{seg.start:.1f}s",
"due": self._detect_due_from_text(task),
}
)
# Post-process: deduplicate semantically and filter tiny filler tasks
processed: List[Dict[str, str]] = []
seen_norms = set()
# Filter out filler / non-actionable phrases (e.g., meeting start/thanks)
filler_patterns = [
r"\bkita mulai rapat",
r"\bitu yang mau kita bahas",
r"\bterima kasih",
r"\bok(e|ey)?\b",
r"\bsip\b",
r"\bcukup(kan)? sampai",
r"\btidak ada( yang)?\b",
r"\biya\b",
r"\bsetuju\b",
]
filler_re = re.compile("|".join(filler_patterns), flags=re.IGNORECASE)
for it in action_items:
task_text = it.get("task", "")
# Skip common non-actionable conversational lines
if filler_re.search(task_text):
continue
# Ensure the sentence is actionable (has a commitment verb or explicit owner/name)
if not self._is_actionable_text(task_text):
continue
norm = self._normalize_text_for_dedup(task_text)[:200]
# skip if too short
if len(task_text.split()) < 3:
continue
if norm in seen_norms:
continue
seen_norms.add(norm)
processed.append(it)
# Limit number of action items
return processed[:15]
def _detect_due_from_text(self, text: str) -> str:
"""Detect simple due-date hints from text and return a short normalized due string."""
t = text.lower()
if "besok" in t:
return "besok"
if "segera" in t or "secepat" in t or "sekarang" in t:
return "segera"
if "minggu depan" in t:
return "1 minggu"
m = re.search(r"(\d+)\s*minggu", t)
if m:
return f"{m.group(1)} minggu"
if "2 minggu" in t or "dua minggu" in t:
return "2 minggu"
if "deadline" in t:
# try to capture a following date/token
m2 = re.search(r"deadline\s*[:\-\s]*([\w\-\./]+)", t)
return m2.group(1) if m2 else "TBD"
return ""
def _extract_name_as_owner(self, text: str) -> Optional[str]:
"""Return a candidate owner name if a capitalized proper name is explicitly present in the clause.
Simple heuristic: look for capitalized words (not at sentence start if it's a pronoun) followed by 'akan' or similar.
"""
m = re.search(r"\b([A-Z][a-z]{2,})\b(?=\s+akan|\s+siapkan|\s+tolong|\s+bisa|\s+bertanggung)", text)
if m:
return m.group(1)
return None
def _is_actionable_text(self, text: str) -> bool:
"""Return True if text contains indicators of an actionable commitment.
Indicators:
- Commitment verbs (akan, harus, perlu, siapkan, dll.)
- Explicit owner mention (capitalized name)
- Time indicators / deadlines (besok, minggu depan, deadline)
"""
t = text or ""
tl = t.lower()
if re.search(r"\b(akan|harus|siapkan|bikin|buat|selesaikan|dikerjakan|tolong|mohon|harap|perlu)\b", tl):
return True
# Only consider capitalized names as indicators if followed by an action verb
if re.search(r"\b([A-Z][a-z]{2,})\b(?=\s+(akan|siapkan|tolong|mohon|harus|selesaikan|buat|bikin))", t):
return True
if any(k in tl for k in ("deadline", "minggu depan", "besok")):
return True
return False
def _extract_topics(self, text: str, num_topics: int = 5) -> List[str]:
"""Extract main topics from text using simple frequency analysis"""
# Simple word frequency approach
# Remove common Indonesian stopwords
stopwords = {
"yang",
"dan",
"di",
"ke",
"dari",
"ini",
"itu",
"dengan",
"untuk",
"pada",
"adalah",
"dalam",
"tidak",
"akan",
"sudah",
"juga",
"saya",
"kita",
"kami",
"mereka",
"ada",
"bisa",
"atau",
"seperti",
"jadi",
"kalau",
"karena",
"tapi",
"ya",
"apa",
"bagaimana",
"kenapa",
"siapa",
"kapan",
"dimana",
"nya",
"kan",
"dong",
"sih",
"kok",
"deh",
"loh",
"lah",
}
# Tokenize and count
words = re.findall(r"\b[a-zA-Z]{4,}\b", text.lower())
word_counts = {}
for word in words:
if word not in stopwords:
word_counts[word] = word_counts.get(word, 0) + 1
# Sort by frequency
sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
# Return top topics
return [word for word, count in sorted_words[:num_topics]]
def summarize_by_speaker(self, segments: List[TranscriptSegment]) -> Dict[str, str]:
"""Generate per-speaker summary"""
# Group segments by speaker
speaker_texts = {}
for seg in segments:
if seg.speaker_id not in speaker_texts:
speaker_texts[seg.speaker_id] = []
speaker_texts[seg.speaker_id].append(seg.text)
# Summarize each speaker's contribution
speaker_summaries = {}
for speaker_id, texts in speaker_texts.items():
full_text = " ".join(texts)
sentences = self._split_sentences(full_text)
if sentences:
# Get top 2 sentences for each speaker
key_sentences = self._extract_key_sentences(sentences)[:2]
speaker_summaries[speaker_id] = " ".join(key_sentences)
else:
speaker_summaries[speaker_id] = "Tidak ada kontribusi yang dapat diringkas."
return speaker_summaries