""" Complete Hugging Face Chemical Analyzer with MolScribe Integration Production-ready system with Google Vision, Claude analysis, and N8N HTTP integration """ import gradio as gr import json import requests import os import io import re import tempfile import base64 import concurrent.futures import time import threading from typing import Dict, Optional, List, Tuple, Union, Any from PIL import Image, ImageDraw, ImageEnhance, ImageFilter import numpy as np import pandas as pd from datetime import datetime import logging from functools import wraps import traceback import hashlib # Enhanced logging setup for Hugging Face logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class DependencyManager: """Manages optional dependencies with fallback support for Hugging Face""" def __init__(self): self.available_deps = {} self.fallback_options = {} self.detect_all_dependencies() def detect_all_dependencies(self): """Detect all optional dependencies with HF-compatible fallbacks""" # PDF Processing with dual support self.available_deps['pdf_processing'] = False try: import fitz # PyMuPDF self.available_deps['pymupdf'] = True self.available_deps['pdf_processing'] = True logger.info("✅ PyMuPDF available") except ImportError: try: import PyPDF2 self.available_deps['pypdf2'] = True self.available_deps['pdf_processing'] = True logger.info("✅ PyPDF2 available as fallback") except ImportError: try: import pdfplumber self.available_deps['pdfplumber'] = True self.available_deps['pdf_processing'] = True logger.info("✅ pdfplumber available as fallback") except ImportError: logger.warning("⚠️ No PDF processing libraries available") # Google APIs try: from googleapiclient.discovery import build from google.oauth2.service_account import Credentials self.available_deps['google_drive'] = True logger.info("✅ Google Drive API available") except ImportError: self.available_deps['google_drive'] = False logger.warning("⚠️ Google Drive API not available") try: from google.cloud import vision self.available_deps['google_vision'] = True logger.info("✅ Google Vision API available") except ImportError: self.available_deps['google_vision'] = False logger.warning("⚠️ Google Vision API not available") # Chemical analysis tools try: from rdkit import Chem from rdkit.Chem import Descriptors, Crippen, rdMolDescriptors self.available_deps['rdkit'] = True logger.info("✅ RDKit available") except ImportError: self.available_deps['rdkit'] = False logger.warning("⚠️ RDKit not available") # MolScribe try: import molscribe self.available_deps['molscribe'] = True logger.info("✅ MolScribe available") except ImportError: self.available_deps['molscribe'] = False logger.warning("⚠️ MolScribe not available") # Essential dependencies try: from PIL import Image self.available_deps['pillow'] = True except ImportError: self.available_deps['pillow'] = False logger.error("❌ Pillow required") try: import requests self.available_deps['requests'] = True except ImportError: self.available_deps['requests'] = False logger.error("❌ Requests required") def is_available(self, dependency: str) -> bool: return self.available_deps.get(dependency, False) def get_status(self) -> Dict: return self.available_deps.copy() # Initialize dependency manager deps = DependencyManager() class HuggingFaceConfig: """Configuration manager for Hugging Face deployment""" def __init__(self): self.setup_environment() def setup_environment(self): """Setup environment variables and secrets""" # Hugging Face Spaces secrets self.claude_api_key = os.getenv('CLAUDE_API_KEY', '') self.google_credentials = os.getenv('GOOGLE_APPLICATION_CREDENTIALS_JSON', '') self.n8n_webhook_secret = os.getenv('N8N_WEBHOOK_SECRET', 'default_secret') # Setup Google credentials if provided if self.google_credentials: try: credentials_data = json.loads(self.google_credentials) with open('/tmp/google_credentials.json', 'w') as f: json.dump(credentials_data, f) os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/tmp/google_credentials.json' logger.info("✅ Google credentials configured from HF secrets") except Exception as e: logger.warning(f"⚠️ Google credentials setup failed: {e}") def validate_api_key(self, api_key: str) -> bool: """Validate Claude API key format""" return api_key and api_key.startswith('sk-ant-') and len(api_key) > 20 config = HuggingFaceConfig() class GoogleVisionImageFilter: """Google Vision API integration with HF-compatible error handling""" def __init__(self, debug_mode: bool = True): self.available = deps.is_available('google_vision') self.debug_mode = debug_mode self.debug_log = [] self.client = None self.setup_vision_client() def log_debug(self, message: str): timestamp = datetime.now().strftime("%H:%M:%S") formatted_message = f"[{timestamp}] {message}" if self.debug_mode: logger.info(f"[VISION] {formatted_message}") self.debug_log.append(formatted_message) def setup_vision_client(self): """Initialize Google Vision client for HF""" if not self.available: self.log_debug("❌ Google Vision API not available") return try: from google.cloud import vision # Check for credentials creds_path = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS', '/tmp/google_credentials.json') if os.path.exists(creds_path): self.client = vision.ImageAnnotatorClient() self.log_debug("✅ Google Vision client initialized") else: self.log_debug(f"⚠️ Vision credentials not found") self.available = False except Exception as e: self.log_debug(f"❌ Vision client setup failed: {e}") self.available = False def analyze_image_content(self, image: Image.Image) -> Dict: """Analyze image to determine if it contains chemical structures""" if not self.available or not self.client: # Fallback: assume chemical structure if Vision unavailable return { 'is_chemical_structure': True, 'confidence': 0.5, 'content_type': 'unknown', 'error': 'Google Vision not available - processing anyway' } try: # Convert PIL image to bytes img_byte_arr = io.BytesIO() if image.mode != 'RGB': image = image.convert('RGB') image.save(img_byte_arr, format='PNG') img_byte_arr = img_byte_arr.getvalue() from google.cloud import vision vision_image = vision.Image(content=img_byte_arr) # Perform multiple detection types text_response = self.client.text_detection(image=vision_image) texts = text_response.text_annotations objects_response = self.client.object_localization(image=vision_image) objects = objects_response.localized_object_annotations label_response = self.client.label_detection(image=vision_image) labels = label_response.label_annotations # Analyze results analysis = self._analyze_vision_results(texts, objects, labels, image.size) self.log_debug(f"Vision analysis: {analysis['content_type']} (confidence: {analysis['confidence']:.2f})") return analysis except Exception as e: self.log_debug(f"❌ Vision analysis failed: {e}") return { 'is_chemical_structure': True, # Default to processing 'confidence': 0.5, 'content_type': 'unknown', 'error': str(e) } def _analyze_vision_results(self, texts, objects, labels, image_size) -> Dict: """Analyze Vision API results to classify content""" width, height = image_size image_area = width * height text_score = 0.0 structure_score = 0.0 # Analyze text detection for chemical vs table content if texts: total_text_area = 0 chemical_keywords = 0 table_indicators = 0 for text in texts[1:]: # Skip first (full text) vertices = text.bounding_poly.vertices if len(vertices) >= 4: text_width = abs(vertices[2].x - vertices[0].x) text_height = abs(vertices[2].y - vertices[0].y) total_text_area += text_width * text_height text_content = text.description.lower() # Chemical structure indicators if any(chem in text_content for chem in [ 'scheme', 'figure', 'compound', 'synthesis', 'reaction', 'mol', 'structure', 'formula' ]): chemical_keywords += 1 # Table/data indicators if any(table in text_content for table in [ 'table', 'yield', '%', 'mp', 'melting', 'nmr', 'ir', 'ms', 'data', 'result', 'analysis' ]): table_indicators += 1 text_density = total_text_area / image_area if image_area > 0 else 0 # High text density suggests tables/data if text_density > 0.3: text_score += 0.4 if table_indicators > chemical_keywords and table_indicators > 2: text_score += 0.3 # Analyze object detection for obj in objects: obj_name = obj.name.lower() if any(diagram_term in obj_name for diagram_term in [ 'diagram', 'chart', 'figure', 'drawing', 'illustration' ]): structure_score += 0.2 # Analyze labels for label in labels: label_name = label.description.lower() confidence = label.score # Chemical structure indicators if any(chem_label in label_name for chem_label in [ 'diagram', 'drawing', 'line art', 'figure', 'illustration', 'chemistry', 'molecule', 'formula' ]): structure_score += confidence * 0.3 # Text/table indicators if any(text_label in label_name for text_label in [ 'text', 'document', 'table', 'data', 'spreadsheet' ]): text_score += confidence * 0.2 # Image aspect ratio analysis aspect_ratio = width / height if height > 0 else 1 if 0.3 <= aspect_ratio <= 3.0: structure_score += 0.1 if aspect_ratio > 4.0 or aspect_ratio < 0.25: text_score += 0.2 # Final classification if structure_score > text_score: is_chemical = True confidence = min(structure_score, 0.9) content_type = 'chemical_structure' else: is_chemical = False confidence = min(text_score, 0.9) content_type = 'text_or_table' confidence = max(confidence, 0.1) return { 'is_chemical_structure': is_chemical, 'confidence': confidence, 'content_type': content_type, 'text_score': text_score, 'structure_score': structure_score, 'analysis_details': { 'text_density': locals().get('text_density', 0), 'chemical_keywords': locals().get('chemical_keywords', 0), 'table_indicators': locals().get('table_indicators', 0), 'aspect_ratio': aspect_ratio } } class MolScribeAnalyzer: """MolScribe-based chemical structure recognition optimized for HF""" def __init__(self, debug_mode: bool = True): self.available = deps.is_available('molscribe') self.debug_mode = debug_mode self.debug_log = [] self.stats = { 'total_predictions': 0, 'successful_predictions': 0, 'failed_predictions': 0, 'avg_processing_time': 0.0 } self.setup_molscribe() def log_debug(self, message: str): timestamp = datetime.now().strftime("%H:%M:%S") formatted_message = f"[{timestamp}] {message}" if self.debug_mode: logger.info(f"[MOLSCRIBE] {formatted_message}") self.debug_log.append(formatted_message) def setup_molscribe(self): """Initialize and test MolScribe""" if not self.available: self.log_debug("❌ MolScribe not available") return try: import molscribe self.log_debug("✅ MolScribe imported successfully") # Test with simple image test_image = Image.new('RGB', (200, 200), 'white') draw = ImageDraw.Draw(test_image) # Draw a simple benzene ring for testing center_x, center_y = 100, 100 radius = 50 # Draw hexagon points = [] for i in range(6): angle = i * 60 * np.pi / 180 x = center_x + radius * np.cos(angle) y = center_y + radius * np.sin(angle) points.append((x, y)) for i in range(6): start = points[i] end = points[(i + 1) % 6] draw.line([start, end], fill='black', width=3) self.log_debug("Testing MolScribe with benzene structure...") test_result = self._molscribe_predict(test_image, timeout_seconds=10) if test_result.get('success'): self.log_debug("✅ MolScribe test successful") else: self.log_debug(f"⚠️ MolScribe test returned: {test_result}") except Exception as e: self.log_debug(f"❌ MolScribe setup failed: {e}") self.available = False def recognize_structure_with_timeout(self, image: Image.Image, timeout_seconds: int = 30) -> Dict: """Recognize structure with HF-compatible timeout""" self.stats['total_predictions'] += 1 self.log_debug(f"Starting MolScribe recognition (attempt #{self.stats['total_predictions']})") if not self.available: self.stats['failed_predictions'] += 1 return { 'success': False, 'error': 'MolScribe not available. Install with: pip install MolScribe', 'method': 'MolScribe' } try: # Preprocess image for optimal recognition processed_image = self._preprocess_image(image) start_time = time.time() # Use concurrent.futures for timeout control with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(self._molscribe_predict, processed_image, timeout_seconds) try: result = future.result(timeout=timeout_seconds) except concurrent.futures.TimeoutError: self.stats['failed_predictions'] += 1 self.log_debug(f"❌ MolScribe timeout after {timeout_seconds} seconds") return { 'success': False, 'error': f'MolScribe processing timeout ({timeout_seconds}s)', 'method': 'MolScribe' } processing_time = time.time() - start_time if result.get('success'): self.stats['successful_predictions'] += 1 result['processing_time'] = processing_time self.log_debug(f"✅ Structure recognized in {processing_time:.2f}s") else: self.stats['failed_predictions'] += 1 self.log_debug(f"❌ Recognition failed: {result.get('error', 'Unknown error')}") return result except Exception as e: self.stats['failed_predictions'] += 1 error_msg = f'MolScribe recognition failed: {str(e)}' self.log_debug(f"❌ {error_msg}") return { 'success': False, 'error': error_msg, 'method': 'MolScribe' } def _molscribe_predict(self, image: Image.Image, timeout_seconds: int) -> Dict: """Core MolScribe prediction with error handling""" try: import molscribe from molscribe import MolScribe from huggingface_hub import hf_hub_download # Save image to temporary file with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file: if image.mode != 'RGB': image = image.convert('RGB') image.save(tmp_file.name, 'PNG') temp_path = tmp_file.name try: # Download model if needed ckpt_path = hf_hub_download('yujieq/MolScribe', 'swin_base_char_aux_1m.pth') model = MolScribe(ckpt_path) # Call MolScribe smiles_result = model.predict_image(temp_path) # Clean up temp file if os.path.exists(temp_path): os.remove(temp_path) # Validate result if smiles_result and isinstance(smiles_result, str) and len(smiles_result.strip()) > 0: smiles = smiles_result.strip() # Check for garbage SMILES if self._is_garbage_smiles(smiles): return { 'success': False, 'error': 'MolScribe produced garbage SMILES (likely not a chemical structure)', 'raw_smiles': smiles[:100] + '...' if len(smiles) > 100 else smiles } # Validate with RDKit if available is_valid = True validation_error = None canonical_smiles = smiles if deps.is_available('rdkit'): try: from rdkit import Chem mol = Chem.MolFromSmiles(smiles) if mol is not None: canonical_smiles = Chem.MolToSmiles(mol) is_valid = True else: is_valid = False validation_error = "Invalid SMILES structure" except Exception as e: is_valid = False validation_error = str(e) return { 'success': True, 'smiles': smiles, 'canonical_smiles': canonical_smiles, 'is_valid': is_valid, 'validation_error': validation_error, 'method': 'MolScribe' } else: return { 'success': False, 'error': 'No structure detected by MolScribe', 'method': 'MolScribe' } finally: if os.path.exists(temp_path): try: os.remove(temp_path) except: pass except Exception as e: return { 'success': False, 'error': f'MolScribe prediction failed: {str(e)}', 'method': 'MolScribe' } def _preprocess_image(self, image: Image.Image) -> Image.Image: """Enhanced image preprocessing for MolScribe""" try: if image.mode != 'RGB': image = image.convert('RGB') # Enhance contrast and sharpness enhancer = ImageEnhance.Contrast(image) image = enhancer.enhance(1.3) enhancer = ImageEnhance.Sharpness(image) image = enhancer.enhance(1.2) # Resize intelligently for MolScribe width, height = image.size min_size = 200 max_size = 1024 if width < min_size or height < min_size: scale_factor = min_size / min(width, height) new_width = int(width * scale_factor) new_height = int(height * scale_factor) image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) elif width > max_size or height > max_size: scale_factor = max_size / max(width, height) new_width = int(width * scale_factor) new_height = int(height * scale_factor) image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) return image except Exception as e: logger.warning(f"Image preprocessing failed: {e}") return image def _is_garbage_smiles(self, smiles: str) -> bool: """Check if SMILES is garbage (repetitive patterns, too long, etc.)""" # Check for excessively long SMILES if len(smiles) > 1000: return True # Check for repetitive patterns if any(pattern in smiles for pattern in [ 'CC#CC#CC#CC#', 'CCCCCCCCCCCCCCCCCCCC', '111111111', '222222222', '333333333' ]): return True # Check for too many dots (disconnected fragments) if smiles.count('.') > 10: return True # Check for obvious errors if smiles.count('(') != smiles.count(')'): return True if smiles.count('[') != smiles.count(']'): return True return False class ClaudeDocumentAnalyzer: """Claude-powered document analysis for chemical literature""" def __init__(self, debug_mode: bool = True): self.debug_mode = debug_mode self.debug_log = [] self.analysis_cache = {} def log_debug(self, message: str): timestamp = datetime.now().strftime("%H:%M:%S") formatted_message = f"[{timestamp}] {message}" if self.debug_mode: logger.info(f"[CLAUDE] {formatted_message}") self.debug_log.append(formatted_message) def analyze_document_structure(self, text_content: str, api_key: str) -> Dict: """Analyze document using Claude to identify chemical content locations""" self.log_debug("Starting Claude document structure analysis...") if not config.validate_api_key(api_key): return { 'error': 'Valid Claude API key required (sk-ant-...)', 'regions_to_process': [], 'document_type': 'unknown' } try: # Create cache key cache_key = hashlib.md5(text_content[:1000].encode()).hexdigest() if cache_key in self.analysis_cache: self.log_debug("Using cached analysis") return self.analysis_cache[cache_key] # Create structured prompt analysis_prompt = self._create_analysis_prompt(text_content) headers = { 'Content-Type': 'application/json', 'x-api-key': api_key.strip(), 'anthropic-version': '2023-06-01' } payload = { 'model': 'claude-3-5-sonnet-20241022', 'max_tokens': 2000, 'messages': [{'role': 'user', 'content': analysis_prompt}] } self.log_debug("Sending document to Claude for analysis...") response = requests.post( 'https://api.anthropic.com/v1/messages', headers=headers, json=payload, timeout=60 ) if response.status_code == 200: result = response.json() analysis_text = result['content'][0]['text'] # Parse Claude's structured response parsed_analysis = self._parse_claude_response(analysis_text) # Cache the result self.analysis_cache[cache_key] = { 'success': True, 'document_type': parsed_analysis['document_type'], 'structure_locations': parsed_analysis['structure_locations'], 'data_tables': parsed_analysis['data_tables'], 'chemical_entities': parsed_analysis['chemical_entities'], 'processing_priority': parsed_analysis['processing_priority'], 'claude_analysis': analysis_text, 'tokens_used': result.get('usage', {}).get('output_tokens', 'unknown') } self.log_debug(f"Claude identified {len(parsed_analysis['structure_locations'])} potential structure locations") return self.analysis_cache[cache_key] else: error_msg = f'Claude API error {response.status_code}: {response.text[:200]}' self.log_debug(f"Claude API failed: {error_msg}") return { 'error': error_msg, 'regions_to_process': [], 'document_type': 'unknown' } except Exception as e: error_msg = f'Claude analysis failed: {str(e)}' self.log_debug(error_msg) return { 'error': error_msg, 'regions_to_process': [], 'document_type': 'unknown' } def _create_analysis_prompt(self, text_content: str) -> str: """Create structured prompt for Claude document analysis""" prompt = f"""Analyze this chemistry research document and provide a structured analysis to guide automated chemical structure recognition. Document Text: {text_content[:6000]} Please provide a JSON response with the following structure: {{ "document_type": "research_paper" | "review" | "patent" | "thesis" | "other", "structure_locations": [ {{ "type": "reaction_scheme" | "individual_structure" | "mechanism", "keywords": ["scheme", "figure", "compound"], "page_likely": 1, "description": "Brief description of what structure is expected", "priority": "high" | "medium" | "low" }} ], "data_tables": [ {{ "type": "yields" | "properties" | "spectral_data" | "references", "keywords": ["table", "yield", "melting point"], "should_skip": true, "page_likely": 2 }} ], "chemical_entities": {{ "main_compounds": ["compound names found"], "reagents": ["reagent names"], "solvents": ["solvent names"], "catalysts": ["catalyst names"] }}, "processing_priority": [ "Focus on Scheme 1 - main reaction", "Look for individual product structures", "Skip data tables and references" ] }} Focus on identifying: 1. Actual chemical structure diagrams vs data tables 2. Reaction schemes vs individual compounds 3. Main synthetic routes vs supporting data 4. What should be processed vs what should be skipped Respond only with valid JSON - no additional text.""" return prompt def _parse_claude_response(self, response_text: str) -> Dict: """Parse Claude's JSON response with fallback parsing""" try: # Try to extract JSON from response json_start = response_text.find('{') json_end = response_text.rfind('}') + 1 if json_start >= 0 and json_end > json_start: json_str = response_text[json_start:json_end] parsed = json.loads(json_str) # Validate required fields required_fields = ['document_type', 'structure_locations', 'data_tables', 'chemical_entities'] if all(field in parsed for field in required_fields): return parsed # Fallback parsing if JSON fails self.log_debug("JSON parsing failed, using fallback text analysis") return self._fallback_text_analysis(response_text) except Exception as e: self.log_debug(f"Response parsing failed: {e}, using fallback") return self._fallback_text_analysis(response_text) def _fallback_text_analysis(self, text: str) -> Dict: """Fallback analysis when JSON parsing fails""" return { 'document_type': 'research_paper', 'structure_locations': [ { 'type': 'reaction_scheme', 'keywords': ['scheme', 'synthesis'], 'page_likely': 1, 'description': 'Main reaction scheme', 'priority': 'high' } ], 'data_tables': [ { 'type': 'yields', 'keywords': ['table', 'yield', 'melting'], 'should_skip': True, 'page_likely': 2 } ], 'chemical_entities': { 'main_compounds': [], 'reagents': [], 'solvents': [], 'catalysts': [] }, 'processing_priority': ['Focus on chemical structure diagrams', 'Skip data tables'] } class GoogleDriveManager: """Google Drive integration for Hugging Face with service_account.json""" def __init__(self): self.service = None self.setup_service() def setup_service(self): """Initialize Google Drive service using service_account.json""" if not deps.is_available('google_drive'): logger.warning("Google Drive libraries not available") return try: from googleapiclient.discovery import build from google.oauth2.service_account import Credentials # Look for service_account.json in multiple locations possible_paths = [ 'service_account.json', '/app/service_account.json', '/tmp/service_account.json', os.path.join(os.getcwd(), 'service_account.json') ] creds_file = None for path in possible_paths: if os.path.exists(path): creds_file = path break if creds_file: credentials = Credentials.from_service_account_file( creds_file, scopes=['https://www.googleapis.com/auth/drive.readonly'] ) self.service = build('drive', 'v3', credentials=credentials) logger.info(f"Google Drive service initialized using {creds_file}") else: # Try from environment variable (HF Spaces secret) if config.google_credentials: credentials = Credentials.from_service_account_info( json.loads(config.google_credentials), scopes=['https://www.googleapis.com/auth/drive.readonly'] ) self.service = build('drive', 'v3', credentials=credentials) logger.info("Google Drive service initialized from HF secret") else: logger.warning("service_account.json not found in any expected location") except Exception as e: logger.error(f"Google Drive setup failed: {e}") def download_file(self, file_id: str) -> Dict: """Download file from Google Drive""" if not self.service: return {'error': 'Google Drive service not available'} try: # Get file metadata file_info = self.service.files().get(fileId=file_id).execute() # Download file content from googleapiclient.http import MediaIoBaseDownload request = self.service.files().get_media(fileId=file_id) file_content = io.BytesIO() downloader = MediaIoBaseDownload(file_content, request) done = False while done is False: status, done = downloader.next_chunk() file_content.seek(0) logger.info(f"Downloaded file: {file_info['name']} ({len(file_content.getvalue())} bytes)") return { 'success': True, 'file_name': file_info['name'], 'file_size': len(file_content.getvalue()), 'content': file_content.getvalue(), 'metadata': file_info } except Exception as e: error_msg = f'Download failed: {str(e)}' logger.error(error_msg) return {'error': error_msg} class PDFProcessor: """PDF processor with dual support (PyMuPDF/PyPDF2/pdfplumber)""" def __init__(self, vision_filter, debug_mode: bool = True): self.vision_filter = vision_filter self.debug_mode = debug_mode self.debug_log = [] self.available_processors = self._detect_pdf_processors() def _detect_pdf_processors(self): """Detect available PDF processing libraries""" processors = [] if deps.is_available('pymupdf'): processors.append('pymupdf') if deps.is_available('pypdf2'): processors.append('pypdf2') if deps.is_available('pdfplumber'): processors.append('pdfplumber') logger.info(f"Available PDF processors: {processors}") return processors def log_debug(self, message: str): if self.debug_mode: logger.info(f"[PDF] {message}") self.debug_log.append(message) def extract_content_with_vision_filtering(self, pdf_bytes: bytes, claude_analysis: Dict = None) -> Dict: """Extract content using best available PDF processor + Vision filtering""" if not self.available_processors: return {'error': 'No PDF processing libraries available'} self.debug_log = [] self.log_debug("Starting Vision-enhanced PDF content extraction") # Try processors in order of preference for processor in ['pymupdf', 'pypdf2', 'pdfplumber']: if processor in self.available_processors: try: if processor == 'pymupdf': return self._extract_with_pymupdf(pdf_bytes, claude_analysis) elif processor == 'pypdf2': return self._extract_with_pypdf2(pdf_bytes, claude_analysis) elif processor == 'pdfplumber': return self._extract_with_pdfplumber(pdf_bytes, claude_analysis) except Exception as e: self.log_debug(f"{processor} failed: {e}, trying next processor") continue return {'error': 'All PDF processors failed'} def _extract_with_pymupdf(self, pdf_bytes: bytes, claude_analysis: Dict) -> Dict: """Extract using PyMuPDF (preferred method)""" import fitz doc = fitz.open(stream=pdf_bytes, filetype="pdf") self.log_debug(f"Opened PDF with PyMuPDF: {len(doc)} pages") results = { 'text_content': '', 'images': [], 'page_count': len(doc), 'extraction_stats': {}, 'debug_info': [], 'processor_used': 'pymupdf' } all_text = [] total_images = 0 vision_filtered = 0 for page_num in range(min(len(doc), 10)): # Limit to 10 pages for HF page = doc[page_num] self.log_debug(f"Processing page {page_num + 1}") # Extract text page_text = page.get_text() all_text.append(f"\n--- PAGE {page_num + 1} ---\n{page_text}") # Find image regions regions = self._identify_regions_pymupdf(page, page_text, page_num, claude_analysis) for region_idx, region in enumerate(regions[:3]): # Limit to 3 per page try: # Render region mat = fitz.Matrix(2.0, 2.0) # 2x zoom clip = fitz.Rect(region['bbox']) if clip.width > 50 and clip.height > 50: pix = page.get_pixmap(matrix=mat, clip=clip) img_data = pix.tobytes("png") pil_image = Image.open(io.BytesIO(img_data)) # Vision API filtering if self.vision_filter.available: vision_analysis = self.vision_filter.analyze_image_content(pil_image) if not vision_analysis['is_chemical_structure']: vision_filtered += 1 self.log_debug(f"Vision filtered out region {region_idx}: {vision_analysis['content_type']}") continue results['images'].append({ 'page': page_num + 1, 'index': region_idx, 'size': pil_image.size, 'image': pil_image, 'filename': f"page_{page_num+1}_region_{region_idx+1}.png", 'type': region.get('type', 'structure'), 'description': region.get('description', 'Chemical structure'), 'vision_analysis': vision_analysis if self.vision_filter.available else None }) total_images += 1 pix = None # Free memory except Exception as e: self.log_debug(f"Error processing region {region_idx}: {e}") results['text_content'] = '\n'.join(all_text) results['extraction_stats'] = { 'pages_processed': len(doc), 'total_regions': total_images + vision_filtered, 'vision_approved': total_images, 'vision_filtered': vision_filtered, 'text_length': len(results['text_content']) } results['debug_info'] = self.debug_log.copy() doc.close() return results def _extract_with_pypdf2(self, pdf_bytes: bytes, claude_analysis: Dict) -> Dict: """Extract using PyPDF2 (fallback method)""" import PyPDF2 pdf_file = io.BytesIO(pdf_bytes) pdf_reader = PyPDF2.PdfReader(pdf_file) self.log_debug(f"Opened PDF with PyPDF2: {len(pdf_reader.pages)} pages") # Extract text only (no images with PyPDF2) all_text = [] for page_num, page in enumerate(pdf_reader.pages[:10]): page_text = page.extract_text() all_text.append(f"\n--- PAGE {page_num + 1} ---\n{page_text}") return { 'text_content': '\n'.join(all_text), 'images': [], # PyPDF2 doesn't extract images well 'page_count': len(pdf_reader.pages), 'extraction_stats': { 'pages_processed': len(pdf_reader.pages), 'text_length': len('\n'.join(all_text)), 'images_extracted': 0 }, 'processor_used': 'pypdf2', 'note': 'PyPDF2 used - text only, no image extraction' } def _extract_with_pdfplumber(self, pdf_bytes: bytes, claude_analysis: Dict) -> Dict: """Extract using pdfplumber (alternative fallback)""" import pdfplumber with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: self.log_debug(f"Opened PDF with pdfplumber: {len(pdf.pages)} pages") all_text = [] for page_num, page in enumerate(pdf.pages[:10]): page_text = page.extract_text() or "" all_text.append(f"\n--- PAGE {page_num + 1} ---\n{page_text}") return { 'text_content': '\n'.join(all_text), 'images': [], # pdfplumber focuses on text 'page_count': len(pdf.pages), 'extraction_stats': { 'pages_processed': len(pdf.pages), 'text_length': len('\n'.join(all_text)), 'images_extracted': 0 }, 'processor_used': 'pdfplumber', 'note': 'pdfplumber used - text extraction only' } def _identify_regions_pymupdf(self, page, page_text: str, page_num: int, claude_analysis: Dict) -> List[Dict]: """Identify regions using PyMuPDF""" regions = [] try: # Get text blocks blocks = page.get_text("dict") for block in blocks.get("blocks", []): if "lines" in block: block_text = "" bbox = block.get("bbox", [0, 0, 0, 0]) for line in block["lines"]: for span in line.get("spans", []): block_text += span.get("text", "") + " " block_text_lower = block_text.lower() # Look for chemical structure indicators chemical_keywords = ['scheme', 'figure', 'compound', 'structure', 'reaction'] if any(keyword in block_text_lower for keyword in chemical_keywords): # Expand bbox for potential structure expanded_bbox = [ max(0, bbox[0] - 100), max(0, bbox[1] - 150), min(page.rect.width, bbox[2] + 100), min(page.rect.height, bbox[3] + 300) ] regions.append({ 'bbox': expanded_bbox, 'text': block_text.strip(), 'type': 'structure', 'description': 'Potential chemical structure', 'confidence': 0.7 }) except Exception as e: self.log_debug(f"Error identifying regions: {e}") # Add center region if no specific regions found if not regions: page_width = page.rect.width page_height = page.rect.height regions.append({ 'bbox': [page_width * 0.1, page_height * 0.2, page_width * 0.9, page_height * 0.7], 'text': 'center_focus', 'type': 'structure', 'description': 'Center focus area', 'confidence': 0.5 }) return regions class RDKitAnalyzer: """RDKit molecular analysis with HF compatibility""" def __init__(self): self.available = deps.is_available('rdkit') def analyze_molecule(self, smiles: str) -> Dict: """Comprehensive molecular analysis using RDKit""" if not self.available: return {'error': 'RDKit not available'} try: from rdkit import Chem from rdkit.Chem import Descriptors, Crippen, rdMolDescriptors mol = Chem.MolFromSmiles(smiles) if not mol: return {'error': f'Invalid SMILES: {smiles}'} properties = { 'smiles_input': smiles, 'canonical_smiles': Chem.MolToSmiles(mol), 'molecular_formula': rdMolDescriptors.CalcMolFormula(mol), 'molecular_weight': round(Descriptors.MolWt(mol), 2), 'exact_mass': round(Descriptors.ExactMolWt(mol), 4), 'logp': round(Crippen.MolLogP(mol), 2), 'tpsa': round(Descriptors.TPSA(mol), 2), 'hbd': Descriptors.NumHDonors(mol), 'hba': Descriptors.NumHAcceptors(mol), 'heavy_atoms': Descriptors.HeavyAtomCount(mol), 'rotatable_bonds': Descriptors.NumRotatableBonds(mol), 'aromatic_rings': Descriptors.NumAromaticRings(mol), 'ring_count': Descriptors.RingCount(mol), 'formal_charge': Chem.rdmolops.GetFormalCharge(mol), } # Drug-likeness assessment mw = Descriptors.MolWt(mol) logp = Crippen.MolLogP(mol) hbd = Descriptors.NumHDonors(mol) hba = Descriptors.NumHAcceptors(mol) lipinski_violations = sum([mw > 500, logp > 5, hbd > 5, hba > 10]) properties.update({ 'lipinski_violations': lipinski_violations, 'lipinski_compliant': lipinski_violations <= 1, 'drug_likeness': 'High' if lipinski_violations <= 1 else 'Medium' if lipinski_violations <= 2 else 'Low' }) return properties except Exception as e: return {'error': f'RDKit analysis failed: {str(e)}'} class N8NAPIHandler: """N8N HTTP node integration for automated workflows""" def __init__(self): self.processing_status = {} self.request_counter = 0 def validate_request(self, request_data: Dict) -> Tuple[bool, str]: """Validate N8N HTTP request""" try: # Check for required fields if 'file_id' not in request_data: return False, "Missing required field: file_id" if 'claude_api_key' not in request_data: return False, "Missing required field: claude_api_key" # Validate Claude API key format if not config.validate_api_key(request_data['claude_api_key']): return False, "Invalid Claude API key format" return True, "Valid request" except Exception as e: return False, f"Request validation error: {str(e)}" def process_n8n_request(self, request_data: Dict) -> Dict: """Process N8N HTTP request and return JSON response""" try: # Generate request ID self.request_counter += 1 request_id = f"n8n_req_{self.request_counter}_{int(time.time())}" # Validate request is_valid, validation_message = self.validate_request(request_data) if not is_valid: return { 'success': False, 'request_id': request_id, 'error': validation_message, 'timestamp': datetime.now().isoformat() } file_id = request_data['file_id'] claude_api_key = request_data['claude_api_key'] doc_name = request_data.get('doc_name', 'N8N Document') workflow_id = request_data.get('workflow_id', request_id) # Set processing status self.processing_status[request_id] = { 'status': 'processing', 'start_time': datetime.now().isoformat(), 'file_id': file_id, 'doc_name': doc_name, 'workflow_id': workflow_id } # Run the complete pipeline result = self._run_complete_pipeline(file_id, claude_api_key, doc_name, request_id) # Update status self.processing_status[request_id].update({ 'status': 'completed' if result.get('status') == 'success' else 'failed', 'end_time': datetime.now().isoformat(), 'result': result }) # Return N8N-friendly response if result.get('status') == 'success': return { 'success': True, 'request_id': request_id, 'workflow_id': workflow_id, 'data': result, 'processing_time': self._calculate_processing_time(request_id), 'timestamp': datetime.now().isoformat() } else: return { 'success': False, 'request_id': request_id, 'workflow_id': workflow_id, 'error': result.get('error', 'Unknown processing error'), 'timestamp': datetime.now().isoformat() } except Exception as e: return { 'success': False, 'request_id': request_id if 'request_id' in locals() else 'unknown', 'error': f'Processing failed: {str(e)}', 'timestamp': datetime.now().isoformat() } def _calculate_processing_time(self, request_id: str) -> str: """Calculate processing time for a request""" try: status = self.processing_status.get(request_id, {}) start_time = status.get('start_time') end_time = status.get('end_time') if start_time and end_time: start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00') if start_time.endswith('Z') else start_time) end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00') if end_time.endswith('Z') else end_time) duration = (end_dt - start_dt).total_seconds() return f"{duration:.2f} seconds" return "Unknown" except: return "Unknown" def _run_complete_pipeline(self, file_id: str, claude_api_key: str, doc_name: str, request_id: str) -> Dict: """Run the complete analysis pipeline""" try: # Step 1: Download from Google Drive download_result = gdrive.download_file(file_id) if 'error' in download_result: return { 'status': 'error', 'step': 'download', 'error': f'Download failed: {download_result["error"]}' } # Step 2: Extract text for Claude analysis pdf_content = pdf_processor.extract_content_with_vision_filtering(download_result['content']) if 'error' in pdf_content: return { 'status': 'error', 'step': 'text_extraction', 'error': f'Text extraction failed: {pdf_content["error"]}' } # Step 3: Claude document analysis claude_analysis = claude_analyzer.analyze_document_structure( pdf_content['text_content'], claude_api_key.strip() ) # Step 4: Vision-enhanced PDF processing vision_enhanced_content = pdf_processor.extract_content_with_vision_filtering( download_result['content'], claude_analysis ) # Step 5: MolScribe analysis on Vision-approved images molscribe_results = [] if vision_enhanced_content['images']: for img_data in vision_enhanced_content['images'][:5]: # Limit to 5 images molscribe_result = molscribe_analyzer.recognize_structure_with_timeout( img_data['image'], timeout_seconds=30 ) molscribe_results.append({ 'image_info': { 'page': img_data['page'], 'size': img_data['size'], 'type': img_data.get('type', 'structure'), 'description': img_data.get('description', 'Chemical structure'), 'vision_approved': True }, 'result': molscribe_result }) # Step 6: RDKit analysis rdkit_results = [] for molscribe_result in molscribe_results: if molscribe_result['result'].get('success') and molscribe_result['result'].get('smiles'): smiles = molscribe_result['result']['smiles'] rdkit_analysis = rdkit_analyzer.analyze_molecule(smiles) rdkit_results.append({ 'source': 'molscribe_vision_approved', 'smiles': smiles, 'analysis': rdkit_analysis, 'image_info': molscribe_result['image_info'] }) # Compile results result = { 'status': 'success', 'request_id': request_id, 'file_info': { 'file_id': file_id, 'doc_name': doc_name, 'file_size_kb': round(download_result['file_size'] / 1024, 1) }, 'claude_analysis': { 'success': claude_analysis.get('success', False), 'document_type': claude_analysis.get('document_type', 'unknown'), 'structure_locations_identified': len(claude_analysis.get('structure_locations', [])), 'processing_guidance': claude_analysis.get('processing_priority', []) }, 'vision_enhanced_analysis': { 'pages': vision_enhanced_content['page_count'], 'vision_approved_regions': len(vision_enhanced_content['images']), 'processor_used': vision_enhanced_content.get('processor_used', 'unknown') }, 'molscribe_analysis': { 'images_processed': len(molscribe_results), 'structures_recognized': len([r for r in molscribe_results if r['result'].get('success')]), 'results': molscribe_results }, 'rdkit_analysis': { 'molecules_analyzed': len(rdkit_results), 'results': rdkit_results }, 'processing_summary': { 'total_structures_found': len([r for r in molscribe_results if r['result'].get('success')]), 'valid_molecules': len([r for r in rdkit_results if 'error' not in r['analysis']]), 'pipeline_components': ['Google Drive', 'Claude Analysis', 'Google Vision', 'MolScribe', 'RDKit'] } } return result except Exception as e: return { 'status': 'error', 'error': f'Pipeline processing failed: {str(e)}', 'request_id': request_id } # Initialize all components gdrive = GoogleDriveManager() claude_analyzer = ClaudeDocumentAnalyzer(debug_mode=True) vision_filter = GoogleVisionImageFilter(debug_mode=True) pdf_processor = PDFProcessor(vision_filter, debug_mode=True) rdkit_analyzer = RDKitAnalyzer() molscribe_analyzer = MolScribeAnalyzer(debug_mode=True) n8n_handler = N8NAPIHandler() # Core Functions for Gradio Interface def test_system_status(): """Test all system components""" status = deps.get_status() return { 'dependency_status': status, 'google_drive_service': 'connected' if gdrive.service else 'not connected', 'google_vision_client': 'connected' if vision_filter.available else 'not connected', 'pdf_processors': pdf_processor.available_processors, 'molscribe_ready': molscribe_analyzer.available, 'rdkit_ready': rdkit_analyzer.available, 'claude_analyzer_ready': True, 'n8n_api_ready': True, 'system_type': 'Hugging Face Chemical Analyzer with MolScribe', 'key_features': [ 'Google Drive integration with service_account.json', 'Claude document structure analysis', 'Google Vision image/text filtering', 'MolScribe structure recognition', 'RDKit molecular analysis', 'N8N HTTP node integration', 'Dual dependency support' ] } def analyze_single_image_molscribe(image: Image.Image) -> str: """Analyze single image with Vision + MolScribe""" if image is None: return json.dumps({'status': 'error', 'error': 'No image provided'}, indent=2) # Vision analysis first vision_analysis = vision_filter.analyze_image_content(image) result = { 'vision_analysis': vision_analysis, 'molscribe_processed': False, 'molscribe_result': None } # Process with MolScribe if Vision approves if vision_analysis['is_chemical_structure']: molscribe_result = molscribe_analyzer.recognize_structure_with_timeout(image, timeout_seconds=30) result['molscribe_processed'] = True result['molscribe_result'] = molscribe_result # Add RDKit analysis if successful if molscribe_result.get('success') and molscribe_result.get('smiles'): rdkit_analysis = rdkit_analyzer.analyze_molecule(molscribe_result['smiles']) result['rdkit_analysis'] = rdkit_analysis else: result['message'] = f"Vision API classified this as '{vision_analysis['content_type']}' - skipping MolScribe processing" return json.dumps({ 'status': 'success', 'analysis_result': result, 'molscribe_stats': molscribe_analyzer.stats }, indent=2) def process_complete_pipeline(file_id: str, claude_api_key: str) -> str: """Process complete pipeline for N8N or direct use""" if not file_id: return json.dumps({'status': 'error', 'error': 'File ID is required'}, indent=2) if not config.validate_api_key(claude_api_key): return json.dumps({'status': 'error', 'error': 'Valid Claude API key required'}, indent=2) try: # Create request data in N8N format request_data = { 'file_id': file_id, 'claude_api_key': claude_api_key, 'doc_name': 'Direct Pipeline Request', 'workflow_id': f'direct_{int(time.time())}' } result = n8n_handler.process_n8n_request(request_data) return json.dumps(result, indent=2) except Exception as e: return json.dumps({ 'success': False, 'error': f'Pipeline processing failed: {str(e)}' }, indent=2) def process_pdf_direct(pdf_file, claude_api_key: str) -> str: """Process PDF file directly uploaded""" if pdf_file is None: return json.dumps({'status': 'error', 'error': 'No PDF file provided'}, indent=2) if not config.validate_api_key(claude_api_key): return json.dumps({'status': 'error', 'error': 'Valid Claude API key required'}, indent=2) try: # Read PDF bytes pdf_bytes = pdf_file.read() # Extract text for Claude analysis pdf_content = pdf_processor.extract_content_with_vision_filtering(pdf_bytes) if 'error' in pdf_content: return json.dumps({ 'status': 'error', 'step': 'text_extraction', 'error': f'Text extraction failed: {pdf_content["error"]}' }, indent=2) # Claude document analysis claude_analysis = claude_analyzer.analyze_document_structure( pdf_content['text_content'], claude_api_key.strip() ) # Vision-enhanced processing vision_enhanced_content = pdf_processor.extract_content_with_vision_filtering( pdf_bytes, claude_analysis ) # MolScribe analysis molscribe_results = [] if vision_enhanced_content['images']: for img_data in vision_enhanced_content['images'][:3]: # Limit for demo molscribe_result = molscribe_analyzer.recognize_structure_with_timeout( img_data['image'], timeout_seconds=20 ) molscribe_results.append({ 'image_info': { 'page': img_data['page'], 'size': img_data['size'], 'description': img_data.get('description', 'Chemical structure') }, 'result': molscribe_result }) # RDKit analysis rdkit_results = [] for molscribe_result in molscribe_results: if molscribe_result['result'].get('success') and molscribe_result['result'].get('smiles'): smiles = molscribe_result['result']['smiles'] rdkit_analysis = rdkit_analyzer.analyze_molecule(smiles) rdkit_results.append({ 'smiles': smiles, 'analysis': rdkit_analysis }) result = { 'status': 'success', 'file_name': pdf_file.name, 'claude_analysis': { 'success': claude_analysis.get('success', False), 'document_type': claude_analysis.get('document_type', 'unknown') }, 'vision_analysis': { 'pages': vision_enhanced_content['page_count'], 'vision_approved_regions': len(vision_enhanced_content['images']), 'processor_used': vision_enhanced_content.get('processor_used', 'unknown') }, 'molscribe_analysis': { 'images_processed': len(molscribe_results), 'structures_recognized': len([r for r in molscribe_results if r['result'].get('success')]), 'results': molscribe_results }, 'rdkit_analysis': { 'molecules_analyzed': len(rdkit_results), 'results': rdkit_results } } return json.dumps(result, indent=2) except Exception as e: return json.dumps({ 'status': 'error', 'error': f'PDF processing failed: {str(e)}' }, indent=2) def test_n8n_integration(file_id: str, claude_api_key: str, workflow_id: str = None) -> str: """Test N8N HTTP integration""" if not file_id: return json.dumps({ 'success': False, 'error': 'File ID is required for testing' }, indent=2) if not config.validate_api_key(claude_api_key): return json.dumps({ 'success': False, 'error': 'Valid Claude API key required' }, indent=2) # Create N8N-style request n8n_request = { 'file_id': file_id, 'claude_api_key': claude_api_key, 'doc_name': 'N8N Test Document', 'workflow_id': workflow_id or f'test_workflow_{int(time.time())}' } # Process through N8N handler result = n8n_handler.process_n8n_request(n8n_request) return json.dumps({ 'n8n_integration_test': True, 'request_sent': n8n_request, 'response_received': result, 'test_status': 'success' if result.get('success') else 'failed' }, indent=2) # Create Gradio Interface with gr.Blocks(title="Hugging Face Chemical Analyzer", theme=gr.themes.Soft()) as app: gr.HTML("""

🧪 Hugging Face Chemical Analyzer with MolScribe

Claude Analysis + Google Vision + MolScribe + RDKit + N8N HTTP Integration

Production Ready: N8N HTTP nodes, Google Drive, Dual dependency support

""") with gr.Tabs(): # System Status Tab with gr.TabItem("🔧 System Status"): gr.HTML("""

System Component Status

Check all dependencies and integrations...

""") status_btn = gr.Button("🔬 Check System Status", variant="primary") status_output = gr.Code(label="System Status", language="json") status_btn.click( lambda: json.dumps(test_system_status(), indent=2), outputs=[status_output] ) # Single Image Analysis with gr.TabItem("👁️ Vision + MolScribe Analysis"): gr.HTML("""

Single Image Analysis

Upload an image: Vision API checks → MolScribe processes → RDKit analyzes

""") with gr.Row(): with gr.Column(): image_input = gr.Image(type="pil", label="Chemical Structure Image") image_btn = gr.Button("🔍 Analyze with MolScribe", variant="primary") with gr.Column(): image_output = gr.Code(label="Analysis Results", language="json") image_btn.click( analyze_single_image_molscribe, inputs=[image_input], outputs=[image_output] ) # Google Drive Pipeline with gr.TabItem("🚀 Google Drive Pipeline"): gr.HTML("""

Complete Google Drive Pipeline

Full pipeline: Download → Claude analyzes → Vision filters → MolScribe processes → RDKit analyzes

""") with gr.Row(): with gr.Column(): drive_file_id = gr.Textbox( label="Google Drive File ID", placeholder="Enter Google Drive file ID", info="Make sure file is shared or accessible by service account" ) drive_claude_key = gr.Textbox( label="Claude API Key", type="password", placeholder="sk-ant-...", value=config.claude_api_key ) drive_btn = gr.Button("🚀 Run Complete Pipeline", variant="primary") with gr.Column(): drive_output = gr.Code(label="Pipeline Results", language="json") drive_btn.click( process_complete_pipeline, inputs=[drive_file_id, drive_claude_key], outputs=[drive_output] ) # Direct PDF Upload with gr.TabItem("📄 Direct PDF Analysis"): gr.HTML("""

Direct PDF Upload & Analysis

Upload PDF directly for immediate processing

""") with gr.Row(): with gr.Column(): pdf_input = gr.File(label="PDF File", file_types=[".pdf"]) pdf_claude_key = gr.Textbox( label="Claude API Key", type="password", placeholder="sk-ant-...", value=config.claude_api_key ) pdf_btn = gr.Button("📑 Analyze PDF", variant="primary") with gr.Column(): pdf_output = gr.Code(label="PDF Analysis Results", language="json") pdf_btn.click( process_pdf_direct, inputs=[pdf_input, pdf_claude_key], outputs=[pdf_output] ) # N8N HTTP Integration Testing with gr.TabItem("🔗 N8N HTTP Integration"): gr.HTML("""

N8N HTTP Node Integration

Test N8N HTTP integration and get API documentation

""") with gr.Row(): with gr.Column(): n8n_file_id = gr.Textbox( label="Google Drive File ID", placeholder="Test file ID for N8N integration" ) n8n_claude_key = gr.Textbox( label="Claude API Key", type="password", placeholder="sk-ant-...", value=config.claude_api_key ) n8n_workflow_id = gr.Textbox( label="Workflow ID (Optional)", placeholder="test_workflow_123" ) n8n_test_btn = gr.Button("🧪 Test N8N Integration", variant="primary") with gr.Column(): n8n_output = gr.Code(label="N8N Integration Test Results", language="json") n8n_test_btn.click( test_n8n_integration, inputs=[n8n_file_id, n8n_claude_key, n8n_workflow_id], outputs=[n8n_output] ) gr.HTML("""

N8N HTTP Node Configuration:

Method: POST
URL: https://your-hf-space.hf.space/api/analyze
Content-Type: application/json
Body (JSON):
{
  "file_id": "{{ $json.file_id }}",
  "claude_api_key": "{{ $json.claude_api_key }}",
  "doc_name": "{{ $json.doc_name }}",
  "workflow_id": "{{ $json.workflow_id }}"
}

Available API Endpoints:

N8N Workflow Example:

  1. HTTP Request Node: POST to /api/analyze with document data
  2. Wait Node (Optional): Brief pause for processing
  3. Code Node: Extract results from response.data
  4. Switch Node: Route based on success/failure
  5. Further Processing: Use extracted SMILES, molecular data, etc.
""") # Status footer gr.HTML(f"""

Google Drive: {'✅ Connected' if gdrive.service else '⚠️ Needs service_account.json'}

Google Vision: {'✅ Connected' if vision_filter.available else '⚠️ Needs credentials'}

PDF Processing: {', '.join(pdf_processor.available_processors) if pdf_processor.available_processors else '❌ No processors'}

MolScribe: {'✅ Available' if molscribe_analyzer.available else '❌ Not Available'}

RDKit: {'✅ Available' if rdkit_analyzer.available else '❌ Not Available'}

N8N HTTP API: ✅ Ready for HTTP nodes

Deployment: ✅ Hugging Face Spaces compatible with N8N integration

""") if __name__ == "__main__": # For Hugging Face Spaces - just run Gradio # N8N can call the Gradio API directly app.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )