""" BERT Extractive Summarization Module ==================================== Implements extractive summarization using IndoBERT/mBERT for meeting minutes. """ from __future__ import annotations import re from dataclasses import dataclass, field from typing import Any, Dict, List, Optional import numpy as np def _collapse_repeated_phrases_global(text: str, max_ngram: int = 6, min_repeats: int = 2) -> str: """Module-level helper to collapse repeated n-gram phrases. Iteratively collapses repeated adjacent n-gram phrases into a single occurrence. """ if not text or min_repeats < 2: return text pattern = re.compile(r"(\b(?:\w+\s+){0,%d}\w+\b)(?:\s+\1){%d,}" % (max_ngram - 1, min_repeats - 1), flags=re.IGNORECASE) prev = None out = text while prev != out: prev = out out = pattern.sub(r"\1", out) return out from src.transcriber import TranscriptSegment @dataclass class SummarizationConfig: """Configuration for summarization""" # Method: 'extractive' (BERT embeddings) or 'abstractive' (seq2seq model) method: str = "extractive" # Models # Use a cached/available model for reliability in offline environments sentence_model_id: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" abstractive_model_id: str = "google/mt5-base" # Extractive settings (increase to capture more key points) num_sentences: int = 7 min_sentence_length: int = 6 max_sentence_length: int = 300 # Abstractive settings max_input_chars: int = 1000 max_summary_length: int = 128 min_summary_length: int = 30 # Light abstractive refinement step (run on condensed extractive overview) do_abstractive_refinement: bool = True abstractive_refine_max_len: int = 80 # Generate a comprehensive executive overview (long, covering entire meeting) comprehensive_overview: bool = True comprehensive_max_length: int = 512 # Post-processing options polish_overview: bool = True semantic_dedup_threshold: float = 0.75 # Scoring weights position_weight: float = 0.15 length_weight: float = 0.10 similarity_weight: float = 0.75 # Keywords for detection decision_keywords: List[str] = field( default_factory=lambda: [ "diputuskan", "disepakati", "kesimpulan", "keputusan", "jadi", "maka", "sepakat", "setuju", "final", "kesepakatan", "disimpulkan", "ditetapkan", "disetujui", "putus", ] ) action_keywords: List[str] = field( default_factory=lambda: [ "akan", "harus", "perlu", "tolong", "mohon", "harap", "deadline", "target", "tugas", "tanggung jawab", "action item", "follow up", "tindak lanjut", "dikerjakan", "selesaikan", "lakukan", "siapkan", "minggu depan", "besok", "segera", "bikin", "buat", ] ) # Device device: str = "cpu" @dataclass class MeetingSummary: """Structured meeting summary""" overview: str key_points: List[str] decisions: List[str] action_items: List[Dict[str, str]] topics: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { "overview": self.overview, "key_points": self.key_points, "decisions": self.decisions, "action_items": self.action_items, "topics": self.topics, "keywords": getattr(self, "keywords", []), } def __str__(self) -> str: """String representation""" lines = [] lines.append("=== RINGKASAN RAPAT ===\n") lines.append(f"Overview:\n{self.overview}\n") if self.key_points: lines.append("Poin-Poin Penting:") for i, point in enumerate(self.key_points, 1): lines.append(f" {i}. {point}") lines.append("") if self.decisions: lines.append("Keputusan:") for i, decision in enumerate(self.decisions, 1): lines.append(f" {i}. {decision}") lines.append("") if self.action_items: lines.append("Action Items:") for i, item in enumerate(self.action_items, 1): owner = item.get("owner", "TBD") task = item.get("task", "") due = item.get("due", "") if due: lines.append(f" {i}. [{owner}] {task} (Due: {due})") else: lines.append(f" {i}. [{owner}] {task}") if self.topics: lines.append("") lines.append("Topik:") lines.append(", ".join(self.topics)) return "\n".join(lines) def to_json(self) -> str: """Return a JSON string for machine-readable outputs.""" import json return json.dumps(self.to_dict(), ensure_ascii=False, indent=2) def to_yaml(self) -> str: """Return a YAML string (requires PyYAML).""" try: import yaml return yaml.safe_dump(self.to_dict(), allow_unicode=True) except Exception: # Fallback to JSON if YAML not available return self.to_json() class AbstractiveSummarizer: """Abstractive summarizer using HuggingFace transformers pipeline (mt5/mbart/etc).""" def __init__(self, config: Optional[SummarizationConfig] = None): self.config = config or SummarizationConfig() self._pipeline = None def _load_model(self): if self._pipeline is None: try: from transformers import pipeline device = 0 if self.config.device.startswith("cuda") else -1 print(f"[Summarizer] Loading abstractive model: {self.config.abstractive_model_id}") self._pipeline = pipeline( "summarization", model=self.config.abstractive_model_id, tokenizer=self.config.abstractive_model_id, device=device, truncation=True, ) print("[Summarizer] Abstractive model loaded successfully") except Exception as e: print(f"[Summarizer] Warning: abstractive model load failed: {e}") self._pipeline = None def _chunk_text(self, text: str) -> List[str]: max_chars = int(self.config.max_input_chars) if len(text) <= max_chars: return [text] chunks = [] start = 0 while start < len(text): end = min(len(text), start + max_chars) # try to cut at sentence boundary cut = text.rfind(".", start, end) if cut <= start: cut = end chunk = text[start:cut].strip() if chunk: # prevent repeating identical chunks chunk = self._collapse_repeated_phrases(chunk) chunks.append(chunk) start = cut return chunks def _clean_abstractive_output(self, overview: str, full_text: str) -> (str, List[str]): """Clean artifacts from abstractive model output and produce fallback key points. Returns (overview_clean, key_points) """ overview_clean = self._clean_abstractive_text(overview) # If abstract output is still noisy (placeholders remain or too few alpha tokens), fallback to extractive if "= self.config.num_sentences: break return overview_clean, key_points def _clean_abstractive_text(self, text: str) -> str: """Lightweight cleaning of abstractive text outputs (remove placeholders, collapse punctuation). Kept as a separate method for unit testing/backwards compatibility with older tests. Also collapses repeated trivial tokens and reduces punctuation runs. """ t = re.sub(r"", "", text) t = re.sub(r"\)\s*", "", t) # collapse repeated short filler words sequences e.g. "Jadi contohnya Jadi contohnya ..." t = self._collapse_repeated_phrases(t) t = re.sub(r"\s*[\.]{2,}\s*", ". ", t) t = re.sub(r"[!?]{2,}", ".", t) t = re.sub(r"\s+", " ", t).strip() # Remove leading/trailing hyphens and stray punctuation t = re.sub(r"^[-\s]+|[-\s]+$", "", t) if not re.search(r"[.!?]$", t): t = t + "." return t def _generate_keywords(self, text: str, top_k: int = 8) -> List[str]: """Generate simple keywords by frequency (fallback).""" toks = re.findall(r"\b[a-zA-Z]{4,}\b", text.lower()) freq = {} stop = {"yang","dan","ini","itu","untuk","dengan","juga","sudah","ada","kita","saya","kamu"} for w in toks: if w in stop: continue freq[w] = freq.get(w, 0) + 1 sorted_words = sorted(freq.items(), key=lambda x: x[1], reverse=True) return [w for w, _ in sorted_words[:top_k]] def _collapse_repeated_phrases(self, text: str, max_ngram: int = 6, min_repeats: int = 2) -> str: """Delegates to module-level collapse helper""" return _collapse_repeated_phrases_global(text, max_ngram=max_ngram, min_repeats=min_repeats) def _semantic_deduplicate(self, items: List[str], threshold: Optional[float] = None) -> List[str]: """Delegate to AbstractiveSummarizer's semantic dedupe for compatibility.""" return AbstractiveSummarizer(self.config)._semantic_deduplicate(items, threshold) def _semantic_dedup_action_items(self, actions: List[Dict[str, str]], threshold: Optional[float] = None) -> List[Dict[str, str]]: """Delegate to AbstractiveSummarizer's action-item dedupe for compatibility.""" return AbstractiveSummarizer(self.config)._semantic_dedup_action_items(actions, threshold) def _parse_structured_output(self, raw: str, defaults: Dict[str, Any]) -> (str, List[str]): """Try to parse YAML/JSON or simple structured text into (overview, keywords). If parsing fails, return (cleaned_raw, fallback_keywords) """ cleaned = raw.strip() # Try YAML first (if available) try: import yaml parsed = yaml.safe_load(cleaned) if isinstance(parsed, dict): ov = parsed.get("overview", "") kws = parsed.get("keywords", None) if kws is None: kws = self._generate_keywords(ov or " ".join(defaults.get("key_points", []))) return (ov.strip() if isinstance(ov, str) else "", kws) except Exception: pass # Try JSON try: import json parsed = json.loads(cleaned) if isinstance(parsed, dict): ov = parsed.get("overview", "") kws = parsed.get("keywords", None) if kws is None: kws = self._generate_keywords(ov or " ".join(defaults.get("key_points", []))) return (ov.strip() if isinstance(ov, str) else "", kws) except Exception: pass # Simple heuristic: look for header 'overview:' or 'Ringkasan:' in text m = re.search(r"(?im)^(overview|ringkasan)\s*:\s*(.*)$", cleaned) if m: ov = m.group(2).strip() kws = self._generate_keywords(ov or " ".join(defaults.get("key_points", []))) return ov, kws # If nothing recognized, return fallback cleaned text and keywords return cleaned, self._generate_keywords(cleaned or " ".join(defaults.get("key_points", []))) def _sanitize_for_prompt(self, text: str) -> str: """Sanitize text before injecting into the prompt: remove model placeholders, URLs/domains/emails, common web-article boilerplate (closing lines like "Semoga bermanfaat"), and collapse repeats.""" if not text: return text t = re.sub(r"", "", text) # remove emails t = re.sub(r"\b\S+@\S+\.\S+\b", " ", t) # remove domain-like tokens (e.g., Eksekutif.com.co.id) t = re.sub(r"\b\S+\.(?:com|co\.id|info|id|net|org)(?:\.[a-z]{2,})*\b", " ", t, flags=re.IGNORECASE) # remove common article/web boilerplate short phrases that often appear as closings t = re.sub(r"(?i)\b(semoga artikel ini bermanfaat(?: bagi anda semua)?|semoga bermanfaat|terima kasih(?: atas masukannya| juga)?)\b[.!\s,]*", " ", t) t = re.sub(r"\s+", " ", t).strip() t = _collapse_repeated_phrases_global(t) return t def _is_repetitive_text(self, text: str, max_run: int = 6) -> bool: """Detect highly repetitive model outputs (including repeated n-gram phrases). Returns True if repetition patterns exceed thresholds. """ if not text: return False # check placeholder presence quickly if re.search(r"", text): return True # Tokenize tokens = re.findall(r"\w+", text.lower()) if not tokens: return False # Check simple token runs run = 1 last = tokens[0] for tok in tokens[1:]: if tok == last: run += 1 if run >= max_run: return True else: last = tok run = 1 # Check n-gram repeated phrase runs for n=1..4 max_ngram = 4 n_tokens = len(tokens) for n in range(1, max_ngram + 1): i = 0 while i + 2 * n <= n_tokens: # compare tokens[i:i+n] with subsequent repeated occurrences pattern = tokens[i:i + n] run = 1 j = i + n while j + n <= n_tokens and tokens[j:j + n] == pattern: run += 1 j += n if run >= max_run: return True i += 1 # fallback regex for single-token repetition if re.search(r"(\b\w+\b)(?:\s+\1\b){%d,}" % (max_run - 1), text.lower()): return True return False def _contains_domain_noise(self, text: str) -> bool: """Detect domain-like or short web boilerplate noise (e.g., 'Eksekutif.com', 'Semoga artikel ini bermanfaat'). Returns True if common domain patterns or boilerplate phrases are found. """ if not text: return False if re.search(r"\b\S+\.(?:com|co\.id|info|id|net|org)(?:\.[a-z]{2,})*\b", text, flags=re.IGNORECASE): return True if re.search(r"(?i)\b(semoga artikel ini bermanfaat(?: bagi anda semua)?|semoga bermanfaat|terima kasih)\b", text): return True return False def _normalize_overview_text(self, text: str) -> str: """Normalize overview into a readable paragraph or keep structured lists tidy.""" if not text: return text t = text.strip() # collapse repeated fragments first t = _collapse_repeated_phrases_global(t) # If text contains list markers or section headers, tidy spacing and return if "\n-" in t or "Poin-Poin Penting" in t or "Keputusan" in t or "Action Items" in t: # normalize newlines and strip extra spaces t = re.sub(r"\n\s+", "\n", t) t = re.sub(r"\n{2,}", "\n\n", t) return t.strip() # Otherwise make a single paragraph and deduplicate near-duplicate fragments # split by common separators (newline, bullet, or hyphen sequences) if " - " in t: parts = [p.strip(" -" ) for p in re.split(r"\s*-\s*", t) if p.strip()] else: parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", t) if p.strip()] seen = set() uniq = [] for p in parts: norm = re.sub(r"[^a-z0-9 ]", "", p.lower()) norm = re.sub(r"\s+", " ", norm).strip() if not norm: continue if norm in seen: continue seen.add(norm) uniq.append(p.strip(" -.")) para = " ".join(uniq) para = re.sub(r"\s+", " ", para).strip() # Remove any leftover emails/domains or short web boilerplate that slipped through para = re.sub(r"\b\S+@\S+\.\S+\b", " ", para) para = re.sub(r"\b\S+\.(?:com|co\.id|info|id|net|org)(?:\.[a-z]{2,})*\b", " ", para, flags=re.IGNORECASE) para = re.sub(r"(?i)\b(semoga artikel ini bermanfaat(?: bagi anda semua)?|semoga bermanfaat|terima kasih(?: atas masukannya| juga)?)\b[.!\s,]*", " ", para) para = re.sub(r"\s+", " ", para).strip() if para and not re.search(r"[.!?]$", para): para = para + "." if para: para = para[0].upper() + para[1:] return para def _polish_overview(self, overview: str, full_text: str) -> str: """Polish overview into an executive, coherent paragraph using abstractive model (if available). Falls back to normalization and deduplication if model not available. """ if not overview: return overview # Basic normalization first overview = _collapse_repeated_phrases_global(overview) overview = self._normalize_overview_text(overview) # If model available and config allows, ask for paraphrase/expansion if getattr(self.config, "polish_overview", True): try: self._load_model() if self._pipeline is not None: prompt = ( "Paraphrase dan perluas teks berikut menjadi paragraf eksekutif yang jelas, ringkas, dan mudah dibaca. " "Jangan sertakan header." "\n\nTeks:\n" + overview ) out = self._pipeline( prompt, max_length=min(getattr(self.config, "comprehensive_max_length", 512), 350), min_length=40, truncation=True, do_sample=False, ) if isinstance(out, list) and out: candidate = out[0].get("summary_text", "").strip() candidate = self._clean_abstractive_text(candidate) candidate = _collapse_repeated_phrases_global(candidate) candidate = self._normalize_overview_text(candidate) return candidate except Exception: pass return overview def _semantic_deduplicate(self, items: List[str], threshold: Optional[float] = None) -> List[str]: """Deduplicate similar items using sentence-transformer embeddings + cosine similarity. Returns the first occurrence for each semantic group. """ if not items: return [] thr = threshold if threshold is not None else getattr(self.config, "semantic_dedup_threshold", 0.75) # try embeddings try: embs = self._compute_embeddings(items) if embs is not None: from sklearn.metrics.pairwise import cosine_similarity sim = cosine_similarity(embs) n = len(items) taken = set() result = [] for i in range(n): if i in taken: continue result.append(items[i]) for j in range(i + 1, n): if sim[i, j] >= thr: taken.add(j) # If embeddings didn't merge anything useful, fallback to token-jaccard grouping if len(result) == len(items) and len(items) > 1: # token Jaccard token_sets = [set(re.findall(r"\w+", it.lower())) for it in items] taken2 = set() result2 = [] for i in range(len(items)): if i in taken2: continue result2.append(items[i]) for j in range(i + 1, len(items)): if j in taken2: continue si = token_sets[i] sj = token_sets[j] if not si or not sj: continue jacc = len(si & sj) / float(len(si | sj)) if jacc >= 0.45: taken2.add(j) return result2 return result else: raise ValueError("No embeddings") except Exception: # fallback to token-jaccard grouping first (robust when embeddings aren't available) try: token_sets = [set(re.findall(r"\w+", it.lower())) for it in items] taken = set() res = [] for i in range(len(items)): if i in taken: continue res.append(items[i]) si = token_sets[i] for j in range(i + 1, len(items)): if j in taken: continue sj = token_sets[j] if not si or not sj: continue jacc = len(si & sj) / float(len(si | sj)) if jacc >= 0.45: taken.add(j) return res except Exception: # final fallback to naive textual deduplication seen = set() res = [] for it in items: low = re.sub(r"\s+", " ", it.lower()).strip() if low in seen: continue seen.add(low) res.append(it) return res def _semantic_dedup_action_items(self, actions: List[Dict[str, str]], threshold: Optional[float] = None) -> List[Dict[str, str]]: """Deduplicate action items by task text; merge owners when necessary.""" if not actions: return [] tasks = [a.get("task", "") for a in actions] groups = self._semantic_deduplicate(tasks, threshold=threshold) # groups contains first representative tasks; now build merged items merged = [] for rep in groups: owners = [] timestamps = [] dues = set() for a in actions: if a.get("task", "") == rep or (rep and rep in a.get("task", "")): if a.get("owner") and a.get("owner") not in owners: owners.append(a.get("owner")) if a.get("timestamp"): timestamps.append(a.get("timestamp")) if a.get("due"): dues.add(a.get("due")) owner_str = " / ".join(owners) if owners else "TBD" merged.append({ "owner": owner_str, "task": rep, "timestamp": timestamps[0] if timestamps else "", "due": ", ".join(sorted(list(dues))) if dues else "", }) return merged def generate_comprehensive_summary(self, full_text: str, key_points: List[str], decisions: List[str], action_items: List[Dict[str, str]], topics: List[str]) -> (str, List[str]): """Generate a comprehensive executive summary covering the meeting. Uses the abstractive pipeline with a guided prompt built from extracted components. Attempts to request YAML-structured output for reliable parsing; falls back to rule-based assembly. Returns (overview_text, keywords) """ # Build a structured prompt that requests YAML output for safe parsing prompt_parts = [ "Anda adalah asisten yang menulis ringkasan rapat yang komprehensif dan terstruktur.", "Output harus dalam format YAML dengan kunci: overview, key_points (list), decisions (list), action_items (list of {owner, task, due}), keywords (list).", "Berikan overview naratif yang jelas, serta daftar poin penting, keputusan, dan tindak lanjut.", "Topik yang dibahas:", ", ".join(topics) if topics else "-", "Poin-poin penting:\n" + "\n".join([f"- {p}" for p in key_points]) if key_points else "", "Keputusan:\n" + "\n".join([f"- {d}" for d in decisions]) if decisions else "", "Tindak lanjut (Action Items):\n" + "\n".join([f"- [{a.get('owner','TBD')}] {a.get('task','')}" for a in action_items]) if action_items else "", "Tuliskan field 'overview' minimal 80 kata sebagai paragraf naratif yang merangkum seluruh rapat dengan jelas.", "Mohon hasilkan YAML yang valid." ] prompt = "\n\n".join([p for p in prompt_parts if p]) # Sanitize inputs to avoid placeholder tokens and repeated garbage key_points = [self._sanitize_for_prompt(k) for k in key_points if k and k.strip()] decisions = [self._sanitize_for_prompt(d) for d in decisions if d and d.strip()] for a in action_items: a['task'] = self._sanitize_for_prompt(a.get('task','')) # Deduplicate before sending to model try: key_points = self._semantic_deduplicate(key_points) decisions = self._semantic_deduplicate(decisions) except Exception: key_points = list(dict.fromkeys(key_points)) decisions = list(dict.fromkeys(decisions)) # Use pipeline if available try: self._load_model() if self._pipeline is not None: # Try up to 2 attempts: first deterministic, second sampled if repetition/shortness detected attempts = 2 for attempt in range(attempts): gen_kwargs = dict( max_length=getattr(self.config, "comprehensive_max_length", 512), min_length=max(80, int(getattr(self.config, "comprehensive_max_length", 512) * 0.12)), truncation=True, do_sample=False, no_repeat_ngram_size=4, repetition_penalty=1.3, ) if attempt == 1: # more creative generation if deterministic attempt failed gen_kwargs.update({"do_sample": True, "temperature": 0.7, "top_p": 0.9}) out = self._pipeline(prompt, **gen_kwargs) text = out[0].get("summary_text", "").strip() # collapse repeated fragments, then clean text = self._collapse_repeated_phrases(text) cleaned = self._clean_abstractive_text(text) # Quick heuristic checks (repetition, too short, or domain-like web boilerplate -> retry) if self._is_repetitive_text(cleaned) or len(cleaned.split()) < 20 or self._contains_domain_noise(cleaned): # try again (next attempt) with sampling if attempt + 1 < attempts: continue # Attempt to parse structured YAML/JSON overview, keywords = self._parse_structured_output(cleaned, { "key_points": key_points, "decisions": decisions, "action_items": action_items, }) # Final normalization / optional polish overview = self._normalize_overview_text(overview) if getattr(self.config, "polish_overview", True): overview = self._polish_overview(overview, full_text) # Validate overview quality: non-empty, not too short, not repetitive if overview and len(overview.split()) >= 10 and not self._is_repetitive_text(overview): return overview, keywords else: # Try next attempt if available, otherwise break to fallback if attempt + 1 < attempts: continue else: break except Exception: pass # Fallback rule-based assembly: construct a narrative paragraph summarizing meeting, # rather than repeating the list headers. Use polishing to turn it into an executive paragraph. def _format_action_items(ai_list): pairs = [] for a in ai_list: owner = a.get('owner', 'TBD') task = a.get('task', '').strip() if task: pairs.append(f"{owner} akan {task.rstrip('.')}.") return " ".join(pairs) def _join_points(pts): # join key points into a sentence if not pts: return "" # take up to 4 points to avoid overly long lists pts_sample = pts[:4] return "; ".join([p.rstrip('.') for p in pts_sample]) + "" narrative_parts = [] if topics: narrative_parts.append("Topik utama yang dibahas meliputi: " + ", ".join(topics) + ".") if key_points: narrative_parts.append("Beberapa poin penting termasuk: " + _join_points(key_points) + ".") if decisions: narrative_parts.append("Keputusan utama yang dicapai termasuk: " + ", ".join([d.rstrip('.') for d in decisions]) + ".") if action_items: narrative_parts.append("Tindak lanjut yang disepakati di antaranya: " + _format_action_items(action_items)) assembled = " ".join([p for p in narrative_parts if p]).strip() # Normalize and then optionally polish into a smooth executive paragraph assembled = self._normalize_overview_text(assembled) if getattr(self.config, "polish_overview", True): assembled = self._polish_overview(assembled, full_text) keywords = self._generate_keywords(assembled, top_k=8) return assembled, keywords def summarize(self, transcript_segments: List[TranscriptSegment]) -> MeetingSummary: self._load_model() full_text = " ".join([seg.text for seg in transcript_segments if seg.text]) if not full_text.strip(): return MeetingSummary( overview="Tidak ada konten yang dapat diringkas.", key_points=[], decisions=[], action_items=[], ) # Clean up common disfluencies/politeness tokens and ASR annotations full_text = re.sub(r"\[OVERLAP\]|\[NOISE\]|<.*?>", "", full_text) full_text = re.sub( r"\b(oke|ya|oke,|baik|sekarang|sekarang kita|nah|jadi|oke\.|jadi\.)\b", "", full_text, flags=re.IGNORECASE, ) full_text = re.sub(r"\s+", " ", full_text).strip() # Chunk and summarize if self._pipeline is None: # fallback: return first few sentences sentences = BERTSummarizer(self.config)._split_sentences(full_text) overview = " ".join(sentences[: min(3, len(sentences))]) else: chunks = self._chunk_text(full_text) partial_summaries = [] for chunk in chunks: try: out = self._pipeline( chunk, max_length=self.config.max_summary_length, min_length=self.config.min_summary_length, truncation=True, do_sample=False, ) partial_summaries.append(out[0]["summary_text"].strip()) except Exception as e: print(f"[Summarizer] chunk summarization failed: {e}") continue # If multiple partial summaries, join and optionally summarize again combined = " ".join(partial_summaries) if len(combined) > self.config.max_input_chars and self._pipeline: try: out = self._pipeline( combined, max_length=self.config.max_summary_length, min_length=self.config.min_summary_length, truncation=True, do_sample=False, ) overview = out[0]["summary_text"].strip() except Exception: overview = combined else: overview = combined # Clean abstractive overview and produce robust key points (use helper) overview, key_points = self._clean_abstractive_output(overview, full_text) # Extract decisions and actions via keywords sentences = BERTSummarizer(self.config)._split_sentences(full_text) decisions = BERTSummarizer(self.config)._extract_decisions(sentences) action_items = BERTSummarizer(self.config)._extract_action_items(transcript_segments) topics = BERTSummarizer(self.config)._extract_topics(full_text) # Optionally produce a comprehensive overview (uses abstractive pipeline) if getattr(self.config, "comprehensive_overview", False): try: comp_overview, keywords = self.generate_comprehensive_summary(full_text, key_points, decisions, action_items, topics) overview = comp_overview except Exception: keywords = [] ms = MeetingSummary( overview=overview, key_points=key_points, decisions=decisions, action_items=action_items, topics=topics, ) if 'keywords' in locals(): setattr(ms, 'keywords', keywords) return ms class BERTSummarizer: """ Extractive Summarization using BERT sentence embeddings. Selects most important sentences based on semantic similarity to document centroid and other features. Attributes: config: SummarizationConfig object Example: >>> summarizer = BERTSummarizer() >>> summary = summarizer.summarize(transcript_segments) >>> print(summary.overview) >>> print(summary.decisions) """ def __init__(self, config: Optional[SummarizationConfig] = None): """ Initialize BERTSummarizer. Args: config: SummarizationConfig object """ self.config = config or SummarizationConfig() self._model = None def _load_model(self): """Lazy load sentence transformer model""" if self._model is None: try: from sentence_transformers import SentenceTransformer print(f"[Summarizer] Loading model: {self.config.sentence_model_id}") self._model = SentenceTransformer(self.config.sentence_model_id) print("[Summarizer] Model loaded successfully") except Exception as e: print(f"[Summarizer] Warning: Could not load model: {e}") print("[Summarizer] Using fallback mode") self._model = "FALLBACK" def _semantic_deduplicate(self, items: List[str], threshold: Optional[float] = None) -> List[str]: """Delegate to AbstractiveSummarizer semantic dedup for compatibility.""" return AbstractiveSummarizer(self.config)._semantic_deduplicate(items, threshold) def _semantic_dedup_action_items(self, actions: List[Dict[str, str]], threshold: Optional[float] = None) -> List[Dict[str, str]]: """Delegate to AbstractiveSummarizer action-item dedup for compatibility.""" return AbstractiveSummarizer(self.config)._semantic_dedup_action_items(actions, threshold) def _collapse_repeated_phrases(self, text: str, max_ngram: int = 6, min_repeats: int = 2) -> str: """Delegates to module-level collapse helper for compatibility.""" return _collapse_repeated_phrases_global(text, max_ngram=max_ngram, min_repeats=min_repeats) def summarize(self, transcript_segments: List[TranscriptSegment]) -> MeetingSummary: """ Generate meeting summary from transcript. Args: transcript_segments: List of transcript segments with speaker info Returns: MeetingSummary with overview, key points, decisions, and action items """ # If configuration prefers abstractive summarization, delegate to AbstractiveSummarizer if getattr(self.config, "method", "extractive") == "abstractive": try: return AbstractiveSummarizer(self.config).summarize(transcript_segments) except Exception as e: print( f"[Summarizer] Abstractive summarization failed, falling back to extractive: {e}" ) self._load_model() # Combine all text full_text = " ".join([seg.text for seg in transcript_segments if seg.text]) # Clean up disfluencies and annotations commonly appearing in ASR output full_text = re.sub(r"\[OVERLAP\]|\[NOISE\]|<.*?>", "", full_text) full_text = re.sub(r"\s+", " ", full_text).strip() if not full_text.strip(): return MeetingSummary( overview="Tidak ada konten yang dapat diringkas.", key_points=[], decisions=[], action_items=[], ) # Get sentence-level metadata by merging speaker turns sent_meta = self._get_sentences_with_meta(transcript_segments) if not sent_meta: return MeetingSummary( overview="Tidak ada kalimat yang dapat diidentifikasi.", key_points=[], decisions=[], action_items=[], ) sentences = [s["text"] for s in sent_meta] # Compute embeddings and select a diverse set of representative sentences via MMR embeddings = self._compute_embeddings(sentences) num_select = min(max(5, self.config.num_sentences + 2), len(sentences)) if embeddings is not None: selected_idx = self._mmr_selection(sentences, embeddings, k=num_select) key_sentences = [sentences[i] for i in selected_idx] else: # fallback: use earlier scoring key_sentences = self._extract_key_sentences(sentences) # Generate a multi-sentence overview with some ordering and cleaning overview = self._generate_overview(key_sentences[:3]) # Optionally perform a light abstractive refinement on the extractive overview if getattr(self.config, "do_abstractive_refinement", False): try: abs_sum = AbstractiveSummarizer(self.config) abs_sum._load_model() if abs_sum._pipeline is not None and overview: out = abs_sum._pipeline( overview, max_length=getattr(self.config, "abstractive_refine_max_len", 80), min_length=30, truncation=True, do_sample=False, ) # Expect a single summary text if isinstance(out, list) and out: raw_overview = out[0].get("summary_text", overview).strip() # Use AbstractiveSummarizer's cleaning & fallback logic overview_cleaned, _ = abs_sum._clean_abstractive_output(raw_overview, full_text) overview = overview_cleaned except Exception: # Fail silently and use extractive overview pass # Build richer key points: include speaker attribution and short cleaned sentences key_points = [] for i in selected_idx if embeddings is not None else list(range(len(key_sentences))): s = sentences[i] sp = sent_meta[i]["speaker_id"] # Short clean s_clean = re.sub(r"\s+", " ", s).strip() key_points.append(f"{s_clean} (oleh {sp})") # Extract decisions using expanded context (look for decision keywords and enumerations) decisions = [] seen_decisions = set() for i, s in enumerate(sentences): s_clean = re.sub(r"\s+", " ", s).strip() s_lower = s_clean.lower() if any(kw in s_lower for kw in self.config.decision_keywords) or re.match( r"^(pertama|kedua|ketiga|keempat|kelima)\b", s_lower ): context = self._expand_context_for_sentence(sent_meta, i, window=1) dec_text = re.sub(r"\[.*?\]", "", context) dec_text = re.sub(r"\s+", " ", dec_text).strip() # Truncate to a reasonable length (35 words) and remove trailing punctuation words = dec_text.split() dec_text = " ".join(words[:35]).rstrip(" ,.;:") if len(dec_text.split()) < 3: continue if dec_text and dec_text not in seen_decisions: decisions.append(dec_text) seen_decisions.add(dec_text) # If no decisions found, try to extract from key_sentences if not decisions: for ks in key_sentences: if any(kw in ks.lower() for kw in self.config.decision_keywords): if ks not in seen_decisions: decisions.append(ks) seen_decisions.add(ks) # Apply semantic deduplication to decisions try: decisions = self._semantic_deduplicate(decisions) except Exception: pass # Extract action items at sentence level with speaker inference action_items = [] seen_tasks = set() action_kw_re = re.compile( r"\b(" + "|".join([re.escape(k) for k in self.config.action_keywords]) + r")\b", flags=re.IGNORECASE, ) # verbs that indicate an actionable commitment (used to validate generic keyword matches) action_verbs_re = re.compile(r"\b(akan|harus|siapkan|bikin|buat|selesaikan|dikerjakan|tolong|mohon|harap)\b", flags=re.IGNORECASE) for i, s in enumerate(sentences): text = re.sub(r"\[OVERLAP\]|\[NOISE\]|<.*?>", "", s).strip() if not text: continue # explicit commit patterns commit_re = re.compile( r"\b(aku|saya|kami|kita|kamu)\b.*\b(bertanggung jawab|akan|saya akan|aku akan|aku akan membuat|kamu tolong|tolong|siapkan|bikin|harus|selesaikan|dikerjakan)\b", flags=re.IGNORECASE, ) owner = None task = None if commit_re.search(text): owner = sent_meta[i]["speaker_id"] # try to isolate the actionable clause task = re.sub( r"^.*?\b(bertanggung jawab|akan|saya akan|aku akan|kamu tolong|tolong|siapkan|bikin|harus|selesaikan|dikerjakan)\b", "", text, flags=re.IGNORECASE, ) task = task.strip(" .,:;-") if not task: task = text elif action_kw_re.search(text): # Validate generic matches for actionability using helper if not self._is_actionable_text(text): continue owner = sent_meta[i]["speaker_id"] task = text if task: # Normalize task text task = re.sub( r"^\s*(aku|saya|kami|kita|kamu)\b[:,\s]*", "", task, flags=re.IGNORECASE ).strip() task = re.sub(r"\s+", " ", task).strip(" .,:;-") if len(task.split()) < 3: continue filler_short = {"setuju", "oke", "ya", "nah", "betul"} if task.lower() in filler_short: continue key = task.lower()[:120] if key in seen_tasks: continue seen_tasks.add(key) action_items.append( { "owner": owner or "TBD", "task": task, "timestamp": f"{sent_meta[i]['start']:.1f}s", "due": "", } ) # Fall back to segment-level action extraction if none found if not action_items: action_items = self._extract_action_items(transcript_segments) # Apply semantic deduplication to action items (merge owners when possible) try: action_items = self._semantic_dedup_action_items(action_items) except Exception: pass # Extract topics (frequency-based) from cleaned full_text topics = self._extract_topics(full_text) # Optionally produce a comprehensive overview (may use abstractive pipeline) if getattr(self.config, "comprehensive_overview", False): try: abs_s = AbstractiveSummarizer(self.config) comp_overview, keywords = abs_s.generate_comprehensive_summary(full_text, key_points, decisions, action_items, topics) overview = comp_overview except Exception: keywords = [] # Return comprehensive MeetingSummary ms = MeetingSummary( overview=overview, key_points=key_points, decisions=decisions, action_items=action_items, topics=topics, ) if 'keywords' in locals(): setattr(ms, 'keywords', keywords) return ms def _split_sentences(self, text: str) -> List[str]: """Split text into sentences""" # Indonesian sentence splitting # Handle common abbreviations text = re.sub(r"([Dd]r|[Pp]rof|[Bb]pk|[Ii]bu|[Ss]dr|[Nn]o|[Hh]al)\.", r"\1", text) # Split on sentence-ending punctuation sentences = re.split(r"[.!?]+\s*", text) # Restore periods in abbreviations sentences = [s.replace("", ".") for s in sentences] # Clean and filter cleaned = [] for s in sentences: s = s.strip() # Filter by length if len(s) < self.config.min_sentence_length: continue if len(s) > self.config.max_sentence_length: # Truncate very long sentences s = s[: self.config.max_sentence_length] + "..." # Collapse trivial repeated fragments inside sentence s = self._collapse_repeated_phrases(s) cleaned.append(s) return cleaned def _merge_speaker_turns(self, segments: List[TranscriptSegment]) -> List[Dict[str, Any]]: """Merge consecutive segments by the same speaker into 'turns' to provide more context. Returns a list of dicts: {speaker_id, start, end, text, indices} """ turns: List[Dict[str, Any]] = [] for i, seg in enumerate(segments): if not seg.text or not seg.text.strip(): continue # Clean common ASR artifacts and leading fillers text = re.sub(r"\[OVERLAP\]|\[NOISE\]|<.*?>", "", seg.text) text = re.sub( r"^\s*(oke|ya|nah|oke,|baik|sekarang|jadi)\b[\s,:-]*", "", text, flags=re.IGNORECASE ) text = re.sub(r"\s+", " ", text).strip() if not text: continue if turns and turns[-1]["speaker_id"] == seg.speaker_id: turns[-1]["end"] = seg.end turns[-1]["text"] += " " + text turns[-1]["indices"].append(i) else: turns.append( { "speaker_id": seg.speaker_id, "start": seg.start, "end": seg.end, "text": text, "indices": [i], } ) return turns def _get_sentences_with_meta(self, segments: List[TranscriptSegment]) -> List[Dict[str, Any]]: """Split merged speaker turns into sentences and keep metadata.""" turns = self._merge_speaker_turns(segments) sent_meta: List[Dict[str, Any]] = [] for t in turns: sents = self._split_sentences(t["text"]) for j, s in enumerate(sents): sent_meta.append( { "text": s, "speaker_id": t["speaker_id"], "start": t["start"], "end": t["end"], "turn_indices": t["indices"], "sent_idx_in_turn": j, } ) return sent_meta def _compute_embeddings(self, sentences: List[str]): """Compute sentence embeddings using sentence-transformers (lazy load).""" if not sentences: return None try: from sentence_transformers import SentenceTransformer model = SentenceTransformer(self.config.sentence_model_id) embs = model.encode(sentences, show_progress_bar=False) return embs except Exception as e: print(f"[Summarizer] Embedding model error: {e}") return None def _mmr_selection( self, sentences: List[str], embeddings, k: int = 5, lambda_param: float = 0.6 ) -> List[int]: """Maximal Marginal Relevance (MMR) selection for diversity and coverage. Returns list of selected sentence indices in original order. """ import numpy as _np if embeddings is None or len(sentences) <= k: return list(range(min(len(sentences), k))) centroid = _np.mean(embeddings, axis=0) # similarity to centroid sim_to_centroid = _np.dot(embeddings, centroid) / ( _np.linalg.norm(embeddings, axis=1) * (_np.linalg.norm(centroid) + 1e-8) ) selected = [] candidate_indices = list(range(len(sentences))) # pick the top similarity as first first = int(_np.argmax(sim_to_centroid)) selected.append(first) candidate_indices.remove(first) while len(selected) < k and candidate_indices: mmr_scores = [] for idx in candidate_indices: sim_to_sel = max( [ _np.dot(embeddings[idx], embeddings[s]) / (_np.linalg.norm(embeddings[idx]) * _np.linalg.norm(embeddings[s]) + 1e-8) for s in selected ] ) score = lambda_param * sim_to_centroid[idx] - (1 - lambda_param) * sim_to_sel mmr_scores.append((idx, score)) idx_best, _ = max(mmr_scores, key=lambda x: x[1]) selected.append(idx_best) candidate_indices.remove(idx_best) # return in original order selected_sorted = sorted(selected) return selected_sorted def _expand_context_for_sentence( self, sent_meta: List[Dict[str, Any]], idx: int, window: int = 1 ) -> str: """Return concatenated sentence with neighboring contextual sentences for better decision/action extraction.""" start = max(0, idx - window) end = min(len(sent_meta), idx + window + 1) return " ".join([s["text"] for s in sent_meta[start:end]]) def _infer_owner_for_action(self, seg_index: int, sent_meta: List[Dict[str, Any]]) -> str: """Infer owner for an action by looking at the sentence speaker and recent explicit mentions.""" # Prefer sentence speaker if 0 <= seg_index < len(sent_meta): return sent_meta[seg_index]["speaker_id"] return "TBD" def _extract_key_sentences(self, sentences: List[str]) -> List[str]: """Extract most important sentences using BERT embeddings""" if not sentences: return [] # Fallback mode: simple heuristics if self._model == "FALLBACK" or len(sentences) <= self.config.num_sentences: return sentences[: self.config.num_sentences] try: # Get sentence embeddings embeddings = self._model.encode(sentences, show_progress_bar=False) # Calculate document centroid centroid = np.mean(embeddings, axis=0) # Calculate importance scores for each sentence scores = [] for i, (sent, emb) in enumerate(zip(sentences, embeddings)): score = self._calculate_sentence_score( sentence=sent, embedding=emb, centroid=centroid, position=i, total_sentences=len(sentences), ) scores.append((i, score, sent)) # Sort by score scores.sort(key=lambda x: x[1], reverse=True) # Get top-k sentences (maintain original order) top_indices = sorted([s[0] for s in scores[: self.config.num_sentences]]) return [sentences[i] for i in top_indices] except Exception as e: print(f"[Summarizer] Embedding extraction failed: {e}") return sentences[: self.config.num_sentences] def _calculate_sentence_score( self, sentence: str, embedding: np.ndarray, centroid: np.ndarray, position: int, total_sentences: int, ) -> float: """Calculate importance score for a sentence""" # 1. Cosine similarity to centroid similarity = np.dot(embedding, centroid) / ( np.linalg.norm(embedding) * np.linalg.norm(centroid) + 1e-8 ) # 2. Position score (favor beginning and end) if total_sentences > 1: normalized_pos = position / (total_sentences - 1) # U-shaped curve: high at start and end position_score = 1.0 - 0.6 * np.sin(np.pi * normalized_pos) else: position_score = 1.0 # 3. Length score (favor medium-length sentences) word_count = len(sentence.split()) optimal_length = 20 length_score = 1.0 - min(abs(word_count - optimal_length) / 30, 1.0) # 4. Keyword bonus keyword_score = 0.0 sentence_lower = sentence.lower() for kw in self.config.decision_keywords + self.config.action_keywords: if kw in sentence_lower: keyword_score += 0.1 keyword_score = min(keyword_score, 0.3) # Cap bonus # Combined score score = ( self.config.similarity_weight * similarity + self.config.position_weight * position_score + self.config.length_weight * length_score + keyword_score ) return score def _generate_overview(self, key_sentences: List[str]) -> str: """Generate overview from key sentences""" if not key_sentences: return "Tidak ada ringkasan yang dapat dibuat." # Use top 2-3 sentences for overview overview_sentences = key_sentences[: min(3, len(key_sentences))] overview = " ".join(overview_sentences) # Clean up overview = re.sub(r"\s+", " ", overview).strip() return overview def _extract_decisions(self, sentences: List[str]) -> List[str]: """Extract decision-related sentences and synthesize enumerated decisions. This method collects sentence-level decision mentions, attempts to synthesize clauses from enumerated statements (e.g., "Pertama..., Kedua..."), and performs semantic deduplication to avoid repeated/near-duplicate items. """ raw = [] for sent in sentences: sent_lower = sent.lower() # Check for decision keywords if any(kw in sent_lower for kw in self.config.decision_keywords): # Clean the sentence clean_sent = re.sub(r"\s+", " ", sent).strip() if clean_sent and clean_sent not in raw: raw.append(clean_sent) # Try to synthesize enumerated decisions from sentences synthesized = self._synthesize_enumerated_decisions(sentences) all_decisions = raw + synthesized # Deduplicate semantically (Jaccard over tokens) deduped = self._deduplicate_strings(all_decisions) # Limit number of decisions returned return deduped[:7] def _synthesize_enumerated_decisions(self, sentences: List[str]) -> List[str]: """Extract clauses following enumerations like 'Pertama..., Kedua...' and return list. Handles both ordinal words (pertama, kedua, ...) and numbered lists (1., 2.) by splitting and returning non-trivial clauses. """ synth: List[str] = [] enum_words_re = re.compile(r"\b(pertama|kedua|ketiga|keempat|kelima)\b", flags=re.IGNORECASE) for s in sentences: s_clean = s.strip() if enum_words_re.search(s_clean.lower()): # Split by Indonesian ordinal words parts = re.split(r"\bpertama\b|\bkedua\b|\bketiga\b|\bkeempat\b|\bkelima\b", s_clean, flags=re.IGNORECASE) for p in parts: p = p.strip(" .,:;\n-–—") if len(p.split()) >= 3 and p not in synth: synth.append(p) # Also handle simple numbered enumerations like '1. ... 2. ...' if re.search(r"\d+\.\s*", s_clean): parts = re.split(r"\d+\.\s*", s_clean) for p in parts: p = p.strip(" .,:;\n-–—") if len(p.split()) >= 3 and p not in synth: synth.append(p) return synth def _normalize_text_for_dedup(self, text: str) -> str: """Normalize text for lightweight semantic deduplication.""" t = text.lower() # remove punctuation, keep alphanumerics and spaces t = re.sub(r"[^a-z0-9\s]+", "", t) t = re.sub(r"\s+", " ", t).strip() return t def _deduplicate_strings(self, items: List[str], threshold: float = 0.5) -> List[str]: """Deduplicate items using token Jaccard similarity threshold.""" kept: List[str] = [] norms: List[str] = [] for it in items: n = self._normalize_text_for_dedup(it) if not n: continue toks1 = set(n.split()) is_dup = False for other in norms: toks2 = set(other.split()) if not toks1 or not toks2: continue inter = len(toks1 & toks2) union = len(toks1 | toks2) if union > 0 and (inter / union) >= threshold: is_dup = True break if not is_dup: kept.append(it) norms.append(n) return kept def _extract_action_items(self, segments: List[TranscriptSegment]) -> List[Dict[str, str]]: """Extract action items with speaker attribution (improved heuristics) Heuristics: - Detect explicit commitments like "aku akan", "saya bertanggung jawab", "kamu siapkan" and assign owner - Fallback to keyword-based detection - Normalize duplicate tasks and detect simple due-date mentions like "minggu depan", "besok" - Try to infer explicit owner names mentioned in the clause """ action_items: List[Dict[str, str]] = [] seen_tasks = set() # Try to use AdvancedNLPExtractor (NER + dependency parse) for higher-quality extraction try: from src.nlp_utils import AdvancedNLPExtractor extractor = AdvancedNLPExtractor() sent_meta = self._get_sentences_with_meta(segments) nlp_actions = extractor.extract_actions_from_sentences(sent_meta) for item in nlp_actions: task_key = item.get("task", "").lower()[:120] if task_key in seen_tasks: continue seen_tasks.add(task_key) action_items.append( { "owner": item.get("owner", "TBD"), "task": item.get("task", "").strip(), "timestamp": f"{sent_meta[item.get('sentence_idx', 0)]['start']:.1f}s", "due": self._detect_due_from_text(item.get("task", "")), } ) except Exception: extractor = None commit_re = re.compile( r"\b(aku|saya|kami|kita|kamu)\b.*\b(bertanggung jawab|akan|saya akan|aku akan|aku akan membuat|kamu tolong|tolong|siapkan|bikin|harus|selesaikan|dikerjakan)\b", flags=re.IGNORECASE, ) # Actionable verbs/phrases to validate generic keyword matches _action_verbs_re = re.compile(r"\b(akan|harus|siapkan|bikin|buat|selesaikan|dikerjakan|tolong|mohon|harap)\b", flags=re.IGNORECASE) for seg in segments: if not seg.text: continue text = re.sub(r"\[OVERLAP\]|\[NOISE\]|<.*?>", "", seg.text).strip() text_lower = text.lower() # 1) explicit commitment patterns if commit_re.search(text_lower): # Try to extract short actionable clause task = re.sub( r"^.*?(bertanggung jawab|akan|membuat|siapkan|tolong|saya akan|aku akan|kamu tolong)\b", "", text, flags=re.IGNORECASE, ) task = task.strip(" .,:;-") if not task: # fallback to whole segment task = text # Try to detect explicit owner name within the clause (e.g., "Budi akan ...") owner = self._extract_name_as_owner(text) or seg.speaker_id task_key = task.lower()[:120] if task_key not in seen_tasks: seen_tasks.add(task_key) action_items.append( { "owner": owner, "task": task, "timestamp": f"{seg.start:.1f}s", "due": self._detect_due_from_text(task), } ) continue # 2) keyword-based detection if any(kw in text_lower for kw in self.config.action_keywords): # Validate that the segment is actionable (has verbs like 'akan'/'perlu' or explicit name) if not self._is_actionable_text(text): continue owner = self._extract_name_as_owner(text) or seg.speaker_id task = text.strip() task_key = task.lower()[:120] if task_key in seen_tasks: continue seen_tasks.add(task_key) action_items.append( { "owner": owner, "task": task, "timestamp": f"{seg.start:.1f}s", "due": self._detect_due_from_text(task), } ) # Post-process: deduplicate semantically and filter tiny filler tasks processed: List[Dict[str, str]] = [] seen_norms = set() # Filter out filler / non-actionable phrases (e.g., meeting start/thanks) filler_patterns = [ r"\bkita mulai rapat", r"\bitu yang mau kita bahas", r"\bterima kasih", r"\bok(e|ey)?\b", r"\bsip\b", r"\bcukup(kan)? sampai", r"\btidak ada( yang)?\b", r"\biya\b", r"\bsetuju\b", ] filler_re = re.compile("|".join(filler_patterns), flags=re.IGNORECASE) for it in action_items: task_text = it.get("task", "") # Skip common non-actionable conversational lines if filler_re.search(task_text): continue # Ensure the sentence is actionable (has a commitment verb or explicit owner/name) if not self._is_actionable_text(task_text): continue norm = self._normalize_text_for_dedup(task_text)[:200] # skip if too short if len(task_text.split()) < 3: continue if norm in seen_norms: continue seen_norms.add(norm) processed.append(it) # Limit number of action items return processed[:15] def _detect_due_from_text(self, text: str) -> str: """Detect simple due-date hints from text and return a short normalized due string.""" t = text.lower() if "besok" in t: return "besok" if "segera" in t or "secepat" in t or "sekarang" in t: return "segera" if "minggu depan" in t: return "1 minggu" m = re.search(r"(\d+)\s*minggu", t) if m: return f"{m.group(1)} minggu" if "2 minggu" in t or "dua minggu" in t: return "2 minggu" if "deadline" in t: # try to capture a following date/token m2 = re.search(r"deadline\s*[:\-\s]*([\w\-\./]+)", t) return m2.group(1) if m2 else "TBD" return "" def _extract_name_as_owner(self, text: str) -> Optional[str]: """Return a candidate owner name if a capitalized proper name is explicitly present in the clause. Simple heuristic: look for capitalized words (not at sentence start if it's a pronoun) followed by 'akan' or similar. """ m = re.search(r"\b([A-Z][a-z]{2,})\b(?=\s+akan|\s+siapkan|\s+tolong|\s+bisa|\s+bertanggung)", text) if m: return m.group(1) return None def _is_actionable_text(self, text: str) -> bool: """Return True if text contains indicators of an actionable commitment. Indicators: - Commitment verbs (akan, harus, perlu, siapkan, dll.) - Explicit owner mention (capitalized name) - Time indicators / deadlines (besok, minggu depan, deadline) """ t = text or "" tl = t.lower() if re.search(r"\b(akan|harus|siapkan|bikin|buat|selesaikan|dikerjakan|tolong|mohon|harap|perlu)\b", tl): return True # Only consider capitalized names as indicators if followed by an action verb if re.search(r"\b([A-Z][a-z]{2,})\b(?=\s+(akan|siapkan|tolong|mohon|harus|selesaikan|buat|bikin))", t): return True if any(k in tl for k in ("deadline", "minggu depan", "besok")): return True return False def _extract_topics(self, text: str, num_topics: int = 5) -> List[str]: """Extract main topics from text using simple frequency analysis""" # Simple word frequency approach # Remove common Indonesian stopwords stopwords = { "yang", "dan", "di", "ke", "dari", "ini", "itu", "dengan", "untuk", "pada", "adalah", "dalam", "tidak", "akan", "sudah", "juga", "saya", "kita", "kami", "mereka", "ada", "bisa", "atau", "seperti", "jadi", "kalau", "karena", "tapi", "ya", "apa", "bagaimana", "kenapa", "siapa", "kapan", "dimana", "nya", "kan", "dong", "sih", "kok", "deh", "loh", "lah", } # Tokenize and count words = re.findall(r"\b[a-zA-Z]{4,}\b", text.lower()) word_counts = {} for word in words: if word not in stopwords: word_counts[word] = word_counts.get(word, 0) + 1 # Sort by frequency sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True) # Return top topics return [word for word, count in sorted_words[:num_topics]] def summarize_by_speaker(self, segments: List[TranscriptSegment]) -> Dict[str, str]: """Generate per-speaker summary""" # Group segments by speaker speaker_texts = {} for seg in segments: if seg.speaker_id not in speaker_texts: speaker_texts[seg.speaker_id] = [] speaker_texts[seg.speaker_id].append(seg.text) # Summarize each speaker's contribution speaker_summaries = {} for speaker_id, texts in speaker_texts.items(): full_text = " ".join(texts) sentences = self._split_sentences(full_text) if sentences: # Get top 2 sentences for each speaker key_sentences = self._extract_key_sentences(sentences)[:2] speaker_summaries[speaker_id] = " ".join(key_sentences) else: speaker_summaries[speaker_id] = "Tidak ada kontribusi yang dapat diringkas." return speaker_summaries