| from __future__ import annotations |
|
|
| import hashlib |
| import re |
| from typing import List, Dict, Optional |
| from .chunk_schema import CodeChunk, ChunkAST, ChunkSpan, ChunkHierarchy |
|
|
| def _hash_id(text: str, prefix: str) -> str: |
| """ |
| Generate deterministic ID using SHA256 (standardized). |
| |
| Previously used SHA1, now standardized to SHA256 for consistency |
| with repo_chunker.py and id_utils.py. |
| """ |
| |
| h = hashlib.sha256(text.encode("utf-8")).hexdigest()[:8] |
| return f"{prefix}_{h}" |
|
|
|
|
| def _is_actual_code(text: str) -> bool: |
| """ |
| Check if text inside a fenced block is actual executable code |
| or just formatted text. |
| """ |
| text = text.strip() |
| |
| |
| formatted_text_patterns = [ |
| |
| r'^=+\s*[A-Za-z\s]+\s*=+$', |
| r'^-+\s*[A-Za-z\s]+\s*-+$', |
| |
| r'^[=_-]{20,}$', |
| |
| r'\b(the|and|that|this|with|for|are|is|was|were|have|has|had)\b', |
| r'[.!?]\s+[A-Z]', |
| |
| r'^\s*(Human|AI|Tool|System|User|Assistant)\s+(Message|Response|Input|Output)?\s*[:=-]', |
| r'^\s*[A-Z][a-z]+\s*:', |
| ] |
| |
| |
| lines = text.split('\n') |
| formatted_line_count = 0 |
| code_line_count = 0 |
| |
| |
| code_patterns = [ |
| r'^\s*(def|class|import|from|async|await|return|if|for|while|try|except|with)\b', |
| r'^\s*@\w+', |
| r'^\s*\w+\s*=\s*.+', |
| r'^\s*\w+\(.+\)', |
| r'^\s*print\(.+\)', |
| r'^\s*\{.*\}', |
| r'^\s*\[.*\]', |
| ] |
| |
| for line in lines: |
| line = line.strip() |
| if not line: |
| continue |
| |
| |
| is_formatted = any(re.search(pattern, line, re.IGNORECASE) for pattern in formatted_text_patterns) |
| |
| |
| is_code = any(re.search(pattern, line) for pattern in code_patterns) |
| |
| if is_formatted: |
| formatted_line_count += 1 |
| if is_code: |
| code_line_count += 1 |
| |
| |
| if formatted_line_count > 1 and code_line_count == 0: |
| return False |
| |
| |
| return True |
|
|
|
|
| def _looks_like_code_block(lines: List[str]) -> bool: |
| """ |
| Heuristic to recover code blocks when Markdown fences are missing |
| (common after HTML → MD conversion). |
| """ |
| if not lines: |
| return False |
| |
| |
| joined = "\n".join(lines) |
| text = joined.strip() |
| |
| |
| if len(text) < 50: |
| return False |
| |
| |
| code_patterns = [ |
| |
| r'^\s*(def\s+\w+\s*\(|class\s+\w+|import\s+\w+|from\s+\w+\s+import)', |
| |
| r'^\s*\w+\s*=\s*.+|^\s*\w+\s*\(.+\)', |
| |
| r'^\s*(if|for|while|with|try|except|finally|async|await)\s+', |
| |
| r'^\s*@\w+', |
| |
| r'^\s*return\b', |
| |
| r'^\s*print\(', |
| |
| r'^\s{4,}\S', |
| ] |
| |
| |
| prose_indicators = [ |
| |
| r'\b(the|and|that|this|with|for|are|is|was|were|have|has|had)\b', |
| |
| r'[.!?]\s+[A-Z]', |
| |
| r'\b(a|an|the)\s+\w+', |
| ] |
| |
| lines_list = text.split('\n') |
| code_line_count = 0 |
| prose_line_count = 0 |
| |
| for line in lines_list: |
| line = line.strip() |
| if not line: |
| continue |
| |
| |
| is_code = any(re.search(pattern, line) for pattern in code_patterns) |
| |
| |
| is_prose = len(line) > 20 and any(re.search(pattern, line, re.IGNORECASE) for pattern in prose_indicators) |
| |
| if is_code: |
| code_line_count += 1 |
| if is_prose: |
| prose_line_count += 1 |
| |
| |
| total_non_empty_lines = len([l for l in lines_list if l.strip()]) |
| |
| |
| if code_line_count >= 2 and prose_line_count <= code_line_count // 2: |
| return True |
| |
| |
| if total_non_empty_lines <= 3 and code_line_count >= 1 and prose_line_count == 0: |
| return True |
| |
| |
| code_only_patterns = [ |
| r'^\s*from langchain\.', |
| r'^\s*import langchain', |
| r'^\s*@tool\b', |
| r'^\s*agent = create_agent\(', |
| r'^\s*result = agent\.invoke\(', |
| ] |
| |
| if any(re.search(pattern, text) for pattern in code_only_patterns): |
| return True |
| |
| return False |
|
|
|
|
| def _looks_like_executable_code(text: str) -> bool: |
| """Check if code looks like it could be executed""" |
| |
| if not _is_actual_code(text): |
| return False |
| |
| |
| patterns = [ |
| r'\bdef\s+\w+\s*\([^)]*\)\s*:', |
| r'\bclass\s+\w+\s*\(?[^:]*\)?\s*:', |
| r'^\s*from\s+\w+\s+import\s+\w+', |
| r'^\s*import\s+\w+', |
| r'\breturn\b', |
| r'\bprint\(', |
| r'^\s*\w+\s*=\s*[^=\n]+$', |
| ] |
| |
| lines = text.split('\n') |
| executable_lines = 0 |
| |
| for line in lines: |
| line = line.strip() |
| if not line or line.startswith('#') or line.startswith('"""'): |
| continue |
| if any(re.search(pattern, line) for pattern in patterns): |
| executable_lines += 1 |
| |
| |
| return executable_lines >= 2 or ( |
| executable_lines >= 1 and len([l for l in lines if l.strip()]) <= 3 |
| ) |
|
|
|
|
| def chunk_document( |
| raw_text: str, |
| source_name: str, |
| source_url: Optional[str] = None, |
| ) -> List[Dict]: |
| """ |
| Chunk documentation text containing headings, prose, and code examples. |
| |
| Design goals: |
| - Preserve document hierarchy |
| - Separate prose vs code |
| - Recover code even if Markdown fences are lost |
| - Deterministic chunk IDs |
| """ |
|
|
| chunks: List[Dict] = [] |
|
|
| heading_stack: List[str] = [] |
| current_heading: Optional[str] = None |
| current_heading_level: Optional[int] = None |
|
|
| buffer: List[str] = [] |
|
|
| code_block = False |
| code_language: Optional[str] = None |
| code_lines: List[str] = [] |
|
|
| lines = raw_text.splitlines() |
| chunk_index = 0 |
| line_cursor = 0 |
|
|
| def heading_path() -> Optional[str]: |
| return " > ".join(heading_stack) if heading_stack else None |
|
|
| def flush_text(start_line: int, end_line: int): |
| nonlocal buffer, chunk_index |
| if not buffer: |
| return |
|
|
| text = "\n".join(buffer).strip() |
| buffer = [] |
|
|
| if not text: |
| return |
|
|
| lines_local = text.splitlines() |
|
|
| |
| |
| if _looks_like_code_block(lines_local) and len(text) > 30: |
| |
| looks_like_prose = any(word in text.lower() for word in |
| ['the', 'and', 'that', 'this', 'with', 'for', 'are', 'is', 'was']) |
| |
| if not looks_like_prose: |
| chunks.append( |
| { |
| "chunk_id": _hash_id(text, "doc_code"), |
| "source": "documentation", |
| "source_name": source_name, |
| "source_url": source_url, |
| "language": "python", |
| "chunk_type": "code", |
| "content": text, |
| "chunk_index": chunk_index, |
| "metadata": { |
| "heading": current_heading, |
| "heading_level": current_heading_level, |
| "heading_path": heading_path(), |
| "line_start": start_line, |
| "line_end": end_line, |
| "inferred_block": True, |
| }, |
| } |
| ) |
| chunk_index += 1 |
| return |
| |
| |
| chunks.append( |
| { |
| "chunk_id": _hash_id(text, "doc_text"), |
| "source": "documentation", |
| "source_name": source_name, |
| "source_url": source_url, |
| "language": "markdown", |
| "chunk_type": "text", |
| "content": text, |
| "chunk_index": chunk_index, |
| "metadata": { |
| "heading": current_heading, |
| "heading_level": current_heading_level, |
| "heading_path": heading_path(), |
| "line_start": start_line, |
| "line_end": end_line, |
| }, |
| } |
| ) |
| chunk_index += 1 |
|
|
| def flush_code(start_line: int, end_line: int): |
| nonlocal code_lines, code_language, chunk_index |
| if not code_lines: |
| return |
|
|
| code = "\n".join(code_lines) |
| code_lines = [] |
|
|
| |
| is_actual_code = _is_actual_code(code) |
| |
| if is_actual_code: |
| chunks.append( |
| { |
| "chunk_id": _hash_id(code, "doc_code"), |
| "source": "documentation", |
| "source_name": source_name, |
| "source_url": source_url, |
| "language": code_language or "unknown", |
| "chunk_type": "code", |
| "content": code, |
| "chunk_index": chunk_index, |
| "metadata": { |
| "heading": current_heading, |
| "heading_level": current_heading_level, |
| "heading_path": heading_path(), |
| "fenced_block": True, |
| "line_start": start_line, |
| "line_end": end_line, |
| "looks_executable": _looks_like_executable_code(code), |
| }, |
| } |
| ) |
| else: |
| |
| chunks.append( |
| { |
| "chunk_id": _hash_id(code, "doc_text"), |
| "source": "documentation", |
| "source_name": source_name, |
| "source_url": source_url, |
| "language": "markdown", |
| "chunk_type": "text", |
| "content": code, |
| "chunk_index": chunk_index, |
| "metadata": { |
| "heading": current_heading, |
| "heading_level": current_heading_level, |
| "heading_path": heading_path(), |
| "line_start": start_line, |
| "line_end": end_line, |
| "was_fenced_block": True, |
| }, |
| } |
| ) |
|
|
| chunk_index += 1 |
| code_language = None |
|
|
| buffer_start_line = 0 |
| code_start_line = 0 |
|
|
| for i, line in enumerate(lines): |
| line_cursor = i + 1 |
|
|
| |
| m = re.match(r"^(#{2,6})\s+(.*)", line) |
| if not code_block and m: |
| flush_text(buffer_start_line, line_cursor - 1) |
|
|
| level = len(m.group(1)) |
| title = m.group(2).strip() |
|
|
| |
| heading_stack[:] = heading_stack[: level - 2] |
| heading_stack.append(title) |
|
|
| current_heading = title |
| current_heading_level = level |
| buffer_start_line = line_cursor |
| continue |
|
|
| |
| if line.strip().startswith("```"): |
| if not code_block: |
| flush_text(buffer_start_line, line_cursor - 1) |
| code_block = True |
| code_language = line.strip().replace("```", "").strip() or None |
| code_start_line = line_cursor + 1 |
| else: |
| code_block = False |
| flush_code(code_start_line, line_cursor - 1) |
| buffer_start_line = line_cursor + 1 |
| continue |
|
|
| if code_block: |
| code_lines.append(line) |
| else: |
| if not buffer: |
| buffer_start_line = line_cursor |
| buffer.append(line) |
|
|
| flush_text(buffer_start_line, line_cursor) |
| flush_code(code_start_line, line_cursor) |
|
|
| return chunks |
|
|
|
|
| def wrap_doc_chunks(doc_chunks: List[dict]) -> List[CodeChunk]: |
| """ |
| Adapter: convert doc_chunker output (dict) |
| into CodeChunk(documentation). |
| Does NOT affect core doc_chunker parsing logic. |
| """ |
| wrapped: List[CodeChunk] = [] |
|
|
| for d in doc_chunks: |
| wrapped.append( |
| CodeChunk( |
| chunk_id=d["chunk_id"], |
| file_path=d["source_name"], |
| language=d.get("language", "markdown"), |
| chunk_type="documentation", |
| code=d["content"], |
| ast=ChunkAST( |
| symbol_type="documentation", |
| name=d.get("metadata", {}).get("heading"), |
| parent=d.get("metadata", {}).get("heading_path"), |
| ), |
| span=ChunkSpan( |
| start_line=d.get("metadata", {}).get("line_start"), |
| end_line=d.get("metadata", {}).get("line_end"), |
| ), |
| hierarchy=ChunkHierarchy( |
| is_primary=True, |
| is_extracted=True, |
| ), |
| metadata=d.get("metadata", {}), |
| ) |
| ) |
|
|
| return wrapped |