|
|
|
|
|
""" |
|
|
Hugging Face Gradio App for RDF Validation with MCP Server and Anthropic AI |
|
|
|
|
|
This app serves both as a web interface and can expose MCP server functionality. |
|
|
Deploy this on Hugging Face Spaces with your Anthropic API key. |
|
|
""" |
|
|
|
|
|
import gradio as gr |
|
|
import os |
|
|
import json |
|
|
import sys |
|
|
import asyncio |
|
|
import logging |
|
|
import re |
|
|
import hashlib |
|
|
import threading |
|
|
import time |
|
|
from collections import OrderedDict |
|
|
from typing import Any, Dict, List, Optional |
|
|
|
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__))) |
|
|
|
|
|
|
|
|
try: |
|
|
from validator import validate_rdf |
|
|
VALIDATOR_AVAILABLE = True |
|
|
|
|
|
if not callable(validate_rdf): |
|
|
print("β οΈ Warning: validate_rdf is not callable") |
|
|
VALIDATOR_AVAILABLE = False |
|
|
else: |
|
|
print("β
Validator module loaded successfully") |
|
|
except ImportError as e: |
|
|
VALIDATOR_AVAILABLE = False |
|
|
print(f"β οΈ Warning: validator.py not found or has import errors: {e}") |
|
|
print("Some features may be limited.") |
|
|
except Exception as e: |
|
|
VALIDATOR_AVAILABLE = False |
|
|
print(f"β οΈ Warning: Error loading validator: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
from openai import OpenAI |
|
|
OPENAI_AVAILABLE = True |
|
|
except ImportError: |
|
|
OPENAI_AVAILABLE = False |
|
|
print("π‘ Install 'openai' package for AI-powered corrections: pip install openai") |
|
|
|
|
|
try: |
|
|
import requests |
|
|
HF_INFERENCE_AVAILABLE = True |
|
|
except ImportError: |
|
|
HF_INFERENCE_AVAILABLE = False |
|
|
print("π‘ Install 'requests' package for AI-powered corrections: pip install requests") |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
HF_API_KEY = os.getenv('HF_API_KEY', '') |
|
|
HF_ENDPOINT_URL = "https://evxgv66ksxjlfrts.us-east-1.aws.endpoints.huggingface.cloud/v1/" |
|
|
HF_MODEL = "lmstudio-community/Llama-3.3-70B-Instruct-GGUF" |
|
|
|
|
|
|
|
|
MAX_CORRECTION_ATTEMPTS = 2 |
|
|
ENABLE_VALIDATION_LOOP = True |
|
|
|
|
|
|
|
|
MCP4BIBFRAME_DOCS_URL = "https://jimfhahn-mcp4bibframe-docs.hf.space/api/mcp" |
|
|
MCP4BIBFRAME_DOCS_ENABLED = True |
|
|
|
|
|
|
|
|
BIBFRAME_DOCS_CACHE: Dict[str, tuple[Any, float]] = {} |
|
|
BIBFRAME_DOCS_CACHE_TTL = 3600 |
|
|
|
|
|
|
|
|
FIX_CACHE: OrderedDict[str, str] = OrderedDict() |
|
|
FIX_CACHE_MAX_SIZE = 100 |
|
|
|
|
|
|
|
|
def _make_fix_cache_key(validation_results: str, rdf_content: str, template: str) -> str: |
|
|
"""Generate a deterministic cache key for correction attempts.""" |
|
|
hasher = hashlib.sha256() |
|
|
hasher.update(template.strip().encode("utf-8")) |
|
|
hasher.update(b"\x1f") |
|
|
hasher.update(validation_results.strip().encode("utf-8", errors="ignore")) |
|
|
hasher.update(b"\x1f") |
|
|
hasher.update(rdf_content.strip().encode("utf-8", errors="ignore")) |
|
|
return hasher.hexdigest() |
|
|
|
|
|
|
|
|
def _get_cached_correction(cache_key: str, steps_log: Optional[List[str]] = None) -> Optional[str]: |
|
|
"""Retrieve a cached correction, updating its recency ordering.""" |
|
|
cached = FIX_CACHE.get(cache_key) |
|
|
if cached is not None: |
|
|
FIX_CACHE.move_to_end(cache_key) |
|
|
if steps_log is not None: |
|
|
steps_log.append("Using cached correction for repeated validation errors") |
|
|
return cached |
|
|
|
|
|
|
|
|
def _store_correction_in_cache(cache_key: str, corrected_rdf: str, steps_log: Optional[List[str]] = None) -> None: |
|
|
"""Store a correction in the cache and evict the oldest entry if needed.""" |
|
|
if not corrected_rdf: |
|
|
return |
|
|
FIX_CACHE[cache_key] = corrected_rdf |
|
|
FIX_CACHE.move_to_end(cache_key) |
|
|
if len(FIX_CACHE) > FIX_CACHE_MAX_SIZE: |
|
|
removed_key, _ = FIX_CACHE.popitem(last=False) |
|
|
if steps_log is not None: |
|
|
steps_log.append("Cache full; evicted oldest correction entry") |
|
|
elif steps_log is not None: |
|
|
steps_log.append("Cached correction for future reuse") |
|
|
|
|
|
|
|
|
FIX_CACHE: OrderedDict[str, str] = OrderedDict() |
|
|
FIX_CACHE_MAX_SIZE = 100 |
|
|
|
|
|
|
|
|
def rapid_fix_missing_properties(rdf_content: str, validation_results: str, template: str, steps_log: Optional[List[str]] = None) -> Optional[str]: |
|
|
"""Ultra-fast fix for simple missing property errors - no AI needed.""" |
|
|
import re |
|
|
|
|
|
|
|
|
missing = re.findall(r"Less than \d+ values on.*->bf:(\w+)", validation_results) |
|
|
if not missing: |
|
|
if steps_log: |
|
|
steps_log.append("β Rapid fix: No missing properties detected in validation results") |
|
|
return None |
|
|
|
|
|
if steps_log: |
|
|
steps_log.append(f"π Rapid fix detected {len(missing)} missing properties: {', '.join(set(missing))}") |
|
|
|
|
|
|
|
|
INSTANT_FIXES = { |
|
|
"title": '<bf:title><bf:Title><bf:mainTitle>Untitled</bf:mainTitle></bf:Title></bf:title>', |
|
|
"language": '<bf:language><bf:Language rdf:about="http://id.loc.gov/vocabulary/languages/eng"><rdfs:label>English</rdfs:label><bf:code>eng</bf:code></bf:Language></bf:language>', |
|
|
"content": '<bf:content><bf:Content rdf:about="http://id.loc.gov/vocabulary/contentTypes/txt"><rdfs:label>text</rdfs:label><bf:code>txt</bf:code></bf:Content></bf:content>', |
|
|
"adminMetadata": '''<bf:adminMetadata> |
|
|
<bf:AdminMetadata> |
|
|
<bf:status> |
|
|
<bf:Status rdf:about="http://id.loc.gov/vocabulary/mstatus/n"> |
|
|
<rdfs:label>new</rdfs:label> |
|
|
<bf:code>n</bf:code> |
|
|
</bf:Status> |
|
|
</bf:status> |
|
|
<bf:date rdf:datatype="http://www.w3.org/2001/XMLSchema#date">2024-01-01</bf:date> |
|
|
<bf:agent> |
|
|
<bf:Agent rdf:about="http://id.loc.gov/vocabulary/organizations/dlc"> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Organization"/> |
|
|
<rdfs:label>Library of Congress</rdfs:label> |
|
|
</bf:Agent> |
|
|
</bf:agent> |
|
|
<bf:assigner> |
|
|
<bf:Agent rdf:about="http://id.loc.gov/vocabulary/organizations/dlc"> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Organization"/> |
|
|
<rdfs:label>Library of Congress</rdfs:label> |
|
|
</bf:Agent> |
|
|
</bf:assigner> |
|
|
</bf:AdminMetadata> |
|
|
</bf:adminMetadata>''', |
|
|
"assigner": '''<bf:assigner> |
|
|
<bf:Agent rdf:about="http://id.loc.gov/vocabulary/organizations/dlc"> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Organization"/> |
|
|
<rdfs:label>Library of Congress</rdfs:label> |
|
|
</bf:Agent> |
|
|
</bf:assigner>''' |
|
|
} |
|
|
|
|
|
|
|
|
work_match = re.search(r'(<bf:Work[^>]*>)(.*?)(</bf:Work>)', rdf_content, re.DOTALL) |
|
|
instance_match = re.search(r'(<bf:Instance[^>]*>)(.*?)(</bf:Instance>)', rdf_content, re.DOTALL) |
|
|
|
|
|
if not work_match and not instance_match: |
|
|
if steps_log: |
|
|
steps_log.append("β Rapid fix: No bf:Work or bf:Instance found in RDF") |
|
|
return None |
|
|
|
|
|
match = work_match or instance_match |
|
|
target_type = "Work" if work_match else "Instance" |
|
|
opening_tag = match.group(1) |
|
|
content = match.group(2) |
|
|
closing_tag = match.group(3) |
|
|
|
|
|
if steps_log: |
|
|
steps_log.append(f"π Rapid fix target: bf:{target_type}") |
|
|
has_admin = "<bf:adminMetadata>" in content or "<bf:AdminMetadata>" in content |
|
|
steps_log.append(f"π Current state: AdminMetadata {'EXISTS' if has_admin else 'MISSING'}") |
|
|
|
|
|
|
|
|
fixes = [] |
|
|
assigner_fixed = False |
|
|
|
|
|
for prop in missing[:10]: |
|
|
prop_lower = prop.lower() |
|
|
|
|
|
|
|
|
if prop_lower == "assigner": |
|
|
if steps_log: |
|
|
steps_log.append("π§ Processing missing 'assigner' property...") |
|
|
|
|
|
admin_pattern = re.compile(r'(<bf:AdminMetadata[^>]*>)(.*?)(</bf:AdminMetadata>)', re.DOTALL) |
|
|
|
|
|
def add_assigner(match): |
|
|
nonlocal assigner_fixed |
|
|
admin_open = match.group(1) |
|
|
admin_content = match.group(2) |
|
|
admin_close = match.group(3) |
|
|
|
|
|
|
|
|
if '<bf:assigner' in admin_content: |
|
|
return match.group(0) |
|
|
|
|
|
|
|
|
agent_uri = None |
|
|
agent_match = re.search(r'<bf:agent\s+rdf:resource="([^"]+)"', admin_content) |
|
|
if not agent_match: |
|
|
agent_match = re.search(r'<bf:agent[^>]*>\s*<[^>]+\s+rdf:about="([^"]+)"', admin_content) |
|
|
if agent_match: |
|
|
agent_uri = agent_match.group(1) |
|
|
|
|
|
|
|
|
if agent_uri: |
|
|
assigner_element = f' <bf:assigner rdf:resource="{agent_uri}"/>' |
|
|
else: |
|
|
|
|
|
assigner_element = ''' <bf:assigner> |
|
|
<bf:Agent rdf:about="http://id.loc.gov/vocabulary/organizations/dlc"> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Organization"/> |
|
|
<rdfs:label>Library of Congress</rdfs:label> |
|
|
</bf:Agent> |
|
|
</bf:assigner>''' |
|
|
|
|
|
assigner_fixed = True |
|
|
if steps_log: |
|
|
steps_log.append(f" β
Injected assigner into existing AdminMetadata (agent URI: {agent_uri or 'default'})") |
|
|
|
|
|
return admin_open + admin_content + '\n' + assigner_element + '\n ' + admin_close |
|
|
|
|
|
original_content = content |
|
|
content = admin_pattern.sub(add_assigner, content) |
|
|
|
|
|
if assigner_fixed and steps_log: |
|
|
steps_log.append(" β
Assigner successfully added to existing AdminMetadata") |
|
|
elif steps_log and content == original_content: |
|
|
steps_log.append(" βΉοΈ No AdminMetadata found to inject assigner (will add with full block if adminMetadata is missing)") |
|
|
|
|
|
elif prop in INSTANT_FIXES and f"<bf:{prop}" not in content: |
|
|
fixes.append(INSTANT_FIXES[prop]) |
|
|
if steps_log: |
|
|
steps_log.append(f" β
Will add missing '{prop}' property") |
|
|
elif prop in INSTANT_FIXES: |
|
|
if steps_log: |
|
|
steps_log.append(f" βΉοΈ Property '{prop}' already exists, skipping") |
|
|
elif steps_log: |
|
|
steps_log.append(f" β οΈ No template for '{prop}', skipping") |
|
|
|
|
|
if not fixes and not assigner_fixed: |
|
|
if steps_log: |
|
|
steps_log.append("β Rapid fix: No properties could be fixed") |
|
|
return None |
|
|
|
|
|
|
|
|
if fixes: |
|
|
if steps_log: |
|
|
steps_log.append(f"π¨ Adding {len(fixes)} missing properties to {target_type}") |
|
|
fixed_content = opening_tag + content + '\n ' + '\n '.join(fixes) + '\n' + closing_tag |
|
|
else: |
|
|
if steps_log: |
|
|
steps_log.append(f"π¨ Modified content (assigner injection only)") |
|
|
fixed_content = opening_tag + content + closing_tag |
|
|
|
|
|
|
|
|
result = rdf_content.replace(match.group(0), fixed_content) |
|
|
|
|
|
if steps_log: |
|
|
steps_log.append(f"β
Rapid fix complete: Added {len(fixes)} properties, assigner_injected={assigner_fixed}") |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def get_ai_correction_minimal(errors: str, rdf: str, max_tokens: int = 800) -> str: |
|
|
"""Ultra-minimal prompt for faster AI response.""" |
|
|
|
|
|
if not OPENAI_AVAILABLE or not os.getenv('HF_API_KEY'): |
|
|
return rdf |
|
|
|
|
|
try: |
|
|
client = get_openai_client() |
|
|
if not client: |
|
|
return rdf |
|
|
|
|
|
|
|
|
error_lines = [] |
|
|
for line in errors.split('\n'): |
|
|
if any(term in line for term in ['Less than', 'missing', 'required', '->bf:', 'adminMetadata', 'assigner']): |
|
|
error_lines.append(line.strip()[:100]) |
|
|
if len(error_lines) >= 5: |
|
|
break |
|
|
|
|
|
if not error_lines: |
|
|
return rdf |
|
|
|
|
|
|
|
|
prompt = f"""Fix these BibFrame errors: |
|
|
|
|
|
{chr(10).join(error_lines[:3])} |
|
|
|
|
|
Add only what's missing to this RDF: |
|
|
{rdf[:800]}...{rdf[-200:] if len(rdf) > 1000 else ''} |
|
|
|
|
|
Return complete valid RDF/XML only.""" |
|
|
|
|
|
response = client.chat.completions.create( |
|
|
model=HF_MODEL, |
|
|
messages=[ |
|
|
{"role": "system", "content": "Fix RDF. Output only valid RDF/XML. No explanations."}, |
|
|
{"role": "user", "content": prompt} |
|
|
], |
|
|
max_tokens=max_tokens, |
|
|
temperature=0, |
|
|
timeout=20 |
|
|
) |
|
|
|
|
|
result = response.choices[0].message.content |
|
|
result = extract_rdf_from_response(result) |
|
|
result = fix_common_rdf_errors(result) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception: |
|
|
return rdf |
|
|
|
|
|
|
|
|
def test_validator_functionality(): |
|
|
"""Test if the validator is actually working""" |
|
|
if not VALIDATOR_AVAILABLE: |
|
|
print("β Validator not available for testing") |
|
|
return False |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
test_rdf = '''<?xml version="1.0"?> |
|
|
<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" |
|
|
xmlns:bf="http://id.loc.gov/ontologies/bibframe/"> |
|
|
<bf:Work rdf:about="http://example.org/work/1"> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Text"/> |
|
|
<!-- Intentionally missing title, language, content, adminMetadata to trigger SHACL violations --> |
|
|
</bf:Work> |
|
|
</rdf:RDF>''' |
|
|
conforms, results = validate_rdf(test_rdf.encode('utf-8'), 'monograph') |
|
|
|
|
|
|
|
|
if conforms: |
|
|
print("β οΈ WARNING: Validator returned 'conforms=True' for invalid RDF. Validator may not be working correctly!") |
|
|
return False |
|
|
else: |
|
|
preview = (results or '').strip() |
|
|
preview = preview[:200] + ('β¦' if len(preview) > 200 else '') |
|
|
print(f"β
Validator test passed. Got expected SHACL violations. Preview: {preview if preview else 'No results text returned'}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Validator test failed with error: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
if VALIDATOR_AVAILABLE: |
|
|
test_validator_functionality() |
|
|
|
|
|
def query_bibframe_docs(tool_name: str, params: dict, timeout: int = 10) -> Optional[dict]: |
|
|
""" |
|
|
Query the MCP4BibFrame documentation API using the MCP protocol. |
|
|
|
|
|
Args: |
|
|
tool_name (str): Name of the tool to invoke |
|
|
params (dict): Parameters for the tool |
|
|
timeout (int): Request timeout in seconds |
|
|
|
|
|
Returns: |
|
|
Optional[dict]: Response data or None if failed |
|
|
""" |
|
|
if not MCP4BIBFRAME_DOCS_ENABLED: |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
mcp_request = { |
|
|
"jsonrpc": "2.0", |
|
|
"method": "tools/call", |
|
|
"params": { |
|
|
"name": tool_name, |
|
|
"arguments": params |
|
|
}, |
|
|
"id": 1 |
|
|
} |
|
|
|
|
|
logger.info(f"Querying BibFrame docs: {tool_name} with {params}") |
|
|
|
|
|
|
|
|
response = requests.post( |
|
|
MCP4BIBFRAME_DOCS_URL, |
|
|
json=mcp_request, |
|
|
timeout=timeout, |
|
|
headers={"Accept": "text/event-stream"} |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
|
|
|
for line in response.text.split('\n'): |
|
|
if line.startswith('data: '): |
|
|
try: |
|
|
data = json.loads(line[6:]) |
|
|
if 'result' in data: |
|
|
return data['result'] |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
else: |
|
|
logger.warning(f"BibFrame docs API returned status {response.status_code}") |
|
|
|
|
|
except requests.exceptions.Timeout: |
|
|
logger.warning("Timeout querying BibFrame documentation") |
|
|
except Exception as e: |
|
|
logger.error(f"Error querying BibFrame documentation: {str(e)}") |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def query_bibframe_docs_cached(tool_name: str, params: dict, timeout: int = 10) -> Optional[dict]: |
|
|
"""Cached wrapper around ``query_bibframe_docs`` to avoid repeated HTTP calls.""" |
|
|
if not MCP4BIBFRAME_DOCS_ENABLED: |
|
|
return None |
|
|
|
|
|
try: |
|
|
cache_key = f"{tool_name}:{json.dumps(params, sort_keys=True)}" |
|
|
except TypeError: |
|
|
cache_key = f"{tool_name}:{str(params)}" |
|
|
|
|
|
cached = BIBFRAME_DOCS_CACHE.get(cache_key) |
|
|
if cached: |
|
|
payload, timestamp = cached |
|
|
if time.time() - timestamp < BIBFRAME_DOCS_CACHE_TTL: |
|
|
logger.debug(f"Using cached BibFrame docs response for {cache_key}") |
|
|
return payload |
|
|
|
|
|
response = query_bibframe_docs(tool_name, params, timeout) |
|
|
if response is not None: |
|
|
BIBFRAME_DOCS_CACHE[cache_key] = (response, time.time()) |
|
|
|
|
|
return response |
|
|
|
|
|
def extract_bibframe_terms_from_errors(validation_results: str) -> dict: |
|
|
""" |
|
|
Extract BibFrame properties and classes mentioned in validation errors. |
|
|
|
|
|
Args: |
|
|
validation_results (str): Validation error text |
|
|
|
|
|
Returns: |
|
|
dict: Dictionary with 'properties' and 'classes' lists |
|
|
""" |
|
|
import re |
|
|
|
|
|
terms = { |
|
|
'properties': set(), |
|
|
'classes': set() |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
property_patterns = [ |
|
|
r'bf:(\w+)', |
|
|
r'->bf:(\w+)', |
|
|
r'property (\w+)', |
|
|
r'missing (\w+)', |
|
|
r'requires? (\w+)' |
|
|
] |
|
|
|
|
|
|
|
|
class_patterns = [ |
|
|
r'bf:([A-Z]\w+)', |
|
|
r'type ([A-Z]\w+)', |
|
|
r'class ([A-Z]\w+)', |
|
|
r'<bf:([A-Z]\w+)', |
|
|
r'a ([A-Z]\w+)' |
|
|
] |
|
|
|
|
|
text = validation_results.lower() |
|
|
|
|
|
|
|
|
for pattern in property_patterns: |
|
|
matches = re.findall(pattern, validation_results, re.IGNORECASE) |
|
|
for match in matches: |
|
|
if match and len(match) > 2: |
|
|
terms['properties'].add(match.lower()) |
|
|
|
|
|
|
|
|
for pattern in class_patterns: |
|
|
matches = re.findall(pattern, validation_results) |
|
|
for match in matches: |
|
|
if match and len(match) > 2: |
|
|
terms['classes'].add(match) |
|
|
|
|
|
|
|
|
terms['properties'] = list(terms['properties'])[:5] |
|
|
terms['classes'] = list(terms['classes'])[:3] |
|
|
|
|
|
return terms |
|
|
|
|
|
def fetch_bibframe_guidance(validation_results: str, rdf_content: str) -> str: |
|
|
""" |
|
|
Fetch relevant BibFrame guidance from the documentation API based on errors. |
|
|
|
|
|
Args: |
|
|
validation_results (str): Validation error messages |
|
|
rdf_content (str): Original RDF content |
|
|
|
|
|
Returns: |
|
|
str: Formatted guidance text for inclusion in prompts |
|
|
""" |
|
|
if not MCP4BIBFRAME_DOCS_ENABLED: |
|
|
return "" |
|
|
|
|
|
guidance_parts = [] |
|
|
|
|
|
try: |
|
|
|
|
|
terms = extract_bibframe_terms_from_errors(validation_results) |
|
|
logger.info(f"Extracted terms - properties: {terms['properties']}, classes: {terms['classes']}") |
|
|
|
|
|
|
|
|
for prop in terms['properties'][:3]: |
|
|
prop_uri = _resolve_bibframe_uri(prop) |
|
|
result = query_bibframe_docs_cached("get_property_info", {"property_uri": prop_uri}) |
|
|
if result and isinstance(result, dict): |
|
|
guidance_parts.append(f"\n**{result.get('label', prop)}** ({prop}):") |
|
|
if 'definition' in result: |
|
|
guidance_parts.append(f"- Definition: {result['definition']}") |
|
|
if 'domain' in result: |
|
|
guidance_parts.append(f"- Used in: {', '.join(result['domain'])}") |
|
|
if 'range' in result: |
|
|
guidance_parts.append(f"- Values: {', '.join(result['range'])}") |
|
|
if 'examples' in result and result['examples']: |
|
|
guidance_parts.append(f"- Example: {result['examples'][0]}") |
|
|
|
|
|
|
|
|
for cls in terms['classes'][:2]: |
|
|
cls_uri = _resolve_bibframe_uri(cls) |
|
|
result = query_bibframe_docs_cached("get_class_info", {"class_uri": cls_uri}) |
|
|
if result and isinstance(result, dict): |
|
|
guidance_parts.append(f"\n**{result.get('label', cls)}** class:") |
|
|
if 'definition' in result: |
|
|
guidance_parts.append(f"- Definition: {result['definition']}") |
|
|
if 'applicable_properties' in result: |
|
|
props = [p.get('label', p.get('property', '')) for p in result['applicable_properties'][:5]] |
|
|
guidance_parts.append(f"- Key properties: {', '.join(props)}") |
|
|
|
|
|
|
|
|
if any(term in validation_results.lower() for term in ['adminmetadata', 'assigner', '->bf:assigner']): |
|
|
result = query_bibframe_docs_cached("get_property_usage", { |
|
|
"property_name": "assigner", |
|
|
"class_name": "AdminMetadata" |
|
|
}) |
|
|
if result and isinstance(result, dict): |
|
|
guidance_parts.append("\n**AdminMetadata/assigner usage:**") |
|
|
if 'usage' in result: |
|
|
guidance_parts.append(f"- {result['usage']}") |
|
|
if 'examples' in result and result['examples']: |
|
|
guidance_parts.append(f"- Pattern: {result['examples'][0]}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching BibFrame guidance: {str(e)}") |
|
|
|
|
|
if guidance_parts: |
|
|
return "\n".join(guidance_parts) |
|
|
return "" |
|
|
|
|
|
|
|
|
def get_openai_client(): |
|
|
"""Get configured OpenAI client for HF Inference Endpoint""" |
|
|
if not HF_API_KEY: |
|
|
print("β No HF_API_KEY available for OpenAI client") |
|
|
return None |
|
|
|
|
|
print(f"π Creating OpenAI client with:") |
|
|
print(f" base_url: {HF_ENDPOINT_URL}") |
|
|
print(f" api_key: {'***' + HF_API_KEY[-4:] if len(HF_API_KEY) > 4 else 'HIDDEN'}") |
|
|
|
|
|
return OpenAI( |
|
|
base_url=HF_ENDPOINT_URL, |
|
|
api_key=HF_API_KEY, |
|
|
timeout=120.0 |
|
|
) |
|
|
|
|
|
|
|
|
SAMPLE_VALID_RDF = '''<?xml version="1.0" encoding="UTF-8"?> |
|
|
<rdf:RDF xmlns:bf="http://id.loc.gov/ontologies/bibframe/" |
|
|
xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" |
|
|
xmlns:rdfs="http://www.w3.org/2000/01/rdf-schema#" |
|
|
xmlns:bflc="http://id.loc.gov/ontologies/bflc/" |
|
|
xmlns:madsrdf="http://www.loc.gov/mads/rdf/v1#"> |
|
|
|
|
|
<bf:Work rdf:about="http://example.org/work/1"> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Text"/> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Monograph"/> |
|
|
|
|
|
<bf:title> |
|
|
<bf:Title> |
|
|
<bf:mainTitle>The knitter's handy book of patterns</bf:mainTitle> |
|
|
<bf:subtitle>basic designs in multiple sizes & gauges</bf:subtitle> |
|
|
</bf:Title> |
|
|
</bf:title> |
|
|
|
|
|
<bf:contribution> |
|
|
<bf:PrimaryContribution> |
|
|
<bf:agent> |
|
|
<bf:Agent rdf:about="http://id.loc.gov/rwo/agents/n2001017606"> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Person"/> |
|
|
<rdfs:label>Budd, Ann, 1956-</rdfs:label> |
|
|
</bf:Agent> |
|
|
</bf:agent> |
|
|
<bf:role> |
|
|
<bf:Role rdf:about="http://id.loc.gov/vocabulary/relators/aut"> |
|
|
<rdfs:label>author</rdfs:label> |
|
|
<bf:code>aut</bf:code> |
|
|
</bf:Role> |
|
|
</bf:role> |
|
|
</bf:PrimaryContribution> |
|
|
</bf:contribution> |
|
|
|
|
|
<bf:language> |
|
|
<bf:Language rdf:about="http://id.loc.gov/vocabulary/languages/eng"> |
|
|
<rdfs:label xml:lang="en">English</rdfs:label> |
|
|
<bf:code rdf:datatype="http://www.w3.org/2001/XMLSchema#string">eng</bf:code> |
|
|
</bf:Language> |
|
|
</bf:language> |
|
|
|
|
|
<bf:content> |
|
|
<bf:Content rdf:about="http://id.loc.gov/vocabulary/contentTypes/txt"> |
|
|
<rdfs:label>text</rdfs:label> |
|
|
<bf:code>txt</bf:code> |
|
|
</bf:Content> |
|
|
</bf:content> |
|
|
|
|
|
<bf:classification> |
|
|
<bf:ClassificationLcc> |
|
|
<bf:classificationPortion>TT820</bf:classificationPortion> |
|
|
<bf:itemPortion>.B877 2002</bf:itemPortion> |
|
|
<bf:assigner> |
|
|
<bf:Organization rdf:about="http://id.loc.gov/vocabulary/organizations/dlc"> |
|
|
<rdfs:label>United States, Library of Congress</rdfs:label> |
|
|
</bf:Organization> |
|
|
</bf:assigner> |
|
|
</bf:ClassificationLcc> |
|
|
</bf:classification> |
|
|
|
|
|
<bf:adminMetadata> |
|
|
<bf:AdminMetadata> |
|
|
<bf:status> |
|
|
<bf:Status rdf:about="http://id.loc.gov/vocabulary/mstatus/n"> |
|
|
<rdfs:label>new</rdfs:label> |
|
|
<bf:code>n</bf:code> |
|
|
</bf:Status> |
|
|
</bf:status> |
|
|
<bf:date rdf:datatype="http://www.w3.org/2001/XMLSchema#date">2001-12-12</bf:date> |
|
|
<bf:agent> |
|
|
<bf:Agent rdf:about="http://id.loc.gov/vocabulary/organizations/dlc"> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Organization"/> |
|
|
<rdfs:label>United States, Library of Congress</rdfs:label> |
|
|
</bf:Agent> |
|
|
</bf:agent> |
|
|
</bf:AdminMetadata> |
|
|
</bf:adminMetadata> |
|
|
</bf:Work> |
|
|
|
|
|
</rdf:RDF>''' |
|
|
|
|
|
SAMPLE_INVALID_RDF = '''<?xml version="1.0" encoding="UTF-8"?> |
|
|
<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" |
|
|
xmlns:bf="http://id.loc.gov/ontologies/bibframe/"> |
|
|
<!-- Well-formed RDF/XML, but missing required properties to trigger SHACL violations --> |
|
|
<bf:Work rdf:about="http://example.org/work/invalid-1"> |
|
|
<!-- Ensure target class is hit so SHACL runs --> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Text"/> |
|
|
<!-- Missing proper title structure, language, content, adminMetadata --> |
|
|
<bf:title>Incomplete Title</bf:title> |
|
|
</bf:Work> |
|
|
</rdf:RDF>''' |
|
|
|
|
|
|
|
|
BIBFRAME_CORRECTION_EXAMPLES = { |
|
|
"title_structure": { |
|
|
"pattern": r"bf:title", |
|
|
"wrong": """<bf:title>Simple Title String</bf:title>""", |
|
|
"correct": """<bf:title> |
|
|
<bf:Title> |
|
|
<bf:mainTitle>The knitter's handy book of patterns</bf:mainTitle> |
|
|
<bf:subtitle>basic designs in multiple sizes & gauges</bf:subtitle> |
|
|
</bf:Title> |
|
|
</bf:title>""" |
|
|
}, |
|
|
"adminmetadata": { |
|
|
"pattern": r"bf:adminMetadata|->bf:assigner", |
|
|
"wrong": """<bf:adminMetadata> |
|
|
<bf:AdminMetadata> |
|
|
<bf:agent rdf:resource="http://example.org/org"/> |
|
|
<bf:status>new</bf:status> |
|
|
</bf:AdminMetadata> |
|
|
</bf:adminMetadata>""", |
|
|
"correct": """<bf:adminMetadata> |
|
|
<bf:AdminMetadata> |
|
|
<bf:status> |
|
|
<bf:Status rdf:about="http://id.loc.gov/vocabulary/mstatus/n"> |
|
|
<rdfs:label>new</rdfs:label> |
|
|
<bf:code>n</bf:code> |
|
|
</bf:Status> |
|
|
</bf:status> |
|
|
<bf:date rdf:datatype="http://www.w3.org/2001/XMLSchema#date">2001-12-12</bf:date> |
|
|
<bf:agent> |
|
|
<bf:Agent rdf:about="http://id.loc.gov/vocabulary/organizations/dlc"> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Organization"/> |
|
|
<rdfs:label>United States, Library of Congress</rdfs:label> |
|
|
</bf:Agent> |
|
|
</bf:agent> |
|
|
</bf:AdminMetadata> |
|
|
</bf:adminMetadata>""" |
|
|
}, |
|
|
"contribution": { |
|
|
"pattern": r"bf:contribution", |
|
|
"wrong": """<bf:contribution>Author Name</bf:contribution>""", |
|
|
"correct": """<bf:contribution> |
|
|
<bf:PrimaryContribution> |
|
|
<bf:agent> |
|
|
<bf:Agent rdf:about="http://id.loc.gov/rwo/agents/n2001017606"> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Person"/> |
|
|
<rdfs:label>Budd, Ann, 1956-</rdfs:label> |
|
|
</bf:Agent> |
|
|
</bf:agent> |
|
|
<bf:role> |
|
|
<bf:Role rdf:about="http://id.loc.gov/vocabulary/relators/ctb"> |
|
|
<rdfs:label>contributor</rdfs:label> |
|
|
<bf:code>ctb</bf:code> |
|
|
</bf:Role> |
|
|
</bf:role> |
|
|
</bf:PrimaryContribution> |
|
|
</bf:contribution>""" |
|
|
}, |
|
|
"language": { |
|
|
"pattern": r"bf:language", |
|
|
"wrong": """<bf:language>English</bf:language>""", |
|
|
"correct": """<bf:language> |
|
|
<bf:Language rdf:about="http://id.loc.gov/vocabulary/languages/eng"> |
|
|
<rdfs:label xml:lang="en">English</rdfs:label> |
|
|
<bf:code rdf:datatype="http://www.w3.org/2001/XMLSchema#string">eng</bf:code> |
|
|
</bf:Language> |
|
|
</bf:language>""" |
|
|
}, |
|
|
"content": { |
|
|
"pattern": r"bf:content", |
|
|
"wrong": """<bf:content>Text</bf:content>""", |
|
|
"correct": """<bf:content> |
|
|
<bf:Content rdf:about="http://id.loc.gov/vocabulary/contentTypes/txt"> |
|
|
<rdfs:label>text</rdfs:label> |
|
|
<bf:code>txt</bf:code> |
|
|
</bf:Content> |
|
|
</bf:content>""" |
|
|
}, |
|
|
"classification": { |
|
|
"pattern": r"bf:classification", |
|
|
"wrong": """<bf:classification>TT820 .B877 2002</bf:classification>""", |
|
|
"correct": """<bf:classification> |
|
|
<bf:ClassificationLcc> |
|
|
<bf:classificationPortion>TT820</bf:classificationPortion> |
|
|
<bf:itemPortion>.B877 2002</bf:itemPortion> |
|
|
<bf:assigner> |
|
|
<bf:Organization rdf:about="http://id.loc.gov/vocabulary/organizations/dlc"> |
|
|
<rdfs:label>United States, Library of Congress</rdfs:label> |
|
|
<bf:code rdf:datatype="http://id.loc.gov/datatypes/orgs/code">DLC</bf:code> |
|
|
</bf:Organization> |
|
|
</bf:assigner> |
|
|
<bf:status> |
|
|
<bf:Status rdf:about="http://id.loc.gov/vocabulary/mstatus/uba"> |
|
|
<rdfs:label>used by assigner</rdfs:label> |
|
|
<bf:code>uba</bf:code> |
|
|
</bf:Status> |
|
|
</bf:status> |
|
|
</bf:ClassificationLcc> |
|
|
</bf:classification>""" |
|
|
}, |
|
|
"subject": { |
|
|
"pattern": r"bf:subject", |
|
|
"wrong": """<bf:subject>Knitting--Patterns</bf:subject>""", |
|
|
"correct": """<bf:subject> |
|
|
<bf:Topic rdf:about="http://id.loc.gov/authorities/subjects/sh85072708"> |
|
|
<rdfs:label xml:lang="en">Knitting--Patterns</rdfs:label> |
|
|
<madsrdf:componentList rdf:parseType="Collection"> |
|
|
<madsrdf:Authority> |
|
|
<madsrdf:authoritativeLabel xml:lang="en">Knitting</madsrdf:authoritativeLabel> |
|
|
<madsrdf:elementList> |
|
|
<madsrdf:TopicElement> |
|
|
<madsrdf:elementValue xml:lang="en">Knitting</madsrdf:elementValue> |
|
|
</madsrdf:TopicElement> |
|
|
</madsrdf:elementList> |
|
|
</madsrdf:Authority> |
|
|
</madsrdf:componentList> |
|
|
</bf:Topic> |
|
|
</bf:subject>""" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
def validate_rdf_tool(rdf_content: str, template: str = "monograph") -> dict: |
|
|
""" |
|
|
Validate RDF/XML content against SHACL templates. |
|
|
|
|
|
This tool validates RDF/XML data against predefined SHACL shapes to ensure |
|
|
compliance with metadata standards like BIBFRAME. Returns detailed validation |
|
|
results with conformance status and specific violation information. |
|
|
|
|
|
Args: |
|
|
rdf_content (str): The RDF/XML content to validate |
|
|
template (str): Validation template to use ('monograph' or 'custom') |
|
|
|
|
|
Returns: |
|
|
dict: Validation results with conformance status and detailed feedback |
|
|
""" |
|
|
if not rdf_content: |
|
|
return {"error": "No RDF/XML content provided", "conforms": False} |
|
|
|
|
|
if not VALIDATOR_AVAILABLE: |
|
|
logger.error("Validator module not available") |
|
|
return { |
|
|
"error": "Validator not available - ensure validator.py is present", |
|
|
"conforms": False |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
try: |
|
|
try: |
|
|
import rdflib |
|
|
except ImportError: |
|
|
rdflib = None |
|
|
if rdflib: |
|
|
g = rdflib.Graph() |
|
|
|
|
|
g.parse(data=rdf_content, format="application/rdf+xml") |
|
|
else: |
|
|
logger.info("rdflib not installed; skipping pre-parse RDF/XML syntax check") |
|
|
except Exception as parse_err: |
|
|
logger.error(f"RDF/XML parse error before validation: {parse_err}") |
|
|
return { |
|
|
"error": f"RDF/XML parse error: {parse_err}", |
|
|
"conforms": False |
|
|
} |
|
|
|
|
|
|
|
|
logger.info(f"Validating RDF with template '{template}', content length: {len(rdf_content)}") |
|
|
|
|
|
|
|
|
conforms, results_text = validate_rdf(rdf_content.encode('utf-8'), template) |
|
|
|
|
|
|
|
|
logger.info(f"Validation result - conforms: {conforms}, results length: {len(results_text) if results_text else 0}") |
|
|
|
|
|
|
|
|
if conforms and (not results_text or len(results_text.strip()) == 0): |
|
|
results_text = "Validation passed with no specific feedback." |
|
|
elif not conforms and (not results_text or len(results_text.strip()) == 0): |
|
|
results_text = "Validation failed but no specific errors were returned. Check the RDF syntax and structure." |
|
|
|
|
|
return { |
|
|
"conforms": conforms, |
|
|
"results": results_text if results_text else "", |
|
|
"template": template, |
|
|
"status": "β
Valid RDF" if conforms else "β Invalid RDF" |
|
|
} |
|
|
|
|
|
except ImportError as e: |
|
|
logger.error(f"Import error in validator: {str(e)}") |
|
|
return { |
|
|
"error": f"Validator import error: {str(e)}. Check that all dependencies are installed.", |
|
|
"conforms": False |
|
|
} |
|
|
except AttributeError as e: |
|
|
logger.error(f"Validator function not found: {str(e)}") |
|
|
return { |
|
|
"error": f"Validator function error: {str(e)}. Check validator.py implementation.", |
|
|
"conforms": False |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Validation error: {str(e)}") |
|
|
import traceback |
|
|
logger.error(f"Full traceback: {traceback.format_exc()}") |
|
|
return { |
|
|
"error": f"Validation failed: {str(e)}", |
|
|
"conforms": False |
|
|
} |
|
|
|
|
|
def filter_validation_results_by_class(validation_results: str, rdf_content: str) -> dict: |
|
|
""" |
|
|
Filter validation results by RDF class (Work, Instance, etc.) |
|
|
|
|
|
Args: |
|
|
validation_results (str): Full validation results |
|
|
rdf_content (str): Original RDF content |
|
|
|
|
|
Returns: |
|
|
dict: Validation results organized by class |
|
|
""" |
|
|
import re |
|
|
|
|
|
|
|
|
class_results = { |
|
|
'Work': [], |
|
|
'Instance': [], |
|
|
'Title': [], |
|
|
'Contribution': [], |
|
|
'AdminMetadata': [], |
|
|
'Other': [] |
|
|
} |
|
|
|
|
|
lines = validation_results.split('\n') |
|
|
current_section = [] |
|
|
current_class = 'Other' |
|
|
|
|
|
for line in lines: |
|
|
|
|
|
if 'bf:Work' in line or '/work/' in line: |
|
|
current_class = 'Work' |
|
|
elif 'bf:Instance' in line or '/instance/' in line: |
|
|
current_class = 'Instance' |
|
|
elif 'bf:Title' in line: |
|
|
current_class = 'Title' |
|
|
elif 'bf:Contribution' in line: |
|
|
current_class = 'Contribution' |
|
|
elif 'bf:AdminMetadata' in line or 'AdminMetadata' in line or '->bf:assigner' in line: |
|
|
|
|
|
current_class = 'AdminMetadata' |
|
|
|
|
|
|
|
|
if 'Constraint Violation' in line: |
|
|
if current_section: |
|
|
class_results[current_class].extend(current_section) |
|
|
current_section = [line] |
|
|
elif line.strip(): |
|
|
current_section.append(line) |
|
|
|
|
|
|
|
|
if current_section: |
|
|
class_results[current_class].extend(current_section) |
|
|
|
|
|
|
|
|
return {k: '\n'.join(v) for k, v in class_results.items() if v} |
|
|
|
|
|
def get_ai_suggestions(validation_results: str, rdf_content: str, include_warnings: bool = False) -> str: |
|
|
"""Generate AI-powered, plain-language suggestions based on validation results. |
|
|
|
|
|
Avoids RDF/SHACL jargon and focuses on actionable fixes. |
|
|
""" |
|
|
if not OPENAI_AVAILABLE: |
|
|
return generate_manual_suggestions(validation_results) |
|
|
|
|
|
current_api_key = os.getenv('HF_API_KEY', '') |
|
|
if not current_api_key: |
|
|
return f""" |
|
|
π **AI suggestions disabled**: Please set your Hugging Face API key as a Secret in your Space settings. |
|
|
|
|
|
{generate_manual_suggestions(validation_results)} |
|
|
""" |
|
|
|
|
|
try: |
|
|
client = get_openai_client() |
|
|
if not client: |
|
|
return f""" |
|
|
π **AI suggestions disabled**: HF_API_KEY not configured. |
|
|
|
|
|
{generate_manual_suggestions(validation_results)} |
|
|
""" |
|
|
|
|
|
severity_instruction = ( |
|
|
"Focus only on violations (errors) and ignore any warnings." |
|
|
if not include_warnings else |
|
|
"Address both violations and warnings." |
|
|
) |
|
|
|
|
|
|
|
|
bibframe_guidance = fetch_bibframe_guidance(validation_results, rdf_content) |
|
|
doc_section = "" |
|
|
if bibframe_guidance: |
|
|
doc_section = f""" |
|
|
Reference information from BibFrame ontology: |
|
|
{bibframe_guidance} |
|
|
""" |
|
|
|
|
|
|
|
|
class_results = filter_validation_results_by_class(validation_results, rdf_content) |
|
|
if class_results: |
|
|
primary_class = max(class_results.keys(), key=lambda k: len(class_results[k])) |
|
|
focused_results = class_results[primary_class] |
|
|
else: |
|
|
primary_class = "Record" |
|
|
focused_results = validation_results |
|
|
|
|
|
simplified_summary = parse_shacl_results_for_ai(focused_results) |
|
|
relevant_rdf = extract_relevant_rdf_section(rdf_content, primary_class) |
|
|
|
|
|
prompt = f""" |
|
|
You are a helpful metadata librarian. Write in plain language (no RDF/SHACL jargon). Analyze the validation errors for the {primary_class} and provide concise, actionable fixes. |
|
|
|
|
|
{severity_instruction} |
|
|
{doc_section} |
|
|
|
|
|
Validation Errors for {primary_class}: |
|
|
{focused_results[:1500]} |
|
|
|
|
|
Validation Summary (plain language): |
|
|
{simplified_summary} |
|
|
|
|
|
Relevant RDF Section: |
|
|
{relevant_rdf[:800]} |
|
|
|
|
|
Instructions: |
|
|
1. ONE sentence: What's wrong with this {primary_class}? |
|
|
2. List errors (max 3 words each) |
|
|
3. Show exact XML fixes |
|
|
|
|
|
Format: |
|
|
**Issue:** [One sentence about the {primary_class} problem] |
|
|
|
|
|
**Errors:** |
|
|
β’ Error 1 |
|
|
β’ Error 2 |
|
|
|
|
|
**Fix:** |
|
|
```xml |
|
|
[Complete corrected {primary_class} section] |
|
|
``` |
|
|
|
|
|
Be ultra-concise. Show the fix, not explanations.""" |
|
|
|
|
|
chat_completion = client.chat.completions.create( |
|
|
model=HF_MODEL, |
|
|
messages=[ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": "You are a friendly librarian helping fix catalog records. Never use technical RDF or SHACL terminology. Use the BibFrame documentation provided to ensure accuracy." |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": prompt |
|
|
} |
|
|
], |
|
|
max_tokens=800, |
|
|
temperature=0.5, |
|
|
top_p=0.9 |
|
|
) |
|
|
|
|
|
generated_text = chat_completion.choices[0].message.content |
|
|
generated_text = clean_technical_jargon(generated_text) |
|
|
|
|
|
other_classes = [k for k in class_results.keys() if k != primary_class] |
|
|
class_note = ( |
|
|
f"\n\nπ **Note:** Focused on {primary_class} errors. " + |
|
|
(f"Also found issues in: {', '.join(other_classes)}" if other_classes else "") |
|
|
) |
|
|
|
|
|
return f"π€ **AI-Powered Suggestions ({('Violations + Warnings' if include_warnings else 'Violations Only')}):**\n\n{generated_text}{class_note}" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"OpenAI/HF Inference Endpoint error: {str(e)}") |
|
|
return f""" |
|
|
β **AI suggestions error**: {str(e)} |
|
|
|
|
|
{generate_manual_suggestions(validation_results)} |
|
|
""" |
|
|
|
|
|
def extract_relevant_rdf_section(rdf_content: str, class_name: str) -> str: |
|
|
""" |
|
|
Extract only the relevant RDF section for a specific class |
|
|
|
|
|
Args: |
|
|
rdf_content (str): Full RDF content |
|
|
class_name (str): Class name to extract (Work, Instance, etc.) |
|
|
|
|
|
Returns: |
|
|
str: Relevant RDF section |
|
|
""" |
|
|
import re |
|
|
|
|
|
|
|
|
patterns = { |
|
|
'Work': r'<bf:Work.*?</bf:Work>', |
|
|
'Instance': r'<bf:Instance.*?</bf:Instance>', |
|
|
'Title': r'<bf:Title.*?</bf:Title>', |
|
|
'Contribution': r'<bf:Contribution.*?</bf:Contribution>', |
|
|
'AdminMetadata': r'<bf:AdminMetadata.*?</bf:AdminMetadata>' |
|
|
} |
|
|
|
|
|
pattern = patterns.get(class_name) |
|
|
if not pattern: |
|
|
return rdf_content[:1000] |
|
|
|
|
|
|
|
|
match = re.search(pattern, rdf_content, re.DOTALL) |
|
|
if match: |
|
|
section = match.group(0) |
|
|
|
|
|
namespaces = re.findall(r'xmlns:\w+="[^"]*"', rdf_content[:500]) |
|
|
if namespaces: |
|
|
return f"<!-- Namespaces: {' '.join(namespaces[:3])} -->\n{section}" |
|
|
return section |
|
|
|
|
|
return rdf_content[:1000] |
|
|
|
|
|
|
|
|
|
|
|
def merge_corrected_sections(original_rdf: str, corrected_sections: dict) -> str: |
|
|
""" |
|
|
Merge corrected class sections back into the original RDF |
|
|
|
|
|
Args: |
|
|
original_rdf (str): Original RDF content |
|
|
corrected_sections (dict): Corrected sections by class |
|
|
|
|
|
Returns: |
|
|
str: Merged RDF with corrections |
|
|
""" |
|
|
import re |
|
|
|
|
|
result = original_rdf |
|
|
|
|
|
|
|
|
for class_name, corrected_section in corrected_sections.items(): |
|
|
patterns = { |
|
|
'Work': r'<bf:Work.*?</bf:Work>', |
|
|
'Instance': r'<bf:Instance.*?</bf:Instance>', |
|
|
'Title': r'<bf:Title.*?</bf:Title>', |
|
|
'Contribution': r'<bf:Contribution.*?</bf:Contribution>', |
|
|
'AdminMetadata': r'<bf:AdminMetadata.*?</bf:AdminMetadata>' |
|
|
} |
|
|
|
|
|
pattern = patterns.get(class_name) |
|
|
if pattern: |
|
|
result = re.sub(pattern, corrected_section, result, count=1, flags=re.DOTALL) |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
Generate AI-powered fix suggestions for invalid RDF/XML. |
|
|
|
|
|
This tool analyzes validation results and provides actionable suggestions |
|
|
for fixing RDF/XML validation errors using AI or rule-based analysis. |
|
|
|
|
|
Args: |
|
|
validation_results (str): The validation error messages |
|
|
rdf_content (str): The original RDF/XML content that failed validation |
|
|
include_warnings (bool): Whether to include warnings in suggestions |
|
|
|
|
|
Returns: |
|
|
str: Detailed suggestions for fixing the RDF validation issues |
|
|
""" |
|
|
|
|
|
if not OPENAI_AVAILABLE: |
|
|
return generate_manual_suggestions(validation_results) |
|
|
|
|
|
|
|
|
current_api_key = os.getenv('HF_API_KEY', '') |
|
|
if not current_api_key: |
|
|
return f""" |
|
|
π **AI suggestions disabled**: Please set your Hugging Face API key as a Secret in your Space settings. |
|
|
|
|
|
{generate_manual_suggestions(validation_results)} |
|
|
""" |
|
|
|
|
|
try: |
|
|
|
|
|
client = get_openai_client() |
|
|
if not client: |
|
|
return f""" |
|
|
π **AI suggestions disabled**: HF_API_KEY not configured. |
|
|
|
|
|
{generate_manual_suggestions(validation_results)} |
|
|
""" |
|
|
|
|
|
severity_instruction = "Focus only on violations (errors) and ignore any warnings." if not include_warnings else "Address both violations and warnings." |
|
|
|
|
|
prompt = f"""You are an expert in RDF/XML and SHACL validation. Analyze the validation errors and provide CONCISE, ACTIONABLE fix suggestions. |
|
|
|
|
|
{severity_instruction} |
|
|
|
|
|
Validation Results: |
|
|
{validation_results} |
|
|
|
|
|
Original RDF (first 1000 chars): |
|
|
{rdf_content[:1000]}... |
|
|
|
|
|
Instructions: |
|
|
1. Start with a ONE-SENTENCE summary of the main issue |
|
|
2. List the specific errors in bullet points (max 5 words per error) |
|
|
3. Provide the exact fix for each error with code snippets |
|
|
4. Keep explanations minimal - focus on solutions |
|
|
|
|
|
Format: |
|
|
**Main Issue:** [One sentence] |
|
|
|
|
|
**Errors Found:** |
|
|
β’ Error 1 name |
|
|
β’ Error 2 name |
|
|
|
|
|
**Fixes:** |
|
|
1. **Error 1**: |
|
|
```xml |
|
|
[exact code to add/fix] |
|
|
``` |
|
|
2. **Error 2**: |
|
|
```xml |
|
|
[exact code to add/fix] |
|
|
``` |
|
|
|
|
|
Be direct and solution-focused. No lengthy explanations.""" |
|
|
|
|
|
|
|
|
print(f"π Making API call to: {HF_ENDPOINT_URL}") |
|
|
print(f"π Using model: {HF_MODEL}") |
|
|
print(f"π Include warnings: {include_warnings}") |
|
|
|
|
|
chat_completion = client.chat.completions.create( |
|
|
model=HF_MODEL, |
|
|
messages=[ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": "You are a friendly librarian helping fix catalog records. Never use technical RDF or SHACL terminology." |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": prompt |
|
|
} |
|
|
], |
|
|
max_tokens=1500, |
|
|
temperature=0.6, |
|
|
top_p=0.9 |
|
|
) |
|
|
|
|
|
print("β
API call successful") |
|
|
generated_text = chat_completion.choices[0].message.content |
|
|
return f"π€ **AI-Powered Suggestions ({('Violations + Warnings' if include_warnings else 'Violations Only')}):**\n\n{generated_text}" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"OpenAI/HF Inference Endpoint error: {str(e)}") |
|
|
return f""" |
|
|
β **AI suggestions error**: {str(e)} |
|
|
|
|
|
{generate_manual_suggestions(validation_results)} |
|
|
""" |
|
|
|
|
|
def extract_rdf_from_response(response: str) -> str: |
|
|
""" |
|
|
Extract RDF/XML content from AI response, handling code blocks. |
|
|
|
|
|
Args: |
|
|
response (str): AI response that may contain RDF wrapped in code blocks |
|
|
|
|
|
Returns: |
|
|
str: Extracted RDF/XML content |
|
|
""" |
|
|
response = response.strip() |
|
|
|
|
|
|
|
|
if "```xml" in response: |
|
|
try: |
|
|
return response.split("```xml")[1].split("```")[0].strip() |
|
|
except IndexError: |
|
|
pass |
|
|
|
|
|
|
|
|
if "```" in response and response.count("```") >= 2: |
|
|
try: |
|
|
return response.split("```")[1].split("```")[0].strip() |
|
|
except IndexError: |
|
|
pass |
|
|
|
|
|
|
|
|
return response |
|
|
|
|
|
def fix_common_rdf_errors(rdf_xml: str) -> str: |
|
|
""" |
|
|
Fix common RDF/XML errors that AI models generate. |
|
|
|
|
|
Args: |
|
|
rdf_xml (str): RDF/XML that may contain common errors |
|
|
|
|
|
Returns: |
|
|
str: Fixed RDF/XML |
|
|
""" |
|
|
import re |
|
|
|
|
|
|
|
|
rdf_xml = re.sub(r'\s+rdf:parseType="[^"]*"', '', rdf_xml) |
|
|
|
|
|
|
|
|
rdf_xml = re.sub( |
|
|
r'<bf:title>([^<]+)</bf:title>', |
|
|
r'<bf:title><bf:Title><bf:mainTitle>\1</bf:mainTitle></bf:Title></bf:title>', |
|
|
rdf_xml |
|
|
) |
|
|
|
|
|
|
|
|
language_map = { |
|
|
'English': 'http://id.loc.gov/vocabulary/languages/eng', |
|
|
'eng': 'http://id.loc.gov/vocabulary/languages/eng', |
|
|
'Spanish': 'http://id.loc.gov/vocabulary/languages/spa', |
|
|
'French': 'http://id.loc.gov/vocabulary/languages/fre', |
|
|
} |
|
|
for lang_text, lang_uri in language_map.items(): |
|
|
rdf_xml = re.sub( |
|
|
f'<bf:language>{lang_text}</bf:language>', |
|
|
f'<bf:language rdf:resource="{lang_uri}"/>', |
|
|
rdf_xml, |
|
|
flags=re.IGNORECASE |
|
|
) |
|
|
|
|
|
|
|
|
content_map = { |
|
|
'Text': 'http://id.loc.gov/vocabulary/contentTypes/txt', |
|
|
'text': 'http://id.loc.gov/vocabulary/contentTypes/txt', |
|
|
} |
|
|
for content_text, content_uri in content_map.items(): |
|
|
rdf_xml = re.sub( |
|
|
f'<bf:content>{content_text}</bf:content>', |
|
|
f'<bf:content rdf:resource="{content_uri}"/>', |
|
|
rdf_xml, |
|
|
flags=re.IGNORECASE |
|
|
) |
|
|
|
|
|
return rdf_xml |
|
|
|
|
|
|
|
|
def extract_error_focus_points(validation_results: str) -> Dict[str, List[str]]: |
|
|
"""Identify the specific focus nodes and properties mentioned in validation errors.""" |
|
|
import re |
|
|
|
|
|
focus = { |
|
|
"properties": [], |
|
|
"focus_nodes": [], |
|
|
"missing_properties": [], |
|
|
"classes": [], |
|
|
} |
|
|
|
|
|
if not validation_results: |
|
|
return focus |
|
|
|
|
|
property_set = set() |
|
|
missing_set = set() |
|
|
node_set = set() |
|
|
|
|
|
for match in re.finditer(r"Focus Node:\s*(?:<)?([^\s>]+)(?:>)?", validation_results): |
|
|
node_set.add(match.group(1)) |
|
|
|
|
|
for match in re.finditer(r"Result Path:\s*(?:http://[^/]+/)?([A-Za-z]+)", validation_results): |
|
|
property_set.add(match.group(1)) |
|
|
|
|
|
for match in re.finditer(r"Less than \d+ values on .*->bf:([A-Za-z]+)", validation_results): |
|
|
missing_set.add(match.group(1)) |
|
|
|
|
|
focus["properties"] = sorted(property_set) |
|
|
focus["focus_nodes"] = sorted(node_set) |
|
|
focus["missing_properties"] = sorted(missing_set) |
|
|
return focus |
|
|
|
|
|
|
|
|
def _resolve_bibframe_uri(name: str) -> str: |
|
|
if not name: |
|
|
return name |
|
|
if name.startswith("http://") or name.startswith("https://"): |
|
|
return name |
|
|
if ":" in name: |
|
|
prefix, local = name.split(":", 1) |
|
|
if prefix == "bf": |
|
|
return f"http://id.loc.gov/ontologies/bibframe/{local}" |
|
|
return f"http://id.loc.gov/ontologies/bibframe/{name}" |
|
|
|
|
|
|
|
|
def get_targeted_bibframe_guidance(properties: List[str], classes: List[str]) -> Dict[str, dict]: |
|
|
"""Fetch BibFrame documentation for only the specified properties/classes.""" |
|
|
guidance: Dict[str, dict] = {} |
|
|
|
|
|
if not MCP4BIBFRAME_DOCS_ENABLED: |
|
|
return guidance |
|
|
|
|
|
for prop in properties[:5]: |
|
|
prop_uri = _resolve_bibframe_uri(prop) |
|
|
result = query_bibframe_docs_cached("get_property_info", {"property_uri": prop_uri}, timeout=5) |
|
|
if result: |
|
|
guidance[prop] = result |
|
|
|
|
|
for cls in classes[:5]: |
|
|
cls_uri = _resolve_bibframe_uri(cls) |
|
|
result = query_bibframe_docs_cached("get_class_info", {"class_uri": cls_uri}, timeout=5) |
|
|
if result: |
|
|
guidance[cls] = result |
|
|
|
|
|
return guidance |
|
|
|
|
|
|
|
|
def generate_property_specific_fix(property_name: str, guidance: Optional[dict] = None) -> str: |
|
|
"""Generate a BibFrame-compliant snippet for a specific missing property.""" |
|
|
guidance = guidance or {} |
|
|
prop = property_name.lower() if property_name else "" |
|
|
|
|
|
if prop == "title": |
|
|
return """<bf:title> |
|
|
<bf:Title> |
|
|
<bf:mainTitle>PLACEHOLDER_TITLE</bf:mainTitle> |
|
|
</bf:Title> |
|
|
</bf:title>""" |
|
|
|
|
|
if prop == "language": |
|
|
return """<bf:language> |
|
|
<bf:Language rdf:about="http://id.loc.gov/vocabulary/languages/eng"> |
|
|
<rdfs:label xml:lang="en">English</rdfs:label> |
|
|
<bf:code rdf:datatype="http://www.w3.org/2001/XMLSchema#string">eng</bf:code> |
|
|
</bf:Language> |
|
|
</bf:language>""" |
|
|
|
|
|
if prop == "content": |
|
|
return """<bf:content> |
|
|
<bf:Content rdf:about="http://id.loc.gov/vocabulary/contentTypes/txt"> |
|
|
<rdfs:label>text</rdfs:label> |
|
|
<bf:code>txt</bf:code> |
|
|
</bf:Content> |
|
|
</bf:content>""" |
|
|
|
|
|
if prop == "contribution": |
|
|
return """<bf:contribution> |
|
|
<bf:PrimaryContribution> |
|
|
<bf:agent> |
|
|
<bf:Agent> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Person"/> |
|
|
<rdfs:label>Author Name</rdfs:label> |
|
|
</bf:Agent> |
|
|
</bf:agent> |
|
|
<bf:role> |
|
|
<bf:Role rdf:about="http://id.loc.gov/vocabulary/relators/aut"> |
|
|
<rdfs:label>author</rdfs:label> |
|
|
<bf:code>aut</bf:code> |
|
|
</bf:Role> |
|
|
</bf:role> |
|
|
</bf:PrimaryContribution> |
|
|
</bf:contribution>""" |
|
|
|
|
|
if prop == "classification": |
|
|
return """<bf:classification> |
|
|
<bf:ClassificationLcc> |
|
|
<bf:classificationPortion>TT820</bf:classificationPortion> |
|
|
<bf:itemPortion>.B877 2002</bf:itemPortion> |
|
|
<bf:assigner> |
|
|
<bf:Organization rdf:about="http://id.loc.gov/vocabulary/organizations/dlc"> |
|
|
<rdfs:label>United States, Library of Congress</rdfs:label> |
|
|
</bf:Organization> |
|
|
</bf:assigner> |
|
|
</bf:ClassificationLcc> |
|
|
</bf:classification>""" |
|
|
|
|
|
if prop == "adminmetadata": |
|
|
return """<bf:adminMetadata> |
|
|
<bf:AdminMetadata> |
|
|
<bf:status> |
|
|
<bf:Status rdf:about="http://id.loc.gov/vocabulary/mstatus/n"> |
|
|
<rdfs:label>new</rdfs:label> |
|
|
<bf:code>n</bf:code> |
|
|
</bf:Status> |
|
|
</bf:status> |
|
|
<bf:date rdf:datatype="http://www.w3.org/2001/XMLSchema#date">2024-01-01</bf:date> |
|
|
<bf:agent> |
|
|
<bf:Agent rdf:about="http://id.loc.gov/vocabulary/organizations/dlc"> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Organization"/> |
|
|
<rdfs:label>United States, Library of Congress</rdfs:label> |
|
|
</bf:Agent> |
|
|
</bf:agent> |
|
|
</bf:AdminMetadata> |
|
|
</bf:adminMetadata>""" |
|
|
|
|
|
|
|
|
return f"<bf:{property_name}>PLACEHOLDER_VALUE</bf:{property_name}>" |
|
|
|
|
|
def get_ai_correction(validation_results: str, rdf_content: str, template: str = 'monograph', max_attempts: int = None, include_warnings: bool = False, enable_validation_loop: bool | None = None, cache_key: Optional[str] = None, steps_log: Optional[List[str]] = None) -> str: |
|
|
""" |
|
|
Generate AI-powered corrected RDF/XML based on validation errors. |
|
|
|
|
|
This tool takes invalid RDF/XML and validation results, then generates |
|
|
a corrected version that addresses all identified validation issues. |
|
|
The generated correction is validated before being returned to the user. |
|
|
|
|
|
Args: |
|
|
validation_results (str): The validation error messages |
|
|
rdf_content (str): The original invalid RDF/XML content |
|
|
template (str): The validation template to use |
|
|
max_attempts (int): Maximum number of attempts to generate valid RDF (uses MAX_CORRECTION_ATTEMPTS if None) |
|
|
include_warnings (bool): Whether to fix warnings in addition to violations |
|
|
|
|
|
Returns: |
|
|
str: Corrected RDF/XML that should pass validation |
|
|
""" |
|
|
|
|
|
|
|
|
iterate_enabled = ENABLE_VALIDATION_LOOP if enable_validation_loop is None else enable_validation_loop |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Planning correction: iterate_enabled={iterate_enabled}, include_warnings={include_warnings}") |
|
|
|
|
|
if max_attempts is None: |
|
|
max_attempts = MAX_CORRECTION_ATTEMPTS |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Max attempts set to {max_attempts}") |
|
|
|
|
|
if not iterate_enabled: |
|
|
max_attempts = 1 |
|
|
if steps_log is not None: |
|
|
steps_log.append("Iteration disabled; forcing single attempt") |
|
|
|
|
|
if cache_key is None and validation_results and rdf_content: |
|
|
cache_key = _make_fix_cache_key(validation_results, rdf_content, template) |
|
|
if cache_key: |
|
|
cached_result = _get_cached_correction(cache_key, steps_log) |
|
|
if cached_result is not None: |
|
|
return cached_result |
|
|
|
|
|
if not OPENAI_AVAILABLE: |
|
|
if steps_log is not None: |
|
|
steps_log.append("OPENAI client not available; falling back to manual hints") |
|
|
return generate_manual_correction_hints(validation_results, rdf_content) |
|
|
|
|
|
|
|
|
current_api_key = os.getenv('HF_API_KEY', '') |
|
|
if not current_api_key: |
|
|
if steps_log is not None: |
|
|
steps_log.append("HF_API_KEY not set; cannot call model; returning manual hints") |
|
|
return f"""<!-- AI correction disabled: Set HF_API_KEY as a Secret in your Space settings --> |
|
|
|
|
|
{generate_manual_correction_hints(validation_results, rdf_content)}""" |
|
|
|
|
|
try: |
|
|
client = get_openai_client() |
|
|
if not client: |
|
|
if steps_log is not None: |
|
|
steps_log.append("Failed to initialize OpenAI client; returning manual hints") |
|
|
return f"""<!-- AI correction disabled: HF_API_KEY not configured --> |
|
|
|
|
|
{generate_manual_correction_hints(validation_results, rdf_content)}""" |
|
|
|
|
|
|
|
|
if steps_log is not None: |
|
|
steps_log.append("Fetching BibFrame documentation guidance...") |
|
|
|
|
|
bibframe_guidance = fetch_bibframe_guidance(validation_results, rdf_content) |
|
|
|
|
|
if bibframe_guidance: |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Retrieved BibFrame guidance ({len(bibframe_guidance)} chars)") |
|
|
guidance_section = f""" |
|
|
BIBFRAME DOCUMENTATION (from official ontology): |
|
|
{bibframe_guidance} |
|
|
|
|
|
Apply the above BibFrame definitions and patterns when correcting the RDF/XML. |
|
|
""" |
|
|
else: |
|
|
guidance_section = "" |
|
|
if steps_log is not None: |
|
|
steps_log.append("No specific BibFrame guidance retrieved") |
|
|
|
|
|
|
|
|
import time |
|
|
start_time = time.time() |
|
|
timeout = 45 |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Timeout budget: {timeout}s total") |
|
|
|
|
|
severity_instruction = "Fix only the violations (errors) and ignore any warnings." if not include_warnings else "Fix both violations and warnings." |
|
|
|
|
|
|
|
|
for attempt in range(max_attempts): |
|
|
|
|
|
elapsed = time.time() - start_time |
|
|
if elapsed > timeout: |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Timeout reached after {int(elapsed)}s; stopping attempts") |
|
|
print(f"β° Timeout reached after {timeout} seconds") |
|
|
break |
|
|
|
|
|
attempt_no = attempt + 1 |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Attempt {attempt_no}/{max_attempts}: requesting model correction") |
|
|
print(f"π Correction attempt {attempt_no}/{max_attempts}") |
|
|
|
|
|
|
|
|
needs_assigner = ("->bf:assigner" in validation_results) or (" bf:assigner" in validation_results) |
|
|
admin_guidance = "" |
|
|
if needs_assigner: |
|
|
admin_guidance = """ |
|
|
IMPORTANT: For each <bf:AdminMetadata>, ensure it has a direct child <bf:assigner>. |
|
|
Rules: |
|
|
- If <bf:agent rdf:resource=\"...\"/> exists, add <bf:assigner rdf:resource=\"...\"/> with the SAME URI. |
|
|
- Else if <bf:descriptionModifier rdf:resource=\"...\"/> exists, add <bf:assigner rdf:resource=\"...\"/> with the SAME URI. |
|
|
- Else if a <bf:identifiedBy> block contains <bf:assigner rdf:resource=\"...\"/>, copy that URI to a TOP-LEVEL <bf:assigner>. |
|
|
Keep all existing content; only add missing <bf:assigner> where required. |
|
|
""" |
|
|
|
|
|
|
|
|
examples_to_include = [] |
|
|
validation_lower = validation_results.lower() |
|
|
|
|
|
|
|
|
for name, example in BIBFRAME_CORRECTION_EXAMPLES.items(): |
|
|
pattern = example.get("pattern", name) |
|
|
if re.search(pattern, validation_results, re.IGNORECASE): |
|
|
examples_to_include.append((name, example)) |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Including {name} example based on pattern match") |
|
|
|
|
|
few_shot_section = "" |
|
|
if examples_to_include: |
|
|
few_shot_section = "\n\nCORRECT BIBFRAME PATTERNS (from Library of Congress records):\n" |
|
|
few_shot_section += "NEVER use simple strings - always use nested structures as shown below:\n\n" |
|
|
for name, example in examples_to_include: |
|
|
few_shot_section += f"{name.upper()}:\n" |
|
|
few_shot_section += f"β WRONG:\n```xml\n{example['wrong']}\n```\n" |
|
|
few_shot_section += f"β
CORRECT:\n```xml\n{example['correct']}\n```\n\n" |
|
|
|
|
|
|
|
|
critical_rules = """ |
|
|
CRITICAL RDF/XML RULES (from real BibFrame): |
|
|
1. NEVER use rdf:parseType except for "Collection" on madsrdf:componentList |
|
|
2. Properties like bf:title, bf:language, bf:content MUST have nested typed resources |
|
|
3. Use rdf:about for resource URIs, not rdf:resource on the property element |
|
|
4. bf:adminMetadata can appear multiple times in one record |
|
|
5. Status, Role, Language etc. are OBJECTS with rdf:about URIs, not literals |
|
|
6. Date values use rdf:datatype for typing (e.g., xsd:date, xsd:dateTime) |
|
|
7. Every bf:AdminMetadata needs BOTH bf:agent AND bf:assigner if validation requires it |
|
|
""" |
|
|
|
|
|
prompt = f"""You are an expert in RDF/XML and BibFrame cataloging. Fix the following RDF/XML based on the validation errors and official BibFrame documentation. |
|
|
|
|
|
{severity_instruction} |
|
|
{admin_guidance} |
|
|
{guidance_section} |
|
|
{critical_rules} |
|
|
{few_shot_section} |
|
|
|
|
|
Validation Errors: |
|
|
{validation_results} |
|
|
|
|
|
Original RDF/XML: |
|
|
{rdf_content} |
|
|
|
|
|
{f"Previous attempt {attempt} still had validation errors. Please fix ALL issues this time." if attempt > 0 else ""} |
|
|
|
|
|
INSTRUCTIONS: |
|
|
1. Return ONLY valid RDF/XML - no explanations |
|
|
2. Follow the EXACT patterns shown in the examples above |
|
|
3. Use proper nested structures - NO simple string values for complex properties |
|
|
4. Keep ALL namespace declarations |
|
|
5. Fix ALL validation errors""" |
|
|
|
|
|
try: |
|
|
|
|
|
system_prompt = """You are an RDF/XML expert following Library of Congress BibFrame patterns. |
|
|
Output ONLY valid RDF/XML following these rules: |
|
|
- Start with <?xml version="1.0" encoding="UTF-8"?> |
|
|
- NO markdown, NO explanations |
|
|
- Use EXACT structure patterns from the examples |
|
|
- Complex properties need nested typed resources |
|
|
- rdf:parseType ONLY for Collection on madsrdf:componentList |
|
|
- Status/Role/Language are OBJECTS with URIs, not strings""" |
|
|
|
|
|
chat_completion = client.chat.completions.create( |
|
|
model=HF_MODEL, |
|
|
messages=[ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": system_prompt |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": prompt |
|
|
} |
|
|
], |
|
|
max_tokens=1500, |
|
|
temperature=0.0, |
|
|
timeout=20 |
|
|
) |
|
|
|
|
|
corrected_rdf = chat_completion.choices[0].message.content.strip() |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Attempt {attempt_no}: model responded; extracting and fixing common errors") |
|
|
|
|
|
|
|
|
corrected_rdf = extract_rdf_from_response(corrected_rdf) |
|
|
|
|
|
|
|
|
corrected_rdf = fix_common_rdf_errors(corrected_rdf) |
|
|
|
|
|
|
|
|
if VALIDATOR_AVAILABLE and (time.time() - start_time < timeout - 10): |
|
|
try: |
|
|
|
|
|
conforms, new_results = validate_rdf(corrected_rdf.encode('utf-8'), template) |
|
|
|
|
|
if conforms: |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Attempt {attempt_no}: correction PASSED validation") |
|
|
print(f"β
Correction validated successfully on attempt {attempt_no}") |
|
|
result_text = f"""<!-- AI-generated correction validated successfully --> |
|
|
{corrected_rdf}""" |
|
|
if cache_key: |
|
|
_store_correction_in_cache(cache_key, result_text, steps_log) |
|
|
return result_text |
|
|
else: |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Attempt {attempt_no}: still invalid; will retry with updated errors") |
|
|
print(f"β Correction attempt {attempt_no} still has validation errors") |
|
|
|
|
|
validation_results = new_results |
|
|
|
|
|
except Exception as e: |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Attempt {attempt_no}: error during validation: {str(e)} β returning correction anyway") |
|
|
print(f"β οΈ Error validating correction attempt {attempt_no}: {str(e)}") |
|
|
|
|
|
return f"""<!-- AI-generated correction (validation check failed) --> |
|
|
{corrected_rdf}""" |
|
|
else: |
|
|
|
|
|
if steps_log is not None: |
|
|
steps_log.append("Skipping validation check (validator unavailable or timeout)") |
|
|
print("β οΈ Returning correction without validation") |
|
|
return f"""<!-- AI-generated correction (validation skipped) --> |
|
|
{corrected_rdf}""" |
|
|
|
|
|
except Exception as api_error: |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Attempt {attempt_no}: API error: {str(api_error)}") |
|
|
print(f"β API error on attempt {attempt_no}: {str(api_error)}") |
|
|
if attempt == max_attempts - 1: |
|
|
raise api_error |
|
|
continue |
|
|
|
|
|
|
|
|
if steps_log is not None: |
|
|
steps_log.append("All attempts failed or timed out; returning manual hints") |
|
|
return f"""<!-- AI correction failed after {max_attempts} attempts or timeout --> |
|
|
<!-- Please correct manually using the validation results as a guide --> |
|
|
|
|
|
{generate_manual_correction_hints(validation_results, rdf_content)}""" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"LLM API error: {str(e)}") |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Fatal error invoking model: {str(e)}") |
|
|
return f"""<!-- Error generating AI correction: {str(e)} --> |
|
|
|
|
|
{generate_manual_correction_hints(validation_results, rdf_content)}""" |
|
|
|
|
|
|
|
|
def get_ai_correction_targeted(validation_results: str, rdf_content: str, template: str = 'monograph', max_attempts: int = None, include_warnings: bool = False, enable_validation_loop: bool | None = None, steps_log: Optional[List[str]] = None) -> str: |
|
|
"""Fast path that attempts structured quick fixes before invoking the full AI loop.""" |
|
|
|
|
|
if steps_log: |
|
|
steps_log.append("\n" + "=" * 70) |
|
|
steps_log.append("π INITIAL VALIDATION ERRORS:") |
|
|
steps_log.append("=" * 70) |
|
|
|
|
|
error_lines = [line.strip() for line in validation_results.split('\n') if 'Less than' in line or 'Message:' in line or 'Module:' in line] |
|
|
for line in error_lines[:15]: |
|
|
steps_log.append(f" {line}") |
|
|
if len(error_lines) > 15: |
|
|
steps_log.append(f" ... and {len(error_lines) - 15} more errors") |
|
|
steps_log.append("") |
|
|
|
|
|
cache_key: Optional[str] = None |
|
|
if validation_results and rdf_content: |
|
|
cache_key = _make_fix_cache_key(validation_results, rdf_content, template) |
|
|
cached = _get_cached_correction(cache_key, steps_log) |
|
|
if cached is not None: |
|
|
if steps_log: |
|
|
steps_log.append("πΎ Cache hit! Returning previously successful correction") |
|
|
return cached |
|
|
|
|
|
|
|
|
if steps_log: |
|
|
steps_log.append("=" * 60) |
|
|
steps_log.append("π STARTING RAPID FIX") |
|
|
steps_log.append("=" * 60) |
|
|
|
|
|
quick_fix = rapid_fix_missing_properties(rdf_content, validation_results, template, steps_log) |
|
|
|
|
|
if quick_fix: |
|
|
if steps_log: |
|
|
steps_log.append("=" * 60) |
|
|
steps_log.append("π RE-VALIDATING AFTER RAPID FIX") |
|
|
steps_log.append("=" * 60) |
|
|
|
|
|
if quick_fix and VALIDATOR_AVAILABLE: |
|
|
try: |
|
|
conforms, new_results = validate_rdf(quick_fix.encode('utf-8'), template) |
|
|
if conforms: |
|
|
if steps_log: |
|
|
steps_log.append("=" * 60) |
|
|
steps_log.append("β
β
β
RAPID FIX SUCCESSFUL - VALIDATION PASSED!") |
|
|
steps_log.append("=" * 60) |
|
|
if cache_key: |
|
|
_store_correction_in_cache(cache_key, quick_fix, steps_log) |
|
|
return quick_fix |
|
|
else: |
|
|
|
|
|
if steps_log: |
|
|
steps_log.append("=" * 60) |
|
|
steps_log.append("β οΈ RAPID FIX INCOMPLETE - Still has errors:") |
|
|
steps_log.append("=" * 60) |
|
|
|
|
|
error_lines = new_results.split('\n')[:10] if new_results else [] |
|
|
for line in error_lines: |
|
|
if 'Less than' in line or 'Message:' in line: |
|
|
steps_log.append(f" {line.strip()}") |
|
|
|
|
|
validation_results = new_results or validation_results |
|
|
rdf_content = quick_fix |
|
|
if steps_log: |
|
|
steps_log.append("π Continuing to minimal AI correction...") |
|
|
except Exception as e: |
|
|
if steps_log: |
|
|
steps_log.append("=" * 60) |
|
|
steps_log.append(f"β RAPID FIX VALIDATION ERROR: {e}") |
|
|
steps_log.append("=" * 60) |
|
|
steps_log.append("π Continuing to minimal AI correction...") |
|
|
elif quick_fix and steps_log: |
|
|
steps_log.append("β οΈ Validator not available, cannot re-validate rapid fix") |
|
|
elif steps_log: |
|
|
steps_log.append("βΉοΈ Rapid fix returned None, moving to AI correction") |
|
|
|
|
|
|
|
|
if OPENAI_AVAILABLE and os.getenv('HF_API_KEY'): |
|
|
if steps_log: |
|
|
steps_log.append("Attempting minimal AI correction...") |
|
|
|
|
|
corrected = get_ai_correction_minimal(validation_results, rdf_content, max_tokens=1000) |
|
|
|
|
|
if corrected and corrected != rdf_content and VALIDATOR_AVAILABLE: |
|
|
try: |
|
|
conforms, new_results = validate_rdf(corrected.encode('utf-8'), template) |
|
|
if conforms: |
|
|
if steps_log: |
|
|
steps_log.append("β
Minimal AI correction successful!") |
|
|
if cache_key: |
|
|
_store_correction_in_cache(cache_key, corrected, steps_log) |
|
|
return corrected |
|
|
else: |
|
|
validation_results = new_results or validation_results |
|
|
rdf_content = corrected |
|
|
if steps_log: |
|
|
steps_log.append("Minimal AI correction partial; falling back to full AI...") |
|
|
except Exception as e: |
|
|
if steps_log: |
|
|
steps_log.append(f"Minimal AI validation error: {e}; falling back...") |
|
|
|
|
|
focus_points = extract_error_focus_points(validation_results) |
|
|
missing_props = focus_points.get("missing_properties", []) |
|
|
|
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Targeted fix: detected {len(missing_props)} missing properties") |
|
|
if missing_props: |
|
|
preview = ", ".join(missing_props[:5]) |
|
|
if len(missing_props) > 5: |
|
|
preview += ", ..." |
|
|
steps_log.append(f"Missing list: {preview}") |
|
|
|
|
|
working_rdf = rdf_content |
|
|
quick_fix_attempted = False |
|
|
|
|
|
if missing_props and len(missing_props) <= 5: |
|
|
guidance = get_targeted_bibframe_guidance(missing_props, focus_points.get("classes", [])) |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Retrieved guidance entries: {len(guidance)}") |
|
|
|
|
|
import re |
|
|
|
|
|
def _inject_snippets(match: re.Match) -> str: |
|
|
nonlocal quick_fix_attempted |
|
|
opening, inner, closing = match.groups() |
|
|
new_bits = [] |
|
|
for prop in missing_props: |
|
|
if f"<bf:{prop}" not in inner: |
|
|
snippet = generate_property_specific_fix(prop, guidance.get(prop)) |
|
|
new_bits.append(snippet) |
|
|
if not new_bits: |
|
|
return match.group(0) |
|
|
quick_fix_attempted = True |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Injected {len(new_bits)} snippets into {match.group(1).split()[0][1:]}") |
|
|
combined = opening + inner |
|
|
if not inner.endswith("\n"): |
|
|
combined += "\n" |
|
|
combined += " " + "\n ".join(new_bits) + "\n" + closing |
|
|
return combined |
|
|
|
|
|
work_pattern = re.compile(r"(<bf:Work[^>]*>)([\s\S]*?)(</bf:Work>)") |
|
|
instance_pattern = re.compile(r"(<bf:Instance[^>]*>)([\s\S]*?)(</bf:Instance>)") |
|
|
|
|
|
if work_pattern.search(working_rdf): |
|
|
working_rdf = work_pattern.sub(_inject_snippets, working_rdf, count=1) |
|
|
elif instance_pattern.search(working_rdf): |
|
|
working_rdf = instance_pattern.sub(_inject_snippets, working_rdf, count=1) |
|
|
|
|
|
if quick_fix_attempted and VALIDATOR_AVAILABLE: |
|
|
try: |
|
|
conforms, new_results = validate_rdf(working_rdf.encode('utf-8'), template) |
|
|
if conforms: |
|
|
if steps_log is not None: |
|
|
steps_log.append("Quick fix succeeded; validation now passes") |
|
|
if cache_key: |
|
|
_store_correction_in_cache(cache_key, working_rdf, steps_log) |
|
|
return working_rdf |
|
|
else: |
|
|
if steps_log is not None: |
|
|
steps_log.append("Quick fix incomplete; falling back to AI loop") |
|
|
validation_results = new_results or validation_results |
|
|
except Exception as quick_err: |
|
|
if steps_log is not None: |
|
|
steps_log.append(f"Quick fix validation error: {quick_err}; using AI fallback") |
|
|
|
|
|
if validation_results and working_rdf: |
|
|
cache_key = _make_fix_cache_key(validation_results, working_rdf, template) |
|
|
|
|
|
return get_ai_correction( |
|
|
validation_results, |
|
|
working_rdf, |
|
|
template, |
|
|
max_attempts=max_attempts, |
|
|
include_warnings=include_warnings, |
|
|
enable_validation_loop=enable_validation_loop, |
|
|
cache_key=cache_key, |
|
|
steps_log=steps_log, |
|
|
) |
|
|
|
|
|
|
|
|
def generate_manual_suggestions(validation_results: str) -> str: |
|
|
"""Generate generic, pattern-based suggestions when AI is not available. |
|
|
|
|
|
Note: Avoid hardcoding SHACL rules or specific property requirements; rely only on |
|
|
patterns present in the validation output text. |
|
|
""" |
|
|
vr_lower = validation_results.lower() if validation_results else "" |
|
|
suggestions: List[str] = [] |
|
|
|
|
|
|
|
|
if ("mincount" in vr_lower) or ("missing" in vr_lower) or ("required" in vr_lower): |
|
|
suggestions.append("β’ Some required fields are missing. Add the missing information where indicated.") |
|
|
|
|
|
|
|
|
if ("maxcount" in vr_lower) or ("too many" in vr_lower) or ("more than allowed" in vr_lower): |
|
|
suggestions.append("β’ Some fields have too many values. Keep only the main/one value as required.") |
|
|
|
|
|
|
|
|
if ("datatype" in vr_lower) or ("type mismatch" in vr_lower) or ("expected" in vr_lower and "datatype" in vr_lower): |
|
|
suggestions.append("β’ Some values are in the wrong format. Use the expected format (e.g., dates like YYYY-MM-DD).") |
|
|
|
|
|
|
|
|
if ("iri" in vr_lower) or ("uri" in vr_lower) or ("identifier" in vr_lower and "invalid" in vr_lower): |
|
|
suggestions.append("β’ Some identifiers look malformed. Use complete, valid web addresses or proper identifiers.") |
|
|
|
|
|
|
|
|
if ("namespace" in vr_lower) or ("prefix" in vr_lower): |
|
|
suggestions.append("β’ Define all XML namespace prefixes at the top and use them consistently.") |
|
|
|
|
|
|
|
|
if ("xml" in vr_lower) or ("syntax" in vr_lower) or ("well-formed" in vr_lower): |
|
|
suggestions.append("β’ Fix XML structure issues (unclosed tags, invalid characters, or nesting problems).") |
|
|
|
|
|
|
|
|
if not suggestions: |
|
|
suggestions.append("β’ Review the validation details and update the record where issues are highlighted.") |
|
|
suggestions.append("β’ Follow the selected template; add missing fields and correct formats as needed.") |
|
|
|
|
|
suggestions_text = "\n".join(suggestions) |
|
|
|
|
|
return f""" |
|
|
π **What needs fixing:** |
|
|
|
|
|
{suggestions_text} |
|
|
|
|
|
π‘ **Quick tips:** |
|
|
β’ Include required fields when noted |
|
|
β’ Keep single-value fields to one value |
|
|
β’ Use the expected formats (e.g., for dates) |
|
|
β’ Declare and use XML namespace prefixes consistently |
|
|
β’ Ensure the XML is wellβformed |
|
|
|
|
|
Need help? Load an example and compare the structure. |
|
|
""" |
|
|
|
|
|
def clean_technical_jargon(text: str) -> str: |
|
|
"""Replace technical RDF/SHACL terms with plain language for end users.""" |
|
|
if not text: |
|
|
return text |
|
|
replacements = { |
|
|
|
|
|
"URIRef": "identifier", |
|
|
"URI": "identifier", |
|
|
"IRI": "identifier", |
|
|
"Literal": "text value", |
|
|
"triple": "field entry", |
|
|
"graph": "dataset", |
|
|
"node": "record", |
|
|
"subject": "record", |
|
|
"predicate": "field type", |
|
|
"object": "value", |
|
|
"SHACL": "validation", |
|
|
"constraint": "rule", |
|
|
"conformance": "compliance", |
|
|
"violation": "issue", |
|
|
"sh:": "", |
|
|
"rdf:": "", |
|
|
"rdfs:": "", |
|
|
"xsd:": "", |
|
|
|
|
|
"Error:": "Issue:", |
|
|
"Invalid": "Incorrect", |
|
|
"Failed": "Did not pass", |
|
|
"Missing": "Not found", |
|
|
} |
|
|
cleaned = text |
|
|
for k, v in replacements.items(): |
|
|
cleaned = cleaned.replace(k, v) |
|
|
return cleaned |
|
|
|
|
|
def parse_shacl_results_for_ai(results_text: str) -> str: |
|
|
"""Simplify SHACL results into clearer sentences for AI processing. |
|
|
|
|
|
Pattern-based only; does not depend on any SHACL rule definitions. |
|
|
""" |
|
|
if not results_text: |
|
|
return "" |
|
|
import re |
|
|
simplified: List[str] = [] |
|
|
|
|
|
|
|
|
patterns = [ |
|
|
(re.compile(r"minCount", re.IGNORECASE), "A required field is missing."), |
|
|
(re.compile(r"maxCount", re.IGNORECASE), "A field has more values than allowed; only one may be permitted."), |
|
|
(re.compile(r"datatype", re.IGNORECASE), "A field has a value in the wrong format."), |
|
|
(re.compile(r"iri|uri", re.IGNORECASE), "An identifier looks malformed or incomplete."), |
|
|
(re.compile(r"namespace|prefix", re.IGNORECASE), "A namespace prefix is undefined or inconsistent."), |
|
|
(re.compile(r"xml|syntax|well-formed", re.IGNORECASE), "The XML structure has an error (e.g., unclosed tag)."), |
|
|
] |
|
|
|
|
|
lines = [ln.strip() for ln in results_text.splitlines() if ln.strip()] |
|
|
for ln in lines: |
|
|
matched = False |
|
|
for regex, message in patterns: |
|
|
if regex.search(ln): |
|
|
simplified.append(message) |
|
|
matched = True |
|
|
break |
|
|
if not matched and ("Constraint Violation" in ln or "Violation" in ln): |
|
|
simplified.append("A record rule was not met.") |
|
|
|
|
|
|
|
|
seen = set() |
|
|
unique = [] |
|
|
for s in simplified: |
|
|
if s not in seen: |
|
|
unique.append(s) |
|
|
seen.add(s) |
|
|
|
|
|
return "\n".join(unique) if unique else results_text |
|
|
|
|
|
def generate_manual_correction_hints(validation_results: str, rdf_content: str) -> str: |
|
|
"""Generate manual correction hints when AI is not available""" |
|
|
return f"""<!-- Manual correction hints based on validation results --> |
|
|
<!-- Set HF_API_KEY as a Secret in your Space settings for AI-powered corrections --> |
|
|
|
|
|
{rdf_content} |
|
|
|
|
|
<!-- |
|
|
VALIDATION ISSUES FOUND: |
|
|
{validation_results[:500]}... |
|
|
|
|
|
MANUAL CORRECTION STEPS: |
|
|
1. Add missing namespace declarations |
|
|
2. Include required properties (rdf:type, etc.) |
|
|
3. Fix XML syntax errors |
|
|
4. Ensure proper URI formats |
|
|
5. Validate data types |
|
|
-->""" |
|
|
|
|
|
def extract_xml_from_text(text: str) -> str: |
|
|
"""Extract RDF/XML from model output that may include extra formatting. |
|
|
|
|
|
Looks for the first <rdf:RDF ...> ... </rdf:RDF> block. If not found, |
|
|
returns the original text unchanged. |
|
|
""" |
|
|
if not text: |
|
|
return text |
|
|
import re |
|
|
|
|
|
|
|
|
pattern = re.compile(r"<rdf:RDF[\s\S]*?</rdf:RDF>", re.IGNORECASE) |
|
|
m = pattern.search(text) |
|
|
if m: |
|
|
return m.group(0) |
|
|
|
|
|
fenced = re.sub(r"^```[a-zA-Z]*\n|```$", "", text.strip()) |
|
|
return fenced if fenced else text |
|
|
|
|
|
def clean_xml_for_validation(xml_text: str) -> str: |
|
|
""" |
|
|
Clean XML text for validation by removing comments and extra formatting. |
|
|
|
|
|
Args: |
|
|
xml_text (str): XML text that may contain comments or formatting |
|
|
|
|
|
Returns: |
|
|
str: Clean XML ready for validation |
|
|
""" |
|
|
import re |
|
|
|
|
|
if not xml_text: |
|
|
return xml_text |
|
|
|
|
|
|
|
|
cleaned = re.sub(r'<!--.*?-->', '', xml_text, flags=re.DOTALL) |
|
|
|
|
|
|
|
|
cleaned = cleaned.strip() |
|
|
|
|
|
|
|
|
if cleaned.startswith("```"): |
|
|
try: |
|
|
|
|
|
parts = cleaned.split("```") |
|
|
if len(parts) >= 3: |
|
|
|
|
|
cleaned = parts[1] |
|
|
|
|
|
if cleaned.startswith("xml"): |
|
|
cleaned = cleaned[3:] |
|
|
except: |
|
|
pass |
|
|
|
|
|
return cleaned.strip() |
|
|
|
|
|
|
|
|
STANDARD_NAMESPACES = { |
|
|
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", |
|
|
"bf": "http://id.loc.gov/ontologies/bibframe/", |
|
|
"rdfs": "http://www.w3.org/2000/01/rdf-schema#", |
|
|
"xsd": "http://www.w3.org/2001/XMLSchema#", |
|
|
} |
|
|
|
|
|
def _extract_declared_namespaces(xml_text: str) -> dict: |
|
|
import re |
|
|
decls = {} |
|
|
for prefix, uri in re.findall(r"xmlns:([A-Za-z0-9_-]+)=\"([^\"]+)\"", xml_text[:2000]): |
|
|
decls[prefix] = uri |
|
|
return decls |
|
|
|
|
|
def _detect_used_prefixes(xml_text: str) -> set: |
|
|
import re |
|
|
used = set() |
|
|
|
|
|
for m in re.finditer(r"<\s*([A-Za-z0-9_-]+):[A-Za-z0-9_-]+", xml_text): |
|
|
used.add(m.group(1)) |
|
|
for m in re.finditer(r"\s([A-Za-z0-9_-]+):[A-Za-z0-9_-]+=", xml_text): |
|
|
used.add(m.group(1)) |
|
|
return used |
|
|
|
|
|
def ensure_rdf_wrapper_and_namespaces(xml_text: str, original_text: Optional[str] = None, steps_log: Optional[List[str]] = None) -> str: |
|
|
"""Ensure the XML has an <rdf:RDF> wrapper and required xmlns declarations for used prefixes. |
|
|
|
|
|
- If wrapper exists, add any missing xmlns: declarations for standard, used prefixes. |
|
|
- If wrapper is missing, wrap the content and include standard namespaces for used prefixes. |
|
|
""" |
|
|
if not xml_text or not isinstance(xml_text, str): |
|
|
return xml_text |
|
|
import re |
|
|
|
|
|
declared = _extract_declared_namespaces(xml_text) |
|
|
if original_text: |
|
|
|
|
|
declared.update(_extract_declared_namespaces(original_text)) |
|
|
|
|
|
used = _detect_used_prefixes(xml_text) |
|
|
|
|
|
used.add("rdf") |
|
|
|
|
|
|
|
|
missing = [p for p in used if p not in declared and p in STANDARD_NAMESPACES] |
|
|
added_attrs = " ".join([f"xmlns:{p}=\"{STANDARD_NAMESPACES[p]}\"" for p in missing]) |
|
|
|
|
|
has_wrapper = bool(re.search(r"<rdf:RDF[^>]*>", xml_text)) |
|
|
updated = xml_text |
|
|
|
|
|
if has_wrapper: |
|
|
if added_attrs: |
|
|
|
|
|
def _inject(match): |
|
|
start_tag = match.group(0) |
|
|
if start_tag.endswith('>'): |
|
|
return start_tag[:-1] + ' ' + added_attrs + '>' |
|
|
return start_tag + ' ' + added_attrs |
|
|
updated = re.sub(r"<rdf:RDF[^>]*>", _inject, updated, count=1) |
|
|
if steps_log is not None and missing: |
|
|
steps_log.append(f"Injected missing namespace declarations: {', '.join(missing)}") |
|
|
else: |
|
|
|
|
|
attrs = [f"xmlns:rdf=\"{STANDARD_NAMESPACES['rdf']}\""] |
|
|
for p in used: |
|
|
if p == 'rdf': |
|
|
continue |
|
|
uri = declared.get(p) or STANDARD_NAMESPACES.get(p) |
|
|
if uri: |
|
|
attrs.append(f"xmlns:{p}=\"{uri}\"") |
|
|
wrapper_open = "<rdf:RDF " + " ".join(attrs) + ">\n" |
|
|
wrapper_close = "\n</rdf:RDF>" |
|
|
updated = wrapper_open + xml_text + wrapper_close |
|
|
if steps_log is not None: |
|
|
steps_log.append("Wrapped snippet in <rdf:RDF> with standard namespace declarations") |
|
|
|
|
|
return updated |
|
|
|
|
|
def validate_rdf_interface(rdf_content: str, template: str, use_ai: bool = True, include_warnings: bool = False, iterate_until_valid: bool = True, max_attempts: int = 5, show_steps: bool = True): |
|
|
"""Main validation function for Gradio interface""" |
|
|
if not rdf_content.strip(): |
|
|
return "β Error", "No RDF/XML data provided", "", "", "", "", "" |
|
|
|
|
|
steps_log: List[str] = [] |
|
|
|
|
|
|
|
|
if not VALIDATOR_AVAILABLE: |
|
|
error_msg = "Validator module is not available. Please check that validator.py is present and all dependencies are installed." |
|
|
steps_log.append(f"ERROR: {error_msg}") |
|
|
return "β Error", error_msg, "", "\n".join(steps_log) if show_steps else "", "", "", "" |
|
|
|
|
|
|
|
|
steps_log.append(f"Preparing RDF for validation (original length: {len(rdf_content)} chars)") |
|
|
prepped_input = ensure_rdf_wrapper_and_namespaces(rdf_content, steps_log=steps_log if show_steps else None) |
|
|
steps_log.append(f"Preprocessed RDF (new length: {len(prepped_input)} chars)") |
|
|
|
|
|
|
|
|
steps_log.append(f"Calling validator with template '{template}'") |
|
|
result = validate_rdf_tool(prepped_input, template) |
|
|
|
|
|
if "error" in result: |
|
|
steps_log.append(f"Validation error: {result['error']}") |
|
|
return f"β Error: {result['error']}", "", "", "\n".join(steps_log) if show_steps else "", "", "", "" |
|
|
|
|
|
status = result["status"] |
|
|
results_text = result["results"] |
|
|
conforms = result["conforms"] |
|
|
|
|
|
steps_log.append(f"Initial validation: {'PASSED' if conforms else 'FAILED'} using template '{template}'") |
|
|
|
|
|
|
|
|
if not results_text or len(results_text.strip()) == 0: |
|
|
steps_log.append("WARNING: Validator returned empty results text") |
|
|
|
|
|
|
|
|
filtered_results = results_text |
|
|
if not include_warnings and "Warning" in results_text: |
|
|
|
|
|
lines = results_text.split('\n') |
|
|
filtered_lines = [] |
|
|
skip_until_next_section = False |
|
|
|
|
|
for line in lines: |
|
|
if "Warning" in line and ("Constraint Violation" in line or "sh:Warning" in line): |
|
|
skip_until_next_section = True |
|
|
elif "Constraint Violation" in line and "Warning" not in line: |
|
|
skip_until_next_section = False |
|
|
filtered_lines.append(line) |
|
|
elif not skip_until_next_section: |
|
|
filtered_lines.append(line) |
|
|
|
|
|
filtered_results = '\n'.join(filtered_lines) |
|
|
if not include_warnings: |
|
|
steps_log.append("Filtered out warnings from results") |
|
|
|
|
|
corrected_status = "" |
|
|
corrected_results = "" |
|
|
|
|
|
if not include_warnings: |
|
|
steps_log.append("Configured to ignore warnings in AI processing") |
|
|
if iterate_until_valid: |
|
|
steps_log.append(f"Iteration enabled with max_attempts={max_attempts}") |
|
|
if conforms: |
|
|
suggestions = "β
No issues found! Your RDF/XML is valid according to the selected template." |
|
|
corrected_rdf = "" |
|
|
corrected_status = "β" |
|
|
corrected_results = "" |
|
|
steps_log.append("No correction needed; record already conforms") |
|
|
else: |
|
|
if use_ai: |
|
|
|
|
|
suggestions = get_ai_suggestions(filtered_results, rdf_content, include_warnings) |
|
|
steps_log.append("Requested AI suggestions for concise guidance") |
|
|
corrected_rdf = get_ai_correction_targeted( |
|
|
filtered_results, |
|
|
rdf_content, |
|
|
template, |
|
|
max_attempts=max_attempts, |
|
|
include_warnings=include_warnings, |
|
|
enable_validation_loop=iterate_until_valid, |
|
|
steps_log=steps_log, |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
corrected_xml = clean_xml_for_validation(corrected_rdf) |
|
|
corrected_xml = extract_xml_from_text(corrected_xml) |
|
|
corrected_xml = ensure_rdf_wrapper_and_namespaces(corrected_xml, original_text=prepped_input, steps_log=steps_log) |
|
|
|
|
|
|
|
|
steps_log.append(f"Re-validating cleaned RDF ({len(corrected_xml)} chars)") |
|
|
if show_steps: |
|
|
|
|
|
preview = corrected_xml[:200] + "..." if len(corrected_xml) > 200 else corrected_xml |
|
|
steps_log.append(f"Preview: {preview}") |
|
|
|
|
|
reval = validate_rdf_tool(corrected_xml, template) |
|
|
if "error" in reval: |
|
|
corrected_status = f"β Re-validation Error: {reval['error']}" |
|
|
corrected_results = "" |
|
|
steps_log.append(f"Re-validation failed with error: {reval['error']}") |
|
|
else: |
|
|
corrected_status = reval.get("status", "") |
|
|
corrected_results = reval.get("results", "") |
|
|
conforms = reval.get('conforms', False) |
|
|
steps_log.append(f"Re-validation: {corrected_status} - Conforms: {conforms}") |
|
|
except Exception as re_ex: |
|
|
corrected_status = f"β Re-validation Error: {re_ex}" |
|
|
corrected_results = "" |
|
|
steps_log.append(f"Re-validation error: {re_ex}") |
|
|
else: |
|
|
suggestions = generate_manual_suggestions(filtered_results) |
|
|
corrected_rdf = generate_manual_correction_hints(filtered_results, rdf_content) |
|
|
corrected_status = "β" |
|
|
corrected_results = "" |
|
|
steps_log.append("AI disabled; produced manual suggestions and hints") |
|
|
|
|
|
steps_text = "\n".join(steps_log) if show_steps else "" |
|
|
return status, results_text, suggestions, steps_text, corrected_rdf, corrected_status, corrected_results |
|
|
|
|
|
def get_rdf_examples(example_type: str = "valid") -> str: |
|
|
""" |
|
|
Retrieve example RDF/XML snippets for testing and learning. |
|
|
|
|
|
This tool provides sample RDF/XML content that can be used to test |
|
|
the validation system or learn proper RDF structure. Examples include |
|
|
valid BibFrame Work records, invalid records for testing corrections, |
|
|
and BibFrame Instance records. |
|
|
|
|
|
Args: |
|
|
example_type (str): Type of example to retrieve. Options: |
|
|
- 'valid': A complete, valid BibFrame Work record |
|
|
- 'invalid': An incomplete BibFrame Work with validation errors |
|
|
- 'bibframe': A BibFrame Instance record example |
|
|
|
|
|
Returns: |
|
|
str: Complete RDF/XML example content ready for validation testing |
|
|
""" |
|
|
examples = { |
|
|
"valid": SAMPLE_VALID_RDF, |
|
|
"invalid": SAMPLE_INVALID_RDF, |
|
|
"bibframe": '''<?xml version="1.0" encoding="UTF-8"?> |
|
|
<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" |
|
|
xmlns:bf="http://id.loc.gov/ontologies/bibframe/" |
|
|
xmlns:rdfs="http://www.w3.org/2000/01/rdf-schema#"> |
|
|
|
|
|
<bf:Instance rdf:about="http://example.org/instance/1"> |
|
|
<rdf:type rdf:resource="http://id.loc.gov/ontologies/bibframe/Print"/> |
|
|
<bf:instanceOf rdf:resource="http://example.org/work/1"/> |
|
|
<bf:title> |
|
|
<bf:Title> |
|
|
<bf:mainTitle>Example Book Title</bf:mainTitle> |
|
|
</bf:Title> |
|
|
</bf:title> |
|
|
<bf:provisionActivity> |
|
|
<bf:Publication> |
|
|
<bf:date>2024</bf:date> |
|
|
<bf:place> |
|
|
<bf:Place> |
|
|
<rdfs:label>New York</rdfs:label> |
|
|
</bf:Place> |
|
|
</bf:place> |
|
|
</bf:Publication> |
|
|
</bf:provisionActivity> |
|
|
</bf:Instance> |
|
|
|
|
|
</rdf:RDF>''' |
|
|
} |
|
|
|
|
|
return examples.get(example_type, examples["valid"]) |
|
|
|
|
|
|
|
|
def create_interface(): |
|
|
"""Create the main Gradio interface""" |
|
|
|
|
|
|
|
|
current_api_key = os.getenv('HF_API_KEY', '') |
|
|
api_status = "π AI features enabled" if (OPENAI_AVAILABLE and current_api_key) else "β οΈ AI features disabled (set HF_API_KEY)" |
|
|
|
|
|
with gr.Blocks( |
|
|
title="RDF Validation Server with AI", |
|
|
theme=gr.themes.Soft(), |
|
|
css=""" |
|
|
.status-box { |
|
|
font-weight: bold; |
|
|
padding: 10px; |
|
|
border-radius: 5px; |
|
|
} |
|
|
.header-text { |
|
|
text-align: center; |
|
|
padding: 20px; |
|
|
} |
|
|
""" |
|
|
) as demo: |
|
|
|
|
|
|
|
|
debug_info = f""" |
|
|
Debug Info: |
|
|
- OPENAI_AVAILABLE: {OPENAI_AVAILABLE} |
|
|
- HF_INFERENCE_AVAILABLE: {HF_INFERENCE_AVAILABLE} |
|
|
- HF_API_KEY set: {'Yes' if current_api_key else 'No'} |
|
|
- HF_API_KEY length: {len(current_api_key) if current_api_key else 0} |
|
|
- HF_ENDPOINT_URL: {HF_ENDPOINT_URL} |
|
|
- HF_MODEL: {HF_MODEL} |
|
|
""" |
|
|
|
|
|
gr.HTML(f""" |
|
|
<div class="header-text"> |
|
|
<h1>π RDF Validation Server with AI</h1> |
|
|
<p>Validate RDF/XML against SHACL schemas with AI-powered suggestions and corrections</p> |
|
|
<p><strong>Status:</strong> {api_status}</p> |
|
|
<details><summary>Debug Info</summary><pre>{debug_info}</pre></details> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### π Input") |
|
|
|
|
|
rdf_input = gr.Textbox( |
|
|
label="RDF/XML Content", |
|
|
placeholder="Paste your RDF/XML content here...", |
|
|
lines=15, |
|
|
show_copy_button=True |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Accordion("Advanced options", open=False): |
|
|
with gr.Row(): |
|
|
template_dropdown = gr.Dropdown( |
|
|
label="Validation Template", |
|
|
choices=["monograph", "custom"], |
|
|
value="monograph", |
|
|
info="Select the SHACL template to validate against" |
|
|
) |
|
|
use_ai_checkbox = gr.Checkbox( |
|
|
label="Use AI Features", |
|
|
value=True, |
|
|
info="Enable AI-powered suggestions and corrections" |
|
|
) |
|
|
include_warnings_checkbox = gr.Checkbox( |
|
|
label="Include Warnings", |
|
|
value=False, |
|
|
info="Include warnings in AI corrections (violations only by default)" |
|
|
) |
|
|
with gr.Row(): |
|
|
iterate_checkbox = gr.Checkbox( |
|
|
label="Iterate until valid", |
|
|
value=True, |
|
|
info="Try multiple correction attempts until validation passes or attempts run out" |
|
|
) |
|
|
max_attempts_slider = gr.Slider( |
|
|
label="Max attempts", |
|
|
minimum=1, |
|
|
maximum=3, |
|
|
value=2, |
|
|
step=1, |
|
|
info="Maximum number of correction attempts (2 recommended for speed)" |
|
|
) |
|
|
show_steps_checkbox = gr.Checkbox( |
|
|
label="Show steps", |
|
|
value=False, |
|
|
info="Display step-by-step process (turn on when you want transparency)" |
|
|
) |
|
|
|
|
|
validate_btn = gr.Button("π Validate RDF", variant="primary", size="lg") |
|
|
|
|
|
|
|
|
gr.Markdown("### π Examples & Tools") |
|
|
|
|
|
with gr.Row(): |
|
|
example1_btn = gr.Button("β
Valid RDF Example", variant="secondary") |
|
|
example2_btn = gr.Button("β Invalid RDF Example", variant="secondary") |
|
|
clear_btn = gr.Button("ποΈ Clear All", variant="stop") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("### π Results") |
|
|
|
|
|
status_output = gr.Textbox( |
|
|
label="Validation Status", |
|
|
interactive=False, |
|
|
lines=1, |
|
|
elem_classes=["status-box"] |
|
|
) |
|
|
|
|
|
results_output = gr.Textbox( |
|
|
label="Detailed Validation Results", |
|
|
interactive=False, |
|
|
lines=8, |
|
|
show_copy_button=True |
|
|
) |
|
|
|
|
|
suggestions_output = gr.Textbox( |
|
|
label="π‘ Fix Suggestions", |
|
|
interactive=False, |
|
|
lines=8, |
|
|
show_copy_button=True |
|
|
) |
|
|
|
|
|
steps_output = gr.Textbox( |
|
|
label="π§ Correction Steps", |
|
|
interactive=False, |
|
|
lines=10, |
|
|
show_copy_button=True, |
|
|
placeholder="Step-by-step log of how the system derived the corrected XML" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("### π οΈ AI-Generated Corrections") |
|
|
|
|
|
corrected_output = gr.Textbox( |
|
|
label="Corrected RDF/XML", |
|
|
interactive=False, |
|
|
lines=15, |
|
|
show_copy_button=True, |
|
|
placeholder="Corrected RDF will appear here after validation..." |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
corrected_status_output = gr.Textbox( |
|
|
label="Re-validation Status (Corrected RDF)", |
|
|
interactive=False, |
|
|
lines=1, |
|
|
elem_classes=["status-box"] |
|
|
) |
|
|
corrected_results_output = gr.Textbox( |
|
|
label="Re-validation Details", |
|
|
interactive=False, |
|
|
lines=6, |
|
|
show_copy_button=True |
|
|
) |
|
|
|
|
|
|
|
|
validate_btn.click( |
|
|
fn=validate_rdf_interface, |
|
|
inputs=[rdf_input, template_dropdown, use_ai_checkbox, include_warnings_checkbox, iterate_checkbox, max_attempts_slider, show_steps_checkbox], |
|
|
outputs=[status_output, results_output, suggestions_output, steps_output, corrected_output, corrected_status_output, corrected_results_output] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
example1_btn.click( |
|
|
lambda: get_rdf_examples("valid"), |
|
|
outputs=[rdf_input] |
|
|
) |
|
|
|
|
|
example2_btn.click( |
|
|
lambda: get_rdf_examples("invalid"), |
|
|
outputs=[rdf_input] |
|
|
) |
|
|
|
|
|
clear_btn.click( |
|
|
lambda: ("", "", "", "", "", "", "", ""), |
|
|
outputs=[rdf_input, status_output, results_output, suggestions_output, steps_output, corrected_output, corrected_status_output, corrected_results_output] |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown(""" |
|
|
--- |
|
|
### οΏ½ **Documentation & Resources:** |
|
|
|
|
|
**[π MCP4BibFrame Documentation](https://huggingface.co/spaces/jimfhahn/mcp4bibframe-docs)** - Complete BibFrame ontology reference with examples |
|
|
|
|
|
This validator integrates with the **MCP4BibFrame Documentation API** to provide authoritative BibFrame ontology information during AI-powered corrections. |
|
|
|
|
|
### π **Quick Start:** |
|
|
|
|
|
1. **Paste your RDF/XML** in the input box above |
|
|
2. **Click "Validate RDF"** to check for errors |
|
|
3. **Review AI suggestions** for plain-language fixes (enhanced with BibFrame documentation) |
|
|
4. **Copy the corrected RDF** from the output |
|
|
|
|
|
--- |
|
|
### οΏ½π **Deployment Instructions for Hugging Face Spaces:** |
|
|
|
|
|
1. **Create a new Space** on [Hugging Face](https://huggingface.co/spaces) |
|
|
2. **Set up your Hugging Face Inference Endpoint** and get the endpoint URL |
|
|
3. **Set your tokens** in Space settings (use Secrets for security): |
|
|
- Go to Settings β Repository secrets |
|
|
- Add: `HF_API_KEY` = `your_huggingface_api_key_here` |
|
|
- Endpoint is now hardcoded to your specific Inference Endpoint |
|
|
4. **Upload these files** to your Space repository |
|
|
5. **Install requirements**: The Space will auto-install from `requirements.txt` |
|
|
|
|
|
### π§ **MCP Server Mode:** |
|
|
This app functions as both a web interface AND an MCP server for Claude Desktop and other MCP clients. |
|
|
|
|
|
**Available MCP Tools:** |
|
|
- `validate_rdf_tool`: Validate RDF/XML against SHACL shapes |
|
|
- `get_ai_suggestions`: Get AI-powered fix suggestions (with BibFrame docs) |
|
|
- `get_ai_correction`: Generate corrected RDF/XML (with BibFrame docs) |
|
|
- `get_rdf_examples`: Retrieve example RDF snippets |
|
|
- `validate_rdf_interface`: Complete validation with AI suggestions and corrections (primary tool) |
|
|
|
|
|
**MCP Configuration (Streamable HTTP):** |
|
|
Add this configuration to your MCP client (Claude Desktop, etc.): |
|
|
|
|
|
```json |
|
|
{ |
|
|
"mcpServers": { |
|
|
"rdf-validator": { |
|
|
"url": "https://jimfhahn-mcp4rdf.hf.space/gradio_api/mcp/" |
|
|
} |
|
|
} |
|
|
} |
|
|
``` |
|
|
|
|
|
**Alternative SSE Configuration:** |
|
|
```json |
|
|
{ |
|
|
"mcpServers": { |
|
|
"rdf-validator": { |
|
|
"url": "https://jimfhahn-mcp4rdf.hf.space/gradio_api/mcp/sse" |
|
|
} |
|
|
} |
|
|
} |
|
|
``` |
|
|
|
|
|
### π‘ **Features:** |
|
|
- β
Real-time RDF/XML validation against SHACL schemas |
|
|
- π€ AI-powered error suggestions and corrections (enhanced with BibFrame ontology docs) |
|
|
- π Built-in examples and templates |
|
|
- π Integrated with [MCP4BibFrame Documentation API](https://huggingface.co/spaces/jimfhahn/mcp4bibframe-docs) |
|
|
- π Copy results with one click |
|
|
|
|
|
**BibFrame Documentation Integration:** |
|
|
AI corrections now use authoritative BibFrame ontology information from the MCP4BibFrame Documentation API to ensure accuracy and compliance with official specifications. |
|
|
|
|
|
### π **Related Resources:** |
|
|
- [MCP4BibFrame Documentation](https://huggingface.co/spaces/jimfhahn/mcp4bibframe-docs) - BibFrame ontology reference |
|
|
- [BIG DCTAP Documentation](https://bf-interop.github.io/DCTap/) |
|
|
- [BIBFRAME Ontology](http://id.loc.gov/ontologies/bibframe.html) |
|
|
- [SHACL Specification](https://www.w3.org/TR/shacl/) |
|
|
|
|
|
**Note:** AI features require a valid Hugging Face API key (HF_API_KEY) set as a Secret. Manual suggestions are provided as fallback. |
|
|
""") |
|
|
|
|
|
return demo |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo = create_interface() |
|
|
|
|
|
|
|
|
port = int(os.getenv('PORT', 7860)) |
|
|
|
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=port, |
|
|
share=False, |
|
|
show_error=True, |
|
|
show_api=True, |
|
|
allowed_paths=["."], |
|
|
mcp_server=True |
|
|
) |
|
|
|