Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import hashlib | |
| import re | |
| from typing import List, Dict, Optional | |
| from .chunk_schema import CodeChunk, ChunkAST, ChunkSpan, ChunkHierarchy | |
| def _hash_id(text: str, prefix: str) -> str: | |
| """ | |
| Generate deterministic ID using SHA256 (standardized). | |
| Previously used SHA1, now standardized to SHA256 for consistency | |
| with repo_chunker.py and id_utils.py. | |
| """ | |
| # CHANGED: sha1 → sha256 | |
| h = hashlib.sha256(text.encode("utf-8")).hexdigest()[:8] | |
| return f"{prefix}_{h}" | |
| def _is_actual_code(text: str) -> bool: | |
| """ | |
| Check if text inside a fenced block is actual executable code | |
| or just formatted text. | |
| """ | |
| text = text.strip() | |
| # Common patterns that indicate formatted text, not code | |
| formatted_text_patterns = [ | |
| # Lines with many = or - characters (dividers) | |
| r'^=+\s*[A-Za-z\s]+\s*=+$', | |
| r'^-+\s*[A-Za-z\s]+\s*-+$', | |
| # Lines that look like headers/separators | |
| r'^[=_-]{20,}$', | |
| # Contains natural language sentences | |
| r'\b(the|and|that|this|with|for|are|is|was|were|have|has|had)\b', | |
| r'[.!?]\s+[A-Z]', # Sentence boundaries | |
| # Message-like patterns | |
| r'^\s*(Human|AI|Tool|System|User|Assistant)\s+(Message|Response|Input|Output)?\s*[:=-]', | |
| r'^\s*[A-Z][a-z]+\s*:', # "Reasoning:", "Acting:", etc. | |
| ] | |
| # Check if it looks like formatted text | |
| lines = text.split('\n') | |
| formatted_line_count = 0 | |
| code_line_count = 0 | |
| # Patterns that indicate actual code | |
| code_patterns = [ | |
| r'^\s*(def|class|import|from|async|await|return|if|for|while|try|except|with)\b', | |
| r'^\s*@\w+', | |
| r'^\s*\w+\s*=\s*.+', | |
| r'^\s*\w+\(.+\)', | |
| r'^\s*print\(.+\)', | |
| r'^\s*\{.*\}', # JSON/dict | |
| r'^\s*\[.*\]', # List | |
| ] | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Check for formatted text patterns | |
| is_formatted = any(re.search(pattern, line, re.IGNORECASE) for pattern in formatted_text_patterns) | |
| # Check for code patterns | |
| is_code = any(re.search(pattern, line) for pattern in code_patterns) | |
| if is_formatted: | |
| formatted_line_count += 1 | |
| if is_code: | |
| code_line_count += 1 | |
| # If it has many formatted text lines and few/no code lines, it's not actual code | |
| if formatted_line_count > 1 and code_line_count == 0: | |
| return False | |
| # Default to treating fenced blocks as code (original behavior) | |
| return True | |
| def _looks_like_code_block(lines: List[str]) -> bool: | |
| """ | |
| Heuristic to recover code blocks when Markdown fences are missing | |
| (common after HTML → MD conversion). | |
| """ | |
| if not lines: | |
| return False | |
| # Join lines and check for minimum length | |
| joined = "\n".join(lines) | |
| text = joined.strip() | |
| # Too short? Probably not code | |
| if len(text) < 50: | |
| return False | |
| # Check for code patterns | |
| code_patterns = [ | |
| # Python keywords at line start | |
| r'^\s*(def\s+\w+\s*\(|class\s+\w+|import\s+\w+|from\s+\w+\s+import)', | |
| # Function calls or assignments | |
| r'^\s*\w+\s*=\s*.+|^\s*\w+\s*\(.+\)', | |
| # Control structures | |
| r'^\s*(if|for|while|with|try|except|finally|async|await)\s+', | |
| # Decorators | |
| r'^\s*@\w+', | |
| # Return statements | |
| r'^\s*return\b', | |
| # Print statements | |
| r'^\s*print\(', | |
| # Indented blocks (common in Python) | |
| r'^\s{4,}\S', | |
| ] | |
| # Check for prose indicators (if these are present, it's likely text) | |
| prose_indicators = [ | |
| # Common English words in prose | |
| r'\b(the|and|that|this|with|for|are|is|was|were|have|has|had)\b', | |
| # Sentence endings followed by capital | |
| r'[.!?]\s+[A-Z]', | |
| # Articles | |
| r'\b(a|an|the)\s+\w+', | |
| ] | |
| lines_list = text.split('\n') | |
| code_line_count = 0 | |
| prose_line_count = 0 | |
| for line in lines_list: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Check if line looks like code | |
| is_code = any(re.search(pattern, line) for pattern in code_patterns) | |
| # Check if line looks like prose (but only if it's not empty/short) | |
| is_prose = len(line) > 20 and any(re.search(pattern, line, re.IGNORECASE) for pattern in prose_indicators) | |
| if is_code: | |
| code_line_count += 1 | |
| if is_prose: | |
| prose_line_count += 1 | |
| # Need strong evidence for code | |
| total_non_empty_lines = len([l for l in lines_list if l.strip()]) | |
| # If more than 2 lines look like code and not many look like prose | |
| if code_line_count >= 2 and prose_line_count <= code_line_count // 2: | |
| return True | |
| # Special case: single strong code line in short text | |
| if total_non_empty_lines <= 3 and code_line_count >= 1 and prose_line_count == 0: | |
| return True | |
| # Check for specific code-only patterns | |
| code_only_patterns = [ | |
| r'^\s*from langchain\.', | |
| r'^\s*import langchain', | |
| r'^\s*@tool\b', # Decorator | |
| r'^\s*agent = create_agent\(', | |
| r'^\s*result = agent\.invoke\(', | |
| ] | |
| if any(re.search(pattern, text) for pattern in code_only_patterns): | |
| return True | |
| return False | |
| def _looks_like_executable_code(text: str) -> bool: | |
| """Check if code looks like it could be executed""" | |
| # First check if it's actually code (not formatted text) | |
| if not _is_actual_code(text): | |
| return False | |
| # Check for actual Python syntax patterns | |
| patterns = [ | |
| r'\bdef\s+\w+\s*\([^)]*\)\s*:', | |
| r'\bclass\s+\w+\s*\(?[^:]*\)?\s*:', | |
| r'^\s*from\s+\w+\s+import\s+\w+', | |
| r'^\s*import\s+\w+', | |
| r'\breturn\b', | |
| r'\bprint\(', | |
| r'^\s*\w+\s*=\s*[^=\n]+$', # Variable assignment | |
| ] | |
| lines = text.split('\n') | |
| executable_lines = 0 | |
| for line in lines: | |
| line = line.strip() | |
| if not line or line.startswith('#') or line.startswith('"""'): | |
| continue | |
| if any(re.search(pattern, line) for pattern in patterns): | |
| executable_lines += 1 | |
| # Need at least 2 executable lines or 1 strong executable line | |
| return executable_lines >= 2 or ( | |
| executable_lines >= 1 and len([l for l in lines if l.strip()]) <= 3 | |
| ) | |
| def chunk_document( | |
| raw_text: str, | |
| source_name: str, | |
| source_url: Optional[str] = None, | |
| ) -> List[Dict]: | |
| """ | |
| Chunk documentation text containing headings, prose, and code examples. | |
| Design goals: | |
| - Preserve document hierarchy | |
| - Separate prose vs code | |
| - Recover code even if Markdown fences are lost | |
| - Deterministic chunk IDs | |
| """ | |
| chunks: List[Dict] = [] | |
| heading_stack: List[str] = [] | |
| current_heading: Optional[str] = None | |
| current_heading_level: Optional[int] = None | |
| buffer: List[str] = [] | |
| code_block = False | |
| code_language: Optional[str] = None | |
| code_lines: List[str] = [] | |
| lines = raw_text.splitlines() | |
| chunk_index = 0 | |
| line_cursor = 0 | |
| def heading_path() -> Optional[str]: | |
| return " > ".join(heading_stack) if heading_stack else None | |
| def flush_text(start_line: int, end_line: int): | |
| nonlocal buffer, chunk_index | |
| if not buffer: | |
| return | |
| text = "\n".join(buffer).strip() | |
| buffer = [] | |
| if not text: | |
| return | |
| lines_local = text.splitlines() | |
| # 🔹 Recover unfenced code blocks - use stricter heuristic | |
| # Only mark as code if it's very clearly code | |
| if _looks_like_code_block(lines_local) and len(text) > 30: | |
| # Double-check: make sure it doesn't look like prose | |
| looks_like_prose = any(word in text.lower() for word in | |
| ['the', 'and', 'that', 'this', 'with', 'for', 'are', 'is', 'was']) | |
| if not looks_like_prose: | |
| chunks.append( | |
| { | |
| "chunk_id": _hash_id(text, "doc_code"), | |
| "source": "documentation", | |
| "source_name": source_name, | |
| "source_url": source_url, | |
| "language": "python", | |
| "chunk_type": "code", | |
| "content": text, | |
| "chunk_index": chunk_index, | |
| "metadata": { | |
| "heading": current_heading, | |
| "heading_level": current_heading_level, | |
| "heading_path": heading_path(), | |
| "line_start": start_line, | |
| "line_end": end_line, | |
| "inferred_block": True, | |
| }, | |
| } | |
| ) | |
| chunk_index += 1 | |
| return | |
| # Default to text | |
| chunks.append( | |
| { | |
| "chunk_id": _hash_id(text, "doc_text"), | |
| "source": "documentation", | |
| "source_name": source_name, | |
| "source_url": source_url, | |
| "language": "markdown", | |
| "chunk_type": "text", | |
| "content": text, | |
| "chunk_index": chunk_index, | |
| "metadata": { | |
| "heading": current_heading, | |
| "heading_level": current_heading_level, | |
| "heading_path": heading_path(), | |
| "line_start": start_line, | |
| "line_end": end_line, | |
| }, | |
| } | |
| ) | |
| chunk_index += 1 | |
| def flush_code(start_line: int, end_line: int): | |
| nonlocal code_lines, code_language, chunk_index | |
| if not code_lines: | |
| return | |
| code = "\n".join(code_lines) | |
| code_lines = [] | |
| # Check if this is actually code or just formatted text | |
| is_actual_code = _is_actual_code(code) | |
| if is_actual_code: | |
| chunks.append( | |
| { | |
| "chunk_id": _hash_id(code, "doc_code"), | |
| "source": "documentation", | |
| "source_name": source_name, | |
| "source_url": source_url, | |
| "language": code_language or "unknown", | |
| "chunk_type": "code", | |
| "content": code, | |
| "chunk_index": chunk_index, | |
| "metadata": { | |
| "heading": current_heading, | |
| "heading_level": current_heading_level, | |
| "heading_path": heading_path(), | |
| "fenced_block": True, | |
| "line_start": start_line, | |
| "line_end": end_line, | |
| "looks_executable": _looks_like_executable_code(code), | |
| }, | |
| } | |
| ) | |
| else: | |
| # It's formatted text, not actual code | |
| chunks.append( | |
| { | |
| "chunk_id": _hash_id(code, "doc_text"), | |
| "source": "documentation", | |
| "source_name": source_name, | |
| "source_url": source_url, | |
| "language": "markdown", | |
| "chunk_type": "text", | |
| "content": code, | |
| "chunk_index": chunk_index, | |
| "metadata": { | |
| "heading": current_heading, | |
| "heading_level": current_heading_level, | |
| "heading_path": heading_path(), | |
| "line_start": start_line, | |
| "line_end": end_line, | |
| "was_fenced_block": True, # Note: was in ``` but isn't code | |
| }, | |
| } | |
| ) | |
| chunk_index += 1 | |
| code_language = None | |
| buffer_start_line = 0 | |
| code_start_line = 0 | |
| for i, line in enumerate(lines): | |
| line_cursor = i + 1 | |
| # ---- Heading detection ---- | |
| m = re.match(r"^(#{2,6})\s+(.*)", line) | |
| if not code_block and m: | |
| flush_text(buffer_start_line, line_cursor - 1) | |
| level = len(m.group(1)) | |
| title = m.group(2).strip() | |
| # Maintain heading stack | |
| heading_stack[:] = heading_stack[: level - 2] | |
| heading_stack.append(title) | |
| current_heading = title | |
| current_heading_level = level | |
| buffer_start_line = line_cursor | |
| continue | |
| # ---- Code fence detection ---- | |
| if line.strip().startswith("```"): | |
| if not code_block: | |
| flush_text(buffer_start_line, line_cursor - 1) | |
| code_block = True | |
| code_language = line.strip().replace("```", "").strip() or None | |
| code_start_line = line_cursor + 1 | |
| else: | |
| code_block = False | |
| flush_code(code_start_line, line_cursor - 1) | |
| buffer_start_line = line_cursor + 1 | |
| continue | |
| if code_block: | |
| code_lines.append(line) | |
| else: | |
| if not buffer: | |
| buffer_start_line = line_cursor | |
| buffer.append(line) | |
| flush_text(buffer_start_line, line_cursor) | |
| flush_code(code_start_line, line_cursor) | |
| return chunks | |
| def wrap_doc_chunks(doc_chunks: List[dict]) -> List[CodeChunk]: | |
| """ | |
| Adapter: convert doc_chunker output (dict) | |
| into CodeChunk(documentation). | |
| Does NOT affect core doc_chunker parsing logic. | |
| """ | |
| wrapped: List[CodeChunk] = [] | |
| for d in doc_chunks: | |
| wrapped.append( | |
| CodeChunk( | |
| chunk_id=d["chunk_id"], | |
| file_path=d["source_name"], | |
| language=d.get("language", "markdown"), | |
| chunk_type="documentation", | |
| code=d["content"], | |
| ast=ChunkAST( | |
| symbol_type="documentation", | |
| name=d.get("metadata", {}).get("heading"), | |
| parent=d.get("metadata", {}).get("heading_path"), | |
| ), | |
| span=ChunkSpan( | |
| start_line=d.get("metadata", {}).get("line_start"), | |
| end_line=d.get("metadata", {}).get("line_end"), | |
| ), | |
| hierarchy=ChunkHierarchy( | |
| is_primary=True, | |
| is_extracted=True, | |
| ), | |
| metadata=d.get("metadata", {}), | |
| ) | |
| ) | |
| return wrapped |