Spaces:
Sleeping
Sleeping
| """ | |
| Complete Hugging Face Chemical Analyzer with MolScribe Integration | |
| Production-ready system with Google Vision, Claude analysis, and N8N HTTP integration | |
| """ | |
| import gradio as gr | |
| import json | |
| import requests | |
| import os | |
| import io | |
| import re | |
| import tempfile | |
| import base64 | |
| import concurrent.futures | |
| import time | |
| import threading | |
| from typing import Dict, Optional, List, Tuple, Union, Any | |
| from PIL import Image, ImageDraw, ImageEnhance, ImageFilter | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime | |
| import logging | |
| from functools import wraps | |
| import traceback | |
| import hashlib | |
| # Enhanced logging setup for Hugging Face | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class DependencyManager: | |
| """Manages optional dependencies with fallback support for Hugging Face""" | |
| def __init__(self): | |
| self.available_deps = {} | |
| self.fallback_options = {} | |
| self.detect_all_dependencies() | |
| def detect_all_dependencies(self): | |
| """Detect all optional dependencies with HF-compatible fallbacks""" | |
| # PDF Processing with dual support | |
| self.available_deps['pdf_processing'] = False | |
| try: | |
| import fitz # PyMuPDF | |
| self.available_deps['pymupdf'] = True | |
| self.available_deps['pdf_processing'] = True | |
| logger.info("β PyMuPDF available") | |
| except ImportError: | |
| try: | |
| import PyPDF2 | |
| self.available_deps['pypdf2'] = True | |
| self.available_deps['pdf_processing'] = True | |
| logger.info("β PyPDF2 available as fallback") | |
| except ImportError: | |
| try: | |
| import pdfplumber | |
| self.available_deps['pdfplumber'] = True | |
| self.available_deps['pdf_processing'] = True | |
| logger.info("β pdfplumber available as fallback") | |
| except ImportError: | |
| logger.warning("β οΈ No PDF processing libraries available") | |
| # Google APIs | |
| try: | |
| from googleapiclient.discovery import build | |
| from google.oauth2.service_account import Credentials | |
| self.available_deps['google_drive'] = True | |
| logger.info("β Google Drive API available") | |
| except ImportError: | |
| self.available_deps['google_drive'] = False | |
| logger.warning("β οΈ Google Drive API not available") | |
| try: | |
| from google.cloud import vision | |
| self.available_deps['google_vision'] = True | |
| logger.info("β Google Vision API available") | |
| except ImportError: | |
| self.available_deps['google_vision'] = False | |
| logger.warning("β οΈ Google Vision API not available") | |
| # Chemical analysis tools | |
| try: | |
| from rdkit import Chem | |
| from rdkit.Chem import Descriptors, Crippen, rdMolDescriptors | |
| self.available_deps['rdkit'] = True | |
| logger.info("β RDKit available") | |
| except ImportError: | |
| self.available_deps['rdkit'] = False | |
| logger.warning("β οΈ RDKit not available") | |
| # MolScribe | |
| try: | |
| import molscribe | |
| self.available_deps['molscribe'] = True | |
| logger.info("β MolScribe available") | |
| except ImportError: | |
| self.available_deps['molscribe'] = False | |
| logger.warning("β οΈ MolScribe not available") | |
| # Essential dependencies | |
| try: | |
| from PIL import Image | |
| self.available_deps['pillow'] = True | |
| except ImportError: | |
| self.available_deps['pillow'] = False | |
| logger.error("β Pillow required") | |
| try: | |
| import requests | |
| self.available_deps['requests'] = True | |
| except ImportError: | |
| self.available_deps['requests'] = False | |
| logger.error("β Requests required") | |
| def is_available(self, dependency: str) -> bool: | |
| return self.available_deps.get(dependency, False) | |
| def get_status(self) -> Dict: | |
| return self.available_deps.copy() | |
| # Initialize dependency manager | |
| deps = DependencyManager() | |
| class HuggingFaceConfig: | |
| """Configuration manager for Hugging Face deployment""" | |
| def __init__(self): | |
| self.setup_environment() | |
| def setup_environment(self): | |
| """Setup environment variables and secrets""" | |
| # Hugging Face Spaces secrets | |
| self.claude_api_key = os.getenv('CLAUDE_API_KEY', '') | |
| self.google_credentials = os.getenv('GOOGLE_APPLICATION_CREDENTIALS_JSON', '') | |
| self.n8n_webhook_secret = os.getenv('N8N_WEBHOOK_SECRET', 'default_secret') | |
| # Setup Google credentials if provided | |
| if self.google_credentials: | |
| try: | |
| credentials_data = json.loads(self.google_credentials) | |
| with open('/tmp/google_credentials.json', 'w') as f: | |
| json.dump(credentials_data, f) | |
| os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/tmp/google_credentials.json' | |
| logger.info("β Google credentials configured from HF secrets") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Google credentials setup failed: {e}") | |
| def validate_api_key(self, api_key: str) -> bool: | |
| """Validate Claude API key format""" | |
| return api_key and api_key.startswith('sk-ant-') and len(api_key) > 20 | |
| config = HuggingFaceConfig() | |
| class GoogleVisionImageFilter: | |
| """Google Vision API integration with HF-compatible error handling""" | |
| def __init__(self, debug_mode: bool = True): | |
| self.available = deps.is_available('google_vision') | |
| self.debug_mode = debug_mode | |
| self.debug_log = [] | |
| self.client = None | |
| self.setup_vision_client() | |
| def log_debug(self, message: str): | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| formatted_message = f"[{timestamp}] {message}" | |
| if self.debug_mode: | |
| logger.info(f"[VISION] {formatted_message}") | |
| self.debug_log.append(formatted_message) | |
| def setup_vision_client(self): | |
| """Initialize Google Vision client for HF""" | |
| if not self.available: | |
| self.log_debug("β Google Vision API not available") | |
| return | |
| try: | |
| from google.cloud import vision | |
| # Check for credentials | |
| creds_path = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS', '/tmp/google_credentials.json') | |
| if os.path.exists(creds_path): | |
| self.client = vision.ImageAnnotatorClient() | |
| self.log_debug("β Google Vision client initialized") | |
| else: | |
| self.log_debug(f"β οΈ Vision credentials not found") | |
| self.available = False | |
| except Exception as e: | |
| self.log_debug(f"β Vision client setup failed: {e}") | |
| self.available = False | |
| def analyze_image_content(self, image: Image.Image) -> Dict: | |
| """Analyze image to determine if it contains chemical structures""" | |
| if not self.available or not self.client: | |
| # Fallback: assume chemical structure if Vision unavailable | |
| return { | |
| 'is_chemical_structure': True, | |
| 'confidence': 0.5, | |
| 'content_type': 'unknown', | |
| 'error': 'Google Vision not available - processing anyway' | |
| } | |
| try: | |
| # Convert PIL image to bytes | |
| img_byte_arr = io.BytesIO() | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| image.save(img_byte_arr, format='PNG') | |
| img_byte_arr = img_byte_arr.getvalue() | |
| from google.cloud import vision | |
| vision_image = vision.Image(content=img_byte_arr) | |
| # Perform multiple detection types | |
| text_response = self.client.text_detection(image=vision_image) | |
| texts = text_response.text_annotations | |
| objects_response = self.client.object_localization(image=vision_image) | |
| objects = objects_response.localized_object_annotations | |
| label_response = self.client.label_detection(image=vision_image) | |
| labels = label_response.label_annotations | |
| # Analyze results | |
| analysis = self._analyze_vision_results(texts, objects, labels, image.size) | |
| self.log_debug(f"Vision analysis: {analysis['content_type']} (confidence: {analysis['confidence']:.2f})") | |
| return analysis | |
| except Exception as e: | |
| self.log_debug(f"β Vision analysis failed: {e}") | |
| return { | |
| 'is_chemical_structure': True, # Default to processing | |
| 'confidence': 0.5, | |
| 'content_type': 'unknown', | |
| 'error': str(e) | |
| } | |
| def _analyze_vision_results(self, texts, objects, labels, image_size) -> Dict: | |
| """Analyze Vision API results to classify content""" | |
| width, height = image_size | |
| image_area = width * height | |
| text_score = 0.0 | |
| structure_score = 0.0 | |
| # Analyze text detection for chemical vs table content | |
| if texts: | |
| total_text_area = 0 | |
| chemical_keywords = 0 | |
| table_indicators = 0 | |
| for text in texts[1:]: # Skip first (full text) | |
| vertices = text.bounding_poly.vertices | |
| if len(vertices) >= 4: | |
| text_width = abs(vertices[2].x - vertices[0].x) | |
| text_height = abs(vertices[2].y - vertices[0].y) | |
| total_text_area += text_width * text_height | |
| text_content = text.description.lower() | |
| # Chemical structure indicators | |
| if any(chem in text_content for chem in [ | |
| 'scheme', 'figure', 'compound', 'synthesis', 'reaction', | |
| 'mol', 'structure', 'formula' | |
| ]): | |
| chemical_keywords += 1 | |
| # Table/data indicators | |
| if any(table in text_content for table in [ | |
| 'table', 'yield', '%', 'mp', 'melting', 'nmr', 'ir', 'ms', | |
| 'data', 'result', 'analysis' | |
| ]): | |
| table_indicators += 1 | |
| text_density = total_text_area / image_area if image_area > 0 else 0 | |
| # High text density suggests tables/data | |
| if text_density > 0.3: | |
| text_score += 0.4 | |
| if table_indicators > chemical_keywords and table_indicators > 2: | |
| text_score += 0.3 | |
| # Analyze object detection | |
| for obj in objects: | |
| obj_name = obj.name.lower() | |
| if any(diagram_term in obj_name for diagram_term in [ | |
| 'diagram', 'chart', 'figure', 'drawing', 'illustration' | |
| ]): | |
| structure_score += 0.2 | |
| # Analyze labels | |
| for label in labels: | |
| label_name = label.description.lower() | |
| confidence = label.score | |
| # Chemical structure indicators | |
| if any(chem_label in label_name for chem_label in [ | |
| 'diagram', 'drawing', 'line art', 'figure', 'illustration', | |
| 'chemistry', 'molecule', 'formula' | |
| ]): | |
| structure_score += confidence * 0.3 | |
| # Text/table indicators | |
| if any(text_label in label_name for text_label in [ | |
| 'text', 'document', 'table', 'data', 'spreadsheet' | |
| ]): | |
| text_score += confidence * 0.2 | |
| # Image aspect ratio analysis | |
| aspect_ratio = width / height if height > 0 else 1 | |
| if 0.3 <= aspect_ratio <= 3.0: | |
| structure_score += 0.1 | |
| if aspect_ratio > 4.0 or aspect_ratio < 0.25: | |
| text_score += 0.2 | |
| # Final classification | |
| if structure_score > text_score: | |
| is_chemical = True | |
| confidence = min(structure_score, 0.9) | |
| content_type = 'chemical_structure' | |
| else: | |
| is_chemical = False | |
| confidence = min(text_score, 0.9) | |
| content_type = 'text_or_table' | |
| confidence = max(confidence, 0.1) | |
| return { | |
| 'is_chemical_structure': is_chemical, | |
| 'confidence': confidence, | |
| 'content_type': content_type, | |
| 'text_score': text_score, | |
| 'structure_score': structure_score, | |
| 'analysis_details': { | |
| 'text_density': locals().get('text_density', 0), | |
| 'chemical_keywords': locals().get('chemical_keywords', 0), | |
| 'table_indicators': locals().get('table_indicators', 0), | |
| 'aspect_ratio': aspect_ratio | |
| } | |
| } | |
| class MolScribeAnalyzer: | |
| """MolScribe-based chemical structure recognition optimized for HF""" | |
| def __init__(self, debug_mode: bool = True): | |
| self.available = deps.is_available('molscribe') | |
| self.debug_mode = debug_mode | |
| self.debug_log = [] | |
| self.stats = { | |
| 'total_predictions': 0, | |
| 'successful_predictions': 0, | |
| 'failed_predictions': 0, | |
| 'avg_processing_time': 0.0 | |
| } | |
| self.setup_molscribe() | |
| def log_debug(self, message: str): | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| formatted_message = f"[{timestamp}] {message}" | |
| if self.debug_mode: | |
| logger.info(f"[MOLSCRIBE] {formatted_message}") | |
| self.debug_log.append(formatted_message) | |
| def setup_molscribe(self): | |
| """Initialize and test MolScribe""" | |
| if not self.available: | |
| self.log_debug("β MolScribe not available") | |
| return | |
| try: | |
| import molscribe | |
| self.log_debug("β MolScribe imported successfully") | |
| # Test with simple image | |
| test_image = Image.new('RGB', (200, 200), 'white') | |
| draw = ImageDraw.Draw(test_image) | |
| # Draw a simple benzene ring for testing | |
| center_x, center_y = 100, 100 | |
| radius = 50 | |
| # Draw hexagon | |
| points = [] | |
| for i in range(6): | |
| angle = i * 60 * np.pi / 180 | |
| x = center_x + radius * np.cos(angle) | |
| y = center_y + radius * np.sin(angle) | |
| points.append((x, y)) | |
| for i in range(6): | |
| start = points[i] | |
| end = points[(i + 1) % 6] | |
| draw.line([start, end], fill='black', width=3) | |
| self.log_debug("Testing MolScribe with benzene structure...") | |
| test_result = self._molscribe_predict(test_image, timeout_seconds=10) | |
| if test_result.get('success'): | |
| self.log_debug("β MolScribe test successful") | |
| else: | |
| self.log_debug(f"β οΈ MolScribe test returned: {test_result}") | |
| except Exception as e: | |
| self.log_debug(f"β MolScribe setup failed: {e}") | |
| self.available = False | |
| def recognize_structure_with_timeout(self, image: Image.Image, timeout_seconds: int = 30) -> Dict: | |
| """Recognize structure with HF-compatible timeout""" | |
| self.stats['total_predictions'] += 1 | |
| self.log_debug(f"Starting MolScribe recognition (attempt #{self.stats['total_predictions']})") | |
| if not self.available: | |
| self.stats['failed_predictions'] += 1 | |
| return { | |
| 'success': False, | |
| 'error': 'MolScribe not available. Install with: pip install MolScribe', | |
| 'method': 'MolScribe' | |
| } | |
| try: | |
| # Preprocess image for optimal recognition | |
| processed_image = self._preprocess_image(image) | |
| start_time = time.time() | |
| # Use concurrent.futures for timeout control | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit(self._molscribe_predict, processed_image, timeout_seconds) | |
| try: | |
| result = future.result(timeout=timeout_seconds) | |
| except concurrent.futures.TimeoutError: | |
| self.stats['failed_predictions'] += 1 | |
| self.log_debug(f"β MolScribe timeout after {timeout_seconds} seconds") | |
| return { | |
| 'success': False, | |
| 'error': f'MolScribe processing timeout ({timeout_seconds}s)', | |
| 'method': 'MolScribe' | |
| } | |
| processing_time = time.time() - start_time | |
| if result.get('success'): | |
| self.stats['successful_predictions'] += 1 | |
| result['processing_time'] = processing_time | |
| self.log_debug(f"β Structure recognized in {processing_time:.2f}s") | |
| else: | |
| self.stats['failed_predictions'] += 1 | |
| self.log_debug(f"β Recognition failed: {result.get('error', 'Unknown error')}") | |
| return result | |
| except Exception as e: | |
| self.stats['failed_predictions'] += 1 | |
| error_msg = f'MolScribe recognition failed: {str(e)}' | |
| self.log_debug(f"β {error_msg}") | |
| return { | |
| 'success': False, | |
| 'error': error_msg, | |
| 'method': 'MolScribe' | |
| } | |
| def _molscribe_predict(self, image: Image.Image, timeout_seconds: int) -> Dict: | |
| """Core MolScribe prediction with error handling""" | |
| try: | |
| import molscribe | |
| from molscribe import MolScribe | |
| from huggingface_hub import hf_hub_download | |
| # Save image to temporary file | |
| with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file: | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| image.save(tmp_file.name, 'PNG') | |
| temp_path = tmp_file.name | |
| try: | |
| # Download model if needed | |
| ckpt_path = hf_hub_download('yujieq/MolScribe', 'swin_base_char_aux_1m.pth') | |
| model = MolScribe(ckpt_path) | |
| # Call MolScribe | |
| smiles_result = model.predict_image(temp_path) | |
| # Clean up temp file | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| # Validate result | |
| if smiles_result and isinstance(smiles_result, str) and len(smiles_result.strip()) > 0: | |
| smiles = smiles_result.strip() | |
| # Check for garbage SMILES | |
| if self._is_garbage_smiles(smiles): | |
| return { | |
| 'success': False, | |
| 'error': 'MolScribe produced garbage SMILES (likely not a chemical structure)', | |
| 'raw_smiles': smiles[:100] + '...' if len(smiles) > 100 else smiles | |
| } | |
| # Validate with RDKit if available | |
| is_valid = True | |
| validation_error = None | |
| canonical_smiles = smiles | |
| if deps.is_available('rdkit'): | |
| try: | |
| from rdkit import Chem | |
| mol = Chem.MolFromSmiles(smiles) | |
| if mol is not None: | |
| canonical_smiles = Chem.MolToSmiles(mol) | |
| is_valid = True | |
| else: | |
| is_valid = False | |
| validation_error = "Invalid SMILES structure" | |
| except Exception as e: | |
| is_valid = False | |
| validation_error = str(e) | |
| return { | |
| 'success': True, | |
| 'smiles': smiles, | |
| 'canonical_smiles': canonical_smiles, | |
| 'is_valid': is_valid, | |
| 'validation_error': validation_error, | |
| 'method': 'MolScribe' | |
| } | |
| else: | |
| return { | |
| 'success': False, | |
| 'error': 'No structure detected by MolScribe', | |
| 'method': 'MolScribe' | |
| } | |
| finally: | |
| if os.path.exists(temp_path): | |
| try: | |
| os.remove(temp_path) | |
| except: | |
| pass | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': f'MolScribe prediction failed: {str(e)}', | |
| 'method': 'MolScribe' | |
| } | |
| def _preprocess_image(self, image: Image.Image) -> Image.Image: | |
| """Enhanced image preprocessing for MolScribe""" | |
| try: | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Enhance contrast and sharpness | |
| enhancer = ImageEnhance.Contrast(image) | |
| image = enhancer.enhance(1.3) | |
| enhancer = ImageEnhance.Sharpness(image) | |
| image = enhancer.enhance(1.2) | |
| # Resize intelligently for MolScribe | |
| width, height = image.size | |
| min_size = 200 | |
| max_size = 1024 | |
| if width < min_size or height < min_size: | |
| scale_factor = min_size / min(width, height) | |
| new_width = int(width * scale_factor) | |
| new_height = int(height * scale_factor) | |
| image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| elif width > max_size or height > max_size: | |
| scale_factor = max_size / max(width, height) | |
| new_width = int(width * scale_factor) | |
| new_height = int(height * scale_factor) | |
| image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| return image | |
| except Exception as e: | |
| logger.warning(f"Image preprocessing failed: {e}") | |
| return image | |
| def _is_garbage_smiles(self, smiles: str) -> bool: | |
| """Check if SMILES is garbage (repetitive patterns, too long, etc.)""" | |
| # Check for excessively long SMILES | |
| if len(smiles) > 1000: | |
| return True | |
| # Check for repetitive patterns | |
| if any(pattern in smiles for pattern in [ | |
| 'CC#CC#CC#CC#', 'CCCCCCCCCCCCCCCCCCCC', | |
| '111111111', '222222222', '333333333' | |
| ]): | |
| return True | |
| # Check for too many dots (disconnected fragments) | |
| if smiles.count('.') > 10: | |
| return True | |
| # Check for obvious errors | |
| if smiles.count('(') != smiles.count(')'): | |
| return True | |
| if smiles.count('[') != smiles.count(']'): | |
| return True | |
| return False | |
| class ClaudeDocumentAnalyzer: | |
| """Claude-powered document analysis for chemical literature""" | |
| def __init__(self, debug_mode: bool = True): | |
| self.debug_mode = debug_mode | |
| self.debug_log = [] | |
| self.analysis_cache = {} | |
| def log_debug(self, message: str): | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| formatted_message = f"[{timestamp}] {message}" | |
| if self.debug_mode: | |
| logger.info(f"[CLAUDE] {formatted_message}") | |
| self.debug_log.append(formatted_message) | |
| def analyze_document_structure(self, text_content: str, api_key: str) -> Dict: | |
| """Analyze document using Claude to identify chemical content locations""" | |
| self.log_debug("Starting Claude document structure analysis...") | |
| if not config.validate_api_key(api_key): | |
| return { | |
| 'error': 'Valid Claude API key required (sk-ant-...)', | |
| 'regions_to_process': [], | |
| 'document_type': 'unknown' | |
| } | |
| try: | |
| # Create cache key | |
| cache_key = hashlib.md5(text_content[:1000].encode()).hexdigest() | |
| if cache_key in self.analysis_cache: | |
| self.log_debug("Using cached analysis") | |
| return self.analysis_cache[cache_key] | |
| # Create structured prompt | |
| analysis_prompt = self._create_analysis_prompt(text_content) | |
| headers = { | |
| 'Content-Type': 'application/json', | |
| 'x-api-key': api_key.strip(), | |
| 'anthropic-version': '2023-06-01' | |
| } | |
| payload = { | |
| 'model': 'claude-3-5-sonnet-20241022', | |
| 'max_tokens': 2000, | |
| 'messages': [{'role': 'user', 'content': analysis_prompt}] | |
| } | |
| self.log_debug("Sending document to Claude for analysis...") | |
| response = requests.post( | |
| 'https://api.anthropic.com/v1/messages', | |
| headers=headers, | |
| json=payload, | |
| timeout=60 | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| analysis_text = result['content'][0]['text'] | |
| # Parse Claude's structured response | |
| parsed_analysis = self._parse_claude_response(analysis_text) | |
| # Cache the result | |
| self.analysis_cache[cache_key] = { | |
| 'success': True, | |
| 'document_type': parsed_analysis['document_type'], | |
| 'structure_locations': parsed_analysis['structure_locations'], | |
| 'data_tables': parsed_analysis['data_tables'], | |
| 'chemical_entities': parsed_analysis['chemical_entities'], | |
| 'processing_priority': parsed_analysis['processing_priority'], | |
| 'claude_analysis': analysis_text, | |
| 'tokens_used': result.get('usage', {}).get('output_tokens', 'unknown') | |
| } | |
| self.log_debug(f"Claude identified {len(parsed_analysis['structure_locations'])} potential structure locations") | |
| return self.analysis_cache[cache_key] | |
| else: | |
| error_msg = f'Claude API error {response.status_code}: {response.text[:200]}' | |
| self.log_debug(f"Claude API failed: {error_msg}") | |
| return { | |
| 'error': error_msg, | |
| 'regions_to_process': [], | |
| 'document_type': 'unknown' | |
| } | |
| except Exception as e: | |
| error_msg = f'Claude analysis failed: {str(e)}' | |
| self.log_debug(error_msg) | |
| return { | |
| 'error': error_msg, | |
| 'regions_to_process': [], | |
| 'document_type': 'unknown' | |
| } | |
| def _create_analysis_prompt(self, text_content: str) -> str: | |
| """Create structured prompt for Claude document analysis""" | |
| prompt = f"""Analyze this chemistry research document and provide a structured analysis to guide automated chemical structure recognition. | |
| Document Text: | |
| {text_content[:6000]} | |
| Please provide a JSON response with the following structure: | |
| {{ | |
| "document_type": "research_paper" | "review" | "patent" | "thesis" | "other", | |
| "structure_locations": [ | |
| {{ | |
| "type": "reaction_scheme" | "individual_structure" | "mechanism", | |
| "keywords": ["scheme", "figure", "compound"], | |
| "page_likely": 1, | |
| "description": "Brief description of what structure is expected", | |
| "priority": "high" | "medium" | "low" | |
| }} | |
| ], | |
| "data_tables": [ | |
| {{ | |
| "type": "yields" | "properties" | "spectral_data" | "references", | |
| "keywords": ["table", "yield", "melting point"], | |
| "should_skip": true, | |
| "page_likely": 2 | |
| }} | |
| ], | |
| "chemical_entities": {{ | |
| "main_compounds": ["compound names found"], | |
| "reagents": ["reagent names"], | |
| "solvents": ["solvent names"], | |
| "catalysts": ["catalyst names"] | |
| }}, | |
| "processing_priority": [ | |
| "Focus on Scheme 1 - main reaction", | |
| "Look for individual product structures", | |
| "Skip data tables and references" | |
| ] | |
| }} | |
| Focus on identifying: | |
| 1. Actual chemical structure diagrams vs data tables | |
| 2. Reaction schemes vs individual compounds | |
| 3. Main synthetic routes vs supporting data | |
| 4. What should be processed vs what should be skipped | |
| Respond only with valid JSON - no additional text.""" | |
| return prompt | |
| def _parse_claude_response(self, response_text: str) -> Dict: | |
| """Parse Claude's JSON response with fallback parsing""" | |
| try: | |
| # Try to extract JSON from response | |
| json_start = response_text.find('{') | |
| json_end = response_text.rfind('}') + 1 | |
| if json_start >= 0 and json_end > json_start: | |
| json_str = response_text[json_start:json_end] | |
| parsed = json.loads(json_str) | |
| # Validate required fields | |
| required_fields = ['document_type', 'structure_locations', 'data_tables', 'chemical_entities'] | |
| if all(field in parsed for field in required_fields): | |
| return parsed | |
| # Fallback parsing if JSON fails | |
| self.log_debug("JSON parsing failed, using fallback text analysis") | |
| return self._fallback_text_analysis(response_text) | |
| except Exception as e: | |
| self.log_debug(f"Response parsing failed: {e}, using fallback") | |
| return self._fallback_text_analysis(response_text) | |
| def _fallback_text_analysis(self, text: str) -> Dict: | |
| """Fallback analysis when JSON parsing fails""" | |
| return { | |
| 'document_type': 'research_paper', | |
| 'structure_locations': [ | |
| { | |
| 'type': 'reaction_scheme', | |
| 'keywords': ['scheme', 'synthesis'], | |
| 'page_likely': 1, | |
| 'description': 'Main reaction scheme', | |
| 'priority': 'high' | |
| } | |
| ], | |
| 'data_tables': [ | |
| { | |
| 'type': 'yields', | |
| 'keywords': ['table', 'yield', 'melting'], | |
| 'should_skip': True, | |
| 'page_likely': 2 | |
| } | |
| ], | |
| 'chemical_entities': { | |
| 'main_compounds': [], | |
| 'reagents': [], | |
| 'solvents': [], | |
| 'catalysts': [] | |
| }, | |
| 'processing_priority': ['Focus on chemical structure diagrams', 'Skip data tables'] | |
| } | |
| class GoogleDriveManager: | |
| """Google Drive integration for Hugging Face with service_account.json""" | |
| def __init__(self): | |
| self.service = None | |
| self.setup_service() | |
| def setup_service(self): | |
| """Initialize Google Drive service using service_account.json""" | |
| if not deps.is_available('google_drive'): | |
| logger.warning("Google Drive libraries not available") | |
| return | |
| try: | |
| from googleapiclient.discovery import build | |
| from google.oauth2.service_account import Credentials | |
| # Look for service_account.json in multiple locations | |
| possible_paths = [ | |
| 'service_account.json', | |
| '/app/service_account.json', | |
| '/tmp/service_account.json', | |
| os.path.join(os.getcwd(), 'service_account.json') | |
| ] | |
| creds_file = None | |
| for path in possible_paths: | |
| if os.path.exists(path): | |
| creds_file = path | |
| break | |
| if creds_file: | |
| credentials = Credentials.from_service_account_file( | |
| creds_file, | |
| scopes=['https://www.googleapis.com/auth/drive.readonly'] | |
| ) | |
| self.service = build('drive', 'v3', credentials=credentials) | |
| logger.info(f"Google Drive service initialized using {creds_file}") | |
| else: | |
| # Try from environment variable (HF Spaces secret) | |
| if config.google_credentials: | |
| credentials = Credentials.from_service_account_info( | |
| json.loads(config.google_credentials), | |
| scopes=['https://www.googleapis.com/auth/drive.readonly'] | |
| ) | |
| self.service = build('drive', 'v3', credentials=credentials) | |
| logger.info("Google Drive service initialized from HF secret") | |
| else: | |
| logger.warning("service_account.json not found in any expected location") | |
| except Exception as e: | |
| logger.error(f"Google Drive setup failed: {e}") | |
| def download_file(self, file_id: str) -> Dict: | |
| """Download file from Google Drive""" | |
| if not self.service: | |
| return {'error': 'Google Drive service not available'} | |
| try: | |
| # Get file metadata | |
| file_info = self.service.files().get(fileId=file_id).execute() | |
| # Download file content | |
| from googleapiclient.http import MediaIoBaseDownload | |
| request = self.service.files().get_media(fileId=file_id) | |
| file_content = io.BytesIO() | |
| downloader = MediaIoBaseDownload(file_content, request) | |
| done = False | |
| while done is False: | |
| status, done = downloader.next_chunk() | |
| file_content.seek(0) | |
| logger.info(f"Downloaded file: {file_info['name']} ({len(file_content.getvalue())} bytes)") | |
| return { | |
| 'success': True, | |
| 'file_name': file_info['name'], | |
| 'file_size': len(file_content.getvalue()), | |
| 'content': file_content.getvalue(), | |
| 'metadata': file_info | |
| } | |
| except Exception as e: | |
| error_msg = f'Download failed: {str(e)}' | |
| logger.error(error_msg) | |
| return {'error': error_msg} | |
| class PDFProcessor: | |
| """PDF processor with dual support (PyMuPDF/PyPDF2/pdfplumber)""" | |
| def __init__(self, vision_filter, debug_mode: bool = True): | |
| self.vision_filter = vision_filter | |
| self.debug_mode = debug_mode | |
| self.debug_log = [] | |
| self.available_processors = self._detect_pdf_processors() | |
| def _detect_pdf_processors(self): | |
| """Detect available PDF processing libraries""" | |
| processors = [] | |
| if deps.is_available('pymupdf'): | |
| processors.append('pymupdf') | |
| if deps.is_available('pypdf2'): | |
| processors.append('pypdf2') | |
| if deps.is_available('pdfplumber'): | |
| processors.append('pdfplumber') | |
| logger.info(f"Available PDF processors: {processors}") | |
| return processors | |
| def log_debug(self, message: str): | |
| if self.debug_mode: | |
| logger.info(f"[PDF] {message}") | |
| self.debug_log.append(message) | |
| def extract_content_with_vision_filtering(self, pdf_bytes: bytes, claude_analysis: Dict = None) -> Dict: | |
| """Extract content using best available PDF processor + Vision filtering""" | |
| if not self.available_processors: | |
| return {'error': 'No PDF processing libraries available'} | |
| self.debug_log = [] | |
| self.log_debug("Starting Vision-enhanced PDF content extraction") | |
| # Try processors in order of preference | |
| for processor in ['pymupdf', 'pypdf2', 'pdfplumber']: | |
| if processor in self.available_processors: | |
| try: | |
| if processor == 'pymupdf': | |
| return self._extract_with_pymupdf(pdf_bytes, claude_analysis) | |
| elif processor == 'pypdf2': | |
| return self._extract_with_pypdf2(pdf_bytes, claude_analysis) | |
| elif processor == 'pdfplumber': | |
| return self._extract_with_pdfplumber(pdf_bytes, claude_analysis) | |
| except Exception as e: | |
| self.log_debug(f"{processor} failed: {e}, trying next processor") | |
| continue | |
| return {'error': 'All PDF processors failed'} | |
| def _extract_with_pymupdf(self, pdf_bytes: bytes, claude_analysis: Dict) -> Dict: | |
| """Extract using PyMuPDF (preferred method)""" | |
| import fitz | |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| self.log_debug(f"Opened PDF with PyMuPDF: {len(doc)} pages") | |
| results = { | |
| 'text_content': '', | |
| 'images': [], | |
| 'page_count': len(doc), | |
| 'extraction_stats': {}, | |
| 'debug_info': [], | |
| 'processor_used': 'pymupdf' | |
| } | |
| all_text = [] | |
| total_images = 0 | |
| vision_filtered = 0 | |
| for page_num in range(min(len(doc), 10)): # Limit to 10 pages for HF | |
| page = doc[page_num] | |
| self.log_debug(f"Processing page {page_num + 1}") | |
| # Extract text | |
| page_text = page.get_text() | |
| all_text.append(f"\n--- PAGE {page_num + 1} ---\n{page_text}") | |
| # Find image regions | |
| regions = self._identify_regions_pymupdf(page, page_text, page_num, claude_analysis) | |
| for region_idx, region in enumerate(regions[:3]): # Limit to 3 per page | |
| try: | |
| # Render region | |
| mat = fitz.Matrix(2.0, 2.0) # 2x zoom | |
| clip = fitz.Rect(region['bbox']) | |
| if clip.width > 50 and clip.height > 50: | |
| pix = page.get_pixmap(matrix=mat, clip=clip) | |
| img_data = pix.tobytes("png") | |
| pil_image = Image.open(io.BytesIO(img_data)) | |
| # Vision API filtering | |
| if self.vision_filter.available: | |
| vision_analysis = self.vision_filter.analyze_image_content(pil_image) | |
| if not vision_analysis['is_chemical_structure']: | |
| vision_filtered += 1 | |
| self.log_debug(f"Vision filtered out region {region_idx}: {vision_analysis['content_type']}") | |
| continue | |
| results['images'].append({ | |
| 'page': page_num + 1, | |
| 'index': region_idx, | |
| 'size': pil_image.size, | |
| 'image': pil_image, | |
| 'filename': f"page_{page_num+1}_region_{region_idx+1}.png", | |
| 'type': region.get('type', 'structure'), | |
| 'description': region.get('description', 'Chemical structure'), | |
| 'vision_analysis': vision_analysis if self.vision_filter.available else None | |
| }) | |
| total_images += 1 | |
| pix = None # Free memory | |
| except Exception as e: | |
| self.log_debug(f"Error processing region {region_idx}: {e}") | |
| results['text_content'] = '\n'.join(all_text) | |
| results['extraction_stats'] = { | |
| 'pages_processed': len(doc), | |
| 'total_regions': total_images + vision_filtered, | |
| 'vision_approved': total_images, | |
| 'vision_filtered': vision_filtered, | |
| 'text_length': len(results['text_content']) | |
| } | |
| results['debug_info'] = self.debug_log.copy() | |
| doc.close() | |
| return results | |
| def _extract_with_pypdf2(self, pdf_bytes: bytes, claude_analysis: Dict) -> Dict: | |
| """Extract using PyPDF2 (fallback method)""" | |
| import PyPDF2 | |
| pdf_file = io.BytesIO(pdf_bytes) | |
| pdf_reader = PyPDF2.PdfReader(pdf_file) | |
| self.log_debug(f"Opened PDF with PyPDF2: {len(pdf_reader.pages)} pages") | |
| # Extract text only (no images with PyPDF2) | |
| all_text = [] | |
| for page_num, page in enumerate(pdf_reader.pages[:10]): | |
| page_text = page.extract_text() | |
| all_text.append(f"\n--- PAGE {page_num + 1} ---\n{page_text}") | |
| return { | |
| 'text_content': '\n'.join(all_text), | |
| 'images': [], # PyPDF2 doesn't extract images well | |
| 'page_count': len(pdf_reader.pages), | |
| 'extraction_stats': { | |
| 'pages_processed': len(pdf_reader.pages), | |
| 'text_length': len('\n'.join(all_text)), | |
| 'images_extracted': 0 | |
| }, | |
| 'processor_used': 'pypdf2', | |
| 'note': 'PyPDF2 used - text only, no image extraction' | |
| } | |
| def _extract_with_pdfplumber(self, pdf_bytes: bytes, claude_analysis: Dict) -> Dict: | |
| """Extract using pdfplumber (alternative fallback)""" | |
| import pdfplumber | |
| with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: | |
| self.log_debug(f"Opened PDF with pdfplumber: {len(pdf.pages)} pages") | |
| all_text = [] | |
| for page_num, page in enumerate(pdf.pages[:10]): | |
| page_text = page.extract_text() or "" | |
| all_text.append(f"\n--- PAGE {page_num + 1} ---\n{page_text}") | |
| return { | |
| 'text_content': '\n'.join(all_text), | |
| 'images': [], # pdfplumber focuses on text | |
| 'page_count': len(pdf.pages), | |
| 'extraction_stats': { | |
| 'pages_processed': len(pdf.pages), | |
| 'text_length': len('\n'.join(all_text)), | |
| 'images_extracted': 0 | |
| }, | |
| 'processor_used': 'pdfplumber', | |
| 'note': 'pdfplumber used - text extraction only' | |
| } | |
| def _identify_regions_pymupdf(self, page, page_text: str, page_num: int, claude_analysis: Dict) -> List[Dict]: | |
| """Identify regions using PyMuPDF""" | |
| regions = [] | |
| try: | |
| # Get text blocks | |
| blocks = page.get_text("dict") | |
| for block in blocks.get("blocks", []): | |
| if "lines" in block: | |
| block_text = "" | |
| bbox = block.get("bbox", [0, 0, 0, 0]) | |
| for line in block["lines"]: | |
| for span in line.get("spans", []): | |
| block_text += span.get("text", "") + " " | |
| block_text_lower = block_text.lower() | |
| # Look for chemical structure indicators | |
| chemical_keywords = ['scheme', 'figure', 'compound', 'structure', 'reaction'] | |
| if any(keyword in block_text_lower for keyword in chemical_keywords): | |
| # Expand bbox for potential structure | |
| expanded_bbox = [ | |
| max(0, bbox[0] - 100), | |
| max(0, bbox[1] - 150), | |
| min(page.rect.width, bbox[2] + 100), | |
| min(page.rect.height, bbox[3] + 300) | |
| ] | |
| regions.append({ | |
| 'bbox': expanded_bbox, | |
| 'text': block_text.strip(), | |
| 'type': 'structure', | |
| 'description': 'Potential chemical structure', | |
| 'confidence': 0.7 | |
| }) | |
| except Exception as e: | |
| self.log_debug(f"Error identifying regions: {e}") | |
| # Add center region if no specific regions found | |
| if not regions: | |
| page_width = page.rect.width | |
| page_height = page.rect.height | |
| regions.append({ | |
| 'bbox': [page_width * 0.1, page_height * 0.2, page_width * 0.9, page_height * 0.7], | |
| 'text': 'center_focus', | |
| 'type': 'structure', | |
| 'description': 'Center focus area', | |
| 'confidence': 0.5 | |
| }) | |
| return regions | |
| class RDKitAnalyzer: | |
| """RDKit molecular analysis with HF compatibility""" | |
| def __init__(self): | |
| self.available = deps.is_available('rdkit') | |
| def analyze_molecule(self, smiles: str) -> Dict: | |
| """Comprehensive molecular analysis using RDKit""" | |
| if not self.available: | |
| return {'error': 'RDKit not available'} | |
| try: | |
| from rdkit import Chem | |
| from rdkit.Chem import Descriptors, Crippen, rdMolDescriptors | |
| mol = Chem.MolFromSmiles(smiles) | |
| if not mol: | |
| return {'error': f'Invalid SMILES: {smiles}'} | |
| properties = { | |
| 'smiles_input': smiles, | |
| 'canonical_smiles': Chem.MolToSmiles(mol), | |
| 'molecular_formula': rdMolDescriptors.CalcMolFormula(mol), | |
| 'molecular_weight': round(Descriptors.MolWt(mol), 2), | |
| 'exact_mass': round(Descriptors.ExactMolWt(mol), 4), | |
| 'logp': round(Crippen.MolLogP(mol), 2), | |
| 'tpsa': round(Descriptors.TPSA(mol), 2), | |
| 'hbd': Descriptors.NumHDonors(mol), | |
| 'hba': Descriptors.NumHAcceptors(mol), | |
| 'heavy_atoms': Descriptors.HeavyAtomCount(mol), | |
| 'rotatable_bonds': Descriptors.NumRotatableBonds(mol), | |
| 'aromatic_rings': Descriptors.NumAromaticRings(mol), | |
| 'ring_count': Descriptors.RingCount(mol), | |
| 'formal_charge': Chem.rdmolops.GetFormalCharge(mol), | |
| } | |
| # Drug-likeness assessment | |
| mw = Descriptors.MolWt(mol) | |
| logp = Crippen.MolLogP(mol) | |
| hbd = Descriptors.NumHDonors(mol) | |
| hba = Descriptors.NumHAcceptors(mol) | |
| lipinski_violations = sum([mw > 500, logp > 5, hbd > 5, hba > 10]) | |
| properties.update({ | |
| 'lipinski_violations': lipinski_violations, | |
| 'lipinski_compliant': lipinski_violations <= 1, | |
| 'drug_likeness': 'High' if lipinski_violations <= 1 else 'Medium' if lipinski_violations <= 2 else 'Low' | |
| }) | |
| return properties | |
| except Exception as e: | |
| return {'error': f'RDKit analysis failed: {str(e)}'} | |
| class N8NAPIHandler: | |
| """N8N HTTP node integration for automated workflows""" | |
| def __init__(self): | |
| self.processing_status = {} | |
| self.request_counter = 0 | |
| def validate_request(self, request_data: Dict) -> Tuple[bool, str]: | |
| """Validate N8N HTTP request""" | |
| try: | |
| # Check for required fields | |
| if 'file_id' not in request_data: | |
| return False, "Missing required field: file_id" | |
| if 'claude_api_key' not in request_data: | |
| return False, "Missing required field: claude_api_key" | |
| # Validate Claude API key format | |
| if not config.validate_api_key(request_data['claude_api_key']): | |
| return False, "Invalid Claude API key format" | |
| return True, "Valid request" | |
| except Exception as e: | |
| return False, f"Request validation error: {str(e)}" | |
| def process_n8n_request(self, request_data: Dict) -> Dict: | |
| """Process N8N HTTP request and return JSON response""" | |
| try: | |
| # Generate request ID | |
| self.request_counter += 1 | |
| request_id = f"n8n_req_{self.request_counter}_{int(time.time())}" | |
| # Validate request | |
| is_valid, validation_message = self.validate_request(request_data) | |
| if not is_valid: | |
| return { | |
| 'success': False, | |
| 'request_id': request_id, | |
| 'error': validation_message, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| file_id = request_data['file_id'] | |
| claude_api_key = request_data['claude_api_key'] | |
| doc_name = request_data.get('doc_name', 'N8N Document') | |
| workflow_id = request_data.get('workflow_id', request_id) | |
| # Set processing status | |
| self.processing_status[request_id] = { | |
| 'status': 'processing', | |
| 'start_time': datetime.now().isoformat(), | |
| 'file_id': file_id, | |
| 'doc_name': doc_name, | |
| 'workflow_id': workflow_id | |
| } | |
| # Run the complete pipeline | |
| result = self._run_complete_pipeline(file_id, claude_api_key, doc_name, request_id) | |
| # Update status | |
| self.processing_status[request_id].update({ | |
| 'status': 'completed' if result.get('status') == 'success' else 'failed', | |
| 'end_time': datetime.now().isoformat(), | |
| 'result': result | |
| }) | |
| # Return N8N-friendly response | |
| if result.get('status') == 'success': | |
| return { | |
| 'success': True, | |
| 'request_id': request_id, | |
| 'workflow_id': workflow_id, | |
| 'data': result, | |
| 'processing_time': self._calculate_processing_time(request_id), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| else: | |
| return { | |
| 'success': False, | |
| 'request_id': request_id, | |
| 'workflow_id': workflow_id, | |
| 'error': result.get('error', 'Unknown processing error'), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'request_id': request_id if 'request_id' in locals() else 'unknown', | |
| 'error': f'Processing failed: {str(e)}', | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| def _calculate_processing_time(self, request_id: str) -> str: | |
| """Calculate processing time for a request""" | |
| try: | |
| status = self.processing_status.get(request_id, {}) | |
| start_time = status.get('start_time') | |
| end_time = status.get('end_time') | |
| if start_time and end_time: | |
| start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00') if start_time.endswith('Z') else start_time) | |
| end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00') if end_time.endswith('Z') else end_time) | |
| duration = (end_dt - start_dt).total_seconds() | |
| return f"{duration:.2f} seconds" | |
| return "Unknown" | |
| except: | |
| return "Unknown" | |
| def _run_complete_pipeline(self, file_id: str, claude_api_key: str, doc_name: str, request_id: str) -> Dict: | |
| """Run the complete analysis pipeline""" | |
| try: | |
| # Step 1: Download from Google Drive | |
| download_result = gdrive.download_file(file_id) | |
| if 'error' in download_result: | |
| return { | |
| 'status': 'error', | |
| 'step': 'download', | |
| 'error': f'Download failed: {download_result["error"]}' | |
| } | |
| # Step 2: Extract text for Claude analysis | |
| pdf_content = pdf_processor.extract_content_with_vision_filtering(download_result['content']) | |
| if 'error' in pdf_content: | |
| return { | |
| 'status': 'error', | |
| 'step': 'text_extraction', | |
| 'error': f'Text extraction failed: {pdf_content["error"]}' | |
| } | |
| # Step 3: Claude document analysis | |
| claude_analysis = claude_analyzer.analyze_document_structure( | |
| pdf_content['text_content'], | |
| claude_api_key.strip() | |
| ) | |
| # Step 4: Vision-enhanced PDF processing | |
| vision_enhanced_content = pdf_processor.extract_content_with_vision_filtering( | |
| download_result['content'], | |
| claude_analysis | |
| ) | |
| # Step 5: MolScribe analysis on Vision-approved images | |
| molscribe_results = [] | |
| if vision_enhanced_content['images']: | |
| for img_data in vision_enhanced_content['images'][:5]: # Limit to 5 images | |
| molscribe_result = molscribe_analyzer.recognize_structure_with_timeout( | |
| img_data['image'], | |
| timeout_seconds=30 | |
| ) | |
| molscribe_results.append({ | |
| 'image_info': { | |
| 'page': img_data['page'], | |
| 'size': img_data['size'], | |
| 'type': img_data.get('type', 'structure'), | |
| 'description': img_data.get('description', 'Chemical structure'), | |
| 'vision_approved': True | |
| }, | |
| 'result': molscribe_result | |
| }) | |
| # Step 6: RDKit analysis | |
| rdkit_results = [] | |
| for molscribe_result in molscribe_results: | |
| if molscribe_result['result'].get('success') and molscribe_result['result'].get('smiles'): | |
| smiles = molscribe_result['result']['smiles'] | |
| rdkit_analysis = rdkit_analyzer.analyze_molecule(smiles) | |
| rdkit_results.append({ | |
| 'source': 'molscribe_vision_approved', | |
| 'smiles': smiles, | |
| 'analysis': rdkit_analysis, | |
| 'image_info': molscribe_result['image_info'] | |
| }) | |
| # Compile results | |
| result = { | |
| 'status': 'success', | |
| 'request_id': request_id, | |
| 'file_info': { | |
| 'file_id': file_id, | |
| 'doc_name': doc_name, | |
| 'file_size_kb': round(download_result['file_size'] / 1024, 1) | |
| }, | |
| 'claude_analysis': { | |
| 'success': claude_analysis.get('success', False), | |
| 'document_type': claude_analysis.get('document_type', 'unknown'), | |
| 'structure_locations_identified': len(claude_analysis.get('structure_locations', [])), | |
| 'processing_guidance': claude_analysis.get('processing_priority', []) | |
| }, | |
| 'vision_enhanced_analysis': { | |
| 'pages': vision_enhanced_content['page_count'], | |
| 'vision_approved_regions': len(vision_enhanced_content['images']), | |
| 'processor_used': vision_enhanced_content.get('processor_used', 'unknown') | |
| }, | |
| 'molscribe_analysis': { | |
| 'images_processed': len(molscribe_results), | |
| 'structures_recognized': len([r for r in molscribe_results if r['result'].get('success')]), | |
| 'results': molscribe_results | |
| }, | |
| 'rdkit_analysis': { | |
| 'molecules_analyzed': len(rdkit_results), | |
| 'results': rdkit_results | |
| }, | |
| 'processing_summary': { | |
| 'total_structures_found': len([r for r in molscribe_results if r['result'].get('success')]), | |
| 'valid_molecules': len([r for r in rdkit_results if 'error' not in r['analysis']]), | |
| 'pipeline_components': ['Google Drive', 'Claude Analysis', 'Google Vision', 'MolScribe', 'RDKit'] | |
| } | |
| } | |
| return result | |
| except Exception as e: | |
| return { | |
| 'status': 'error', | |
| 'error': f'Pipeline processing failed: {str(e)}', | |
| 'request_id': request_id | |
| } | |
| # Initialize all components | |
| gdrive = GoogleDriveManager() | |
| claude_analyzer = ClaudeDocumentAnalyzer(debug_mode=True) | |
| vision_filter = GoogleVisionImageFilter(debug_mode=True) | |
| pdf_processor = PDFProcessor(vision_filter, debug_mode=True) | |
| rdkit_analyzer = RDKitAnalyzer() | |
| molscribe_analyzer = MolScribeAnalyzer(debug_mode=True) | |
| n8n_handler = N8NAPIHandler() | |
| # Core Functions for Gradio Interface | |
| def test_system_status(): | |
| """Test all system components""" | |
| status = deps.get_status() | |
| return { | |
| 'dependency_status': status, | |
| 'google_drive_service': 'connected' if gdrive.service else 'not connected', | |
| 'google_vision_client': 'connected' if vision_filter.available else 'not connected', | |
| 'pdf_processors': pdf_processor.available_processors, | |
| 'molscribe_ready': molscribe_analyzer.available, | |
| 'rdkit_ready': rdkit_analyzer.available, | |
| 'claude_analyzer_ready': True, | |
| 'n8n_api_ready': True, | |
| 'system_type': 'Hugging Face Chemical Analyzer with MolScribe', | |
| 'key_features': [ | |
| 'Google Drive integration with service_account.json', | |
| 'Claude document structure analysis', | |
| 'Google Vision image/text filtering', | |
| 'MolScribe structure recognition', | |
| 'RDKit molecular analysis', | |
| 'N8N HTTP node integration', | |
| 'Dual dependency support' | |
| ] | |
| } | |
| def analyze_single_image_molscribe(image: Image.Image) -> str: | |
| """Analyze single image with Vision + MolScribe""" | |
| if image is None: | |
| return json.dumps({'status': 'error', 'error': 'No image provided'}, indent=2) | |
| # Vision analysis first | |
| vision_analysis = vision_filter.analyze_image_content(image) | |
| result = { | |
| 'vision_analysis': vision_analysis, | |
| 'molscribe_processed': False, | |
| 'molscribe_result': None | |
| } | |
| # Process with MolScribe if Vision approves | |
| if vision_analysis['is_chemical_structure']: | |
| molscribe_result = molscribe_analyzer.recognize_structure_with_timeout(image, timeout_seconds=30) | |
| result['molscribe_processed'] = True | |
| result['molscribe_result'] = molscribe_result | |
| # Add RDKit analysis if successful | |
| if molscribe_result.get('success') and molscribe_result.get('smiles'): | |
| rdkit_analysis = rdkit_analyzer.analyze_molecule(molscribe_result['smiles']) | |
| result['rdkit_analysis'] = rdkit_analysis | |
| else: | |
| result['message'] = f"Vision API classified this as '{vision_analysis['content_type']}' - skipping MolScribe processing" | |
| return json.dumps({ | |
| 'status': 'success', | |
| 'analysis_result': result, | |
| 'molscribe_stats': molscribe_analyzer.stats | |
| }, indent=2) | |
| def process_complete_pipeline(file_id: str, claude_api_key: str) -> str: | |
| """Process complete pipeline for N8N or direct use""" | |
| if not file_id: | |
| return json.dumps({'status': 'error', 'error': 'File ID is required'}, indent=2) | |
| if not config.validate_api_key(claude_api_key): | |
| return json.dumps({'status': 'error', 'error': 'Valid Claude API key required'}, indent=2) | |
| try: | |
| # Create request data in N8N format | |
| request_data = { | |
| 'file_id': file_id, | |
| 'claude_api_key': claude_api_key, | |
| 'doc_name': 'Direct Pipeline Request', | |
| 'workflow_id': f'direct_{int(time.time())}' | |
| } | |
| result = n8n_handler.process_n8n_request(request_data) | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| 'success': False, | |
| 'error': f'Pipeline processing failed: {str(e)}' | |
| }, indent=2) | |
| def process_pdf_direct(pdf_file, claude_api_key: str) -> str: | |
| """Process PDF file directly uploaded""" | |
| if pdf_file is None: | |
| return json.dumps({'status': 'error', 'error': 'No PDF file provided'}, indent=2) | |
| if not config.validate_api_key(claude_api_key): | |
| return json.dumps({'status': 'error', 'error': 'Valid Claude API key required'}, indent=2) | |
| try: | |
| # Read PDF bytes | |
| pdf_bytes = pdf_file.read() | |
| # Extract text for Claude analysis | |
| pdf_content = pdf_processor.extract_content_with_vision_filtering(pdf_bytes) | |
| if 'error' in pdf_content: | |
| return json.dumps({ | |
| 'status': 'error', | |
| 'step': 'text_extraction', | |
| 'error': f'Text extraction failed: {pdf_content["error"]}' | |
| }, indent=2) | |
| # Claude document analysis | |
| claude_analysis = claude_analyzer.analyze_document_structure( | |
| pdf_content['text_content'], | |
| claude_api_key.strip() | |
| ) | |
| # Vision-enhanced processing | |
| vision_enhanced_content = pdf_processor.extract_content_with_vision_filtering( | |
| pdf_bytes, | |
| claude_analysis | |
| ) | |
| # MolScribe analysis | |
| molscribe_results = [] | |
| if vision_enhanced_content['images']: | |
| for img_data in vision_enhanced_content['images'][:3]: # Limit for demo | |
| molscribe_result = molscribe_analyzer.recognize_structure_with_timeout( | |
| img_data['image'], | |
| timeout_seconds=20 | |
| ) | |
| molscribe_results.append({ | |
| 'image_info': { | |
| 'page': img_data['page'], | |
| 'size': img_data['size'], | |
| 'description': img_data.get('description', 'Chemical structure') | |
| }, | |
| 'result': molscribe_result | |
| }) | |
| # RDKit analysis | |
| rdkit_results = [] | |
| for molscribe_result in molscribe_results: | |
| if molscribe_result['result'].get('success') and molscribe_result['result'].get('smiles'): | |
| smiles = molscribe_result['result']['smiles'] | |
| rdkit_analysis = rdkit_analyzer.analyze_molecule(smiles) | |
| rdkit_results.append({ | |
| 'smiles': smiles, | |
| 'analysis': rdkit_analysis | |
| }) | |
| result = { | |
| 'status': 'success', | |
| 'file_name': pdf_file.name, | |
| 'claude_analysis': { | |
| 'success': claude_analysis.get('success', False), | |
| 'document_type': claude_analysis.get('document_type', 'unknown') | |
| }, | |
| 'vision_analysis': { | |
| 'pages': vision_enhanced_content['page_count'], | |
| 'vision_approved_regions': len(vision_enhanced_content['images']), | |
| 'processor_used': vision_enhanced_content.get('processor_used', 'unknown') | |
| }, | |
| 'molscribe_analysis': { | |
| 'images_processed': len(molscribe_results), | |
| 'structures_recognized': len([r for r in molscribe_results if r['result'].get('success')]), | |
| 'results': molscribe_results | |
| }, | |
| 'rdkit_analysis': { | |
| 'molecules_analyzed': len(rdkit_results), | |
| 'results': rdkit_results | |
| } | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| 'status': 'error', | |
| 'error': f'PDF processing failed: {str(e)}' | |
| }, indent=2) | |
| def test_n8n_integration(file_id: str, claude_api_key: str, workflow_id: str = None) -> str: | |
| """Test N8N HTTP integration""" | |
| if not file_id: | |
| return json.dumps({ | |
| 'success': False, | |
| 'error': 'File ID is required for testing' | |
| }, indent=2) | |
| if not config.validate_api_key(claude_api_key): | |
| return json.dumps({ | |
| 'success': False, | |
| 'error': 'Valid Claude API key required' | |
| }, indent=2) | |
| # Create N8N-style request | |
| n8n_request = { | |
| 'file_id': file_id, | |
| 'claude_api_key': claude_api_key, | |
| 'doc_name': 'N8N Test Document', | |
| 'workflow_id': workflow_id or f'test_workflow_{int(time.time())}' | |
| } | |
| # Process through N8N handler | |
| result = n8n_handler.process_n8n_request(n8n_request) | |
| return json.dumps({ | |
| 'n8n_integration_test': True, | |
| 'request_sent': n8n_request, | |
| 'response_received': result, | |
| 'test_status': 'success' if result.get('success') else 'failed' | |
| }, indent=2) | |
| # Create Gradio Interface | |
| with gr.Blocks(title="Hugging Face Chemical Analyzer", theme=gr.themes.Soft()) as app: | |
| gr.HTML(""" | |
| <h1 style='text-align: center; color: #2563eb;'>π§ͺ Hugging Face Chemical Analyzer with MolScribe</h1> | |
| <p style='text-align: center;'>Claude Analysis + Google Vision + MolScribe + RDKit + N8N HTTP Integration</p> | |
| <p style='text-align: center; color: #28a745;'><strong>Production Ready:</strong> N8N HTTP nodes, Google Drive, Dual dependency support</p> | |
| """) | |
| with gr.Tabs(): | |
| # System Status Tab | |
| with gr.TabItem("π§ System Status"): | |
| gr.HTML(""" | |
| <h3>System Component Status</h3> | |
| <p>Check all dependencies and integrations...</p> | |
| """) | |
| status_btn = gr.Button("π¬ Check System Status", variant="primary") | |
| status_output = gr.Code(label="System Status", language="json") | |
| status_btn.click( | |
| lambda: json.dumps(test_system_status(), indent=2), | |
| outputs=[status_output] | |
| ) | |
| # Single Image Analysis | |
| with gr.TabItem("ποΈ Vision + MolScribe Analysis"): | |
| gr.HTML(""" | |
| <h3>Single Image Analysis</h3> | |
| <p>Upload an image: Vision API checks β MolScribe processes β RDKit analyzes</p> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| image_input = gr.Image(type="pil", label="Chemical Structure Image") | |
| image_btn = gr.Button("π Analyze with MolScribe", variant="primary") | |
| with gr.Column(): | |
| image_output = gr.Code(label="Analysis Results", language="json") | |
| image_btn.click( | |
| analyze_single_image_molscribe, | |
| inputs=[image_input], | |
| outputs=[image_output] | |
| ) | |
| # Google Drive Pipeline | |
| with gr.TabItem("π Google Drive Pipeline"): | |
| gr.HTML(""" | |
| <h3>Complete Google Drive Pipeline</h3> | |
| <p>Full pipeline: Download β Claude analyzes β Vision filters β MolScribe processes β RDKit analyzes</p> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| drive_file_id = gr.Textbox( | |
| label="Google Drive File ID", | |
| placeholder="Enter Google Drive file ID", | |
| info="Make sure file is shared or accessible by service account" | |
| ) | |
| drive_claude_key = gr.Textbox( | |
| label="Claude API Key", | |
| type="password", | |
| placeholder="sk-ant-...", | |
| value=config.claude_api_key | |
| ) | |
| drive_btn = gr.Button("π Run Complete Pipeline", variant="primary") | |
| with gr.Column(): | |
| drive_output = gr.Code(label="Pipeline Results", language="json") | |
| drive_btn.click( | |
| process_complete_pipeline, | |
| inputs=[drive_file_id, drive_claude_key], | |
| outputs=[drive_output] | |
| ) | |
| # Direct PDF Upload | |
| with gr.TabItem("π Direct PDF Analysis"): | |
| gr.HTML(""" | |
| <h3>Direct PDF Upload & Analysis</h3> | |
| <p>Upload PDF directly for immediate processing</p> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| pdf_input = gr.File(label="PDF File", file_types=[".pdf"]) | |
| pdf_claude_key = gr.Textbox( | |
| label="Claude API Key", | |
| type="password", | |
| placeholder="sk-ant-...", | |
| value=config.claude_api_key | |
| ) | |
| pdf_btn = gr.Button("π Analyze PDF", variant="primary") | |
| with gr.Column(): | |
| pdf_output = gr.Code(label="PDF Analysis Results", language="json") | |
| pdf_btn.click( | |
| process_pdf_direct, | |
| inputs=[pdf_input, pdf_claude_key], | |
| outputs=[pdf_output] | |
| ) | |
| # N8N HTTP Integration Testing | |
| with gr.TabItem("π N8N HTTP Integration"): | |
| gr.HTML(""" | |
| <h3>N8N HTTP Node Integration</h3> | |
| <p>Test N8N HTTP integration and get API documentation</p> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| n8n_file_id = gr.Textbox( | |
| label="Google Drive File ID", | |
| placeholder="Test file ID for N8N integration" | |
| ) | |
| n8n_claude_key = gr.Textbox( | |
| label="Claude API Key", | |
| type="password", | |
| placeholder="sk-ant-...", | |
| value=config.claude_api_key | |
| ) | |
| n8n_workflow_id = gr.Textbox( | |
| label="Workflow ID (Optional)", | |
| placeholder="test_workflow_123" | |
| ) | |
| n8n_test_btn = gr.Button("π§ͺ Test N8N Integration", variant="primary") | |
| with gr.Column(): | |
| n8n_output = gr.Code(label="N8N Integration Test Results", language="json") | |
| n8n_test_btn.click( | |
| test_n8n_integration, | |
| inputs=[n8n_file_id, n8n_claude_key, n8n_workflow_id], | |
| outputs=[n8n_output] | |
| ) | |
| gr.HTML(""" | |
| <h4>N8N HTTP Node Configuration:</h4> | |
| <div style='background: #f8f9fa; padding: 15px; border-radius: 5px; margin: 10px 0;'> | |
| <strong>Method:</strong> POST<br> | |
| <strong>URL:</strong> <code>https://your-hf-space.hf.space/api/analyze</code><br> | |
| <strong>Content-Type:</strong> application/json<br> | |
| <strong>Body (JSON):</strong> | |
| <pre>{ | |
| "file_id": "{{ $json.file_id }}", | |
| "claude_api_key": "{{ $json.claude_api_key }}", | |
| "doc_name": "{{ $json.doc_name }}", | |
| "workflow_id": "{{ $json.workflow_id }}" | |
| }</pre> | |
| </div> | |
| <h4>Available API Endpoints:</h4> | |
| <ul> | |
| <li><strong>POST /api/analyze</strong> - Main analysis endpoint</li> | |
| <li><strong>GET /api/status/<request_id></strong> - Check processing status</li> | |
| <li><strong>GET /api/health</strong> - Health check</li> | |
| <li><strong>POST /api/test</strong> - Test connectivity</li> | |
| </ul> | |
| <h4>N8N Workflow Example:</h4> | |
| <ol> | |
| <li><strong>HTTP Request Node:</strong> POST to /api/analyze with document data</li> | |
| <li><strong>Wait Node (Optional):</strong> Brief pause for processing</li> | |
| <li><strong>Code Node:</strong> Extract results from response.data</li> | |
| <li><strong>Switch Node:</strong> Route based on success/failure</li> | |
| <li><strong>Further Processing:</strong> Use extracted SMILES, molecular data, etc.</li> | |
| </ol> | |
| """) | |
| # Status footer | |
| gr.HTML(f""" | |
| <div style='text-align: center; margin-top: 20px; color: #666; font-size: 0.9em;'> | |
| <p><strong>Google Drive:</strong> {'β Connected' if gdrive.service else 'β οΈ Needs service_account.json'}</p> | |
| <p><strong>Google Vision:</strong> {'β Connected' if vision_filter.available else 'β οΈ Needs credentials'}</p> | |
| <p><strong>PDF Processing:</strong> {', '.join(pdf_processor.available_processors) if pdf_processor.available_processors else 'β No processors'}</p> | |
| <p><strong>MolScribe:</strong> {'β Available' if molscribe_analyzer.available else 'β Not Available'}</p> | |
| <p><strong>RDKit:</strong> {'β Available' if rdkit_analyzer.available else 'β Not Available'}</p> | |
| <p><strong>N8N HTTP API:</strong> β Ready for HTTP nodes</p> | |
| <p><strong>Deployment:</strong> β Hugging Face Spaces compatible with N8N integration</p> | |
| </div> | |
| """) | |
| if __name__ == "__main__": | |
| # For Hugging Face Spaces - just run Gradio | |
| # N8N can call the Gradio API directly | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True | |
| ) |