#!/usr/bin/env python3
"""
Hugging Face Gradio App for RDF Validation with MCP Server and Anthropic AI
This app serves both as a web interface and can expose MCP server functionality.
Deploy this on Hugging Face Spaces with your Anthropic API key.
"""
import gradio as gr
import os
import json
import sys
import asyncio
import logging
import re
import hashlib
import threading
import time
from collections import OrderedDict
from typing import Any, Dict, List, Optional
# Add current directory to path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Import our validation logic
try:
from validator import validate_rdf
VALIDATOR_AVAILABLE = True
# Test that the function is callable
if not callable(validate_rdf):
print("⚠️ Warning: validate_rdf is not callable")
VALIDATOR_AVAILABLE = False
else:
print("✅ Validator module loaded successfully")
except ImportError as e:
VALIDATOR_AVAILABLE = False
print(f"⚠️ Warning: validator.py not found or has import errors: {e}")
print("Some features may be limited.")
except Exception as e:
VALIDATOR_AVAILABLE = False
print(f"⚠️ Warning: Error loading validator: {e}")
# Optional: Check if OpenAI and requests are available
try:
from openai import OpenAI
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
print("💡 Install 'openai' package for AI-powered corrections: pip install openai")
try:
import requests
HF_INFERENCE_AVAILABLE = True
except ImportError:
HF_INFERENCE_AVAILABLE = False
print("💡 Install 'requests' package for AI-powered corrections: pip install requests")
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration - Your specific Hugging Face Inference Endpoint (hardcoded)
HF_API_KEY = os.getenv('HF_API_KEY', '') # Hugging Face API key from Secret
HF_ENDPOINT_URL = "https://evxgv66ksxjlfrts.us-east-1.aws.endpoints.huggingface.cloud/v1/"
HF_MODEL = "lmstudio-community/Llama-3.3-70B-Instruct-GGUF" # Correct model name for your endpoint
# AI Correction Configuration
MAX_CORRECTION_ATTEMPTS = 2 # Reduced for speed (rapid fix handles most cases)
ENABLE_VALIDATION_LOOP = True # Enable validation loop by default
# MCP4BibFrame Documentation API Configuration
MCP4BIBFRAME_DOCS_URL = "https://jimfhahn-mcp4bibframe-docs.hf.space/api/mcp"
MCP4BIBFRAME_DOCS_ENABLED = True # Set to False to disable doc integration
# Cache BibFrame documentation responses to avoid repeated network calls
BIBFRAME_DOCS_CACHE: Dict[str, tuple[Any, float]] = {}
BIBFRAME_DOCS_CACHE_TTL = 3600 # seconds
# Cache successful correction outputs to accelerate repeated error patterns
FIX_CACHE: OrderedDict[str, str] = OrderedDict()
FIX_CACHE_MAX_SIZE = 100
def _make_fix_cache_key(validation_results: str, rdf_content: str, template: str) -> str:
"""Generate a deterministic cache key for correction attempts."""
hasher = hashlib.sha256()
hasher.update(template.strip().encode("utf-8"))
hasher.update(b"\x1f")
hasher.update(validation_results.strip().encode("utf-8", errors="ignore"))
hasher.update(b"\x1f")
hasher.update(rdf_content.strip().encode("utf-8", errors="ignore"))
return hasher.hexdigest()
def _get_cached_correction(cache_key: str, steps_log: Optional[List[str]] = None) -> Optional[str]:
"""Retrieve a cached correction, updating its recency ordering."""
cached = FIX_CACHE.get(cache_key)
if cached is not None:
FIX_CACHE.move_to_end(cache_key)
if steps_log is not None:
steps_log.append("Using cached correction for repeated validation errors")
return cached
def _store_correction_in_cache(cache_key: str, corrected_rdf: str, steps_log: Optional[List[str]] = None) -> None:
"""Store a correction in the cache and evict the oldest entry if needed."""
if not corrected_rdf:
return
FIX_CACHE[cache_key] = corrected_rdf
FIX_CACHE.move_to_end(cache_key)
if len(FIX_CACHE) > FIX_CACHE_MAX_SIZE:
removed_key, _ = FIX_CACHE.popitem(last=False)
if steps_log is not None:
steps_log.append("Cache full; evicted oldest correction entry")
elif steps_log is not None:
steps_log.append("Cached correction for future reuse")
# Cache successful correction outputs to accelerate repeated error patterns
FIX_CACHE: OrderedDict[str, str] = OrderedDict()
FIX_CACHE_MAX_SIZE = 100
def rapid_fix_missing_properties(rdf_content: str, validation_results: str, template: str, steps_log: Optional[List[str]] = None) -> Optional[str]:
"""Ultra-fast fix for simple missing property errors - no AI needed."""
import re
# Quick pattern match for missing properties
missing = re.findall(r"Less than \d+ values on.*->bf:(\w+)", validation_results)
if not missing:
if steps_log:
steps_log.append("❌ Rapid fix: No missing properties detected in validation results")
return None
if steps_log:
steps_log.append(f"🔍 Rapid fix detected {len(missing)} missing properties: {', '.join(set(missing))}")
# Pre-compiled property templates (no API calls)
INSTANT_FIXES = {
"title": 'Untitled',
"language": 'Englisheng',
"content": 'texttxt',
"adminMetadata": '''
new
n
2024-01-01
Library of Congress
Library of Congress
''',
"assigner": '''
Library of Congress
'''
}
# Find insertion point
work_match = re.search(r'(]*>)(.*?)()', rdf_content, re.DOTALL)
instance_match = re.search(r'(]*>)(.*?)()', rdf_content, re.DOTALL)
if not work_match and not instance_match:
if steps_log:
steps_log.append("❌ Rapid fix: No bf:Work or bf:Instance found in RDF")
return None
match = work_match or instance_match
target_type = "Work" if work_match else "Instance"
opening_tag = match.group(1)
content = match.group(2)
closing_tag = match.group(3)
if steps_log:
steps_log.append(f"📍 Rapid fix target: bf:{target_type}")
has_admin = "" in content or "" in content
steps_log.append(f"🔍 Current state: AdminMetadata {'EXISTS' if has_admin else 'MISSING'}")
# Build fixes
fixes = []
assigner_fixed = False
for prop in missing[:10]: # Limit to 10 properties
prop_lower = prop.lower()
# Special handling for assigner within AdminMetadata
if prop_lower == "assigner":
if steps_log:
steps_log.append("🔧 Processing missing 'assigner' property...")
# Look for existing AdminMetadata blocks that need assigner
admin_pattern = re.compile(r'(]*>)(.*?)()', re.DOTALL)
def add_assigner(match):
nonlocal assigner_fixed
admin_open = match.group(1)
admin_content = match.group(2)
admin_close = match.group(3)
# Skip if already has assigner
if ']*>\s*<[^>]+\s+rdf:about="([^"]+)"', admin_content)
if agent_match:
agent_uri = agent_match.group(1)
# Build assigner element
if agent_uri:
assigner_element = f' '
else:
# Use default Library of Congress
assigner_element = '''
Library of Congress
'''
assigner_fixed = True
if steps_log:
steps_log.append(f" ✅ Injected assigner into existing AdminMetadata (agent URI: {agent_uri or 'default'})")
# Insert before closing tag
return admin_open + admin_content + '\n' + assigner_element + '\n ' + admin_close
original_content = content
content = admin_pattern.sub(add_assigner, content)
if assigner_fixed and steps_log:
steps_log.append(" ✅ Assigner successfully added to existing AdminMetadata")
elif steps_log and content == original_content:
steps_log.append(" ℹ️ No AdminMetadata found to inject assigner (will add with full block if adminMetadata is missing)")
elif prop in INSTANT_FIXES and f" str:
"""Ultra-minimal prompt for faster AI response."""
if not OPENAI_AVAILABLE or not os.getenv('HF_API_KEY'):
return rdf
try:
client = get_openai_client()
if not client:
return rdf
# Extract just the critical errors
error_lines = []
for line in errors.split('\n'):
if any(term in line for term in ['Less than', 'missing', 'required', '->bf:', 'adminMetadata', 'assigner']):
error_lines.append(line.strip()[:100])
if len(error_lines) >= 5:
break
if not error_lines:
return rdf
# Ultra-concise prompt
prompt = f"""Fix these BibFrame errors:
{chr(10).join(error_lines[:3])}
Add only what's missing to this RDF:
{rdf[:800]}...{rdf[-200:] if len(rdf) > 1000 else ''}
Return complete valid RDF/XML only."""
response = client.chat.completions.create(
model=HF_MODEL,
messages=[
{"role": "system", "content": "Fix RDF. Output only valid RDF/XML. No explanations."},
{"role": "user", "content": prompt}
],
max_tokens=max_tokens,
temperature=0,
timeout=20 # Much shorter timeout
)
result = response.choices[0].message.content
result = extract_rdf_from_response(result)
result = fix_common_rdf_errors(result)
return result
except Exception:
return rdf
def test_validator_functionality():
"""Test if the validator is actually working"""
if not VALIDATOR_AVAILABLE:
print("❌ Validator not available for testing")
return False
try:
# Test with minimally valid RDF/XML that matches SHACL targets but is missing required properties
# This ensures SHACL finds focus nodes (bf:Text Work) and reports violations
test_rdf = '''
'''
conforms, results = validate_rdf(test_rdf.encode('utf-8'), 'monograph')
# This should fail validation due to missing required properties
if conforms:
print("⚠️ WARNING: Validator returned 'conforms=True' for invalid RDF. Validator may not be working correctly!")
return False
else:
preview = (results or '').strip()
preview = preview[:200] + ('…' if len(preview) > 200 else '')
print(f"✅ Validator test passed. Got expected SHACL violations. Preview: {preview if preview else 'No results text returned'}")
return True
except Exception as e:
print(f"❌ Validator test failed with error: {e}")
return False
# Run the test on startup
if VALIDATOR_AVAILABLE:
test_validator_functionality()
def query_bibframe_docs(tool_name: str, params: dict, timeout: int = 10) -> Optional[dict]:
"""
Query the MCP4BibFrame documentation API using the MCP protocol.
Args:
tool_name (str): Name of the tool to invoke
params (dict): Parameters for the tool
timeout (int): Request timeout in seconds
Returns:
Optional[dict]: Response data or None if failed
"""
if not MCP4BIBFRAME_DOCS_ENABLED:
return None
try:
# Construct MCP request
mcp_request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": params
},
"id": 1
}
logger.info(f"Querying BibFrame docs: {tool_name} with {params}")
# Make SSE request to MCP endpoint
response = requests.post(
MCP4BIBFRAME_DOCS_URL,
json=mcp_request,
timeout=timeout,
headers={"Accept": "text/event-stream"}
)
if response.status_code == 200:
# Parse SSE response
for line in response.text.split('\n'):
if line.startswith('data: '):
try:
data = json.loads(line[6:])
if 'result' in data:
return data['result']
except json.JSONDecodeError:
continue
else:
logger.warning(f"BibFrame docs API returned status {response.status_code}")
except requests.exceptions.Timeout:
logger.warning("Timeout querying BibFrame documentation")
except Exception as e:
logger.error(f"Error querying BibFrame documentation: {str(e)}")
return None
def query_bibframe_docs_cached(tool_name: str, params: dict, timeout: int = 10) -> Optional[dict]:
"""Cached wrapper around ``query_bibframe_docs`` to avoid repeated HTTP calls."""
if not MCP4BIBFRAME_DOCS_ENABLED:
return None
try:
cache_key = f"{tool_name}:{json.dumps(params, sort_keys=True)}"
except TypeError:
cache_key = f"{tool_name}:{str(params)}"
cached = BIBFRAME_DOCS_CACHE.get(cache_key)
if cached:
payload, timestamp = cached
if time.time() - timestamp < BIBFRAME_DOCS_CACHE_TTL:
logger.debug(f"Using cached BibFrame docs response for {cache_key}")
return payload
response = query_bibframe_docs(tool_name, params, timeout)
if response is not None:
BIBFRAME_DOCS_CACHE[cache_key] = (response, time.time())
return response
def extract_bibframe_terms_from_errors(validation_results: str) -> dict:
"""
Extract BibFrame properties and classes mentioned in validation errors.
Args:
validation_results (str): Validation error text
Returns:
dict: Dictionary with 'properties' and 'classes' lists
"""
import re
terms = {
'properties': set(),
'classes': set()
}
# Common patterns in validation results
# Properties often appear as bf:propertyName or ->bf:propertyName
property_patterns = [
r'bf:(\w+)',
r'->bf:(\w+)',
r'property (\w+)',
r'missing (\w+)',
r'requires? (\w+)'
]
# Classes often appear as bf:ClassName or "a ClassName"
class_patterns = [
r'bf:([A-Z]\w+)',
r'type ([A-Z]\w+)',
r'class ([A-Z]\w+)',
r' 2: # Skip very short matches
terms['properties'].add(match.lower())
# Extract classes
for pattern in class_patterns:
matches = re.findall(pattern, validation_results)
for match in matches:
if match and len(match) > 2:
terms['classes'].add(match)
# Convert sets to lists
terms['properties'] = list(terms['properties'])[:5] # Limit to top 5
terms['classes'] = list(terms['classes'])[:3] # Limit to top 3
return terms
def fetch_bibframe_guidance(validation_results: str, rdf_content: str) -> str:
"""
Fetch relevant BibFrame guidance from the documentation API based on errors.
Args:
validation_results (str): Validation error messages
rdf_content (str): Original RDF content
Returns:
str: Formatted guidance text for inclusion in prompts
"""
if not MCP4BIBFRAME_DOCS_ENABLED:
return ""
guidance_parts = []
try:
# Extract terms from validation errors
terms = extract_bibframe_terms_from_errors(validation_results)
logger.info(f"Extracted terms - properties: {terms['properties']}, classes: {terms['classes']}")
# Query information for key properties
for prop in terms['properties'][:3]: # Limit queries
prop_uri = _resolve_bibframe_uri(prop)
result = query_bibframe_docs_cached("get_property_info", {"property_uri": prop_uri})
if result and isinstance(result, dict):
guidance_parts.append(f"\n**{result.get('label', prop)}** ({prop}):")
if 'definition' in result:
guidance_parts.append(f"- Definition: {result['definition']}")
if 'domain' in result:
guidance_parts.append(f"- Used in: {', '.join(result['domain'])}")
if 'range' in result:
guidance_parts.append(f"- Values: {', '.join(result['range'])}")
if 'examples' in result and result['examples']:
guidance_parts.append(f"- Example: {result['examples'][0]}")
# Query information for key classes
for cls in terms['classes'][:2]: # Limit queries
cls_uri = _resolve_bibframe_uri(cls)
result = query_bibframe_docs_cached("get_class_info", {"class_uri": cls_uri})
if result and isinstance(result, dict):
guidance_parts.append(f"\n**{result.get('label', cls)}** class:")
if 'definition' in result:
guidance_parts.append(f"- Definition: {result['definition']}")
if 'applicable_properties' in result:
props = [p.get('label', p.get('property', '')) for p in result['applicable_properties'][:5]]
guidance_parts.append(f"- Key properties: {', '.join(props)}")
# If we found AdminMetadata issues, get specific usage guidance
if any(term in validation_results.lower() for term in ['adminmetadata', 'assigner', '->bf:assigner']):
result = query_bibframe_docs_cached("get_property_usage", {
"property_name": "assigner",
"class_name": "AdminMetadata"
})
if result and isinstance(result, dict):
guidance_parts.append("\n**AdminMetadata/assigner usage:**")
if 'usage' in result:
guidance_parts.append(f"- {result['usage']}")
if 'examples' in result and result['examples']:
guidance_parts.append(f"- Pattern: {result['examples'][0]}")
except Exception as e:
logger.error(f"Error fetching BibFrame guidance: {str(e)}")
if guidance_parts:
return "\n".join(guidance_parts)
return ""
# OpenAI client configuration for the endpoint
def get_openai_client():
"""Get configured OpenAI client for HF Inference Endpoint"""
if not HF_API_KEY:
print("❌ No HF_API_KEY available for OpenAI client")
return None
print(f"🔗 Creating OpenAI client with:")
print(f" base_url: {HF_ENDPOINT_URL}")
print(f" api_key: {'***' + HF_API_KEY[-4:] if len(HF_API_KEY) > 4 else 'HIDDEN'}")
return OpenAI(
base_url=HF_ENDPOINT_URL,
api_key=HF_API_KEY,
timeout=120.0 # Increase timeout for cold starts
)
# Sample RDF data for examples (based on real Library of Congress BibFrame)
SAMPLE_VALID_RDF = '''
The knitter's handy book of patterns
basic designs in multiple sizes & gauges
Budd, Ann, 1956-
author
aut
English
eng
text
txt
TT820
.B877 2002
United States, Library of Congress
new
n
2001-12-12
United States, Library of Congress
'''
SAMPLE_INVALID_RDF = '''
Incomplete Title
'''
# BibFrame Few-Shot Examples (based on real Library of Congress records)
BIBFRAME_CORRECTION_EXAMPLES = {
"title_structure": {
"pattern": r"bf:title",
"wrong": """Simple Title String""",
"correct": """
The knitter's handy book of patterns
basic designs in multiple sizes & gauges
"""
},
"adminmetadata": {
"pattern": r"bf:adminMetadata|->bf:assigner",
"wrong": """
new
""",
"correct": """
new
n
2001-12-12
United States, Library of Congress
"""
},
"contribution": {
"pattern": r"bf:contribution",
"wrong": """Author Name""",
"correct": """
Budd, Ann, 1956-
contributor
ctb
"""
},
"language": {
"pattern": r"bf:language",
"wrong": """English""",
"correct": """
English
eng
"""
},
"content": {
"pattern": r"bf:content",
"wrong": """Text""",
"correct": """
text
txt
"""
},
"classification": {
"pattern": r"bf:classification",
"wrong": """TT820 .B877 2002""",
"correct": """
TT820
.B877 2002
United States, Library of Congress
DLC
used by assigner
uba
"""
},
"subject": {
"pattern": r"bf:subject",
"wrong": """Knitting--Patterns""",
"correct": """
Knitting--Patterns
Knitting
Knitting
"""
}
}
# MCP Server Tools (can be used independently)
def validate_rdf_tool(rdf_content: str, template: str = "monograph") -> dict:
"""
Validate RDF/XML content against SHACL templates.
This tool validates RDF/XML data against predefined SHACL shapes to ensure
compliance with metadata standards like BIBFRAME. Returns detailed validation
results with conformance status and specific violation information.
Args:
rdf_content (str): The RDF/XML content to validate
template (str): Validation template to use ('monograph' or 'custom')
Returns:
dict: Validation results with conformance status and detailed feedback
"""
if not rdf_content:
return {"error": "No RDF/XML content provided", "conforms": False}
if not VALIDATOR_AVAILABLE:
logger.error("Validator module not available")
return {
"error": "Validator not available - ensure validator.py is present",
"conforms": False
}
try:
# Fast syntax check before SHACL to give clearer errors on XML/prefix issues
try:
try:
import rdflib # type: ignore
except ImportError:
rdflib = None # type: ignore
if rdflib:
g = rdflib.Graph() # type: ignore
# Parse as RDF/XML; raise on syntax errors like unbound prefixes
g.parse(data=rdf_content, format="application/rdf+xml") # type: ignore
else:
logger.info("rdflib not installed; skipping pre-parse RDF/XML syntax check")
except Exception as parse_err:
logger.error(f"RDF/XML parse error before validation: {parse_err}")
return {
"error": f"RDF/XML parse error: {parse_err}",
"conforms": False
}
# Log what we're validating
logger.info(f"Validating RDF with template '{template}', content length: {len(rdf_content)}")
# Call the validator
conforms, results_text = validate_rdf(rdf_content.encode('utf-8'), template)
# Debug logging
logger.info(f"Validation result - conforms: {conforms}, results length: {len(results_text) if results_text else 0}")
# If no results text but claims to conform, something might be wrong
if conforms and (not results_text or len(results_text.strip()) == 0):
results_text = "Validation passed with no specific feedback."
elif not conforms and (not results_text or len(results_text.strip()) == 0):
results_text = "Validation failed but no specific errors were returned. Check the RDF syntax and structure."
return {
"conforms": conforms,
"results": results_text if results_text else "",
"template": template,
"status": "✅ Valid RDF" if conforms else "❌ Invalid RDF"
}
except ImportError as e:
logger.error(f"Import error in validator: {str(e)}")
return {
"error": f"Validator import error: {str(e)}. Check that all dependencies are installed.",
"conforms": False
}
except AttributeError as e:
logger.error(f"Validator function not found: {str(e)}")
return {
"error": f"Validator function error: {str(e)}. Check validator.py implementation.",
"conforms": False
}
except Exception as e:
logger.error(f"Validation error: {str(e)}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return {
"error": f"Validation failed: {str(e)}",
"conforms": False
}
def filter_validation_results_by_class(validation_results: str, rdf_content: str) -> dict:
"""
Filter validation results by RDF class (Work, Instance, etc.)
Args:
validation_results (str): Full validation results
rdf_content (str): Original RDF content
Returns:
dict: Validation results organized by class
"""
import re
# Parse validation results to extract class information
class_results = {
'Work': [],
'Instance': [],
'Title': [],
'Contribution': [],
'AdminMetadata': [],
'Other': []
}
lines = validation_results.split('\n')
current_section = []
current_class = 'Other'
for line in lines:
# Detect which class this error relates to
if 'bf:Work' in line or '/work/' in line:
current_class = 'Work'
elif 'bf:Instance' in line or '/instance/' in line:
current_class = 'Instance'
elif 'bf:Title' in line:
current_class = 'Title'
elif 'bf:Contribution' in line:
current_class = 'Contribution'
elif 'bf:AdminMetadata' in line or 'AdminMetadata' in line or '->bf:assigner' in line:
# Many admin violations show assigner path; map to AdminMetadata
current_class = 'AdminMetadata'
# Collect lines for current violation
if 'Constraint Violation' in line:
if current_section:
class_results[current_class].extend(current_section)
current_section = [line]
elif line.strip():
current_section.append(line)
# Add last section
if current_section:
class_results[current_class].extend(current_section)
# Remove empty classes
return {k: '\n'.join(v) for k, v in class_results.items() if v}
def get_ai_suggestions(validation_results: str, rdf_content: str, include_warnings: bool = False) -> str:
"""Generate AI-powered, plain-language suggestions based on validation results.
Avoids RDF/SHACL jargon and focuses on actionable fixes.
"""
if not OPENAI_AVAILABLE:
return generate_manual_suggestions(validation_results)
current_api_key = os.getenv('HF_API_KEY', '')
if not current_api_key:
return f"""
🔑 **AI suggestions disabled**: Please set your Hugging Face API key as a Secret in your Space settings.
{generate_manual_suggestions(validation_results)}
"""
try:
client = get_openai_client()
if not client:
return f"""
🔑 **AI suggestions disabled**: HF_API_KEY not configured.
{generate_manual_suggestions(validation_results)}
"""
severity_instruction = (
"Focus only on violations (errors) and ignore any warnings."
if not include_warnings else
"Address both violations and warnings."
)
# Get BibFrame documentation for context
bibframe_guidance = fetch_bibframe_guidance(validation_results, rdf_content)
doc_section = ""
if bibframe_guidance:
doc_section = f"""
Reference information from BibFrame ontology:
{bibframe_guidance}
"""
# Group errors by class to focus the prompt
class_results = filter_validation_results_by_class(validation_results, rdf_content)
if class_results:
primary_class = max(class_results.keys(), key=lambda k: len(class_results[k]))
focused_results = class_results[primary_class]
else:
primary_class = "Record"
focused_results = validation_results
simplified_summary = parse_shacl_results_for_ai(focused_results)
relevant_rdf = extract_relevant_rdf_section(rdf_content, primary_class)
prompt = f"""
You are a helpful metadata librarian. Write in plain language (no RDF/SHACL jargon). Analyze the validation errors for the {primary_class} and provide concise, actionable fixes.
{severity_instruction}
{doc_section}
Validation Errors for {primary_class}:
{focused_results[:1500]}
Validation Summary (plain language):
{simplified_summary}
Relevant RDF Section:
{relevant_rdf[:800]}
Instructions:
1. ONE sentence: What's wrong with this {primary_class}?
2. List errors (max 3 words each)
3. Show exact XML fixes
Format:
**Issue:** [One sentence about the {primary_class} problem]
**Errors:**
• Error 1
• Error 2
**Fix:**
```xml
[Complete corrected {primary_class} section]
```
Be ultra-concise. Show the fix, not explanations."""
chat_completion = client.chat.completions.create(
model=HF_MODEL,
messages=[
{
"role": "system",
"content": "You are a friendly librarian helping fix catalog records. Never use technical RDF or SHACL terminology. Use the BibFrame documentation provided to ensure accuracy."
},
{
"role": "user",
"content": prompt
}
],
max_tokens=800,
temperature=0.5,
top_p=0.9
)
generated_text = chat_completion.choices[0].message.content
generated_text = clean_technical_jargon(generated_text)
other_classes = [k for k in class_results.keys() if k != primary_class]
class_note = (
f"\n\n📌 **Note:** Focused on {primary_class} errors. " +
(f"Also found issues in: {', '.join(other_classes)}" if other_classes else "")
)
return f"🤖 **AI-Powered Suggestions ({('Violations + Warnings' if include_warnings else 'Violations Only')}):**\n\n{generated_text}{class_note}"
except Exception as e:
logger.error(f"OpenAI/HF Inference Endpoint error: {str(e)}")
return f"""
❌ **AI suggestions error**: {str(e)}
{generate_manual_suggestions(validation_results)}
"""
def extract_relevant_rdf_section(rdf_content: str, class_name: str) -> str:
"""
Extract only the relevant RDF section for a specific class
Args:
rdf_content (str): Full RDF content
class_name (str): Class name to extract (Work, Instance, etc.)
Returns:
str: Relevant RDF section
"""
import re
# Map class names to RDF patterns
patterns = {
'Work': r'',
'Instance': r'',
'Title': r'',
'Contribution': r'',
'AdminMetadata': r''
}
pattern = patterns.get(class_name)
if not pattern:
return rdf_content[:1000] # Fallback to first 1000 chars
# Extract matching section
match = re.search(pattern, rdf_content, re.DOTALL)
if match:
section = match.group(0)
# Also include namespace declarations
namespaces = re.findall(r'xmlns:\w+="[^"]*"', rdf_content[:500])
if namespaces:
return f"\n{section}"
return section
return rdf_content[:1000] # Fallback
## [Removed duplicate get_ai_correction definition – unified below]
def merge_corrected_sections(original_rdf: str, corrected_sections: dict) -> str:
"""
Merge corrected class sections back into the original RDF
Args:
original_rdf (str): Original RDF content
corrected_sections (dict): Corrected sections by class
Returns:
str: Merged RDF with corrections
"""
import re
result = original_rdf
# Replace each corrected section
for class_name, corrected_section in corrected_sections.items():
patterns = {
'Work': r'',
'Instance': r'',
'Title': r'',
'Contribution': r'',
'AdminMetadata': r''
}
pattern = patterns.get(class_name)
if pattern:
result = re.sub(pattern, corrected_section, result, count=1, flags=re.DOTALL)
return result
# Sample RDF data for examples
# MCP Server Tools (can be used independently)
# Note: This section exists earlier in the file, we're removing the duplicates
"""
Generate AI-powered fix suggestions for invalid RDF/XML.
This tool analyzes validation results and provides actionable suggestions
for fixing RDF/XML validation errors using AI or rule-based analysis.
Args:
validation_results (str): The validation error messages
rdf_content (str): The original RDF/XML content that failed validation
include_warnings (bool): Whether to include warnings in suggestions
Returns:
str: Detailed suggestions for fixing the RDF validation issues
"""
if not OPENAI_AVAILABLE:
return generate_manual_suggestions(validation_results)
# Get API key dynamically at runtime
current_api_key = os.getenv('HF_API_KEY', '')
if not current_api_key:
return f"""
🔑 **AI suggestions disabled**: Please set your Hugging Face API key as a Secret in your Space settings.
{generate_manual_suggestions(validation_results)}
"""
try:
# Use OpenAI client with your Hugging Face Inference Endpoint
client = get_openai_client()
if not client:
return f"""
🔑 **AI suggestions disabled**: HF_API_KEY not configured.
{generate_manual_suggestions(validation_results)}
"""
severity_instruction = "Focus only on violations (errors) and ignore any warnings." if not include_warnings else "Address both violations and warnings."
prompt = f"""You are an expert in RDF/XML and SHACL validation. Analyze the validation errors and provide CONCISE, ACTIONABLE fix suggestions.
{severity_instruction}
Validation Results:
{validation_results}
Original RDF (first 1000 chars):
{rdf_content[:1000]}...
Instructions:
1. Start with a ONE-SENTENCE summary of the main issue
2. List the specific errors in bullet points (max 5 words per error)
3. Provide the exact fix for each error with code snippets
4. Keep explanations minimal - focus on solutions
Format:
**Main Issue:** [One sentence]
**Errors Found:**
• Error 1 name
• Error 2 name
**Fixes:**
1. **Error 1**:
```xml
[exact code to add/fix]
```
2. **Error 2**:
```xml
[exact code to add/fix]
```
Be direct and solution-focused. No lengthy explanations."""
# Make API call using OpenAI client
print(f"🔄 Making API call to: {HF_ENDPOINT_URL}")
print(f"🔄 Using model: {HF_MODEL}")
print(f"🔄 Include warnings: {include_warnings}")
chat_completion = client.chat.completions.create(
model=HF_MODEL,
messages=[
{
"role": "system",
"content": "You are a friendly librarian helping fix catalog records. Never use technical RDF or SHACL terminology."
},
{
"role": "user",
"content": prompt
}
],
max_tokens=1500,
temperature=0.6,
top_p=0.9
)
print("✅ API call successful")
generated_text = chat_completion.choices[0].message.content
return f"🤖 **AI-Powered Suggestions ({('Violations + Warnings' if include_warnings else 'Violations Only')}):**\n\n{generated_text}"
except Exception as e:
logger.error(f"OpenAI/HF Inference Endpoint error: {str(e)}")
return f"""
❌ **AI suggestions error**: {str(e)}
{generate_manual_suggestions(validation_results)}
"""
def extract_rdf_from_response(response: str) -> str:
"""
Extract RDF/XML content from AI response, handling code blocks.
Args:
response (str): AI response that may contain RDF wrapped in code blocks
Returns:
str: Extracted RDF/XML content
"""
response = response.strip()
# Handle ```xml code blocks
if "```xml" in response:
try:
return response.split("```xml")[1].split("```")[0].strip()
except IndexError:
pass
# Handle generic ``` code blocks
if "```" in response and response.count("```") >= 2:
try:
return response.split("```")[1].split("```")[0].strip()
except IndexError:
pass
# If no code blocks found, return the response as-is
return response
def fix_common_rdf_errors(rdf_xml: str) -> str:
"""
Fix common RDF/XML errors that AI models generate.
Args:
rdf_xml (str): RDF/XML that may contain common errors
Returns:
str: Fixed RDF/XML
"""
import re
# Remove any rdf:parseType attributes (common AI mistake)
rdf_xml = re.sub(r'\s+rdf:parseType="[^"]*"', '', rdf_xml)
# Fix bf:title if it's just a string (should be nested structure)
rdf_xml = re.sub(
r'([^<]+)',
r'\1',
rdf_xml
)
# Fix bf:language if it's a string instead of URI
language_map = {
'English': 'http://id.loc.gov/vocabulary/languages/eng',
'eng': 'http://id.loc.gov/vocabulary/languages/eng',
'Spanish': 'http://id.loc.gov/vocabulary/languages/spa',
'French': 'http://id.loc.gov/vocabulary/languages/fre',
}
for lang_text, lang_uri in language_map.items():
rdf_xml = re.sub(
f'{lang_text}',
f'',
rdf_xml,
flags=re.IGNORECASE
)
# Fix bf:content if it's a string
content_map = {
'Text': 'http://id.loc.gov/vocabulary/contentTypes/txt',
'text': 'http://id.loc.gov/vocabulary/contentTypes/txt',
}
for content_text, content_uri in content_map.items():
rdf_xml = re.sub(
f'{content_text}',
f'',
rdf_xml,
flags=re.IGNORECASE
)
return rdf_xml
def extract_error_focus_points(validation_results: str) -> Dict[str, List[str]]:
"""Identify the specific focus nodes and properties mentioned in validation errors."""
import re
focus = {
"properties": [],
"focus_nodes": [],
"missing_properties": [],
"classes": [],
}
if not validation_results:
return focus
property_set = set()
missing_set = set()
node_set = set()
for match in re.finditer(r"Focus Node:\s*(?:<)?([^\s>]+)(?:>)?", validation_results):
node_set.add(match.group(1))
for match in re.finditer(r"Result Path:\s*(?:http://[^/]+/)?([A-Za-z]+)", validation_results):
property_set.add(match.group(1))
for match in re.finditer(r"Less than \d+ values on .*->bf:([A-Za-z]+)", validation_results):
missing_set.add(match.group(1))
focus["properties"] = sorted(property_set)
focus["focus_nodes"] = sorted(node_set)
focus["missing_properties"] = sorted(missing_set)
return focus
def _resolve_bibframe_uri(name: str) -> str:
if not name:
return name
if name.startswith("http://") or name.startswith("https://"):
return name
if ":" in name:
prefix, local = name.split(":", 1)
if prefix == "bf":
return f"http://id.loc.gov/ontologies/bibframe/{local}"
return f"http://id.loc.gov/ontologies/bibframe/{name}"
def get_targeted_bibframe_guidance(properties: List[str], classes: List[str]) -> Dict[str, dict]:
"""Fetch BibFrame documentation for only the specified properties/classes."""
guidance: Dict[str, dict] = {}
if not MCP4BIBFRAME_DOCS_ENABLED:
return guidance
for prop in properties[:5]:
prop_uri = _resolve_bibframe_uri(prop)
result = query_bibframe_docs_cached("get_property_info", {"property_uri": prop_uri}, timeout=5)
if result:
guidance[prop] = result
for cls in classes[:5]:
cls_uri = _resolve_bibframe_uri(cls)
result = query_bibframe_docs_cached("get_class_info", {"class_uri": cls_uri}, timeout=5)
if result:
guidance[cls] = result
return guidance
def generate_property_specific_fix(property_name: str, guidance: Optional[dict] = None) -> str:
"""Generate a BibFrame-compliant snippet for a specific missing property."""
guidance = guidance or {}
prop = property_name.lower() if property_name else ""
if prop == "title":
return """
PLACEHOLDER_TITLE
"""
if prop == "language":
return """
English
eng
"""
if prop == "content":
return """
text
txt
"""
if prop == "contribution":
return """
Author Name
author
aut
"""
if prop == "classification":
return """
TT820
.B877 2002
United States, Library of Congress
"""
if prop == "adminmetadata":
return """
new
n
2024-01-01
United States, Library of Congress
"""
# Fallback: simple literal placeholder
return f"PLACEHOLDER_VALUE"
def get_ai_correction(validation_results: str, rdf_content: str, template: str = 'monograph', max_attempts: int = None, include_warnings: bool = False, enable_validation_loop: bool | None = None, cache_key: Optional[str] = None, steps_log: Optional[List[str]] = None) -> str:
"""
Generate AI-powered corrected RDF/XML based on validation errors.
This tool takes invalid RDF/XML and validation results, then generates
a corrected version that addresses all identified validation issues.
The generated correction is validated before being returned to the user.
Args:
validation_results (str): The validation error messages
rdf_content (str): The original invalid RDF/XML content
template (str): The validation template to use
max_attempts (int): Maximum number of attempts to generate valid RDF (uses MAX_CORRECTION_ATTEMPTS if None)
include_warnings (bool): Whether to fix warnings in addition to violations
Returns:
str: Corrected RDF/XML that should pass validation
"""
# Determine whether to iterate based on parameter or global default
iterate_enabled = ENABLE_VALIDATION_LOOP if enable_validation_loop is None else enable_validation_loop
if steps_log is not None:
steps_log.append(f"Planning correction: iterate_enabled={iterate_enabled}, include_warnings={include_warnings}")
# Use configuration default if not specified
if max_attempts is None:
max_attempts = MAX_CORRECTION_ATTEMPTS
if steps_log is not None:
steps_log.append(f"Max attempts set to {max_attempts}")
# If iteration disabled, force single attempt
if not iterate_enabled:
max_attempts = 1
if steps_log is not None:
steps_log.append("Iteration disabled; forcing single attempt")
if cache_key is None and validation_results and rdf_content:
cache_key = _make_fix_cache_key(validation_results, rdf_content, template)
if cache_key:
cached_result = _get_cached_correction(cache_key, steps_log)
if cached_result is not None:
return cached_result
if not OPENAI_AVAILABLE:
if steps_log is not None:
steps_log.append("OPENAI client not available; falling back to manual hints")
return generate_manual_correction_hints(validation_results, rdf_content)
# Get API key dynamically at runtime
current_api_key = os.getenv('HF_API_KEY', '')
if not current_api_key:
if steps_log is not None:
steps_log.append("HF_API_KEY not set; cannot call model; returning manual hints")
return f"""
{generate_manual_correction_hints(validation_results, rdf_content)}"""
try:
client = get_openai_client()
if not client:
if steps_log is not None:
steps_log.append("Failed to initialize OpenAI client; returning manual hints")
return f"""
{generate_manual_correction_hints(validation_results, rdf_content)}"""
# Fetch BibFrame documentation guidance
if steps_log is not None:
steps_log.append("Fetching BibFrame documentation guidance...")
bibframe_guidance = fetch_bibframe_guidance(validation_results, rdf_content)
if bibframe_guidance:
if steps_log is not None:
steps_log.append(f"Retrieved BibFrame guidance ({len(bibframe_guidance)} chars)")
guidance_section = f"""
BIBFRAME DOCUMENTATION (from official ontology):
{bibframe_guidance}
Apply the above BibFrame definitions and patterns when correcting the RDF/XML.
"""
else:
guidance_section = ""
if steps_log is not None:
steps_log.append("No specific BibFrame guidance retrieved")
# Add timeout protection
import time
start_time = time.time()
timeout = 45 # Reduced to 45 second total timeout for speed
if steps_log is not None:
steps_log.append(f"Timeout budget: {timeout}s total")
severity_instruction = "Fix only the violations (errors) and ignore any warnings." if not include_warnings else "Fix both violations and warnings."
# Try multiple attempts to generate valid RDF
for attempt in range(max_attempts):
# Check timeout
elapsed = time.time() - start_time
if elapsed > timeout:
if steps_log is not None:
steps_log.append(f"Timeout reached after {int(elapsed)}s; stopping attempts")
print(f"⏰ Timeout reached after {timeout} seconds")
break
attempt_no = attempt + 1
if steps_log is not None:
steps_log.append(f"Attempt {attempt_no}/{max_attempts}: requesting model correction")
print(f"🔄 Correction attempt {attempt_no}/{max_attempts}")
# Targeted AdminMetadata guidance inferred from results text
needs_assigner = ("->bf:assigner" in validation_results) or (" bf:assigner" in validation_results)
admin_guidance = ""
if needs_assigner:
admin_guidance = """
IMPORTANT: For each , ensure it has a direct child .
Rules:
- If exists, add with the SAME URI.
- Else if exists, add with the SAME URI.
- Else if a block contains , copy that URI to a TOP-LEVEL .
Keep all existing content; only add missing where required.
"""
# Build few-shot examples based on the errors found
examples_to_include = []
validation_lower = validation_results.lower()
# Check each example pattern against validation results
for name, example in BIBFRAME_CORRECTION_EXAMPLES.items():
pattern = example.get("pattern", name)
if re.search(pattern, validation_results, re.IGNORECASE):
examples_to_include.append((name, example))
if steps_log is not None:
steps_log.append(f"Including {name} example based on pattern match")
few_shot_section = ""
if examples_to_include:
few_shot_section = "\n\nCORRECT BIBFRAME PATTERNS (from Library of Congress records):\n"
few_shot_section += "NEVER use simple strings - always use nested structures as shown below:\n\n"
for name, example in examples_to_include:
few_shot_section += f"{name.upper()}:\n"
few_shot_section += f"❌ WRONG:\n```xml\n{example['wrong']}\n```\n"
few_shot_section += f"✅ CORRECT:\n```xml\n{example['correct']}\n```\n\n"
# Add critical rules based on real patterns
critical_rules = """
CRITICAL RDF/XML RULES (from real BibFrame):
1. NEVER use rdf:parseType except for "Collection" on madsrdf:componentList
2. Properties like bf:title, bf:language, bf:content MUST have nested typed resources
3. Use rdf:about for resource URIs, not rdf:resource on the property element
4. bf:adminMetadata can appear multiple times in one record
5. Status, Role, Language etc. are OBJECTS with rdf:about URIs, not literals
6. Date values use rdf:datatype for typing (e.g., xsd:date, xsd:dateTime)
7. Every bf:AdminMetadata needs BOTH bf:agent AND bf:assigner if validation requires it
"""
prompt = f"""You are an expert in RDF/XML and BibFrame cataloging. Fix the following RDF/XML based on the validation errors and official BibFrame documentation.
{severity_instruction}
{admin_guidance}
{guidance_section}
{critical_rules}
{few_shot_section}
Validation Errors:
{validation_results}
Original RDF/XML:
{rdf_content}
{f"Previous attempt {attempt} still had validation errors. Please fix ALL issues this time." if attempt > 0 else ""}
INSTRUCTIONS:
1. Return ONLY valid RDF/XML - no explanations
2. Follow the EXACT patterns shown in the examples above
3. Use proper nested structures - NO simple string values for complex properties
4. Keep ALL namespace declarations
5. Fix ALL validation errors"""
try:
# Update system prompt to be even more explicit
system_prompt = """You are an RDF/XML expert following Library of Congress BibFrame patterns.
Output ONLY valid RDF/XML following these rules:
- Start with
- NO markdown, NO explanations
- Use EXACT structure patterns from the examples
- Complex properties need nested typed resources
- rdf:parseType ONLY for Collection on madsrdf:componentList
- Status/Role/Language are OBJECTS with URIs, not strings"""
chat_completion = client.chat.completions.create(
model=HF_MODEL,
messages=[
{
"role": "system",
"content": system_prompt
},
{
"role": "user",
"content": prompt
}
],
max_tokens=1500,
temperature=0.0,
timeout=20 # Reduced to 20 second timeout per API call for speed
)
corrected_rdf = chat_completion.choices[0].message.content.strip()
if steps_log is not None:
steps_log.append(f"Attempt {attempt_no}: model responded; extracting and fixing common errors")
# Extract RDF content if it's wrapped in code blocks
corrected_rdf = extract_rdf_from_response(corrected_rdf)
# Fix common AI mistakes
corrected_rdf = fix_common_rdf_errors(corrected_rdf)
# Only validate if we have the validator and haven't hit timeout
if VALIDATOR_AVAILABLE and (time.time() - start_time < timeout - 10):
try:
# Quick validation check
conforms, new_results = validate_rdf(corrected_rdf.encode('utf-8'), template)
if conforms:
if steps_log is not None:
steps_log.append(f"Attempt {attempt_no}: correction PASSED validation")
print(f"✅ Correction validated successfully on attempt {attempt_no}")
result_text = f"""
{corrected_rdf}"""
if cache_key:
_store_correction_in_cache(cache_key, result_text, steps_log)
return result_text
else:
if steps_log is not None:
steps_log.append(f"Attempt {attempt_no}: still invalid; will retry with updated errors")
print(f"❌ Correction attempt {attempt_no} still has validation errors")
# Update validation_results for next attempt
validation_results = new_results
except Exception as e:
if steps_log is not None:
steps_log.append(f"Attempt {attempt_no}: error during validation: {str(e)} — returning correction anyway")
print(f"⚠️ Error validating correction attempt {attempt_no}: {str(e)}")
# If validation fails, return the correction anyway
return f"""
{corrected_rdf}"""
else:
# If validator not available or timeout approaching, return the correction
if steps_log is not None:
steps_log.append("Skipping validation check (validator unavailable or timeout)")
print("⚠️ Returning correction without validation")
return f"""
{corrected_rdf}"""
except Exception as api_error:
if steps_log is not None:
steps_log.append(f"Attempt {attempt_no}: API error: {str(api_error)}")
print(f"❌ API error on attempt {attempt_no}: {str(api_error)}")
if attempt == max_attempts - 1: # Last attempt
raise api_error
continue
# All attempts failed or timed out
if steps_log is not None:
steps_log.append("All attempts failed or timed out; returning manual hints")
return f"""
{generate_manual_correction_hints(validation_results, rdf_content)}"""
except Exception as e:
logger.error(f"LLM API error: {str(e)}")
if steps_log is not None:
steps_log.append(f"Fatal error invoking model: {str(e)}")
return f"""
{generate_manual_correction_hints(validation_results, rdf_content)}"""
def get_ai_correction_targeted(validation_results: str, rdf_content: str, template: str = 'monograph', max_attempts: int = None, include_warnings: bool = False, enable_validation_loop: bool | None = None, steps_log: Optional[List[str]] = None) -> str:
"""Fast path that attempts structured quick fixes before invoking the full AI loop."""
if steps_log:
steps_log.append("\n" + "=" * 70)
steps_log.append("📊 INITIAL VALIDATION ERRORS:")
steps_log.append("=" * 70)
# Show summary of validation errors
error_lines = [line.strip() for line in validation_results.split('\n') if 'Less than' in line or 'Message:' in line or 'Module:' in line]
for line in error_lines[:15]: # Show first 15 error lines
steps_log.append(f" {line}")
if len(error_lines) > 15:
steps_log.append(f" ... and {len(error_lines) - 15} more errors")
steps_log.append("")
cache_key: Optional[str] = None
if validation_results and rdf_content:
cache_key = _make_fix_cache_key(validation_results, rdf_content, template)
cached = _get_cached_correction(cache_key, steps_log)
if cached is not None:
if steps_log:
steps_log.append("💾 Cache hit! Returning previously successful correction")
return cached
# Try rapid fix FIRST - this should handle most cases in < 5 seconds
if steps_log:
steps_log.append("=" * 60)
steps_log.append("🚀 STARTING RAPID FIX")
steps_log.append("=" * 60)
quick_fix = rapid_fix_missing_properties(rdf_content, validation_results, template, steps_log)
if quick_fix:
if steps_log:
steps_log.append("=" * 60)
steps_log.append("🔍 RE-VALIDATING AFTER RAPID FIX")
steps_log.append("=" * 60)
if quick_fix and VALIDATOR_AVAILABLE:
try:
conforms, new_results = validate_rdf(quick_fix.encode('utf-8'), template)
if conforms:
if steps_log:
steps_log.append("=" * 60)
steps_log.append("✅✅✅ RAPID FIX SUCCESSFUL - VALIDATION PASSED!")
steps_log.append("=" * 60)
if cache_key:
_store_correction_in_cache(cache_key, quick_fix, steps_log)
return quick_fix
else:
# Update for next attempt
if steps_log:
steps_log.append("=" * 60)
steps_log.append("⚠️ RAPID FIX INCOMPLETE - Still has errors:")
steps_log.append("=" * 60)
# Show first few errors
error_lines = new_results.split('\n')[:10] if new_results else []
for line in error_lines:
if 'Less than' in line or 'Message:' in line:
steps_log.append(f" {line.strip()}")
validation_results = new_results or validation_results
rdf_content = quick_fix
if steps_log:
steps_log.append("📋 Continuing to minimal AI correction...")
except Exception as e:
if steps_log:
steps_log.append("=" * 60)
steps_log.append(f"❌ RAPID FIX VALIDATION ERROR: {e}")
steps_log.append("=" * 60)
steps_log.append("📋 Continuing to minimal AI correction...")
elif quick_fix and steps_log:
steps_log.append("⚠️ Validator not available, cannot re-validate rapid fix")
elif steps_log:
steps_log.append("ℹ️ Rapid fix returned None, moving to AI correction")
# If rapid fix didn't fully work, try minimal AI correction
if OPENAI_AVAILABLE and os.getenv('HF_API_KEY'):
if steps_log:
steps_log.append("Attempting minimal AI correction...")
corrected = get_ai_correction_minimal(validation_results, rdf_content, max_tokens=1000)
if corrected and corrected != rdf_content and VALIDATOR_AVAILABLE:
try:
conforms, new_results = validate_rdf(corrected.encode('utf-8'), template)
if conforms:
if steps_log:
steps_log.append("✅ Minimal AI correction successful!")
if cache_key:
_store_correction_in_cache(cache_key, corrected, steps_log)
return corrected
else:
validation_results = new_results or validation_results
rdf_content = corrected
if steps_log:
steps_log.append("Minimal AI correction partial; falling back to full AI...")
except Exception as e:
if steps_log:
steps_log.append(f"Minimal AI validation error: {e}; falling back...")
focus_points = extract_error_focus_points(validation_results)
missing_props = focus_points.get("missing_properties", [])
if steps_log is not None:
steps_log.append(f"Targeted fix: detected {len(missing_props)} missing properties")
if missing_props:
preview = ", ".join(missing_props[:5])
if len(missing_props) > 5:
preview += ", ..."
steps_log.append(f"Missing list: {preview}")
working_rdf = rdf_content
quick_fix_attempted = False
if missing_props and len(missing_props) <= 5:
guidance = get_targeted_bibframe_guidance(missing_props, focus_points.get("classes", []))
if steps_log is not None:
steps_log.append(f"Retrieved guidance entries: {len(guidance)}")
import re
def _inject_snippets(match: re.Match) -> str:
nonlocal quick_fix_attempted
opening, inner, closing = match.groups()
new_bits = []
for prop in missing_props:
if f"]*>)([\s\S]*?)()")
instance_pattern = re.compile(r"(]*>)([\s\S]*?)()")
if work_pattern.search(working_rdf):
working_rdf = work_pattern.sub(_inject_snippets, working_rdf, count=1)
elif instance_pattern.search(working_rdf):
working_rdf = instance_pattern.sub(_inject_snippets, working_rdf, count=1)
if quick_fix_attempted and VALIDATOR_AVAILABLE:
try:
conforms, new_results = validate_rdf(working_rdf.encode('utf-8'), template)
if conforms:
if steps_log is not None:
steps_log.append("Quick fix succeeded; validation now passes")
if cache_key:
_store_correction_in_cache(cache_key, working_rdf, steps_log)
return working_rdf
else:
if steps_log is not None:
steps_log.append("Quick fix incomplete; falling back to AI loop")
validation_results = new_results or validation_results
except Exception as quick_err:
if steps_log is not None:
steps_log.append(f"Quick fix validation error: {quick_err}; using AI fallback")
if validation_results and working_rdf:
cache_key = _make_fix_cache_key(validation_results, working_rdf, template)
return get_ai_correction(
validation_results,
working_rdf,
template,
max_attempts=max_attempts,
include_warnings=include_warnings,
enable_validation_loop=enable_validation_loop,
cache_key=cache_key,
steps_log=steps_log,
)
def generate_manual_suggestions(validation_results: str) -> str:
"""Generate generic, pattern-based suggestions when AI is not available.
Note: Avoid hardcoding SHACL rules or specific property requirements; rely only on
patterns present in the validation output text.
"""
vr_lower = validation_results.lower() if validation_results else ""
suggestions: List[str] = []
# Missing/required
if ("mincount" in vr_lower) or ("missing" in vr_lower) or ("required" in vr_lower):
suggestions.append("• Some required fields are missing. Add the missing information where indicated.")
# Too many values
if ("maxcount" in vr_lower) or ("too many" in vr_lower) or ("more than allowed" in vr_lower):
suggestions.append("• Some fields have too many values. Keep only the main/one value as required.")
# Datatype/format issues
if ("datatype" in vr_lower) or ("type mismatch" in vr_lower) or ("expected" in vr_lower and "datatype" in vr_lower):
suggestions.append("• Some values are in the wrong format. Use the expected format (e.g., dates like YYYY-MM-DD).")
# URI/identifier issues
if ("iri" in vr_lower) or ("uri" in vr_lower) or ("identifier" in vr_lower and "invalid" in vr_lower):
suggestions.append("• Some identifiers look malformed. Use complete, valid web addresses or proper identifiers.")
# Namespace/prefix issues
if ("namespace" in vr_lower) or ("prefix" in vr_lower):
suggestions.append("• Define all XML namespace prefixes at the top and use them consistently.")
# XML syntax/structure
if ("xml" in vr_lower) or ("syntax" in vr_lower) or ("well-formed" in vr_lower):
suggestions.append("• Fix XML structure issues (unclosed tags, invalid characters, or nesting problems).")
# Fallback
if not suggestions:
suggestions.append("• Review the validation details and update the record where issues are highlighted.")
suggestions.append("• Follow the selected template; add missing fields and correct formats as needed.")
suggestions_text = "\n".join(suggestions)
return f"""
📋 **What needs fixing:**
{suggestions_text}
💡 **Quick tips:**
• Include required fields when noted
• Keep single-value fields to one value
• Use the expected formats (e.g., for dates)
• Declare and use XML namespace prefixes consistently
• Ensure the XML is well‑formed
Need help? Load an example and compare the structure.
"""
def clean_technical_jargon(text: str) -> str:
"""Replace technical RDF/SHACL terms with plain language for end users."""
if not text:
return text
replacements = {
# RDF/SHACL jargon
"URIRef": "identifier",
"URI": "identifier",
"IRI": "identifier",
"Literal": "text value",
"triple": "field entry",
"graph": "dataset",
"node": "record",
"subject": "record",
"predicate": "field type",
"object": "value",
"SHACL": "validation",
"constraint": "rule",
"conformance": "compliance",
"violation": "issue",
"sh:": "",
"rdf:": "",
"rdfs:": "",
"xsd:": "",
# Tone softening
"Error:": "Issue:",
"Invalid": "Incorrect",
"Failed": "Did not pass",
"Missing": "Not found",
}
cleaned = text
for k, v in replacements.items():
cleaned = cleaned.replace(k, v)
return cleaned
def parse_shacl_results_for_ai(results_text: str) -> str:
"""Simplify SHACL results into clearer sentences for AI processing.
Pattern-based only; does not depend on any SHACL rule definitions.
"""
if not results_text:
return ""
import re
simplified: List[str] = []
# Generic patterns
patterns = [
(re.compile(r"minCount", re.IGNORECASE), "A required field is missing."),
(re.compile(r"maxCount", re.IGNORECASE), "A field has more values than allowed; only one may be permitted."),
(re.compile(r"datatype", re.IGNORECASE), "A field has a value in the wrong format."),
(re.compile(r"iri|uri", re.IGNORECASE), "An identifier looks malformed or incomplete."),
(re.compile(r"namespace|prefix", re.IGNORECASE), "A namespace prefix is undefined or inconsistent."),
(re.compile(r"xml|syntax|well-formed", re.IGNORECASE), "The XML structure has an error (e.g., unclosed tag)."),
]
lines = [ln.strip() for ln in results_text.splitlines() if ln.strip()]
for ln in lines:
matched = False
for regex, message in patterns:
if regex.search(ln):
simplified.append(message)
matched = True
break
if not matched and ("Constraint Violation" in ln or "Violation" in ln):
simplified.append("A record rule was not met.")
# Deduplicate while preserving order
seen = set()
unique = []
for s in simplified:
if s not in seen:
unique.append(s)
seen.add(s)
return "\n".join(unique) if unique else results_text
def generate_manual_correction_hints(validation_results: str, rdf_content: str) -> str:
"""Generate manual correction hints when AI is not available"""
return f"""
{rdf_content}
"""
def extract_xml_from_text(text: str) -> str:
"""Extract RDF/XML from model output that may include extra formatting.
Looks for the first ... block. If not found,
returns the original text unchanged.
"""
if not text:
return text
import re
# Try to capture XML block even if fenced in code blocks
# Use DOTALL to span multiple lines
pattern = re.compile(r"", re.IGNORECASE)
m = pattern.search(text)
if m:
return m.group(0)
# Strip common markdown fences if present
fenced = re.sub(r"^```[a-zA-Z]*\n|```$", "", text.strip())
return fenced if fenced else text
def clean_xml_for_validation(xml_text: str) -> str:
"""
Clean XML text for validation by removing comments and extra formatting.
Args:
xml_text (str): XML text that may contain comments or formatting
Returns:
str: Clean XML ready for validation
"""
import re
if not xml_text:
return xml_text
# Remove all HTML comments
cleaned = re.sub(r'', '', xml_text, flags=re.DOTALL)
# Remove any leading/trailing whitespace
cleaned = cleaned.strip()
# If the text starts with "```" code fence, extract content
if cleaned.startswith("```"):
try:
# Extract content between code fences
parts = cleaned.split("```")
if len(parts) >= 3:
# Second part should be the XML content
cleaned = parts[1]
# Remove language identifier if present (e.g., "xml")
if cleaned.startswith("xml"):
cleaned = cleaned[3:]
except:
pass
return cleaned.strip()
# --- Namespace and wrapper helpers to avoid XML parser errors ---
STANDARD_NAMESPACES = {
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"bf": "http://id.loc.gov/ontologies/bibframe/",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"xsd": "http://www.w3.org/2001/XMLSchema#",
}
def _extract_declared_namespaces(xml_text: str) -> dict:
import re
decls = {}
for prefix, uri in re.findall(r"xmlns:([A-Za-z0-9_-]+)=\"([^\"]+)\"", xml_text[:2000]):
decls[prefix] = uri
return decls
def _detect_used_prefixes(xml_text: str) -> set:
import re
used = set()
# Tag prefixes like and attribute prefixes like rdf:type="..."
for m in re.finditer(r"<\s*([A-Za-z0-9_-]+):[A-Za-z0-9_-]+", xml_text):
used.add(m.group(1))
for m in re.finditer(r"\s([A-Za-z0-9_-]+):[A-Za-z0-9_-]+=", xml_text):
used.add(m.group(1))
return used
def ensure_rdf_wrapper_and_namespaces(xml_text: str, original_text: Optional[str] = None, steps_log: Optional[List[str]] = None) -> str:
"""Ensure the XML has an wrapper and required xmlns declarations for used prefixes.
- If wrapper exists, add any missing xmlns: declarations for standard, used prefixes.
- If wrapper is missing, wrap the content and include standard namespaces for used prefixes.
"""
if not xml_text or not isinstance(xml_text, str):
return xml_text
import re
declared = _extract_declared_namespaces(xml_text)
if original_text:
# Merge any declarations present in the original input
declared.update(_extract_declared_namespaces(original_text))
used = _detect_used_prefixes(xml_text)
# Always consider rdf used for wrapper
used.add("rdf")
# Only inject namespaces for known standards to avoid guessing
missing = [p for p in used if p not in declared and p in STANDARD_NAMESPACES]
added_attrs = " ".join([f"xmlns:{p}=\"{STANDARD_NAMESPACES[p]}\"" for p in missing])
has_wrapper = bool(re.search(r"]*>", xml_text))
updated = xml_text
if has_wrapper:
if added_attrs:
# Inject before the closing '>' of the first
def _inject(match):
start_tag = match.group(0)
if start_tag.endswith('>'):
return start_tag[:-1] + ' ' + added_attrs + '>'
return start_tag + ' ' + added_attrs
updated = re.sub(r"]*>", _inject, updated, count=1)
if steps_log is not None and missing:
steps_log.append(f"Injected missing namespace declarations: {', '.join(missing)}")
else:
# Build a wrapper with standard namespaces for used prefixes we know
attrs = [f"xmlns:rdf=\"{STANDARD_NAMESPACES['rdf']}\""]
for p in used:
if p == 'rdf':
continue
uri = declared.get(p) or STANDARD_NAMESPACES.get(p)
if uri:
attrs.append(f"xmlns:{p}=\"{uri}\"")
wrapper_open = "\n"
wrapper_close = "\n"
updated = wrapper_open + xml_text + wrapper_close
if steps_log is not None:
steps_log.append("Wrapped snippet in with standard namespace declarations")
return updated
def validate_rdf_interface(rdf_content: str, template: str, use_ai: bool = True, include_warnings: bool = False, iterate_until_valid: bool = True, max_attempts: int = 5, show_steps: bool = True):
"""Main validation function for Gradio interface"""
if not rdf_content.strip():
return "❌ Error", "No RDF/XML data provided", "", "", "", "", ""
steps_log: List[str] = []
# Check if validator is available
if not VALIDATOR_AVAILABLE:
error_msg = "Validator module is not available. Please check that validator.py is present and all dependencies are installed."
steps_log.append(f"ERROR: {error_msg}")
return "❌ Error", error_msg, "", "\n".join(steps_log) if show_steps else "", "", "", ""
# Prepare and validate RDF
steps_log.append(f"Preparing RDF for validation (original length: {len(rdf_content)} chars)")
prepped_input = ensure_rdf_wrapper_and_namespaces(rdf_content, steps_log=steps_log if show_steps else None)
steps_log.append(f"Preprocessed RDF (new length: {len(prepped_input)} chars)")
# Call validation
steps_log.append(f"Calling validator with template '{template}'")
result = validate_rdf_tool(prepped_input, template)
if "error" in result:
steps_log.append(f"Validation error: {result['error']}")
return f"❌ Error: {result['error']}", "", "", "\n".join(steps_log) if show_steps else "", "", "", ""
status = result["status"]
results_text = result["results"]
conforms = result["conforms"]
steps_log.append(f"Initial validation: {'PASSED' if conforms else 'FAILED'} using template '{template}'")
# Log if we got unexpected empty results
if not results_text or len(results_text.strip()) == 0:
steps_log.append("WARNING: Validator returned empty results text")
# Filter results if warnings should be excluded
filtered_results = results_text
if not include_warnings and "Warning" in results_text:
# Split results into lines and filter out warnings
lines = results_text.split('\n')
filtered_lines = []
skip_until_next_section = False
for line in lines:
if "Warning" in line and ("Constraint Violation" in line or "sh:Warning" in line):
skip_until_next_section = True
elif "Constraint Violation" in line and "Warning" not in line:
skip_until_next_section = False
filtered_lines.append(line)
elif not skip_until_next_section:
filtered_lines.append(line)
filtered_results = '\n'.join(filtered_lines)
if not include_warnings:
steps_log.append("Filtered out warnings from results")
corrected_status = ""
corrected_results = ""
if not include_warnings:
steps_log.append("Configured to ignore warnings in AI processing")
if iterate_until_valid:
steps_log.append(f"Iteration enabled with max_attempts={max_attempts}")
if conforms:
suggestions = "✅ No issues found! Your RDF/XML is valid according to the selected template."
corrected_rdf = ""
corrected_status = "—"
corrected_results = ""
steps_log.append("No correction needed; record already conforms")
else:
if use_ai:
# Pass filtered results to AI functions
suggestions = get_ai_suggestions(filtered_results, rdf_content, include_warnings)
steps_log.append("Requested AI suggestions for concise guidance")
corrected_rdf = get_ai_correction_targeted(
filtered_results,
rdf_content,
template,
max_attempts=max_attempts,
include_warnings=include_warnings,
enable_validation_loop=iterate_until_valid,
steps_log=steps_log,
)
# Attempt re-validation of corrected RDF
try:
# Clean the corrected output for validation
corrected_xml = clean_xml_for_validation(corrected_rdf)
corrected_xml = extract_xml_from_text(corrected_xml)
corrected_xml = ensure_rdf_wrapper_and_namespaces(corrected_xml, original_text=prepped_input, steps_log=steps_log)
# Debug logging
steps_log.append(f"Re-validating cleaned RDF ({len(corrected_xml)} chars)")
if show_steps:
# Log first 200 chars of what we're validating
preview = corrected_xml[:200] + "..." if len(corrected_xml) > 200 else corrected_xml
steps_log.append(f"Preview: {preview}")
reval = validate_rdf_tool(corrected_xml, template)
if "error" in reval:
corrected_status = f"❌ Re-validation Error: {reval['error']}"
corrected_results = ""
steps_log.append(f"Re-validation failed with error: {reval['error']}")
else:
corrected_status = reval.get("status", "")
corrected_results = reval.get("results", "")
conforms = reval.get('conforms', False)
steps_log.append(f"Re-validation: {corrected_status} - Conforms: {conforms}")
except Exception as re_ex:
corrected_status = f"❌ Re-validation Error: {re_ex}"
corrected_results = ""
steps_log.append(f"Re-validation error: {re_ex}")
else:
suggestions = generate_manual_suggestions(filtered_results)
corrected_rdf = generate_manual_correction_hints(filtered_results, rdf_content)
corrected_status = "—"
corrected_results = ""
steps_log.append("AI disabled; produced manual suggestions and hints")
steps_text = "\n".join(steps_log) if show_steps else ""
return status, results_text, suggestions, steps_text, corrected_rdf, corrected_status, corrected_results
def get_rdf_examples(example_type: str = "valid") -> str:
"""
Retrieve example RDF/XML snippets for testing and learning.
This tool provides sample RDF/XML content that can be used to test
the validation system or learn proper RDF structure. Examples include
valid BibFrame Work records, invalid records for testing corrections,
and BibFrame Instance records.
Args:
example_type (str): Type of example to retrieve. Options:
- 'valid': A complete, valid BibFrame Work record
- 'invalid': An incomplete BibFrame Work with validation errors
- 'bibframe': A BibFrame Instance record example
Returns:
str: Complete RDF/XML example content ready for validation testing
"""
examples = {
"valid": SAMPLE_VALID_RDF,
"invalid": SAMPLE_INVALID_RDF,
"bibframe": '''
Example Book Title
2024
New York
'''
}
return examples.get(example_type, examples["valid"])
# Create Gradio Interface
def create_interface():
"""Create the main Gradio interface"""
# Check API key status dynamically
current_api_key = os.getenv('HF_API_KEY', '')
api_status = "🔑 AI features enabled" if (OPENAI_AVAILABLE and current_api_key) else "⚠️ AI features disabled (set HF_API_KEY)"
with gr.Blocks(
title="RDF Validation Server with AI",
theme=gr.themes.Soft(),
css="""
.status-box {
font-weight: bold;
padding: 10px;
border-radius: 5px;
}
.header-text {
text-align: center;
padding: 20px;
}
"""
) as demo:
# Header
debug_info = f"""
Debug Info:
- OPENAI_AVAILABLE: {OPENAI_AVAILABLE}
- HF_INFERENCE_AVAILABLE: {HF_INFERENCE_AVAILABLE}
- HF_API_KEY set: {'Yes' if current_api_key else 'No'}
- HF_API_KEY length: {len(current_api_key) if current_api_key else 0}
- HF_ENDPOINT_URL: {HF_ENDPOINT_URL}
- HF_MODEL: {HF_MODEL}
"""
gr.HTML(f"""
""")
# Main interface
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 📝 Input")
rdf_input = gr.Textbox(
label="RDF/XML Content",
placeholder="Paste your RDF/XML content here...",
lines=15,
show_copy_button=True
)
# Keep the main form simple and tuck options into an accordion
with gr.Accordion("Advanced options", open=False):
with gr.Row():
template_dropdown = gr.Dropdown(
label="Validation Template",
choices=["monograph", "custom"],
value="monograph",
info="Select the SHACL template to validate against"
)
use_ai_checkbox = gr.Checkbox(
label="Use AI Features",
value=True,
info="Enable AI-powered suggestions and corrections"
)
include_warnings_checkbox = gr.Checkbox(
label="Include Warnings",
value=False,
info="Include warnings in AI corrections (violations only by default)"
)
with gr.Row():
iterate_checkbox = gr.Checkbox(
label="Iterate until valid",
value=True,
info="Try multiple correction attempts until validation passes or attempts run out"
)
max_attempts_slider = gr.Slider(
label="Max attempts",
minimum=1,
maximum=3,
value=2,
step=1,
info="Maximum number of correction attempts (2 recommended for speed)"
)
show_steps_checkbox = gr.Checkbox(
label="Show steps",
value=False,
info="Display step-by-step process (turn on when you want transparency)"
)
validate_btn = gr.Button("🔍 Validate RDF", variant="primary", size="lg")
# Examples and controls
gr.Markdown("### 📚 Examples & Tools")
with gr.Row():
example1_btn = gr.Button("✅ Valid RDF Example", variant="secondary")
example2_btn = gr.Button("❌ Invalid RDF Example", variant="secondary")
clear_btn = gr.Button("🗑️ Clear All", variant="stop")
# Results section
with gr.Row():
with gr.Column():
gr.Markdown("### 📊 Results")
status_output = gr.Textbox(
label="Validation Status",
interactive=False,
lines=1,
elem_classes=["status-box"]
)
results_output = gr.Textbox(
label="Detailed Validation Results",
interactive=False,
lines=8,
show_copy_button=True
)
suggestions_output = gr.Textbox(
label="💡 Fix Suggestions",
interactive=False,
lines=8,
show_copy_button=True
)
steps_output = gr.Textbox(
label="🧭 Correction Steps",
interactive=False,
lines=10,
show_copy_button=True,
placeholder="Step-by-step log of how the system derived the corrected XML"
)
# Corrected RDF section
with gr.Row():
with gr.Column():
gr.Markdown("### 🛠️ AI-Generated Corrections")
corrected_output = gr.Textbox(
label="Corrected RDF/XML",
interactive=False,
lines=15,
show_copy_button=True,
placeholder="Corrected RDF will appear here after validation..."
)
with gr.Row():
corrected_status_output = gr.Textbox(
label="Re-validation Status (Corrected RDF)",
interactive=False,
lines=1,
elem_classes=["status-box"]
)
corrected_results_output = gr.Textbox(
label="Re-validation Details",
interactive=False,
lines=6,
show_copy_button=True
)
# Event handlers
validate_btn.click(
fn=validate_rdf_interface,
inputs=[rdf_input, template_dropdown, use_ai_checkbox, include_warnings_checkbox, iterate_checkbox, max_attempts_slider, show_steps_checkbox],
outputs=[status_output, results_output, suggestions_output, steps_output, corrected_output, corrected_status_output, corrected_results_output]
)
# Remove auto-validation to prevent processing loops
# rdf_input.change(
# fn=validate_rdf_interface,
# inputs=[rdf_input, template_dropdown, use_ai_checkbox],
# outputs=[status_output, results_output, suggestions_output, corrected_output]
# )
# Example buttons
example1_btn.click(
lambda: get_rdf_examples("valid"),
outputs=[rdf_input]
)
example2_btn.click(
lambda: get_rdf_examples("invalid"),
outputs=[rdf_input]
)
clear_btn.click(
lambda: ("", "", "", "", "", "", "", ""),
outputs=[rdf_input, status_output, results_output, suggestions_output, steps_output, corrected_output, corrected_status_output, corrected_results_output]
)
# Footer with instructions
gr.Markdown("""
---
### � **Documentation & Resources:**
**[📖 MCP4BibFrame Documentation](https://huggingface.co/spaces/jimfhahn/mcp4bibframe-docs)** - Complete BibFrame ontology reference with examples
This validator integrates with the **MCP4BibFrame Documentation API** to provide authoritative BibFrame ontology information during AI-powered corrections.
### 🚀 **Quick Start:**
1. **Paste your RDF/XML** in the input box above
2. **Click "Validate RDF"** to check for errors
3. **Review AI suggestions** for plain-language fixes (enhanced with BibFrame documentation)
4. **Copy the corrected RDF** from the output
---
### �🚀 **Deployment Instructions for Hugging Face Spaces:**
1. **Create a new Space** on [Hugging Face](https://huggingface.co/spaces)
2. **Set up your Hugging Face Inference Endpoint** and get the endpoint URL
3. **Set your tokens** in Space settings (use Secrets for security):
- Go to Settings → Repository secrets
- Add: `HF_API_KEY` = `your_huggingface_api_key_here`
- Endpoint is now hardcoded to your specific Inference Endpoint
4. **Upload these files** to your Space repository
5. **Install requirements**: The Space will auto-install from `requirements.txt`
### 🔧 **MCP Server Mode:**
This app functions as both a web interface AND an MCP server for Claude Desktop and other MCP clients.
**Available MCP Tools:**
- `validate_rdf_tool`: Validate RDF/XML against SHACL shapes
- `get_ai_suggestions`: Get AI-powered fix suggestions (with BibFrame docs)
- `get_ai_correction`: Generate corrected RDF/XML (with BibFrame docs)
- `get_rdf_examples`: Retrieve example RDF snippets
- `validate_rdf_interface`: Complete validation with AI suggestions and corrections (primary tool)
**MCP Configuration (Streamable HTTP):**
Add this configuration to your MCP client (Claude Desktop, etc.):
```json
{
"mcpServers": {
"rdf-validator": {
"url": "https://jimfhahn-mcp4rdf.hf.space/gradio_api/mcp/"
}
}
}
```
**Alternative SSE Configuration:**
```json
{
"mcpServers": {
"rdf-validator": {
"url": "https://jimfhahn-mcp4rdf.hf.space/gradio_api/mcp/sse"
}
}
}
```
### 💡 **Features:**
- ✅ Real-time RDF/XML validation against SHACL schemas
- 🤖 AI-powered error suggestions and corrections (enhanced with BibFrame ontology docs)
- 📚 Built-in examples and templates
- 🔗 Integrated with [MCP4BibFrame Documentation API](https://huggingface.co/spaces/jimfhahn/mcp4bibframe-docs)
- 📋 Copy results with one click
**BibFrame Documentation Integration:**
AI corrections now use authoritative BibFrame ontology information from the MCP4BibFrame Documentation API to ensure accuracy and compliance with official specifications.
### 🔗 **Related Resources:**
- [MCP4BibFrame Documentation](https://huggingface.co/spaces/jimfhahn/mcp4bibframe-docs) - BibFrame ontology reference
- [BIG DCTAP Documentation](https://bf-interop.github.io/DCTap/)
- [BIBFRAME Ontology](http://id.loc.gov/ontologies/bibframe.html)
- [SHACL Specification](https://www.w3.org/TR/shacl/)
**Note:** AI features require a valid Hugging Face API key (HF_API_KEY) set as a Secret. Manual suggestions are provided as fallback.
""")
return demo
# Launch configuration
if __name__ == "__main__":
demo = create_interface()
# Configuration for different environments
port = int(os.getenv('PORT', 7860)) # Hugging Face uses PORT env variable
demo.launch(
server_name="0.0.0.0", # Important for external hosting
server_port=port, # Use environment PORT or default to 7860
share=False, # Don't create gradio.live links in production
show_error=True, # Show errors in the interface
show_api=True, # Enable API endpoints
allowed_paths=["."], # Allow serving files from current directory
mcp_server=True # Enable MCP server functionality (Gradio 5.28+)
)