Spaces:
Building
Building
| """ | |
| POS and dependency parsing backend module. | |
| Handles multilingual part-of-speech tagging and dependency parsing. | |
| """ | |
| import spacy | |
| import pandas as pd | |
| from typing import Dict, List, Tuple, Optional, Union, Any | |
| from pathlib import Path | |
| import logging | |
| import tempfile | |
| import base64 | |
| from io import BytesIO | |
| import zipfile | |
| from .base_analyzer import BaseAnalyzer | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class POSParser(BaseAnalyzer): | |
| """ | |
| Main class for POS tagging and dependency parsing. | |
| Handles multilingual analysis and visualization. | |
| Inherits from BaseAnalyzer for consistent SpaCy model management. | |
| """ | |
| def __init__(self, language: str = "en", model_size: str = "trf", gpu_device: Optional[int] = None): | |
| """ | |
| Initialize parser with specified language and model. | |
| Args: | |
| language (str): Language code ('en' for English, 'ja' for Japanese) | |
| model_size (str): SpaCy model size ('trf' or 'md') | |
| gpu_device (int, optional): GPU device ID to use (None for auto-detect, -1 for CPU only) | |
| """ | |
| super().__init__(language, model_size, gpu_device) | |
| def analyze_text(self, text: str) -> Dict: | |
| """ | |
| Analyze text and return POS tagging and dependency parsing results. | |
| Args: | |
| text: Input text to analyze | |
| Returns: | |
| Dictionary containing analysis results | |
| """ | |
| # Process text using base class method | |
| doc = self.process_document(text) | |
| # Extract token information | |
| token_data = [] | |
| for token in doc: | |
| # Skip spaces but include punctuation for complete analysis | |
| if not token.is_space: | |
| token_info = { | |
| 'Token': token.text, | |
| 'Lemma': token.lemma_, | |
| 'POS': token.pos_, | |
| 'Tag': token.tag_, | |
| 'Dependency': token.dep_, | |
| 'Named Entity': token.ent_type_ if token.ent_type_ else '-' | |
| } | |
| token_data.append(token_info) | |
| # Create DataFrame | |
| df = pd.DataFrame(token_data) | |
| # Prepare sentence-level analysis for visualization | |
| sentences = [] | |
| for sent in doc.sents: | |
| # Limit to 30 words per sentence as per specification | |
| sent_tokens = [token for token in sent if not token.is_space] | |
| if len(sent_tokens) > 30: | |
| sent_tokens = sent_tokens[:30] | |
| sentence_info = { | |
| 'text': sent.text, | |
| 'tokens': sent_tokens, | |
| 'length': len(sent_tokens) | |
| } | |
| sentences.append(sentence_info) | |
| results = { | |
| 'token_analysis': df, | |
| 'sentences': sentences, | |
| 'statistics': { | |
| 'total_tokens': len(token_data), | |
| 'total_sentences': len(sentences), | |
| 'unique_pos_tags': len(df['POS'].unique()), | |
| 'unique_dependencies': len(df['Dependency'].unique()), | |
| 'named_entities': len([t for t in token_data if t['Named Entity'] != '-']) | |
| } | |
| } | |
| return results | |
| def generate_displacy_html(self, text: str, style: str = "dep") -> List[str]: | |
| """ | |
| Generate DisplaCy visualization HTML for sentences. | |
| Args: | |
| text: Input text to visualize | |
| style: Visualization style ('dep' for dependency, 'ent' for entities) | |
| Returns: | |
| List of HTML strings, one per sentence | |
| """ | |
| # Process text using base class method | |
| doc = self.process_document(text) | |
| html_outputs = [] | |
| for sent in doc.sents: | |
| # Limit to 30 words per sentence | |
| sent_tokens = [token for token in sent if not token.is_space] | |
| if len(sent_tokens) > 30: | |
| # Create a truncated span | |
| truncated_doc = self.nlp(sent.text) | |
| truncated_tokens = [token for token in truncated_doc if not token.is_space][:30] | |
| # Reconstruct text from first 30 tokens | |
| truncated_text = "" | |
| for token in truncated_tokens: | |
| truncated_text += token.text_with_ws | |
| truncated_doc = self.nlp(truncated_text) | |
| sent_to_visualize = list(truncated_doc.sents)[0] | |
| else: | |
| sent_to_visualize = sent | |
| try: | |
| # Generate HTML using displaCy | |
| html = spacy.displacy.render( | |
| sent_to_visualize, | |
| style=style, | |
| options={ | |
| "fine_grained": True, | |
| "add_lemma": True, | |
| "collapse_punct": False, | |
| "compact": True, | |
| "bg": "#F5F9FA", | |
| "color": "#000000", | |
| "font": "Arial", | |
| } | |
| ) | |
| html_outputs.append(html) | |
| except Exception as e: | |
| logger.error(f"Error generating displaCy visualization: {e}") | |
| html_outputs.append(f"<p>Error generating visualization for sentence: {sent.text[:100]}...</p>") | |
| return html_outputs | |
| def analyze_batch(self, file_paths: List[str], progress_callback=None) -> bytes: | |
| """ | |
| Analyze multiple text files and return results as a ZIP file. | |
| Args: | |
| file_paths: List of paths to text files | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| ZIP file bytes containing TSV results | |
| """ | |
| # Create temporary directory for results | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| result_files = [] | |
| for i, file_path in enumerate(file_paths): | |
| try: | |
| # Read file | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| # Analyze text | |
| results = self.analyze_text(text) | |
| # Save as TSV | |
| filename = Path(file_path).stem + '.tsv' | |
| output_path = Path(temp_dir) / filename | |
| results['token_analysis'].to_csv( | |
| output_path, | |
| sep='\t', | |
| index=False, | |
| encoding='utf-8' | |
| ) | |
| result_files.append(output_path) | |
| if progress_callback: | |
| progress_callback(i + 1, len(file_paths)) | |
| except Exception as e: | |
| logger.error(f"Error processing file {file_path}: {e}") | |
| # Create error file | |
| error_filename = Path(file_path).stem + '_ERROR.txt' | |
| error_path = Path(temp_dir) / error_filename | |
| with open(error_path, 'w', encoding='utf-8') as f: | |
| f.write(f"Error processing {file_path}: {e}") | |
| result_files.append(error_path) | |
| if progress_callback: | |
| progress_callback(i + 1, len(file_paths)) | |
| # Create ZIP file | |
| zip_buffer = BytesIO() | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| for file_path in result_files: | |
| zip_file.write(file_path, file_path.name) | |
| zip_buffer.seek(0) | |
| return zip_buffer.getvalue() | |
| # Construction Extraction Methods (from session-12.ipynb) | |
| def extract_by_simple_dependency(self, result_dictionary: dict, token, dep_rel: str, index_name: str): | |
| """Extract token when it has specific dependency relation.""" | |
| if token.dep_ == dep_rel: | |
| self._update_results(index_name, result_dictionary) | |
| def extract_by_pos(self, result_dictionary: dict, token, pos: str, index_name: str): | |
| """Extract token when it has specific POS tag.""" | |
| if token.pos_ == pos: | |
| self._update_results(index_name, result_dictionary) | |
| def extract_by_tag(self, result_dictionary: dict, token, tag: str, index_name: str): | |
| """Extract token when it has specific fine-grained tag.""" | |
| if token.tag_ == tag: | |
| self._update_results(index_name, result_dictionary) | |
| def extract_by_dependency_and_head_pos(self, result_dictionary: dict, token, dep_rel: str, head_pos: str, index_name: str): | |
| """Extract token when it has specific dependency relation AND its head has specific POS.""" | |
| if token.dep_ == dep_rel and token.head.pos_ == head_pos: | |
| self._update_results(index_name, result_dictionary) | |
| def extract_by_dependency_and_child_features(self, result_dictionary: dict, token, dep_rel: str, child_lemma: str, child_pos: str, index_name: str): | |
| """Extract token when it has specific dependency AND has a child with specific lemma and POS.""" | |
| if token.dep_ == dep_rel: | |
| for child in token.children: | |
| if child.lemma_ == child_lemma and child.pos_ == child_pos: | |
| self._update_results(index_name, result_dictionary) | |
| break # Found one match, don't count multiple times | |
| def _update_results(self, index_name: str, result_dictionary: dict): | |
| """Helper method to update results dictionary.""" | |
| if index_name in result_dictionary: | |
| result_dictionary[index_name] += 1 | |
| else: | |
| result_dictionary[index_name] = 1 | |
| def run_construction_extraction(self, text: str, rule_list: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Execute multiple extraction rules on the text. | |
| Args: | |
| text: Input text to analyze | |
| rule_list: List of rule dictionaries, each containing 'function' and 'params' | |
| Returns: | |
| Dictionary containing extraction results and diagnostic information | |
| """ | |
| # Process text using base class method | |
| doc = self.process_document(text) | |
| # Initialize results | |
| extraction_results = {} | |
| sentence_diagnostics = [] | |
| # Process each sentence | |
| for sent_idx, sent in enumerate(doc.sents): | |
| sent_results = {} | |
| matched_tokens_info = [] | |
| # Apply all rules to each token in the sentence | |
| for token in sent: | |
| for rule in rule_list: | |
| function_name = rule['function'] | |
| params = rule['params'] | |
| # Store count before applying rule | |
| before_count = sent_results.get(params['index_name'], 0) | |
| # Apply the extraction rule | |
| self._apply_extraction_rule(sent_results, token, function_name, params) | |
| # Check if this token matched | |
| after_count = sent_results.get(params['index_name'], 0) | |
| if after_count > before_count: | |
| matched_tokens_info.append({ | |
| 'token': token.text, | |
| 'lemma': token.lemma_, | |
| 'pos': token.pos_, | |
| 'tag': token.tag_, | |
| 'dep': token.dep_, | |
| 'head': token.head.text, | |
| 'head_pos': token.head.pos_, | |
| 'children': [child.text for child in token.children], | |
| 'sentence_position': token.i, | |
| 'rule_matched': params['index_name'] | |
| }) | |
| # Add sentence diagnostic information | |
| sentence_diagnostics.append({ | |
| 'sentence_idx': sent_idx, | |
| 'sentence_text': sent.text, | |
| 'results': sent_results.copy(), | |
| 'matched_tokens': matched_tokens_info, | |
| 'total_tokens': len([t for t in sent if not t.is_space]) | |
| }) | |
| # Merge results into global extraction results | |
| for key, value in sent_results.items(): | |
| extraction_results[key] = extraction_results.get(key, 0) + value | |
| # Create summary DataFrame | |
| summary_data = [] | |
| for rule_name, count in extraction_results.items(): | |
| summary_data.append({ | |
| 'Rule Name': rule_name, | |
| 'Total Matches': count, | |
| 'Matches per Sentence': round(count / len(list(doc.sents)), 2) if len(list(doc.sents)) > 0 else 0 | |
| }) | |
| summary_df = pd.DataFrame(summary_data) | |
| return { | |
| 'extraction_results': extraction_results, | |
| 'sentence_diagnostics': sentence_diagnostics, | |
| 'summary_dataframe': summary_df, | |
| 'statistics': { | |
| 'total_sentences': len(sentence_diagnostics), | |
| 'total_rules_applied': len(rule_list), | |
| 'total_matches': sum(extraction_results.values()) | |
| } | |
| } | |
| def _apply_extraction_rule(self, result_dict: dict, token, function_name: str, params: dict): | |
| """Apply the specified extraction rule to a token.""" | |
| if function_name == 'extract_by_simple_dependency': | |
| self.extract_by_simple_dependency(result_dict, token, params['dep_rel'], params['index_name']) | |
| elif function_name == 'extract_by_pos': | |
| self.extract_by_pos(result_dict, token, params['pos'], params['index_name']) | |
| elif function_name == 'extract_by_tag': | |
| self.extract_by_tag(result_dict, token, params['tag'], params['index_name']) | |
| elif function_name == 'extract_by_dependency_and_head_pos': | |
| self.extract_by_dependency_and_head_pos(result_dict, token, params['dep_rel'], params['head_pos'], params['index_name']) | |
| elif function_name == 'extract_by_dependency_and_child_features': | |
| self.extract_by_dependency_and_child_features(result_dict, token, params['dep_rel'], params['child_lemma'], params['child_pos'], params['index_name']) | |
| def test_rule(self, sentence: str, function_name: str, params: dict, show_visual: bool = True) -> Dict[str, Any]: | |
| """ | |
| Test a single extraction rule on a sentence and show the results with visual parsing. | |
| Args: | |
| sentence: Input sentence to test | |
| function_name: Name of the extraction function to test | |
| params: Parameters for the extraction function | |
| show_visual: Whether to generate visual parsing information | |
| Returns: | |
| Dictionary with matched token information and visual data | |
| """ | |
| # Parse the sentence | |
| doc = self.nlp(sentence) | |
| # Create temporary results dictionary | |
| test_results = {} | |
| # Store matched tokens for detailed output | |
| matched_tokens = [] | |
| matched_indices = [] # Track which token positions matched | |
| # Test the rule on each token | |
| for token in doc: | |
| before_count = test_results.get(params.get('index_name', 'test'), 0) | |
| self._apply_extraction_rule(test_results, token, function_name, params) | |
| after_count = test_results.get(params.get('index_name', 'test'), 0) | |
| if after_count > before_count: | |
| matched_tokens.append({ | |
| 'token': token.text, | |
| 'lemma': token.lemma_, | |
| 'pos': token.pos_, | |
| 'tag': token.tag_, | |
| 'dep': token.dep_, | |
| 'head': token.head.text, | |
| 'head_pos': token.head.pos_, | |
| 'children': [child.text for child in token.children], | |
| 'sentence_position': token.i | |
| }) | |
| matched_indices.append(token.i) | |
| # Generate visual information if requested | |
| visual_info = None | |
| if show_visual: | |
| try: | |
| # Generate HTML with highlighted matched tokens | |
| options = { | |
| "compact": True, | |
| "color": {str(i): "#ff6b6b" for i in matched_indices} | |
| } | |
| visual_info = spacy.displacy.render(doc, style="dep", options=options) | |
| except Exception as e: | |
| logger.error(f"Error generating visual parsing: {e}") | |
| visual_info = None | |
| return { | |
| 'matched_tokens': matched_tokens, | |
| 'total_matches': len(matched_tokens), | |
| 'sentence': sentence, | |
| 'rule_info': {'function': function_name, 'params': params}, | |
| 'visual_html': visual_info, | |
| 'sentence_structure': self._get_sentence_structure(doc) | |
| } | |
| def _get_sentence_structure(self, doc) -> List[Dict[str, Any]]: | |
| """Get detailed token analysis for sentence structure display.""" | |
| structure = [] | |
| for token in doc: | |
| structure.append({ | |
| 'position': token.i, | |
| 'token': token.text, | |
| 'lemma': token.lemma_, | |
| 'pos': token.pos_, | |
| 'tag': token.tag_, | |
| 'dep': token.dep_, | |
| 'head': token.head.text, | |
| 'children': [child.text for child in token.children] | |
| }) | |
| return structure | |
| def generate_construction_visual(self, text: str, rule_list: List[Dict[str, Any]]) -> str: | |
| """ | |
| Generate displaCy visualization with enhanced highlighting for matched tokens and their relationships. | |
| Args: | |
| text: Input text to visualize | |
| rule_list: List of rules to apply | |
| Returns: | |
| HTML string with dependency visualization including custom styling | |
| """ | |
| doc = self.nlp(text) | |
| # Collect all matched token indices with their rule names | |
| all_matches = {} # {token_index: [rule_names]} | |
| rule_colors = {} # {rule_name: color} | |
| # Enhanced color palette with better contrast | |
| colors = [ | |
| "#FF4444", # Bright red | |
| "#44AA44", # Green | |
| "#4488FF", # Blue | |
| "#FF8844", # Orange | |
| "#AA44AA", # Purple | |
| "#44AAAA", # Teal | |
| "#FFAA44", # Golden | |
| "#AA4444", # Dark red | |
| ] | |
| for rule_idx, rule in enumerate(rule_list): | |
| function_name = rule['function'] | |
| params = rule['params'] | |
| rule_name = params['index_name'] | |
| temp_results = {} | |
| # Assign color to this rule | |
| rule_colors[rule_name] = colors[rule_idx % len(colors)] | |
| for token in doc: | |
| before = temp_results.get(params['index_name'], 0) | |
| self._apply_extraction_rule(temp_results, token, function_name, params) | |
| after = temp_results.get(params['index_name'], 0) | |
| if after > before: | |
| if token.i not in all_matches: | |
| all_matches[token.i] = [] | |
| all_matches[token.i].append(rule_name) | |
| # Create enhanced displaCy options | |
| options = { | |
| "compact": True, | |
| "bg": "#F8F9FA", # Light background | |
| "color": {}, | |
| "font": "Arial, sans-serif", | |
| "distance": 120, # More space between tokens | |
| "arrow_stroke": 2, | |
| "arrow_width": 10, | |
| "collapse_punct": False, | |
| "fine_grained": False | |
| } | |
| # Assign colors to matched tokens with enhanced styling | |
| for token_idx, rule_names in all_matches.items(): | |
| # Use the first rule's color | |
| primary_rule = rule_names[0] | |
| options["color"][str(token_idx)] = rule_colors[primary_rule] | |
| try: | |
| # Generate base HTML | |
| html = spacy.displacy.render(doc, style="dep", options=options) | |
| # Add custom CSS for enhanced styling | |
| enhanced_html = self._add_enhanced_styling(html, all_matches, rule_colors, rule_list) | |
| return enhanced_html | |
| except Exception as e: | |
| logger.error(f"Error generating construction visual: {e}") | |
| return f"<p>Error generating visualization: {e}</p>" | |
| def _add_enhanced_styling(self, base_html: str, all_matches: dict, rule_colors: dict, rule_list: List[Dict[str, Any]]) -> str: | |
| """ | |
| Add enhanced CSS styling to the displaCy HTML for better visualization. | |
| Args: | |
| base_html: Base HTML from displaCy | |
| all_matches: Dictionary of matched token indices and rule names | |
| rule_colors: Dictionary mapping rule names to colors | |
| rule_list: List of extraction rules | |
| Returns: | |
| Enhanced HTML with custom styling | |
| """ | |
| # Create legend HTML | |
| legend_items = [] | |
| for rule in rule_list: | |
| rule_name = rule['params']['index_name'] | |
| color = rule_colors.get(rule_name, "#888888") | |
| legend_items.append(f""" | |
| <div style="display: inline-block; margin-right: 15px; margin-bottom: 5px;"> | |
| <span style="display: inline-block; width: 12px; height: 12px; background-color: {color}; | |
| border-radius: 50%; margin-right: 5px; border: 1px solid #333;"></span> | |
| <span style="font-size: 12px; font-weight: bold;">{rule_name}</span> | |
| </div> | |
| """) | |
| legend_html = f""" | |
| <div style="margin-bottom: 15px; padding: 10px; background-color: #F0F0F0; border-radius: 5px; | |
| border: 1px solid #DDD;"> | |
| <div style="font-weight: bold; margin-bottom: 8px; color: #333;">📖 Rule Legend:</div> | |
| <div style="display: flex; flex-wrap: wrap;"> | |
| {''.join(legend_items)} | |
| </div> | |
| </div> | |
| """ | |
| # Enhanced CSS for better visualization | |
| enhanced_css = """ | |
| <style> | |
| /* Enhanced styling for matched tokens */ | |
| .displacy-token[data-tag] { | |
| transition: all 0.3s ease; | |
| } | |
| /* Make dependency arcs more prominent for matched tokens */ | |
| .displacy svg .displacy-arc { | |
| stroke-width: 2px; | |
| transition: all 0.3s ease; | |
| } | |
| /* Enhanced token styling */ | |
| .displacy-token { | |
| font-weight: bold; | |
| border-radius: 3px; | |
| padding: 2px 4px; | |
| margin: 1px; | |
| } | |
| /* Highlight matched tokens with background */ | |
| .matched-token { | |
| background-color: rgba(255, 255, 0, 0.2) !important; | |
| border: 2px solid currentColor !important; | |
| box-shadow: 0 0 5px rgba(0, 0, 0, 0.3); | |
| font-weight: bold !important; | |
| } | |
| /* Arc highlighting */ | |
| .highlighted-arc { | |
| stroke-width: 3px !important; | |
| filter: drop-shadow(0 0 2px rgba(0, 0, 0, 0.5)); | |
| } | |
| /* Improve label visibility */ | |
| .displacy-label { | |
| font-size: 11px; | |
| font-weight: bold; | |
| fill: #333 !important; | |
| text-shadow: 1px 1px 1px rgba(255, 255, 255, 0.8); | |
| } | |
| /* Container styling */ | |
| .displacy-container { | |
| border: 1px solid #DDD; | |
| border-radius: 8px; | |
| padding: 20px; | |
| background-color: #FAFAFA; | |
| box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); | |
| } | |
| </style> | |
| """ | |
| # Add JavaScript to enhance interactivity | |
| enhanced_js = f""" | |
| <script> | |
| document.addEventListener('DOMContentLoaded', function() {{ | |
| // Add matched token classes and enhance arcs | |
| const matchedTokens = {list(all_matches.keys())}; | |
| const ruleColors = {dict(rule_colors)}; | |
| const allMatches = {dict(all_matches)}; | |
| // Find and style matched tokens | |
| matchedTokens.forEach(function(tokenIdx) {{ | |
| const tokenElements = document.querySelectorAll('.displacy-token'); | |
| if (tokenElements[tokenIdx]) {{ | |
| const tokenElement = tokenElements[tokenIdx]; | |
| tokenElement.classList.add('matched-token'); | |
| // Set border color based on rule | |
| const rules = allMatches[tokenIdx]; | |
| if (rules && rules.length > 0) {{ | |
| const primaryRule = rules[0]; | |
| const color = ruleColors[primaryRule]; | |
| tokenElement.style.borderColor = color; | |
| tokenElement.style.color = color; | |
| }} | |
| }} | |
| }}); | |
| // Enhance arcs connected to matched tokens | |
| const arcs = document.querySelectorAll('.displacy-arc'); | |
| arcs.forEach(function(arc, arcIdx) {{ | |
| // This is a simplified approach - you might need to adjust based on displaCy's DOM structure | |
| const pathElement = arc.querySelector('path'); | |
| if (pathElement) {{ | |
| // Check if this arc connects to a matched token | |
| matchedTokens.forEach(function(tokenIdx) {{ | |
| // Simple heuristic - enhance arcs that might connect to matched tokens | |
| if (arcIdx <= matchedTokens.length) {{ | |
| pathElement.classList.add('highlighted-arc'); | |
| const rules = allMatches[tokenIdx]; | |
| if (rules && rules.length > 0) {{ | |
| const color = ruleColors[rules[0]]; | |
| pathElement.style.stroke = color; | |
| pathElement.style.opacity = '0.8'; | |
| }} | |
| }} | |
| }}); | |
| }} | |
| }}); | |
| }}); | |
| </script> | |
| """ | |
| # Combine everything | |
| enhanced_html = f""" | |
| <div class="displacy-container"> | |
| {legend_html} | |
| {enhanced_css} | |
| {base_html} | |
| {enhanced_js} | |
| </div> | |
| """ | |
| return enhanced_html | |
| def get_available_extraction_functions() -> Dict[str, Dict[str, Any]]: | |
| """Get mapping of available extraction functions and their required parameters.""" | |
| return { | |
| 'extract_by_simple_dependency': { | |
| 'params': ['dep_rel', 'index_name'], | |
| 'description': 'Extract tokens by dependency relation only' | |
| }, | |
| 'extract_by_pos': { | |
| 'params': ['pos', 'index_name'], | |
| 'description': 'Extract tokens by POS tag' | |
| }, | |
| 'extract_by_tag': { | |
| 'params': ['tag', 'index_name'], | |
| 'description': 'Extract tokens by fine-grained POS tag' | |
| }, | |
| 'extract_by_dependency_and_head_pos': { | |
| 'params': ['dep_rel', 'head_pos', 'index_name'], | |
| 'description': 'Extract tokens by dependency relation AND head POS' | |
| }, | |
| 'extract_by_dependency_and_child_features': { | |
| 'params': ['dep_rel', 'child_lemma', 'child_pos', 'index_name'], | |
| 'description': 'Extract tokens by dependency AND child features' | |
| } | |
| } | |