""" POS and dependency parsing backend module. Handles multilingual part-of-speech tagging and dependency parsing. """ import spacy import pandas as pd from typing import Dict, List, Tuple, Optional, Union, Any from pathlib import Path import logging import tempfile import base64 from io import BytesIO import zipfile from .base_analyzer import BaseAnalyzer # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class POSParser(BaseAnalyzer): """ Main class for POS tagging and dependency parsing. Handles multilingual analysis and visualization. Inherits from BaseAnalyzer for consistent SpaCy model management. """ def __init__(self, language: str = "en", model_size: str = "trf", gpu_device: Optional[int] = None): """ Initialize parser with specified language and model. Args: language (str): Language code ('en' for English, 'ja' for Japanese) model_size (str): SpaCy model size ('trf' or 'md') gpu_device (int, optional): GPU device ID to use (None for auto-detect, -1 for CPU only) """ super().__init__(language, model_size, gpu_device) def analyze_text(self, text: str) -> Dict: """ Analyze text and return POS tagging and dependency parsing results. Args: text: Input text to analyze Returns: Dictionary containing analysis results """ # Process text using base class method doc = self.process_document(text) # Extract token information token_data = [] for token in doc: # Skip spaces but include punctuation for complete analysis if not token.is_space: token_info = { 'Token': token.text, 'Lemma': token.lemma_, 'POS': token.pos_, 'Tag': token.tag_, 'Dependency': token.dep_, 'Named Entity': token.ent_type_ if token.ent_type_ else '-' } token_data.append(token_info) # Create DataFrame df = pd.DataFrame(token_data) # Prepare sentence-level analysis for visualization sentences = [] for sent in doc.sents: # Limit to 30 words per sentence as per specification sent_tokens = [token for token in sent if not token.is_space] if len(sent_tokens) > 30: sent_tokens = sent_tokens[:30] sentence_info = { 'text': sent.text, 'tokens': sent_tokens, 'length': len(sent_tokens) } sentences.append(sentence_info) results = { 'token_analysis': df, 'sentences': sentences, 'statistics': { 'total_tokens': len(token_data), 'total_sentences': len(sentences), 'unique_pos_tags': len(df['POS'].unique()), 'unique_dependencies': len(df['Dependency'].unique()), 'named_entities': len([t for t in token_data if t['Named Entity'] != '-']) } } return results def generate_displacy_html(self, text: str, style: str = "dep") -> List[str]: """ Generate DisplaCy visualization HTML for sentences. Args: text: Input text to visualize style: Visualization style ('dep' for dependency, 'ent' for entities) Returns: List of HTML strings, one per sentence """ # Process text using base class method doc = self.process_document(text) html_outputs = [] for sent in doc.sents: # Limit to 30 words per sentence sent_tokens = [token for token in sent if not token.is_space] if len(sent_tokens) > 30: # Create a truncated span truncated_doc = self.nlp(sent.text) truncated_tokens = [token for token in truncated_doc if not token.is_space][:30] # Reconstruct text from first 30 tokens truncated_text = "" for token in truncated_tokens: truncated_text += token.text_with_ws truncated_doc = self.nlp(truncated_text) sent_to_visualize = list(truncated_doc.sents)[0] else: sent_to_visualize = sent try: # Generate HTML using displaCy html = spacy.displacy.render( sent_to_visualize, style=style, options={ "fine_grained": True, "add_lemma": True, "collapse_punct": False, "compact": True, "bg": "#F5F9FA", "color": "#000000", "font": "Arial", } ) html_outputs.append(html) except Exception as e: logger.error(f"Error generating displaCy visualization: {e}") html_outputs.append(f"

Error generating visualization for sentence: {sent.text[:100]}...

") return html_outputs def analyze_batch(self, file_paths: List[str], progress_callback=None) -> bytes: """ Analyze multiple text files and return results as a ZIP file. Args: file_paths: List of paths to text files progress_callback: Optional callback for progress updates Returns: ZIP file bytes containing TSV results """ # Create temporary directory for results with tempfile.TemporaryDirectory() as temp_dir: result_files = [] for i, file_path in enumerate(file_paths): try: # Read file with open(file_path, 'r', encoding='utf-8') as f: text = f.read() # Analyze text results = self.analyze_text(text) # Save as TSV filename = Path(file_path).stem + '.tsv' output_path = Path(temp_dir) / filename results['token_analysis'].to_csv( output_path, sep='\t', index=False, encoding='utf-8' ) result_files.append(output_path) if progress_callback: progress_callback(i + 1, len(file_paths)) except Exception as e: logger.error(f"Error processing file {file_path}: {e}") # Create error file error_filename = Path(file_path).stem + '_ERROR.txt' error_path = Path(temp_dir) / error_filename with open(error_path, 'w', encoding='utf-8') as f: f.write(f"Error processing {file_path}: {e}") result_files.append(error_path) if progress_callback: progress_callback(i + 1, len(file_paths)) # Create ZIP file zip_buffer = BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for file_path in result_files: zip_file.write(file_path, file_path.name) zip_buffer.seek(0) return zip_buffer.getvalue() # Construction Extraction Methods (from session-12.ipynb) def extract_by_simple_dependency(self, result_dictionary: dict, token, dep_rel: str, index_name: str): """Extract token when it has specific dependency relation.""" if token.dep_ == dep_rel: self._update_results(index_name, result_dictionary) def extract_by_pos(self, result_dictionary: dict, token, pos: str, index_name: str): """Extract token when it has specific POS tag.""" if token.pos_ == pos: self._update_results(index_name, result_dictionary) def extract_by_tag(self, result_dictionary: dict, token, tag: str, index_name: str): """Extract token when it has specific fine-grained tag.""" if token.tag_ == tag: self._update_results(index_name, result_dictionary) def extract_by_dependency_and_head_pos(self, result_dictionary: dict, token, dep_rel: str, head_pos: str, index_name: str): """Extract token when it has specific dependency relation AND its head has specific POS.""" if token.dep_ == dep_rel and token.head.pos_ == head_pos: self._update_results(index_name, result_dictionary) def extract_by_dependency_and_child_features(self, result_dictionary: dict, token, dep_rel: str, child_lemma: str, child_pos: str, index_name: str): """Extract token when it has specific dependency AND has a child with specific lemma and POS.""" if token.dep_ == dep_rel: for child in token.children: if child.lemma_ == child_lemma and child.pos_ == child_pos: self._update_results(index_name, result_dictionary) break # Found one match, don't count multiple times def _update_results(self, index_name: str, result_dictionary: dict): """Helper method to update results dictionary.""" if index_name in result_dictionary: result_dictionary[index_name] += 1 else: result_dictionary[index_name] = 1 def run_construction_extraction(self, text: str, rule_list: List[Dict[str, Any]]) -> Dict[str, Any]: """ Execute multiple extraction rules on the text. Args: text: Input text to analyze rule_list: List of rule dictionaries, each containing 'function' and 'params' Returns: Dictionary containing extraction results and diagnostic information """ # Process text using base class method doc = self.process_document(text) # Initialize results extraction_results = {} sentence_diagnostics = [] # Process each sentence for sent_idx, sent in enumerate(doc.sents): sent_results = {} matched_tokens_info = [] # Apply all rules to each token in the sentence for token in sent: for rule in rule_list: function_name = rule['function'] params = rule['params'] # Store count before applying rule before_count = sent_results.get(params['index_name'], 0) # Apply the extraction rule self._apply_extraction_rule(sent_results, token, function_name, params) # Check if this token matched after_count = sent_results.get(params['index_name'], 0) if after_count > before_count: matched_tokens_info.append({ 'token': token.text, 'lemma': token.lemma_, 'pos': token.pos_, 'tag': token.tag_, 'dep': token.dep_, 'head': token.head.text, 'head_pos': token.head.pos_, 'children': [child.text for child in token.children], 'sentence_position': token.i, 'rule_matched': params['index_name'] }) # Add sentence diagnostic information sentence_diagnostics.append({ 'sentence_idx': sent_idx, 'sentence_text': sent.text, 'results': sent_results.copy(), 'matched_tokens': matched_tokens_info, 'total_tokens': len([t for t in sent if not t.is_space]) }) # Merge results into global extraction results for key, value in sent_results.items(): extraction_results[key] = extraction_results.get(key, 0) + value # Create summary DataFrame summary_data = [] for rule_name, count in extraction_results.items(): summary_data.append({ 'Rule Name': rule_name, 'Total Matches': count, 'Matches per Sentence': round(count / len(list(doc.sents)), 2) if len(list(doc.sents)) > 0 else 0 }) summary_df = pd.DataFrame(summary_data) return { 'extraction_results': extraction_results, 'sentence_diagnostics': sentence_diagnostics, 'summary_dataframe': summary_df, 'statistics': { 'total_sentences': len(sentence_diagnostics), 'total_rules_applied': len(rule_list), 'total_matches': sum(extraction_results.values()) } } def _apply_extraction_rule(self, result_dict: dict, token, function_name: str, params: dict): """Apply the specified extraction rule to a token.""" if function_name == 'extract_by_simple_dependency': self.extract_by_simple_dependency(result_dict, token, params['dep_rel'], params['index_name']) elif function_name == 'extract_by_pos': self.extract_by_pos(result_dict, token, params['pos'], params['index_name']) elif function_name == 'extract_by_tag': self.extract_by_tag(result_dict, token, params['tag'], params['index_name']) elif function_name == 'extract_by_dependency_and_head_pos': self.extract_by_dependency_and_head_pos(result_dict, token, params['dep_rel'], params['head_pos'], params['index_name']) elif function_name == 'extract_by_dependency_and_child_features': self.extract_by_dependency_and_child_features(result_dict, token, params['dep_rel'], params['child_lemma'], params['child_pos'], params['index_name']) def test_rule(self, sentence: str, function_name: str, params: dict, show_visual: bool = True) -> Dict[str, Any]: """ Test a single extraction rule on a sentence and show the results with visual parsing. Args: sentence: Input sentence to test function_name: Name of the extraction function to test params: Parameters for the extraction function show_visual: Whether to generate visual parsing information Returns: Dictionary with matched token information and visual data """ # Parse the sentence doc = self.nlp(sentence) # Create temporary results dictionary test_results = {} # Store matched tokens for detailed output matched_tokens = [] matched_indices = [] # Track which token positions matched # Test the rule on each token for token in doc: before_count = test_results.get(params.get('index_name', 'test'), 0) self._apply_extraction_rule(test_results, token, function_name, params) after_count = test_results.get(params.get('index_name', 'test'), 0) if after_count > before_count: matched_tokens.append({ 'token': token.text, 'lemma': token.lemma_, 'pos': token.pos_, 'tag': token.tag_, 'dep': token.dep_, 'head': token.head.text, 'head_pos': token.head.pos_, 'children': [child.text for child in token.children], 'sentence_position': token.i }) matched_indices.append(token.i) # Generate visual information if requested visual_info = None if show_visual: try: # Generate HTML with highlighted matched tokens options = { "compact": True, "color": {str(i): "#ff6b6b" for i in matched_indices} } visual_info = spacy.displacy.render(doc, style="dep", options=options) except Exception as e: logger.error(f"Error generating visual parsing: {e}") visual_info = None return { 'matched_tokens': matched_tokens, 'total_matches': len(matched_tokens), 'sentence': sentence, 'rule_info': {'function': function_name, 'params': params}, 'visual_html': visual_info, 'sentence_structure': self._get_sentence_structure(doc) } def _get_sentence_structure(self, doc) -> List[Dict[str, Any]]: """Get detailed token analysis for sentence structure display.""" structure = [] for token in doc: structure.append({ 'position': token.i, 'token': token.text, 'lemma': token.lemma_, 'pos': token.pos_, 'tag': token.tag_, 'dep': token.dep_, 'head': token.head.text, 'children': [child.text for child in token.children] }) return structure def generate_construction_visual(self, text: str, rule_list: List[Dict[str, Any]]) -> str: """ Generate displaCy visualization with enhanced highlighting for matched tokens and their relationships. Args: text: Input text to visualize rule_list: List of rules to apply Returns: HTML string with dependency visualization including custom styling """ doc = self.nlp(text) # Collect all matched token indices with their rule names all_matches = {} # {token_index: [rule_names]} rule_colors = {} # {rule_name: color} # Enhanced color palette with better contrast colors = [ "#FF4444", # Bright red "#44AA44", # Green "#4488FF", # Blue "#FF8844", # Orange "#AA44AA", # Purple "#44AAAA", # Teal "#FFAA44", # Golden "#AA4444", # Dark red ] for rule_idx, rule in enumerate(rule_list): function_name = rule['function'] params = rule['params'] rule_name = params['index_name'] temp_results = {} # Assign color to this rule rule_colors[rule_name] = colors[rule_idx % len(colors)] for token in doc: before = temp_results.get(params['index_name'], 0) self._apply_extraction_rule(temp_results, token, function_name, params) after = temp_results.get(params['index_name'], 0) if after > before: if token.i not in all_matches: all_matches[token.i] = [] all_matches[token.i].append(rule_name) # Create enhanced displaCy options options = { "compact": True, "bg": "#F8F9FA", # Light background "color": {}, "font": "Arial, sans-serif", "distance": 120, # More space between tokens "arrow_stroke": 2, "arrow_width": 10, "collapse_punct": False, "fine_grained": False } # Assign colors to matched tokens with enhanced styling for token_idx, rule_names in all_matches.items(): # Use the first rule's color primary_rule = rule_names[0] options["color"][str(token_idx)] = rule_colors[primary_rule] try: # Generate base HTML html = spacy.displacy.render(doc, style="dep", options=options) # Add custom CSS for enhanced styling enhanced_html = self._add_enhanced_styling(html, all_matches, rule_colors, rule_list) return enhanced_html except Exception as e: logger.error(f"Error generating construction visual: {e}") return f"

Error generating visualization: {e}

" def _add_enhanced_styling(self, base_html: str, all_matches: dict, rule_colors: dict, rule_list: List[Dict[str, Any]]) -> str: """ Add enhanced CSS styling to the displaCy HTML for better visualization. Args: base_html: Base HTML from displaCy all_matches: Dictionary of matched token indices and rule names rule_colors: Dictionary mapping rule names to colors rule_list: List of extraction rules Returns: Enhanced HTML with custom styling """ # Create legend HTML legend_items = [] for rule in rule_list: rule_name = rule['params']['index_name'] color = rule_colors.get(rule_name, "#888888") legend_items.append(f"""
{rule_name}
""") legend_html = f"""
📖 Rule Legend:
{''.join(legend_items)}
""" # Enhanced CSS for better visualization enhanced_css = """ """ # Add JavaScript to enhance interactivity enhanced_js = f""" """ # Combine everything enhanced_html = f"""
{legend_html} {enhanced_css} {base_html} {enhanced_js}
""" return enhanced_html @staticmethod def get_available_extraction_functions() -> Dict[str, Dict[str, Any]]: """Get mapping of available extraction functions and their required parameters.""" return { 'extract_by_simple_dependency': { 'params': ['dep_rel', 'index_name'], 'description': 'Extract tokens by dependency relation only' }, 'extract_by_pos': { 'params': ['pos', 'index_name'], 'description': 'Extract tokens by POS tag' }, 'extract_by_tag': { 'params': ['tag', 'index_name'], 'description': 'Extract tokens by fine-grained POS tag' }, 'extract_by_dependency_and_head_pos': { 'params': ['dep_rel', 'head_pos', 'index_name'], 'description': 'Extract tokens by dependency relation AND head POS' }, 'extract_by_dependency_and_child_features': { 'params': ['dep_rel', 'child_lemma', 'child_pos', 'index_name'], 'description': 'Extract tokens by dependency AND child features' } }