#!/usr/bin/env python3 """ Hugging Face Gradio App for RDF Validation with MCP Server and Anthropic AI This app serves both as a web interface and can expose MCP server functionality. Deploy this on Hugging Face Spaces with your Anthropic API key. """ import gradio as gr import os import json import sys import asyncio import logging import re import hashlib import threading import time from collections import OrderedDict from typing import Any, Dict, List, Optional # Add current directory to path sys.path.append(os.path.dirname(os.path.abspath(__file__))) # Import our validation logic try: from validator import validate_rdf VALIDATOR_AVAILABLE = True # Test that the function is callable if not callable(validate_rdf): print("⚠️ Warning: validate_rdf is not callable") VALIDATOR_AVAILABLE = False else: print("✅ Validator module loaded successfully") except ImportError as e: VALIDATOR_AVAILABLE = False print(f"⚠️ Warning: validator.py not found or has import errors: {e}") print("Some features may be limited.") except Exception as e: VALIDATOR_AVAILABLE = False print(f"⚠️ Warning: Error loading validator: {e}") # Optional: Check if OpenAI and requests are available try: from openai import OpenAI OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False print("💡 Install 'openai' package for AI-powered corrections: pip install openai") try: import requests HF_INFERENCE_AVAILABLE = True except ImportError: HF_INFERENCE_AVAILABLE = False print("💡 Install 'requests' package for AI-powered corrections: pip install requests") # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configuration - Your specific Hugging Face Inference Endpoint (hardcoded) HF_API_KEY = os.getenv('HF_API_KEY', '') # Hugging Face API key from Secret HF_ENDPOINT_URL = "https://evxgv66ksxjlfrts.us-east-1.aws.endpoints.huggingface.cloud/v1/" HF_MODEL = "lmstudio-community/Llama-3.3-70B-Instruct-GGUF" # Correct model name for your endpoint # AI Correction Configuration MAX_CORRECTION_ATTEMPTS = 2 # Reduced for speed (rapid fix handles most cases) ENABLE_VALIDATION_LOOP = True # Enable validation loop by default # MCP4BibFrame Documentation API Configuration MCP4BIBFRAME_DOCS_URL = "https://jimfhahn-mcp4bibframe-docs.hf.space/api/mcp" MCP4BIBFRAME_DOCS_ENABLED = True # Set to False to disable doc integration # Cache BibFrame documentation responses to avoid repeated network calls BIBFRAME_DOCS_CACHE: Dict[str, tuple[Any, float]] = {} BIBFRAME_DOCS_CACHE_TTL = 3600 # seconds # Cache successful correction outputs to accelerate repeated error patterns FIX_CACHE: OrderedDict[str, str] = OrderedDict() FIX_CACHE_MAX_SIZE = 100 def _make_fix_cache_key(validation_results: str, rdf_content: str, template: str) -> str: """Generate a deterministic cache key for correction attempts.""" hasher = hashlib.sha256() hasher.update(template.strip().encode("utf-8")) hasher.update(b"\x1f") hasher.update(validation_results.strip().encode("utf-8", errors="ignore")) hasher.update(b"\x1f") hasher.update(rdf_content.strip().encode("utf-8", errors="ignore")) return hasher.hexdigest() def _get_cached_correction(cache_key: str, steps_log: Optional[List[str]] = None) -> Optional[str]: """Retrieve a cached correction, updating its recency ordering.""" cached = FIX_CACHE.get(cache_key) if cached is not None: FIX_CACHE.move_to_end(cache_key) if steps_log is not None: steps_log.append("Using cached correction for repeated validation errors") return cached def _store_correction_in_cache(cache_key: str, corrected_rdf: str, steps_log: Optional[List[str]] = None) -> None: """Store a correction in the cache and evict the oldest entry if needed.""" if not corrected_rdf: return FIX_CACHE[cache_key] = corrected_rdf FIX_CACHE.move_to_end(cache_key) if len(FIX_CACHE) > FIX_CACHE_MAX_SIZE: removed_key, _ = FIX_CACHE.popitem(last=False) if steps_log is not None: steps_log.append("Cache full; evicted oldest correction entry") elif steps_log is not None: steps_log.append("Cached correction for future reuse") # Cache successful correction outputs to accelerate repeated error patterns FIX_CACHE: OrderedDict[str, str] = OrderedDict() FIX_CACHE_MAX_SIZE = 100 def rapid_fix_missing_properties(rdf_content: str, validation_results: str, template: str, steps_log: Optional[List[str]] = None) -> Optional[str]: """Ultra-fast fix for simple missing property errors - no AI needed.""" import re # Quick pattern match for missing properties missing = re.findall(r"Less than \d+ values on.*->bf:(\w+)", validation_results) if not missing: if steps_log: steps_log.append("❌ Rapid fix: No missing properties detected in validation results") return None if steps_log: steps_log.append(f"🔍 Rapid fix detected {len(missing)} missing properties: {', '.join(set(missing))}") # Pre-compiled property templates (no API calls) INSTANT_FIXES = { "title": 'Untitled', "language": 'Englisheng', "content": 'texttxt', "adminMetadata": ''' new n 2024-01-01 Library of Congress Library of Congress ''', "assigner": ''' Library of Congress ''' } # Find insertion point work_match = re.search(r'(]*>)(.*?)()', rdf_content, re.DOTALL) instance_match = re.search(r'(]*>)(.*?)()', rdf_content, re.DOTALL) if not work_match and not instance_match: if steps_log: steps_log.append("❌ Rapid fix: No bf:Work or bf:Instance found in RDF") return None match = work_match or instance_match target_type = "Work" if work_match else "Instance" opening_tag = match.group(1) content = match.group(2) closing_tag = match.group(3) if steps_log: steps_log.append(f"📍 Rapid fix target: bf:{target_type}") has_admin = "" in content or "" in content steps_log.append(f"🔍 Current state: AdminMetadata {'EXISTS' if has_admin else 'MISSING'}") # Build fixes fixes = [] assigner_fixed = False for prop in missing[:10]: # Limit to 10 properties prop_lower = prop.lower() # Special handling for assigner within AdminMetadata if prop_lower == "assigner": if steps_log: steps_log.append("🔧 Processing missing 'assigner' property...") # Look for existing AdminMetadata blocks that need assigner admin_pattern = re.compile(r'(]*>)(.*?)()', re.DOTALL) def add_assigner(match): nonlocal assigner_fixed admin_open = match.group(1) admin_content = match.group(2) admin_close = match.group(3) # Skip if already has assigner if ']*>\s*<[^>]+\s+rdf:about="([^"]+)"', admin_content) if agent_match: agent_uri = agent_match.group(1) # Build assigner element if agent_uri: assigner_element = f' ' else: # Use default Library of Congress assigner_element = ''' Library of Congress ''' assigner_fixed = True if steps_log: steps_log.append(f" ✅ Injected assigner into existing AdminMetadata (agent URI: {agent_uri or 'default'})") # Insert before closing tag return admin_open + admin_content + '\n' + assigner_element + '\n ' + admin_close original_content = content content = admin_pattern.sub(add_assigner, content) if assigner_fixed and steps_log: steps_log.append(" ✅ Assigner successfully added to existing AdminMetadata") elif steps_log and content == original_content: steps_log.append(" ℹ️ No AdminMetadata found to inject assigner (will add with full block if adminMetadata is missing)") elif prop in INSTANT_FIXES and f" str: """Ultra-minimal prompt for faster AI response.""" if not OPENAI_AVAILABLE or not os.getenv('HF_API_KEY'): return rdf try: client = get_openai_client() if not client: return rdf # Extract just the critical errors error_lines = [] for line in errors.split('\n'): if any(term in line for term in ['Less than', 'missing', 'required', '->bf:', 'adminMetadata', 'assigner']): error_lines.append(line.strip()[:100]) if len(error_lines) >= 5: break if not error_lines: return rdf # Ultra-concise prompt prompt = f"""Fix these BibFrame errors: {chr(10).join(error_lines[:3])} Add only what's missing to this RDF: {rdf[:800]}...{rdf[-200:] if len(rdf) > 1000 else ''} Return complete valid RDF/XML only.""" response = client.chat.completions.create( model=HF_MODEL, messages=[ {"role": "system", "content": "Fix RDF. Output only valid RDF/XML. No explanations."}, {"role": "user", "content": prompt} ], max_tokens=max_tokens, temperature=0, timeout=20 # Much shorter timeout ) result = response.choices[0].message.content result = extract_rdf_from_response(result) result = fix_common_rdf_errors(result) return result except Exception: return rdf def test_validator_functionality(): """Test if the validator is actually working""" if not VALIDATOR_AVAILABLE: print("❌ Validator not available for testing") return False try: # Test with minimally valid RDF/XML that matches SHACL targets but is missing required properties # This ensures SHACL finds focus nodes (bf:Text Work) and reports violations test_rdf = ''' ''' conforms, results = validate_rdf(test_rdf.encode('utf-8'), 'monograph') # This should fail validation due to missing required properties if conforms: print("⚠️ WARNING: Validator returned 'conforms=True' for invalid RDF. Validator may not be working correctly!") return False else: preview = (results or '').strip() preview = preview[:200] + ('…' if len(preview) > 200 else '') print(f"✅ Validator test passed. Got expected SHACL violations. Preview: {preview if preview else 'No results text returned'}") return True except Exception as e: print(f"❌ Validator test failed with error: {e}") return False # Run the test on startup if VALIDATOR_AVAILABLE: test_validator_functionality() def query_bibframe_docs(tool_name: str, params: dict, timeout: int = 10) -> Optional[dict]: """ Query the MCP4BibFrame documentation API using the MCP protocol. Args: tool_name (str): Name of the tool to invoke params (dict): Parameters for the tool timeout (int): Request timeout in seconds Returns: Optional[dict]: Response data or None if failed """ if not MCP4BIBFRAME_DOCS_ENABLED: return None try: # Construct MCP request mcp_request = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": tool_name, "arguments": params }, "id": 1 } logger.info(f"Querying BibFrame docs: {tool_name} with {params}") # Make SSE request to MCP endpoint response = requests.post( MCP4BIBFRAME_DOCS_URL, json=mcp_request, timeout=timeout, headers={"Accept": "text/event-stream"} ) if response.status_code == 200: # Parse SSE response for line in response.text.split('\n'): if line.startswith('data: '): try: data = json.loads(line[6:]) if 'result' in data: return data['result'] except json.JSONDecodeError: continue else: logger.warning(f"BibFrame docs API returned status {response.status_code}") except requests.exceptions.Timeout: logger.warning("Timeout querying BibFrame documentation") except Exception as e: logger.error(f"Error querying BibFrame documentation: {str(e)}") return None def query_bibframe_docs_cached(tool_name: str, params: dict, timeout: int = 10) -> Optional[dict]: """Cached wrapper around ``query_bibframe_docs`` to avoid repeated HTTP calls.""" if not MCP4BIBFRAME_DOCS_ENABLED: return None try: cache_key = f"{tool_name}:{json.dumps(params, sort_keys=True)}" except TypeError: cache_key = f"{tool_name}:{str(params)}" cached = BIBFRAME_DOCS_CACHE.get(cache_key) if cached: payload, timestamp = cached if time.time() - timestamp < BIBFRAME_DOCS_CACHE_TTL: logger.debug(f"Using cached BibFrame docs response for {cache_key}") return payload response = query_bibframe_docs(tool_name, params, timeout) if response is not None: BIBFRAME_DOCS_CACHE[cache_key] = (response, time.time()) return response def extract_bibframe_terms_from_errors(validation_results: str) -> dict: """ Extract BibFrame properties and classes mentioned in validation errors. Args: validation_results (str): Validation error text Returns: dict: Dictionary with 'properties' and 'classes' lists """ import re terms = { 'properties': set(), 'classes': set() } # Common patterns in validation results # Properties often appear as bf:propertyName or ->bf:propertyName property_patterns = [ r'bf:(\w+)', r'->bf:(\w+)', r'property (\w+)', r'missing (\w+)', r'requires? (\w+)' ] # Classes often appear as bf:ClassName or "a ClassName" class_patterns = [ r'bf:([A-Z]\w+)', r'type ([A-Z]\w+)', r'class ([A-Z]\w+)', r' 2: # Skip very short matches terms['properties'].add(match.lower()) # Extract classes for pattern in class_patterns: matches = re.findall(pattern, validation_results) for match in matches: if match and len(match) > 2: terms['classes'].add(match) # Convert sets to lists terms['properties'] = list(terms['properties'])[:5] # Limit to top 5 terms['classes'] = list(terms['classes'])[:3] # Limit to top 3 return terms def fetch_bibframe_guidance(validation_results: str, rdf_content: str) -> str: """ Fetch relevant BibFrame guidance from the documentation API based on errors. Args: validation_results (str): Validation error messages rdf_content (str): Original RDF content Returns: str: Formatted guidance text for inclusion in prompts """ if not MCP4BIBFRAME_DOCS_ENABLED: return "" guidance_parts = [] try: # Extract terms from validation errors terms = extract_bibframe_terms_from_errors(validation_results) logger.info(f"Extracted terms - properties: {terms['properties']}, classes: {terms['classes']}") # Query information for key properties for prop in terms['properties'][:3]: # Limit queries prop_uri = _resolve_bibframe_uri(prop) result = query_bibframe_docs_cached("get_property_info", {"property_uri": prop_uri}) if result and isinstance(result, dict): guidance_parts.append(f"\n**{result.get('label', prop)}** ({prop}):") if 'definition' in result: guidance_parts.append(f"- Definition: {result['definition']}") if 'domain' in result: guidance_parts.append(f"- Used in: {', '.join(result['domain'])}") if 'range' in result: guidance_parts.append(f"- Values: {', '.join(result['range'])}") if 'examples' in result and result['examples']: guidance_parts.append(f"- Example: {result['examples'][0]}") # Query information for key classes for cls in terms['classes'][:2]: # Limit queries cls_uri = _resolve_bibframe_uri(cls) result = query_bibframe_docs_cached("get_class_info", {"class_uri": cls_uri}) if result and isinstance(result, dict): guidance_parts.append(f"\n**{result.get('label', cls)}** class:") if 'definition' in result: guidance_parts.append(f"- Definition: {result['definition']}") if 'applicable_properties' in result: props = [p.get('label', p.get('property', '')) for p in result['applicable_properties'][:5]] guidance_parts.append(f"- Key properties: {', '.join(props)}") # If we found AdminMetadata issues, get specific usage guidance if any(term in validation_results.lower() for term in ['adminmetadata', 'assigner', '->bf:assigner']): result = query_bibframe_docs_cached("get_property_usage", { "property_name": "assigner", "class_name": "AdminMetadata" }) if result and isinstance(result, dict): guidance_parts.append("\n**AdminMetadata/assigner usage:**") if 'usage' in result: guidance_parts.append(f"- {result['usage']}") if 'examples' in result and result['examples']: guidance_parts.append(f"- Pattern: {result['examples'][0]}") except Exception as e: logger.error(f"Error fetching BibFrame guidance: {str(e)}") if guidance_parts: return "\n".join(guidance_parts) return "" # OpenAI client configuration for the endpoint def get_openai_client(): """Get configured OpenAI client for HF Inference Endpoint""" if not HF_API_KEY: print("❌ No HF_API_KEY available for OpenAI client") return None print(f"🔗 Creating OpenAI client with:") print(f" base_url: {HF_ENDPOINT_URL}") print(f" api_key: {'***' + HF_API_KEY[-4:] if len(HF_API_KEY) > 4 else 'HIDDEN'}") return OpenAI( base_url=HF_ENDPOINT_URL, api_key=HF_API_KEY, timeout=120.0 # Increase timeout for cold starts ) # Sample RDF data for examples (based on real Library of Congress BibFrame) SAMPLE_VALID_RDF = ''' The knitter's handy book of patterns basic designs in multiple sizes & gauges Budd, Ann, 1956- author aut English eng text txt TT820 .B877 2002 United States, Library of Congress new n 2001-12-12 United States, Library of Congress ''' SAMPLE_INVALID_RDF = ''' Incomplete Title ''' # BibFrame Few-Shot Examples (based on real Library of Congress records) BIBFRAME_CORRECTION_EXAMPLES = { "title_structure": { "pattern": r"bf:title", "wrong": """Simple Title String""", "correct": """ The knitter's handy book of patterns basic designs in multiple sizes & gauges """ }, "adminmetadata": { "pattern": r"bf:adminMetadata|->bf:assigner", "wrong": """ new """, "correct": """ new n 2001-12-12 United States, Library of Congress """ }, "contribution": { "pattern": r"bf:contribution", "wrong": """Author Name""", "correct": """ Budd, Ann, 1956- contributor ctb """ }, "language": { "pattern": r"bf:language", "wrong": """English""", "correct": """ English eng """ }, "content": { "pattern": r"bf:content", "wrong": """Text""", "correct": """ text txt """ }, "classification": { "pattern": r"bf:classification", "wrong": """TT820 .B877 2002""", "correct": """ TT820 .B877 2002 United States, Library of Congress DLC used by assigner uba """ }, "subject": { "pattern": r"bf:subject", "wrong": """Knitting--Patterns""", "correct": """ Knitting--Patterns Knitting Knitting """ } } # MCP Server Tools (can be used independently) def validate_rdf_tool(rdf_content: str, template: str = "monograph") -> dict: """ Validate RDF/XML content against SHACL templates. This tool validates RDF/XML data against predefined SHACL shapes to ensure compliance with metadata standards like BIBFRAME. Returns detailed validation results with conformance status and specific violation information. Args: rdf_content (str): The RDF/XML content to validate template (str): Validation template to use ('monograph' or 'custom') Returns: dict: Validation results with conformance status and detailed feedback """ if not rdf_content: return {"error": "No RDF/XML content provided", "conforms": False} if not VALIDATOR_AVAILABLE: logger.error("Validator module not available") return { "error": "Validator not available - ensure validator.py is present", "conforms": False } try: # Fast syntax check before SHACL to give clearer errors on XML/prefix issues try: try: import rdflib # type: ignore except ImportError: rdflib = None # type: ignore if rdflib: g = rdflib.Graph() # type: ignore # Parse as RDF/XML; raise on syntax errors like unbound prefixes g.parse(data=rdf_content, format="application/rdf+xml") # type: ignore else: logger.info("rdflib not installed; skipping pre-parse RDF/XML syntax check") except Exception as parse_err: logger.error(f"RDF/XML parse error before validation: {parse_err}") return { "error": f"RDF/XML parse error: {parse_err}", "conforms": False } # Log what we're validating logger.info(f"Validating RDF with template '{template}', content length: {len(rdf_content)}") # Call the validator conforms, results_text = validate_rdf(rdf_content.encode('utf-8'), template) # Debug logging logger.info(f"Validation result - conforms: {conforms}, results length: {len(results_text) if results_text else 0}") # If no results text but claims to conform, something might be wrong if conforms and (not results_text or len(results_text.strip()) == 0): results_text = "Validation passed with no specific feedback." elif not conforms and (not results_text or len(results_text.strip()) == 0): results_text = "Validation failed but no specific errors were returned. Check the RDF syntax and structure." return { "conforms": conforms, "results": results_text if results_text else "", "template": template, "status": "✅ Valid RDF" if conforms else "❌ Invalid RDF" } except ImportError as e: logger.error(f"Import error in validator: {str(e)}") return { "error": f"Validator import error: {str(e)}. Check that all dependencies are installed.", "conforms": False } except AttributeError as e: logger.error(f"Validator function not found: {str(e)}") return { "error": f"Validator function error: {str(e)}. Check validator.py implementation.", "conforms": False } except Exception as e: logger.error(f"Validation error: {str(e)}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return { "error": f"Validation failed: {str(e)}", "conforms": False } def filter_validation_results_by_class(validation_results: str, rdf_content: str) -> dict: """ Filter validation results by RDF class (Work, Instance, etc.) Args: validation_results (str): Full validation results rdf_content (str): Original RDF content Returns: dict: Validation results organized by class """ import re # Parse validation results to extract class information class_results = { 'Work': [], 'Instance': [], 'Title': [], 'Contribution': [], 'AdminMetadata': [], 'Other': [] } lines = validation_results.split('\n') current_section = [] current_class = 'Other' for line in lines: # Detect which class this error relates to if 'bf:Work' in line or '/work/' in line: current_class = 'Work' elif 'bf:Instance' in line or '/instance/' in line: current_class = 'Instance' elif 'bf:Title' in line: current_class = 'Title' elif 'bf:Contribution' in line: current_class = 'Contribution' elif 'bf:AdminMetadata' in line or 'AdminMetadata' in line or '->bf:assigner' in line: # Many admin violations show assigner path; map to AdminMetadata current_class = 'AdminMetadata' # Collect lines for current violation if 'Constraint Violation' in line: if current_section: class_results[current_class].extend(current_section) current_section = [line] elif line.strip(): current_section.append(line) # Add last section if current_section: class_results[current_class].extend(current_section) # Remove empty classes return {k: '\n'.join(v) for k, v in class_results.items() if v} def get_ai_suggestions(validation_results: str, rdf_content: str, include_warnings: bool = False) -> str: """Generate AI-powered, plain-language suggestions based on validation results. Avoids RDF/SHACL jargon and focuses on actionable fixes. """ if not OPENAI_AVAILABLE: return generate_manual_suggestions(validation_results) current_api_key = os.getenv('HF_API_KEY', '') if not current_api_key: return f""" 🔑 **AI suggestions disabled**: Please set your Hugging Face API key as a Secret in your Space settings. {generate_manual_suggestions(validation_results)} """ try: client = get_openai_client() if not client: return f""" 🔑 **AI suggestions disabled**: HF_API_KEY not configured. {generate_manual_suggestions(validation_results)} """ severity_instruction = ( "Focus only on violations (errors) and ignore any warnings." if not include_warnings else "Address both violations and warnings." ) # Get BibFrame documentation for context bibframe_guidance = fetch_bibframe_guidance(validation_results, rdf_content) doc_section = "" if bibframe_guidance: doc_section = f""" Reference information from BibFrame ontology: {bibframe_guidance} """ # Group errors by class to focus the prompt class_results = filter_validation_results_by_class(validation_results, rdf_content) if class_results: primary_class = max(class_results.keys(), key=lambda k: len(class_results[k])) focused_results = class_results[primary_class] else: primary_class = "Record" focused_results = validation_results simplified_summary = parse_shacl_results_for_ai(focused_results) relevant_rdf = extract_relevant_rdf_section(rdf_content, primary_class) prompt = f""" You are a helpful metadata librarian. Write in plain language (no RDF/SHACL jargon). Analyze the validation errors for the {primary_class} and provide concise, actionable fixes. {severity_instruction} {doc_section} Validation Errors for {primary_class}: {focused_results[:1500]} Validation Summary (plain language): {simplified_summary} Relevant RDF Section: {relevant_rdf[:800]} Instructions: 1. ONE sentence: What's wrong with this {primary_class}? 2. List errors (max 3 words each) 3. Show exact XML fixes Format: **Issue:** [One sentence about the {primary_class} problem] **Errors:** • Error 1 • Error 2 **Fix:** ```xml [Complete corrected {primary_class} section] ``` Be ultra-concise. Show the fix, not explanations.""" chat_completion = client.chat.completions.create( model=HF_MODEL, messages=[ { "role": "system", "content": "You are a friendly librarian helping fix catalog records. Never use technical RDF or SHACL terminology. Use the BibFrame documentation provided to ensure accuracy." }, { "role": "user", "content": prompt } ], max_tokens=800, temperature=0.5, top_p=0.9 ) generated_text = chat_completion.choices[0].message.content generated_text = clean_technical_jargon(generated_text) other_classes = [k for k in class_results.keys() if k != primary_class] class_note = ( f"\n\n📌 **Note:** Focused on {primary_class} errors. " + (f"Also found issues in: {', '.join(other_classes)}" if other_classes else "") ) return f"🤖 **AI-Powered Suggestions ({('Violations + Warnings' if include_warnings else 'Violations Only')}):**\n\n{generated_text}{class_note}" except Exception as e: logger.error(f"OpenAI/HF Inference Endpoint error: {str(e)}") return f""" ❌ **AI suggestions error**: {str(e)} {generate_manual_suggestions(validation_results)} """ def extract_relevant_rdf_section(rdf_content: str, class_name: str) -> str: """ Extract only the relevant RDF section for a specific class Args: rdf_content (str): Full RDF content class_name (str): Class name to extract (Work, Instance, etc.) Returns: str: Relevant RDF section """ import re # Map class names to RDF patterns patterns = { 'Work': r'', 'Instance': r'', 'Title': r'', 'Contribution': r'', 'AdminMetadata': r'' } pattern = patterns.get(class_name) if not pattern: return rdf_content[:1000] # Fallback to first 1000 chars # Extract matching section match = re.search(pattern, rdf_content, re.DOTALL) if match: section = match.group(0) # Also include namespace declarations namespaces = re.findall(r'xmlns:\w+="[^"]*"', rdf_content[:500]) if namespaces: return f"\n{section}" return section return rdf_content[:1000] # Fallback ## [Removed duplicate get_ai_correction definition – unified below] def merge_corrected_sections(original_rdf: str, corrected_sections: dict) -> str: """ Merge corrected class sections back into the original RDF Args: original_rdf (str): Original RDF content corrected_sections (dict): Corrected sections by class Returns: str: Merged RDF with corrections """ import re result = original_rdf # Replace each corrected section for class_name, corrected_section in corrected_sections.items(): patterns = { 'Work': r'', 'Instance': r'', 'Title': r'', 'Contribution': r'', 'AdminMetadata': r'' } pattern = patterns.get(class_name) if pattern: result = re.sub(pattern, corrected_section, result, count=1, flags=re.DOTALL) return result # Sample RDF data for examples # MCP Server Tools (can be used independently) # Note: This section exists earlier in the file, we're removing the duplicates """ Generate AI-powered fix suggestions for invalid RDF/XML. This tool analyzes validation results and provides actionable suggestions for fixing RDF/XML validation errors using AI or rule-based analysis. Args: validation_results (str): The validation error messages rdf_content (str): The original RDF/XML content that failed validation include_warnings (bool): Whether to include warnings in suggestions Returns: str: Detailed suggestions for fixing the RDF validation issues """ if not OPENAI_AVAILABLE: return generate_manual_suggestions(validation_results) # Get API key dynamically at runtime current_api_key = os.getenv('HF_API_KEY', '') if not current_api_key: return f""" 🔑 **AI suggestions disabled**: Please set your Hugging Face API key as a Secret in your Space settings. {generate_manual_suggestions(validation_results)} """ try: # Use OpenAI client with your Hugging Face Inference Endpoint client = get_openai_client() if not client: return f""" 🔑 **AI suggestions disabled**: HF_API_KEY not configured. {generate_manual_suggestions(validation_results)} """ severity_instruction = "Focus only on violations (errors) and ignore any warnings." if not include_warnings else "Address both violations and warnings." prompt = f"""You are an expert in RDF/XML and SHACL validation. Analyze the validation errors and provide CONCISE, ACTIONABLE fix suggestions. {severity_instruction} Validation Results: {validation_results} Original RDF (first 1000 chars): {rdf_content[:1000]}... Instructions: 1. Start with a ONE-SENTENCE summary of the main issue 2. List the specific errors in bullet points (max 5 words per error) 3. Provide the exact fix for each error with code snippets 4. Keep explanations minimal - focus on solutions Format: **Main Issue:** [One sentence] **Errors Found:** • Error 1 name • Error 2 name **Fixes:** 1. **Error 1**: ```xml [exact code to add/fix] ``` 2. **Error 2**: ```xml [exact code to add/fix] ``` Be direct and solution-focused. No lengthy explanations.""" # Make API call using OpenAI client print(f"🔄 Making API call to: {HF_ENDPOINT_URL}") print(f"🔄 Using model: {HF_MODEL}") print(f"🔄 Include warnings: {include_warnings}") chat_completion = client.chat.completions.create( model=HF_MODEL, messages=[ { "role": "system", "content": "You are a friendly librarian helping fix catalog records. Never use technical RDF or SHACL terminology." }, { "role": "user", "content": prompt } ], max_tokens=1500, temperature=0.6, top_p=0.9 ) print("✅ API call successful") generated_text = chat_completion.choices[0].message.content return f"🤖 **AI-Powered Suggestions ({('Violations + Warnings' if include_warnings else 'Violations Only')}):**\n\n{generated_text}" except Exception as e: logger.error(f"OpenAI/HF Inference Endpoint error: {str(e)}") return f""" ❌ **AI suggestions error**: {str(e)} {generate_manual_suggestions(validation_results)} """ def extract_rdf_from_response(response: str) -> str: """ Extract RDF/XML content from AI response, handling code blocks. Args: response (str): AI response that may contain RDF wrapped in code blocks Returns: str: Extracted RDF/XML content """ response = response.strip() # Handle ```xml code blocks if "```xml" in response: try: return response.split("```xml")[1].split("```")[0].strip() except IndexError: pass # Handle generic ``` code blocks if "```" in response and response.count("```") >= 2: try: return response.split("```")[1].split("```")[0].strip() except IndexError: pass # If no code blocks found, return the response as-is return response def fix_common_rdf_errors(rdf_xml: str) -> str: """ Fix common RDF/XML errors that AI models generate. Args: rdf_xml (str): RDF/XML that may contain common errors Returns: str: Fixed RDF/XML """ import re # Remove any rdf:parseType attributes (common AI mistake) rdf_xml = re.sub(r'\s+rdf:parseType="[^"]*"', '', rdf_xml) # Fix bf:title if it's just a string (should be nested structure) rdf_xml = re.sub( r'([^<]+)', r'\1', rdf_xml ) # Fix bf:language if it's a string instead of URI language_map = { 'English': 'http://id.loc.gov/vocabulary/languages/eng', 'eng': 'http://id.loc.gov/vocabulary/languages/eng', 'Spanish': 'http://id.loc.gov/vocabulary/languages/spa', 'French': 'http://id.loc.gov/vocabulary/languages/fre', } for lang_text, lang_uri in language_map.items(): rdf_xml = re.sub( f'{lang_text}', f'', rdf_xml, flags=re.IGNORECASE ) # Fix bf:content if it's a string content_map = { 'Text': 'http://id.loc.gov/vocabulary/contentTypes/txt', 'text': 'http://id.loc.gov/vocabulary/contentTypes/txt', } for content_text, content_uri in content_map.items(): rdf_xml = re.sub( f'{content_text}', f'', rdf_xml, flags=re.IGNORECASE ) return rdf_xml def extract_error_focus_points(validation_results: str) -> Dict[str, List[str]]: """Identify the specific focus nodes and properties mentioned in validation errors.""" import re focus = { "properties": [], "focus_nodes": [], "missing_properties": [], "classes": [], } if not validation_results: return focus property_set = set() missing_set = set() node_set = set() for match in re.finditer(r"Focus Node:\s*(?:<)?([^\s>]+)(?:>)?", validation_results): node_set.add(match.group(1)) for match in re.finditer(r"Result Path:\s*(?:http://[^/]+/)?([A-Za-z]+)", validation_results): property_set.add(match.group(1)) for match in re.finditer(r"Less than \d+ values on .*->bf:([A-Za-z]+)", validation_results): missing_set.add(match.group(1)) focus["properties"] = sorted(property_set) focus["focus_nodes"] = sorted(node_set) focus["missing_properties"] = sorted(missing_set) return focus def _resolve_bibframe_uri(name: str) -> str: if not name: return name if name.startswith("http://") or name.startswith("https://"): return name if ":" in name: prefix, local = name.split(":", 1) if prefix == "bf": return f"http://id.loc.gov/ontologies/bibframe/{local}" return f"http://id.loc.gov/ontologies/bibframe/{name}" def get_targeted_bibframe_guidance(properties: List[str], classes: List[str]) -> Dict[str, dict]: """Fetch BibFrame documentation for only the specified properties/classes.""" guidance: Dict[str, dict] = {} if not MCP4BIBFRAME_DOCS_ENABLED: return guidance for prop in properties[:5]: prop_uri = _resolve_bibframe_uri(prop) result = query_bibframe_docs_cached("get_property_info", {"property_uri": prop_uri}, timeout=5) if result: guidance[prop] = result for cls in classes[:5]: cls_uri = _resolve_bibframe_uri(cls) result = query_bibframe_docs_cached("get_class_info", {"class_uri": cls_uri}, timeout=5) if result: guidance[cls] = result return guidance def generate_property_specific_fix(property_name: str, guidance: Optional[dict] = None) -> str: """Generate a BibFrame-compliant snippet for a specific missing property.""" guidance = guidance or {} prop = property_name.lower() if property_name else "" if prop == "title": return """ PLACEHOLDER_TITLE """ if prop == "language": return """ English eng """ if prop == "content": return """ text txt """ if prop == "contribution": return """ Author Name author aut """ if prop == "classification": return """ TT820 .B877 2002 United States, Library of Congress """ if prop == "adminmetadata": return """ new n 2024-01-01 United States, Library of Congress """ # Fallback: simple literal placeholder return f"PLACEHOLDER_VALUE" def get_ai_correction(validation_results: str, rdf_content: str, template: str = 'monograph', max_attempts: int = None, include_warnings: bool = False, enable_validation_loop: bool | None = None, cache_key: Optional[str] = None, steps_log: Optional[List[str]] = None) -> str: """ Generate AI-powered corrected RDF/XML based on validation errors. This tool takes invalid RDF/XML and validation results, then generates a corrected version that addresses all identified validation issues. The generated correction is validated before being returned to the user. Args: validation_results (str): The validation error messages rdf_content (str): The original invalid RDF/XML content template (str): The validation template to use max_attempts (int): Maximum number of attempts to generate valid RDF (uses MAX_CORRECTION_ATTEMPTS if None) include_warnings (bool): Whether to fix warnings in addition to violations Returns: str: Corrected RDF/XML that should pass validation """ # Determine whether to iterate based on parameter or global default iterate_enabled = ENABLE_VALIDATION_LOOP if enable_validation_loop is None else enable_validation_loop if steps_log is not None: steps_log.append(f"Planning correction: iterate_enabled={iterate_enabled}, include_warnings={include_warnings}") # Use configuration default if not specified if max_attempts is None: max_attempts = MAX_CORRECTION_ATTEMPTS if steps_log is not None: steps_log.append(f"Max attempts set to {max_attempts}") # If iteration disabled, force single attempt if not iterate_enabled: max_attempts = 1 if steps_log is not None: steps_log.append("Iteration disabled; forcing single attempt") if cache_key is None and validation_results and rdf_content: cache_key = _make_fix_cache_key(validation_results, rdf_content, template) if cache_key: cached_result = _get_cached_correction(cache_key, steps_log) if cached_result is not None: return cached_result if not OPENAI_AVAILABLE: if steps_log is not None: steps_log.append("OPENAI client not available; falling back to manual hints") return generate_manual_correction_hints(validation_results, rdf_content) # Get API key dynamically at runtime current_api_key = os.getenv('HF_API_KEY', '') if not current_api_key: if steps_log is not None: steps_log.append("HF_API_KEY not set; cannot call model; returning manual hints") return f""" {generate_manual_correction_hints(validation_results, rdf_content)}""" try: client = get_openai_client() if not client: if steps_log is not None: steps_log.append("Failed to initialize OpenAI client; returning manual hints") return f""" {generate_manual_correction_hints(validation_results, rdf_content)}""" # Fetch BibFrame documentation guidance if steps_log is not None: steps_log.append("Fetching BibFrame documentation guidance...") bibframe_guidance = fetch_bibframe_guidance(validation_results, rdf_content) if bibframe_guidance: if steps_log is not None: steps_log.append(f"Retrieved BibFrame guidance ({len(bibframe_guidance)} chars)") guidance_section = f""" BIBFRAME DOCUMENTATION (from official ontology): {bibframe_guidance} Apply the above BibFrame definitions and patterns when correcting the RDF/XML. """ else: guidance_section = "" if steps_log is not None: steps_log.append("No specific BibFrame guidance retrieved") # Add timeout protection import time start_time = time.time() timeout = 45 # Reduced to 45 second total timeout for speed if steps_log is not None: steps_log.append(f"Timeout budget: {timeout}s total") severity_instruction = "Fix only the violations (errors) and ignore any warnings." if not include_warnings else "Fix both violations and warnings." # Try multiple attempts to generate valid RDF for attempt in range(max_attempts): # Check timeout elapsed = time.time() - start_time if elapsed > timeout: if steps_log is not None: steps_log.append(f"Timeout reached after {int(elapsed)}s; stopping attempts") print(f"⏰ Timeout reached after {timeout} seconds") break attempt_no = attempt + 1 if steps_log is not None: steps_log.append(f"Attempt {attempt_no}/{max_attempts}: requesting model correction") print(f"🔄 Correction attempt {attempt_no}/{max_attempts}") # Targeted AdminMetadata guidance inferred from results text needs_assigner = ("->bf:assigner" in validation_results) or (" bf:assigner" in validation_results) admin_guidance = "" if needs_assigner: admin_guidance = """ IMPORTANT: For each , ensure it has a direct child . Rules: - If exists, add with the SAME URI. - Else if exists, add with the SAME URI. - Else if a block contains , copy that URI to a TOP-LEVEL . Keep all existing content; only add missing where required. """ # Build few-shot examples based on the errors found examples_to_include = [] validation_lower = validation_results.lower() # Check each example pattern against validation results for name, example in BIBFRAME_CORRECTION_EXAMPLES.items(): pattern = example.get("pattern", name) if re.search(pattern, validation_results, re.IGNORECASE): examples_to_include.append((name, example)) if steps_log is not None: steps_log.append(f"Including {name} example based on pattern match") few_shot_section = "" if examples_to_include: few_shot_section = "\n\nCORRECT BIBFRAME PATTERNS (from Library of Congress records):\n" few_shot_section += "NEVER use simple strings - always use nested structures as shown below:\n\n" for name, example in examples_to_include: few_shot_section += f"{name.upper()}:\n" few_shot_section += f"❌ WRONG:\n```xml\n{example['wrong']}\n```\n" few_shot_section += f"✅ CORRECT:\n```xml\n{example['correct']}\n```\n\n" # Add critical rules based on real patterns critical_rules = """ CRITICAL RDF/XML RULES (from real BibFrame): 1. NEVER use rdf:parseType except for "Collection" on madsrdf:componentList 2. Properties like bf:title, bf:language, bf:content MUST have nested typed resources 3. Use rdf:about for resource URIs, not rdf:resource on the property element 4. bf:adminMetadata can appear multiple times in one record 5. Status, Role, Language etc. are OBJECTS with rdf:about URIs, not literals 6. Date values use rdf:datatype for typing (e.g., xsd:date, xsd:dateTime) 7. Every bf:AdminMetadata needs BOTH bf:agent AND bf:assigner if validation requires it """ prompt = f"""You are an expert in RDF/XML and BibFrame cataloging. Fix the following RDF/XML based on the validation errors and official BibFrame documentation. {severity_instruction} {admin_guidance} {guidance_section} {critical_rules} {few_shot_section} Validation Errors: {validation_results} Original RDF/XML: {rdf_content} {f"Previous attempt {attempt} still had validation errors. Please fix ALL issues this time." if attempt > 0 else ""} INSTRUCTIONS: 1. Return ONLY valid RDF/XML - no explanations 2. Follow the EXACT patterns shown in the examples above 3. Use proper nested structures - NO simple string values for complex properties 4. Keep ALL namespace declarations 5. Fix ALL validation errors""" try: # Update system prompt to be even more explicit system_prompt = """You are an RDF/XML expert following Library of Congress BibFrame patterns. Output ONLY valid RDF/XML following these rules: - Start with - NO markdown, NO explanations - Use EXACT structure patterns from the examples - Complex properties need nested typed resources - rdf:parseType ONLY for Collection on madsrdf:componentList - Status/Role/Language are OBJECTS with URIs, not strings""" chat_completion = client.chat.completions.create( model=HF_MODEL, messages=[ { "role": "system", "content": system_prompt }, { "role": "user", "content": prompt } ], max_tokens=1500, temperature=0.0, timeout=20 # Reduced to 20 second timeout per API call for speed ) corrected_rdf = chat_completion.choices[0].message.content.strip() if steps_log is not None: steps_log.append(f"Attempt {attempt_no}: model responded; extracting and fixing common errors") # Extract RDF content if it's wrapped in code blocks corrected_rdf = extract_rdf_from_response(corrected_rdf) # Fix common AI mistakes corrected_rdf = fix_common_rdf_errors(corrected_rdf) # Only validate if we have the validator and haven't hit timeout if VALIDATOR_AVAILABLE and (time.time() - start_time < timeout - 10): try: # Quick validation check conforms, new_results = validate_rdf(corrected_rdf.encode('utf-8'), template) if conforms: if steps_log is not None: steps_log.append(f"Attempt {attempt_no}: correction PASSED validation") print(f"✅ Correction validated successfully on attempt {attempt_no}") result_text = f""" {corrected_rdf}""" if cache_key: _store_correction_in_cache(cache_key, result_text, steps_log) return result_text else: if steps_log is not None: steps_log.append(f"Attempt {attempt_no}: still invalid; will retry with updated errors") print(f"❌ Correction attempt {attempt_no} still has validation errors") # Update validation_results for next attempt validation_results = new_results except Exception as e: if steps_log is not None: steps_log.append(f"Attempt {attempt_no}: error during validation: {str(e)} — returning correction anyway") print(f"⚠️ Error validating correction attempt {attempt_no}: {str(e)}") # If validation fails, return the correction anyway return f""" {corrected_rdf}""" else: # If validator not available or timeout approaching, return the correction if steps_log is not None: steps_log.append("Skipping validation check (validator unavailable or timeout)") print("⚠️ Returning correction without validation") return f""" {corrected_rdf}""" except Exception as api_error: if steps_log is not None: steps_log.append(f"Attempt {attempt_no}: API error: {str(api_error)}") print(f"❌ API error on attempt {attempt_no}: {str(api_error)}") if attempt == max_attempts - 1: # Last attempt raise api_error continue # All attempts failed or timed out if steps_log is not None: steps_log.append("All attempts failed or timed out; returning manual hints") return f""" {generate_manual_correction_hints(validation_results, rdf_content)}""" except Exception as e: logger.error(f"LLM API error: {str(e)}") if steps_log is not None: steps_log.append(f"Fatal error invoking model: {str(e)}") return f""" {generate_manual_correction_hints(validation_results, rdf_content)}""" def get_ai_correction_targeted(validation_results: str, rdf_content: str, template: str = 'monograph', max_attempts: int = None, include_warnings: bool = False, enable_validation_loop: bool | None = None, steps_log: Optional[List[str]] = None) -> str: """Fast path that attempts structured quick fixes before invoking the full AI loop.""" if steps_log: steps_log.append("\n" + "=" * 70) steps_log.append("📊 INITIAL VALIDATION ERRORS:") steps_log.append("=" * 70) # Show summary of validation errors error_lines = [line.strip() for line in validation_results.split('\n') if 'Less than' in line or 'Message:' in line or 'Module:' in line] for line in error_lines[:15]: # Show first 15 error lines steps_log.append(f" {line}") if len(error_lines) > 15: steps_log.append(f" ... and {len(error_lines) - 15} more errors") steps_log.append("") cache_key: Optional[str] = None if validation_results and rdf_content: cache_key = _make_fix_cache_key(validation_results, rdf_content, template) cached = _get_cached_correction(cache_key, steps_log) if cached is not None: if steps_log: steps_log.append("💾 Cache hit! Returning previously successful correction") return cached # Try rapid fix FIRST - this should handle most cases in < 5 seconds if steps_log: steps_log.append("=" * 60) steps_log.append("🚀 STARTING RAPID FIX") steps_log.append("=" * 60) quick_fix = rapid_fix_missing_properties(rdf_content, validation_results, template, steps_log) if quick_fix: if steps_log: steps_log.append("=" * 60) steps_log.append("🔍 RE-VALIDATING AFTER RAPID FIX") steps_log.append("=" * 60) if quick_fix and VALIDATOR_AVAILABLE: try: conforms, new_results = validate_rdf(quick_fix.encode('utf-8'), template) if conforms: if steps_log: steps_log.append("=" * 60) steps_log.append("✅✅✅ RAPID FIX SUCCESSFUL - VALIDATION PASSED!") steps_log.append("=" * 60) if cache_key: _store_correction_in_cache(cache_key, quick_fix, steps_log) return quick_fix else: # Update for next attempt if steps_log: steps_log.append("=" * 60) steps_log.append("⚠️ RAPID FIX INCOMPLETE - Still has errors:") steps_log.append("=" * 60) # Show first few errors error_lines = new_results.split('\n')[:10] if new_results else [] for line in error_lines: if 'Less than' in line or 'Message:' in line: steps_log.append(f" {line.strip()}") validation_results = new_results or validation_results rdf_content = quick_fix if steps_log: steps_log.append("📋 Continuing to minimal AI correction...") except Exception as e: if steps_log: steps_log.append("=" * 60) steps_log.append(f"❌ RAPID FIX VALIDATION ERROR: {e}") steps_log.append("=" * 60) steps_log.append("📋 Continuing to minimal AI correction...") elif quick_fix and steps_log: steps_log.append("⚠️ Validator not available, cannot re-validate rapid fix") elif steps_log: steps_log.append("ℹ️ Rapid fix returned None, moving to AI correction") # If rapid fix didn't fully work, try minimal AI correction if OPENAI_AVAILABLE and os.getenv('HF_API_KEY'): if steps_log: steps_log.append("Attempting minimal AI correction...") corrected = get_ai_correction_minimal(validation_results, rdf_content, max_tokens=1000) if corrected and corrected != rdf_content and VALIDATOR_AVAILABLE: try: conforms, new_results = validate_rdf(corrected.encode('utf-8'), template) if conforms: if steps_log: steps_log.append("✅ Minimal AI correction successful!") if cache_key: _store_correction_in_cache(cache_key, corrected, steps_log) return corrected else: validation_results = new_results or validation_results rdf_content = corrected if steps_log: steps_log.append("Minimal AI correction partial; falling back to full AI...") except Exception as e: if steps_log: steps_log.append(f"Minimal AI validation error: {e}; falling back...") focus_points = extract_error_focus_points(validation_results) missing_props = focus_points.get("missing_properties", []) if steps_log is not None: steps_log.append(f"Targeted fix: detected {len(missing_props)} missing properties") if missing_props: preview = ", ".join(missing_props[:5]) if len(missing_props) > 5: preview += ", ..." steps_log.append(f"Missing list: {preview}") working_rdf = rdf_content quick_fix_attempted = False if missing_props and len(missing_props) <= 5: guidance = get_targeted_bibframe_guidance(missing_props, focus_points.get("classes", [])) if steps_log is not None: steps_log.append(f"Retrieved guidance entries: {len(guidance)}") import re def _inject_snippets(match: re.Match) -> str: nonlocal quick_fix_attempted opening, inner, closing = match.groups() new_bits = [] for prop in missing_props: if f"]*>)([\s\S]*?)()") instance_pattern = re.compile(r"(]*>)([\s\S]*?)()") if work_pattern.search(working_rdf): working_rdf = work_pattern.sub(_inject_snippets, working_rdf, count=1) elif instance_pattern.search(working_rdf): working_rdf = instance_pattern.sub(_inject_snippets, working_rdf, count=1) if quick_fix_attempted and VALIDATOR_AVAILABLE: try: conforms, new_results = validate_rdf(working_rdf.encode('utf-8'), template) if conforms: if steps_log is not None: steps_log.append("Quick fix succeeded; validation now passes") if cache_key: _store_correction_in_cache(cache_key, working_rdf, steps_log) return working_rdf else: if steps_log is not None: steps_log.append("Quick fix incomplete; falling back to AI loop") validation_results = new_results or validation_results except Exception as quick_err: if steps_log is not None: steps_log.append(f"Quick fix validation error: {quick_err}; using AI fallback") if validation_results and working_rdf: cache_key = _make_fix_cache_key(validation_results, working_rdf, template) return get_ai_correction( validation_results, working_rdf, template, max_attempts=max_attempts, include_warnings=include_warnings, enable_validation_loop=enable_validation_loop, cache_key=cache_key, steps_log=steps_log, ) def generate_manual_suggestions(validation_results: str) -> str: """Generate generic, pattern-based suggestions when AI is not available. Note: Avoid hardcoding SHACL rules or specific property requirements; rely only on patterns present in the validation output text. """ vr_lower = validation_results.lower() if validation_results else "" suggestions: List[str] = [] # Missing/required if ("mincount" in vr_lower) or ("missing" in vr_lower) or ("required" in vr_lower): suggestions.append("• Some required fields are missing. Add the missing information where indicated.") # Too many values if ("maxcount" in vr_lower) or ("too many" in vr_lower) or ("more than allowed" in vr_lower): suggestions.append("• Some fields have too many values. Keep only the main/one value as required.") # Datatype/format issues if ("datatype" in vr_lower) or ("type mismatch" in vr_lower) or ("expected" in vr_lower and "datatype" in vr_lower): suggestions.append("• Some values are in the wrong format. Use the expected format (e.g., dates like YYYY-MM-DD).") # URI/identifier issues if ("iri" in vr_lower) or ("uri" in vr_lower) or ("identifier" in vr_lower and "invalid" in vr_lower): suggestions.append("• Some identifiers look malformed. Use complete, valid web addresses or proper identifiers.") # Namespace/prefix issues if ("namespace" in vr_lower) or ("prefix" in vr_lower): suggestions.append("• Define all XML namespace prefixes at the top and use them consistently.") # XML syntax/structure if ("xml" in vr_lower) or ("syntax" in vr_lower) or ("well-formed" in vr_lower): suggestions.append("• Fix XML structure issues (unclosed tags, invalid characters, or nesting problems).") # Fallback if not suggestions: suggestions.append("• Review the validation details and update the record where issues are highlighted.") suggestions.append("• Follow the selected template; add missing fields and correct formats as needed.") suggestions_text = "\n".join(suggestions) return f""" 📋 **What needs fixing:** {suggestions_text} 💡 **Quick tips:** • Include required fields when noted • Keep single-value fields to one value • Use the expected formats (e.g., for dates) • Declare and use XML namespace prefixes consistently • Ensure the XML is well‑formed Need help? Load an example and compare the structure. """ def clean_technical_jargon(text: str) -> str: """Replace technical RDF/SHACL terms with plain language for end users.""" if not text: return text replacements = { # RDF/SHACL jargon "URIRef": "identifier", "URI": "identifier", "IRI": "identifier", "Literal": "text value", "triple": "field entry", "graph": "dataset", "node": "record", "subject": "record", "predicate": "field type", "object": "value", "SHACL": "validation", "constraint": "rule", "conformance": "compliance", "violation": "issue", "sh:": "", "rdf:": "", "rdfs:": "", "xsd:": "", # Tone softening "Error:": "Issue:", "Invalid": "Incorrect", "Failed": "Did not pass", "Missing": "Not found", } cleaned = text for k, v in replacements.items(): cleaned = cleaned.replace(k, v) return cleaned def parse_shacl_results_for_ai(results_text: str) -> str: """Simplify SHACL results into clearer sentences for AI processing. Pattern-based only; does not depend on any SHACL rule definitions. """ if not results_text: return "" import re simplified: List[str] = [] # Generic patterns patterns = [ (re.compile(r"minCount", re.IGNORECASE), "A required field is missing."), (re.compile(r"maxCount", re.IGNORECASE), "A field has more values than allowed; only one may be permitted."), (re.compile(r"datatype", re.IGNORECASE), "A field has a value in the wrong format."), (re.compile(r"iri|uri", re.IGNORECASE), "An identifier looks malformed or incomplete."), (re.compile(r"namespace|prefix", re.IGNORECASE), "A namespace prefix is undefined or inconsistent."), (re.compile(r"xml|syntax|well-formed", re.IGNORECASE), "The XML structure has an error (e.g., unclosed tag)."), ] lines = [ln.strip() for ln in results_text.splitlines() if ln.strip()] for ln in lines: matched = False for regex, message in patterns: if regex.search(ln): simplified.append(message) matched = True break if not matched and ("Constraint Violation" in ln or "Violation" in ln): simplified.append("A record rule was not met.") # Deduplicate while preserving order seen = set() unique = [] for s in simplified: if s not in seen: unique.append(s) seen.add(s) return "\n".join(unique) if unique else results_text def generate_manual_correction_hints(validation_results: str, rdf_content: str) -> str: """Generate manual correction hints when AI is not available""" return f""" {rdf_content} """ def extract_xml_from_text(text: str) -> str: """Extract RDF/XML from model output that may include extra formatting. Looks for the first ... block. If not found, returns the original text unchanged. """ if not text: return text import re # Try to capture XML block even if fenced in code blocks # Use DOTALL to span multiple lines pattern = re.compile(r"", re.IGNORECASE) m = pattern.search(text) if m: return m.group(0) # Strip common markdown fences if present fenced = re.sub(r"^```[a-zA-Z]*\n|```$", "", text.strip()) return fenced if fenced else text def clean_xml_for_validation(xml_text: str) -> str: """ Clean XML text for validation by removing comments and extra formatting. Args: xml_text (str): XML text that may contain comments or formatting Returns: str: Clean XML ready for validation """ import re if not xml_text: return xml_text # Remove all HTML comments cleaned = re.sub(r'', '', xml_text, flags=re.DOTALL) # Remove any leading/trailing whitespace cleaned = cleaned.strip() # If the text starts with "```" code fence, extract content if cleaned.startswith("```"): try: # Extract content between code fences parts = cleaned.split("```") if len(parts) >= 3: # Second part should be the XML content cleaned = parts[1] # Remove language identifier if present (e.g., "xml") if cleaned.startswith("xml"): cleaned = cleaned[3:] except: pass return cleaned.strip() # --- Namespace and wrapper helpers to avoid XML parser errors --- STANDARD_NAMESPACES = { "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", "bf": "http://id.loc.gov/ontologies/bibframe/", "rdfs": "http://www.w3.org/2000/01/rdf-schema#", "xsd": "http://www.w3.org/2001/XMLSchema#", } def _extract_declared_namespaces(xml_text: str) -> dict: import re decls = {} for prefix, uri in re.findall(r"xmlns:([A-Za-z0-9_-]+)=\"([^\"]+)\"", xml_text[:2000]): decls[prefix] = uri return decls def _detect_used_prefixes(xml_text: str) -> set: import re used = set() # Tag prefixes like and attribute prefixes like rdf:type="..." for m in re.finditer(r"<\s*([A-Za-z0-9_-]+):[A-Za-z0-9_-]+", xml_text): used.add(m.group(1)) for m in re.finditer(r"\s([A-Za-z0-9_-]+):[A-Za-z0-9_-]+=", xml_text): used.add(m.group(1)) return used def ensure_rdf_wrapper_and_namespaces(xml_text: str, original_text: Optional[str] = None, steps_log: Optional[List[str]] = None) -> str: """Ensure the XML has an wrapper and required xmlns declarations for used prefixes. - If wrapper exists, add any missing xmlns: declarations for standard, used prefixes. - If wrapper is missing, wrap the content and include standard namespaces for used prefixes. """ if not xml_text or not isinstance(xml_text, str): return xml_text import re declared = _extract_declared_namespaces(xml_text) if original_text: # Merge any declarations present in the original input declared.update(_extract_declared_namespaces(original_text)) used = _detect_used_prefixes(xml_text) # Always consider rdf used for wrapper used.add("rdf") # Only inject namespaces for known standards to avoid guessing missing = [p for p in used if p not in declared and p in STANDARD_NAMESPACES] added_attrs = " ".join([f"xmlns:{p}=\"{STANDARD_NAMESPACES[p]}\"" for p in missing]) has_wrapper = bool(re.search(r"]*>", xml_text)) updated = xml_text if has_wrapper: if added_attrs: # Inject before the closing '>' of the first def _inject(match): start_tag = match.group(0) if start_tag.endswith('>'): return start_tag[:-1] + ' ' + added_attrs + '>' return start_tag + ' ' + added_attrs updated = re.sub(r"]*>", _inject, updated, count=1) if steps_log is not None and missing: steps_log.append(f"Injected missing namespace declarations: {', '.join(missing)}") else: # Build a wrapper with standard namespaces for used prefixes we know attrs = [f"xmlns:rdf=\"{STANDARD_NAMESPACES['rdf']}\""] for p in used: if p == 'rdf': continue uri = declared.get(p) or STANDARD_NAMESPACES.get(p) if uri: attrs.append(f"xmlns:{p}=\"{uri}\"") wrapper_open = "\n" wrapper_close = "\n" updated = wrapper_open + xml_text + wrapper_close if steps_log is not None: steps_log.append("Wrapped snippet in with standard namespace declarations") return updated def validate_rdf_interface(rdf_content: str, template: str, use_ai: bool = True, include_warnings: bool = False, iterate_until_valid: bool = True, max_attempts: int = 5, show_steps: bool = True): """Main validation function for Gradio interface""" if not rdf_content.strip(): return "❌ Error", "No RDF/XML data provided", "", "", "", "", "" steps_log: List[str] = [] # Check if validator is available if not VALIDATOR_AVAILABLE: error_msg = "Validator module is not available. Please check that validator.py is present and all dependencies are installed." steps_log.append(f"ERROR: {error_msg}") return "❌ Error", error_msg, "", "\n".join(steps_log) if show_steps else "", "", "", "" # Prepare and validate RDF steps_log.append(f"Preparing RDF for validation (original length: {len(rdf_content)} chars)") prepped_input = ensure_rdf_wrapper_and_namespaces(rdf_content, steps_log=steps_log if show_steps else None) steps_log.append(f"Preprocessed RDF (new length: {len(prepped_input)} chars)") # Call validation steps_log.append(f"Calling validator with template '{template}'") result = validate_rdf_tool(prepped_input, template) if "error" in result: steps_log.append(f"Validation error: {result['error']}") return f"❌ Error: {result['error']}", "", "", "\n".join(steps_log) if show_steps else "", "", "", "" status = result["status"] results_text = result["results"] conforms = result["conforms"] steps_log.append(f"Initial validation: {'PASSED' if conforms else 'FAILED'} using template '{template}'") # Log if we got unexpected empty results if not results_text or len(results_text.strip()) == 0: steps_log.append("WARNING: Validator returned empty results text") # Filter results if warnings should be excluded filtered_results = results_text if not include_warnings and "Warning" in results_text: # Split results into lines and filter out warnings lines = results_text.split('\n') filtered_lines = [] skip_until_next_section = False for line in lines: if "Warning" in line and ("Constraint Violation" in line or "sh:Warning" in line): skip_until_next_section = True elif "Constraint Violation" in line and "Warning" not in line: skip_until_next_section = False filtered_lines.append(line) elif not skip_until_next_section: filtered_lines.append(line) filtered_results = '\n'.join(filtered_lines) if not include_warnings: steps_log.append("Filtered out warnings from results") corrected_status = "" corrected_results = "" if not include_warnings: steps_log.append("Configured to ignore warnings in AI processing") if iterate_until_valid: steps_log.append(f"Iteration enabled with max_attempts={max_attempts}") if conforms: suggestions = "✅ No issues found! Your RDF/XML is valid according to the selected template." corrected_rdf = "" corrected_status = "—" corrected_results = "" steps_log.append("No correction needed; record already conforms") else: if use_ai: # Pass filtered results to AI functions suggestions = get_ai_suggestions(filtered_results, rdf_content, include_warnings) steps_log.append("Requested AI suggestions for concise guidance") corrected_rdf = get_ai_correction_targeted( filtered_results, rdf_content, template, max_attempts=max_attempts, include_warnings=include_warnings, enable_validation_loop=iterate_until_valid, steps_log=steps_log, ) # Attempt re-validation of corrected RDF try: # Clean the corrected output for validation corrected_xml = clean_xml_for_validation(corrected_rdf) corrected_xml = extract_xml_from_text(corrected_xml) corrected_xml = ensure_rdf_wrapper_and_namespaces(corrected_xml, original_text=prepped_input, steps_log=steps_log) # Debug logging steps_log.append(f"Re-validating cleaned RDF ({len(corrected_xml)} chars)") if show_steps: # Log first 200 chars of what we're validating preview = corrected_xml[:200] + "..." if len(corrected_xml) > 200 else corrected_xml steps_log.append(f"Preview: {preview}") reval = validate_rdf_tool(corrected_xml, template) if "error" in reval: corrected_status = f"❌ Re-validation Error: {reval['error']}" corrected_results = "" steps_log.append(f"Re-validation failed with error: {reval['error']}") else: corrected_status = reval.get("status", "") corrected_results = reval.get("results", "") conforms = reval.get('conforms', False) steps_log.append(f"Re-validation: {corrected_status} - Conforms: {conforms}") except Exception as re_ex: corrected_status = f"❌ Re-validation Error: {re_ex}" corrected_results = "" steps_log.append(f"Re-validation error: {re_ex}") else: suggestions = generate_manual_suggestions(filtered_results) corrected_rdf = generate_manual_correction_hints(filtered_results, rdf_content) corrected_status = "—" corrected_results = "" steps_log.append("AI disabled; produced manual suggestions and hints") steps_text = "\n".join(steps_log) if show_steps else "" return status, results_text, suggestions, steps_text, corrected_rdf, corrected_status, corrected_results def get_rdf_examples(example_type: str = "valid") -> str: """ Retrieve example RDF/XML snippets for testing and learning. This tool provides sample RDF/XML content that can be used to test the validation system or learn proper RDF structure. Examples include valid BibFrame Work records, invalid records for testing corrections, and BibFrame Instance records. Args: example_type (str): Type of example to retrieve. Options: - 'valid': A complete, valid BibFrame Work record - 'invalid': An incomplete BibFrame Work with validation errors - 'bibframe': A BibFrame Instance record example Returns: str: Complete RDF/XML example content ready for validation testing """ examples = { "valid": SAMPLE_VALID_RDF, "invalid": SAMPLE_INVALID_RDF, "bibframe": ''' Example Book Title 2024 New York ''' } return examples.get(example_type, examples["valid"]) # Create Gradio Interface def create_interface(): """Create the main Gradio interface""" # Check API key status dynamically current_api_key = os.getenv('HF_API_KEY', '') api_status = "🔑 AI features enabled" if (OPENAI_AVAILABLE and current_api_key) else "⚠️ AI features disabled (set HF_API_KEY)" with gr.Blocks( title="RDF Validation Server with AI", theme=gr.themes.Soft(), css=""" .status-box { font-weight: bold; padding: 10px; border-radius: 5px; } .header-text { text-align: center; padding: 20px; } """ ) as demo: # Header debug_info = f""" Debug Info: - OPENAI_AVAILABLE: {OPENAI_AVAILABLE} - HF_INFERENCE_AVAILABLE: {HF_INFERENCE_AVAILABLE} - HF_API_KEY set: {'Yes' if current_api_key else 'No'} - HF_API_KEY length: {len(current_api_key) if current_api_key else 0} - HF_ENDPOINT_URL: {HF_ENDPOINT_URL} - HF_MODEL: {HF_MODEL} """ gr.HTML(f"""

🔍 RDF Validation Server with AI

Validate RDF/XML against SHACL schemas with AI-powered suggestions and corrections

Status: {api_status}

Debug Info
{debug_info}
""") # Main interface with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📝 Input") rdf_input = gr.Textbox( label="RDF/XML Content", placeholder="Paste your RDF/XML content here...", lines=15, show_copy_button=True ) # Keep the main form simple and tuck options into an accordion with gr.Accordion("Advanced options", open=False): with gr.Row(): template_dropdown = gr.Dropdown( label="Validation Template", choices=["monograph", "custom"], value="monograph", info="Select the SHACL template to validate against" ) use_ai_checkbox = gr.Checkbox( label="Use AI Features", value=True, info="Enable AI-powered suggestions and corrections" ) include_warnings_checkbox = gr.Checkbox( label="Include Warnings", value=False, info="Include warnings in AI corrections (violations only by default)" ) with gr.Row(): iterate_checkbox = gr.Checkbox( label="Iterate until valid", value=True, info="Try multiple correction attempts until validation passes or attempts run out" ) max_attempts_slider = gr.Slider( label="Max attempts", minimum=1, maximum=3, value=2, step=1, info="Maximum number of correction attempts (2 recommended for speed)" ) show_steps_checkbox = gr.Checkbox( label="Show steps", value=False, info="Display step-by-step process (turn on when you want transparency)" ) validate_btn = gr.Button("🔍 Validate RDF", variant="primary", size="lg") # Examples and controls gr.Markdown("### 📚 Examples & Tools") with gr.Row(): example1_btn = gr.Button("✅ Valid RDF Example", variant="secondary") example2_btn = gr.Button("❌ Invalid RDF Example", variant="secondary") clear_btn = gr.Button("🗑️ Clear All", variant="stop") # Results section with gr.Row(): with gr.Column(): gr.Markdown("### 📊 Results") status_output = gr.Textbox( label="Validation Status", interactive=False, lines=1, elem_classes=["status-box"] ) results_output = gr.Textbox( label="Detailed Validation Results", interactive=False, lines=8, show_copy_button=True ) suggestions_output = gr.Textbox( label="💡 Fix Suggestions", interactive=False, lines=8, show_copy_button=True ) steps_output = gr.Textbox( label="🧭 Correction Steps", interactive=False, lines=10, show_copy_button=True, placeholder="Step-by-step log of how the system derived the corrected XML" ) # Corrected RDF section with gr.Row(): with gr.Column(): gr.Markdown("### 🛠️ AI-Generated Corrections") corrected_output = gr.Textbox( label="Corrected RDF/XML", interactive=False, lines=15, show_copy_button=True, placeholder="Corrected RDF will appear here after validation..." ) with gr.Row(): corrected_status_output = gr.Textbox( label="Re-validation Status (Corrected RDF)", interactive=False, lines=1, elem_classes=["status-box"] ) corrected_results_output = gr.Textbox( label="Re-validation Details", interactive=False, lines=6, show_copy_button=True ) # Event handlers validate_btn.click( fn=validate_rdf_interface, inputs=[rdf_input, template_dropdown, use_ai_checkbox, include_warnings_checkbox, iterate_checkbox, max_attempts_slider, show_steps_checkbox], outputs=[status_output, results_output, suggestions_output, steps_output, corrected_output, corrected_status_output, corrected_results_output] ) # Remove auto-validation to prevent processing loops # rdf_input.change( # fn=validate_rdf_interface, # inputs=[rdf_input, template_dropdown, use_ai_checkbox], # outputs=[status_output, results_output, suggestions_output, corrected_output] # ) # Example buttons example1_btn.click( lambda: get_rdf_examples("valid"), outputs=[rdf_input] ) example2_btn.click( lambda: get_rdf_examples("invalid"), outputs=[rdf_input] ) clear_btn.click( lambda: ("", "", "", "", "", "", "", ""), outputs=[rdf_input, status_output, results_output, suggestions_output, steps_output, corrected_output, corrected_status_output, corrected_results_output] ) # Footer with instructions gr.Markdown(""" --- ### � **Documentation & Resources:** **[📖 MCP4BibFrame Documentation](https://huggingface.co/spaces/jimfhahn/mcp4bibframe-docs)** - Complete BibFrame ontology reference with examples This validator integrates with the **MCP4BibFrame Documentation API** to provide authoritative BibFrame ontology information during AI-powered corrections. ### 🚀 **Quick Start:** 1. **Paste your RDF/XML** in the input box above 2. **Click "Validate RDF"** to check for errors 3. **Review AI suggestions** for plain-language fixes (enhanced with BibFrame documentation) 4. **Copy the corrected RDF** from the output --- ### �🚀 **Deployment Instructions for Hugging Face Spaces:** 1. **Create a new Space** on [Hugging Face](https://huggingface.co/spaces) 2. **Set up your Hugging Face Inference Endpoint** and get the endpoint URL 3. **Set your tokens** in Space settings (use Secrets for security): - Go to Settings → Repository secrets - Add: `HF_API_KEY` = `your_huggingface_api_key_here` - Endpoint is now hardcoded to your specific Inference Endpoint 4. **Upload these files** to your Space repository 5. **Install requirements**: The Space will auto-install from `requirements.txt` ### 🔧 **MCP Server Mode:** This app functions as both a web interface AND an MCP server for Claude Desktop and other MCP clients. **Available MCP Tools:** - `validate_rdf_tool`: Validate RDF/XML against SHACL shapes - `get_ai_suggestions`: Get AI-powered fix suggestions (with BibFrame docs) - `get_ai_correction`: Generate corrected RDF/XML (with BibFrame docs) - `get_rdf_examples`: Retrieve example RDF snippets - `validate_rdf_interface`: Complete validation with AI suggestions and corrections (primary tool) **MCP Configuration (Streamable HTTP):** Add this configuration to your MCP client (Claude Desktop, etc.): ```json { "mcpServers": { "rdf-validator": { "url": "https://jimfhahn-mcp4rdf.hf.space/gradio_api/mcp/" } } } ``` **Alternative SSE Configuration:** ```json { "mcpServers": { "rdf-validator": { "url": "https://jimfhahn-mcp4rdf.hf.space/gradio_api/mcp/sse" } } } ``` ### 💡 **Features:** - ✅ Real-time RDF/XML validation against SHACL schemas - 🤖 AI-powered error suggestions and corrections (enhanced with BibFrame ontology docs) - 📚 Built-in examples and templates - 🔗 Integrated with [MCP4BibFrame Documentation API](https://huggingface.co/spaces/jimfhahn/mcp4bibframe-docs) - 📋 Copy results with one click **BibFrame Documentation Integration:** AI corrections now use authoritative BibFrame ontology information from the MCP4BibFrame Documentation API to ensure accuracy and compliance with official specifications. ### 🔗 **Related Resources:** - [MCP4BibFrame Documentation](https://huggingface.co/spaces/jimfhahn/mcp4bibframe-docs) - BibFrame ontology reference - [BIG DCTAP Documentation](https://bf-interop.github.io/DCTap/) - [BIBFRAME Ontology](http://id.loc.gov/ontologies/bibframe.html) - [SHACL Specification](https://www.w3.org/TR/shacl/) **Note:** AI features require a valid Hugging Face API key (HF_API_KEY) set as a Secret. Manual suggestions are provided as fallback. """) return demo # Launch configuration if __name__ == "__main__": demo = create_interface() # Configuration for different environments port = int(os.getenv('PORT', 7860)) # Hugging Face uses PORT env variable demo.launch( server_name="0.0.0.0", # Important for external hosting server_port=port, # Use environment PORT or default to 7860 share=False, # Don't create gradio.live links in production show_error=True, # Show errors in the interface show_api=True, # Enable API endpoints allowed_paths=["."], # Allow serving files from current directory mcp_server=True # Enable MCP server functionality (Gradio 5.28+) )