e2hln's picture
Upload 44 files
6165ba9 verified
raw
history blame
31.3 kB
import json
import uuid
import datetime
import logging
import re
from typing import Dict, Optional, Any, List, Union
from urllib.parse import urlparse
from packageurl import PackageURL
from huggingface_hub import HfApi, ModelCard
from huggingface_hub.repocard_data import EvalResult
from .extractor import EnhancedExtractor
from .model_file_extractors import ModelFileExtractor, default_extractors
from .scoring import calculate_completeness_score
from .registry import get_field_registry_manager
from .schemas import AIBOMResponse, EnhancementReport
from ..utils.validation import validate_aibom, get_validation_summary
from ..utils.license_utils import normalize_license_id, get_license_url, is_valid_spdx_license_id
from ..config import AIBOM_GEN_VERSION, AIBOM_GEN_NAME
logger = logging.getLogger(__name__)
class AIBOMService:
"""
Service layer for AI SBOM generation.
Orchestrates metadata extraction, AI SBOM structure creation, and scoring.
"""
def __init__(
self,
hf_token: Optional[str] = None,
inference_model_url: Optional[str] = None,
use_inference: bool = True,
use_best_practices: bool = True,
model_file_extractors: Optional[List[ModelFileExtractor]] = None,
):
self.hf_api = HfApi(token=hf_token)
self.inference_model_url = inference_model_url
self.use_inference = use_inference
self.use_best_practices = use_best_practices
self.enhancement_report = None
self.extraction_results = {}
self.model_file_extractors = (
model_file_extractors if model_file_extractors is not None
else default_extractors()
)
# Initialize registry manager
try:
self.registry_manager = get_field_registry_manager()
logger.info("✅ Registry manager initialized in service")
except Exception as e:
logger.warning(f"⚠️ Could not initialize registry manager: {e}")
self.registry_manager = None
def get_extraction_results(self):
"""Return the enhanced extraction results from the last extraction"""
return self.extraction_results
def get_enhancement_report(self):
"""Return the enhancement report from the last generation"""
return self.enhancement_report
def generate_aibom(
self,
model_id: str,
include_inference: bool = False,
use_best_practices: Optional[bool] = None,
enable_summarization: bool = False,
spec_version: str = "1.6",
metadata_overrides: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""
Generate an AIBOM for the specified Hugging Face model.
"""
try:
model_id = self._normalise_model_id(model_id)
use_inference = include_inference if include_inference is not None else self.use_inference
use_best_practices = use_best_practices if use_best_practices is not None else self.use_best_practices
logger.info(f"Generating AIBOM for {model_id}")
# Fetch generic info
model_info = self._fetch_model_info(model_id)
model_card = self._fetch_model_card(model_id)
# 1. Extract Metadata
original_metadata = self._extract_metadata(model_id, model_info, model_card, enable_summarization)
# 2. Create Initial AIBOM
original_aibom = self._create_aibom_structure(model_id, original_metadata, spec_version)
# 3. Initial Score
original_score = calculate_completeness_score(
original_aibom,
validate=True,
extraction_results=self.extraction_results # Using results from _extract_metadata
)
# 4. AI Enhancement (Placeholder for now as in original)
final_metadata = original_metadata.copy()
ai_enhanced = False
ai_model_name = None
if use_inference and self.inference_model_url:
# Placeholder for AI enhancement logic
pass
# 5. Create Final AIBOM
aibom = self._create_aibom_structure(model_id, final_metadata, spec_version=spec_version, metadata_overrides=metadata_overrides)
# Validate Schema
is_valid, validation_errors = validate_aibom(aibom)
if not is_valid:
logger.warning(f"AIBOM schema validation failed with {len(validation_errors)} errors")
# 6. Final Score
final_score = calculate_completeness_score(
aibom,
validate=True,
extraction_results=self.extraction_results
)
# 7. Store Report
self.enhancement_report = {
"ai_enhanced": ai_enhanced,
"ai_model": ai_model_name,
"original_score": original_score,
"final_score": final_score,
"improvement": round(final_score["total_score"] - original_score["total_score"], 2) if ai_enhanced else 0,
"schema_validation": {
"valid": is_valid,
"error_count": len(validation_errors),
"errors": validation_errors[:10] if not is_valid else []
}
}
return aibom
except Exception as e:
logger.error(f"Error generating AIBOM: {e}", exc_info=True)
return self._create_minimal_aibom(model_id, spec_version)
def _extract_metadata(self, model_id: str, model_info: Dict[str, Any], model_card: Optional[ModelCard], enable_summarization: bool = False) -> Dict[str, Any]:
"""Wrapper around EnhancedExtractor"""
extractor = EnhancedExtractor(self.hf_api, model_file_extractors=self.model_file_extractors)
# Ideally we reuse the registry manager
if self.registry_manager:
extractor.registry_manager = self.registry_manager
extractor.registry_fields = self.registry_manager.get_field_definitions()
metadata = extractor.extract_metadata(model_id, model_info, model_card, enable_summarization=enable_summarization)
self.extraction_results = extractor.extraction_results
return metadata
def _generate_purl(self, model_id: str, version: str, purl_type: str = "huggingface") -> str:
"""Generate PURL using packageurl-python library
Args:
model_id: Model identifier (e.g., "owner/model" or "model")
version: Version string
purl_type: PURL type (default: "huggingface", also supports "generic")
Returns:
PURL string in format pkg:type/namespace/name@version
"""
parts = model_id.split("/", 1)
namespace = parts[0] if len(parts) == 2 else None
name = parts[1] if len(parts) == 2 else parts[0]
purl = PackageURL(type=purl_type, namespace=namespace, name=name, version=version)
return purl.to_string()
def _get_tool_purl(self) -> str:
"""Get PURL for OWASP AIBOM Generator tool"""
purl = PackageURL(type="generic", namespace="owasp-genai", name=AIBOM_GEN_NAME, version=AIBOM_GEN_VERSION)
return purl.to_string()
def _get_tool_metadata(self) -> Dict[str, Any]:
"""Generate the standardized tool metadata for the AIBOM Generator"""
return {
"components": [{
"bom-ref": self._get_tool_purl(),
"type": "application",
"name": AIBOM_GEN_NAME,
"version": AIBOM_GEN_VERSION,
"manufacturer": {"name": "OWASP GenAI Security Project"}
}]
}
def _create_minimal_aibom(self, model_id: str, spec_version: str = "1.6") -> Dict[str, Any]:
"""Create a minimal valid AIBOM structure in case of errors"""
hf_purl = self._generate_purl(model_id, "1.0")
metadata_purl = self._generate_purl(model_id, "1.0", purl_type="generic")
return {
"bomFormat": "CycloneDX",
"specVersion": spec_version,
"serialNumber": f"urn:uuid:{str(uuid.uuid4())}",
"version": 1,
"metadata": {
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds'),
"tools": self._get_tool_metadata(),
"component": {
"bom-ref": metadata_purl,
"type": "application",
"name": model_id.split("/")[-1],
"version": "1.0"
}
},
"components": [{
"bom-ref": hf_purl,
"type": "machine-learning-model",
"name": model_id.split("/")[-1],
"version": "1.0",
"purl": hf_purl
}]
}
def _fetch_with_backoff(self, fetch_func, *args, max_retries=3, initial_backoff=1.0, **kwargs):
import time
for attempt in range(max_retries):
try:
return fetch_func(*args, **kwargs)
except Exception as e:
# e.g., huggingface_hub.utils.HfHubHTTPError
error_msg = str(e)
if "401" in error_msg or "404" in error_msg: # Auth or not found don't retry
raise e
if attempt == max_retries - 1:
logger.warning(f"Final attempt failed for API call: {e}")
raise e
sleep_time = initial_backoff * (2 ** attempt)
logger.warning(f"API call failed: {e}. Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
def _fetch_model_info(self, model_id: str) -> Dict[str, Any]:
try:
return self._fetch_with_backoff(self.hf_api.model_info, model_id)
except Exception as e:
logger.warning(f"Error fetching model info for {model_id}: {e}")
return {}
def _fetch_model_card(self, model_id: str) -> Optional[ModelCard]:
try:
return self._fetch_with_backoff(ModelCard.load, model_id)
except Exception as e:
logger.warning(f"Error fetching model card for {model_id}: {e}")
return None
@staticmethod
def _normalise_model_id(raw_id: str) -> str:
if raw_id.startswith(("http://", "https://")):
path = urlparse(raw_id).path.lstrip("/")
parts = path.split("/")
if len(parts) >= 2:
return "/".join(parts[:2])
return path
return raw_id
def _create_aibom_structure(self, model_id: str, metadata: Dict[str, Any], spec_version: str = "1.6",
metadata_overrides: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
full_commit = metadata.get("commit")
version = full_commit[:8] if full_commit else "1.0"
aibom = {
"bomFormat": "CycloneDX",
"specVersion": spec_version,
"serialNumber": f"urn:uuid:{str(uuid.uuid4())}",
"version": 1,
"metadata": self._create_metadata_section(model_id, metadata, overrides=metadata_overrides),
"components": [self._create_component_section(model_id, metadata)],
"dependencies": [
{
"ref": self._generate_purl(model_id, version, purl_type="generic"),
"dependsOn": [self._generate_purl(model_id, version)]
}
]
}
return aibom
def _create_metadata_section(self, model_id: str, metadata: Dict[str, Any], overrides: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
timestamp = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds')
# Defaults
default_timestamp = datetime.datetime.now().strftime("job-%Y-%m-%d-%H:%M:%S")
default_version = str(int(datetime.datetime.now().timestamp()))
default_mfr = "OWASP AIBOM Generator"
# Apply oveerides or defaults
overrides = overrides or {}
comp_name = overrides.get("name") or default_timestamp
comp_version = overrides.get("version") or default_version
comp_mfr = overrides.get("manufacturer") or default_mfr
# Normalize for PURL (replace spaces with - or similar if needed, but minimal change is best)
purl_ns = comp_mfr.replace(" ", "-") # simplistic sanitation
purl_name = comp_name.replace(" ", "-")
purl = PackageURL(type="generic", namespace=purl_ns, name=purl_name, version=comp_version).to_string()
tools = {"tools": self._get_tool_metadata()}
authors = []
if "author" in metadata and metadata["author"]:
authors.append({"name": metadata["author"]})
component = {
"bom-ref": purl,
"type": "application",
"name": comp_name,
"description": f"Generating SBOM for {model_id}",
"version": comp_version,
"purl": purl,
"manufacturer": {"name": comp_mfr},
"supplier": {"name": comp_mfr}
}
if authors:
component["authors"] = authors
return {
"timestamp": timestamp,
**tools,
"component": component
}
def _create_component_section(self, model_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
parts = model_id.split("/")
group = parts[0] if len(parts) > 1 else ""
name = parts[1] if len(parts) > 1 else parts[0]
full_commit = metadata.get("commit")
version = full_commit[:8] if full_commit else "1.0"
purl = self._generate_purl(model_id, version)
component = {
"bom-ref": purl,
"type": "machine-learning-model",
"group": group,
"name": name,
"version": version,
"purl": purl,
"description": metadata.get("description", f"AI model {model_id}")
}
# 1. Licenses
licenses = self._process_licenses(metadata)
if licenses:
component["licenses"] = licenses
# 2. Authors, Manufacturer, Supplier
# Note: logic inferred from group and metadata
authors, manufacturer, supplier = self._process_authors_and_suppliers(metadata, group)
if authors:
component["authors"] = authors
if manufacturer:
component["manufacturer"] = manufacturer
if supplier:
component["supplier"] = supplier
# 3. Technical Properties
tech_props = self._process_technical_properties(metadata)
if tech_props:
component["properties"] = tech_props
# 4. External References
external_refs = self._process_external_references(model_id, metadata)
if external_refs:
component["externalReferences"] = external_refs
# 5. Model Card
component["modelCard"] = self._create_model_card_section(metadata)
# Defined order for better readability: bom-ref, type, group, name, version, purl, description, modelCard, manufacturer, supplier, authors
# We also need to preserve: licenses, properties, externalReferences (placing them logically)
ordered_keys = [
"bom-ref", "type", "group", "name", "version", "purl",
"description", "licenses", "modelCard",
"manufacturer", "supplier", "authors",
"properties", "externalReferences"
]
ordered_component = {}
for key in ordered_keys:
if key in component:
ordered_component[key] = component[key]
# Ensure we didn't miss anything (though we shouldn't have extra keys usually)
for k, v in component.items():
if k not in ordered_component:
ordered_component[k] = v
return ordered_component
def _process_licenses(self, metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Process and normalize license information."""
raw_license = metadata.get("licenses") or metadata.get("license")
# 1. No license provided -> Return empty list (no license in SBOM)
if not raw_license:
return []
# Handle list input
if isinstance(raw_license, list):
if len(raw_license) > 0:
raw_license = raw_license[0]
else:
return []
if not isinstance(raw_license, str) or not raw_license.strip():
return []
norm_license = normalize_license_id(raw_license)
# Skip NOASSERTION or 'other' explicitly
if norm_license == "NOASSERTION" or (norm_license and norm_license.lower() == "other"):
return []
if norm_license:
# 1. Strict SPDX validation
if not is_valid_spdx_license_id(norm_license):
lic_data = {"name": norm_license}
# Try to find a known URL (e.g. for Nvidia license)
known_url = get_license_url(norm_license, fallback=False)
if known_url:
lic_data["url"] = known_url
return [{"license": lic_data}]
# 2. Valid SPDX ID
return [{"license": {"id": norm_license}}]
# Fallback if normalization fails, use name unless generic
if raw_license.lower() not in ["other", "unknown", "noassertion"]:
return [{"license": {"name": raw_license}}]
return []
def _process_authors_and_suppliers(self, metadata: Dict[str, Any], group: str) -> tuple:
"""
Process authors, manufacturer, and supplier information.
Returns: (authors, manufacturer, supplier)
"""
authors = []
raw_author = metadata.get("author", group)
if raw_author and raw_author != "unknown":
if isinstance(raw_author, str):
authors.append({"name": raw_author})
elif isinstance(raw_author, list):
for a in raw_author:
authors.append({"name": a})
manufacturer = None
supplier = None
# Manufacturer and Supplier
# Use the group (org name) as the manufacturer and supplier if available
# If 'suppliedBy' extracted from README, overwrite supplier
supplier_entity = None
if group:
supplier_entity = {
"name": group,
"url": [f"https://huggingface.co/{group}"]
}
if "suppliedBy" in metadata and metadata["suppliedBy"]:
# If we have explicit suppliedBy, use it for supplier
supplier_entity = {"name": metadata["suppliedBy"]}
if supplier_entity:
supplier = supplier_entity
# Manufacturer often implies the creator/fine-tuner.
# If we have a group, we assume they manufactured it too unless specified.
if group:
manufacturer = {
"name": group,
"url": [f"https://huggingface.co/{group}"]
}
return authors, manufacturer, supplier
def _process_technical_properties(self, metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
tech_props = []
for field in ["model_type", "architectures", "library_name"]:
if field in metadata:
val = metadata[field]
if isinstance(val, list):
val = ", ".join(val)
tech_props.append({"name": field, "value": str(val)})
return tech_props
def _process_external_references(self, model_id: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Process external references including Hugging Face URLs and papers."""
# Start with generic website reference
generic_ref = {"type": "website", "url": f"https://huggingface.co/{model_id}"}
external_refs = [generic_ref]
if "external_references" in metadata and isinstance(metadata["external_references"], list):
for ref in metadata["external_references"]:
if isinstance(ref, dict) and "url" in ref:
rtype = ref.get("type", "website")
# Check if URL already exists in our list
existing_idx = next((i for i, r in enumerate(external_refs) if r["url"] == ref["url"]), -1)
new_ref = {"type": rtype, "url": ref["url"], "comment": ref.get("comment")}
if existing_idx != -1:
# If existing is generic (no comment) and new one has comment, replace it
if not external_refs[existing_idx].get("comment") and new_ref.get("comment"):
external_refs[existing_idx] = new_ref
else:
external_refs.append(new_ref)
# Paper (ArXiv or other documentation)
if "paper" in metadata and metadata["paper"]:
papers = metadata["paper"]
if isinstance(papers, str):
papers = [papers]
for p in papers:
# Check for duplicates
if not any(r["url"] == p for r in external_refs):
# Try to infer if it's arxiv for comment
comment = "Research Paper"
if "arxiv.org" in p:
comment = "ArXiv Paper"
external_refs.append({
"type": "documentation",
"url": p,
"comment": comment
})
return external_refs
def _create_model_card_section(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
section = {}
# 1. Model Parameters
params = {}
# primaryPurpose -> task
if "primaryPurpose" in metadata:
params["task"] = metadata["primaryPurpose"]
elif "pipeline_tag" in metadata:
params["task"] = metadata["pipeline_tag"]
# typeOfModel -> modelArchitecture
if "typeOfModel" in metadata:
params["modelArchitecture"] = metadata["typeOfModel"]
else:
params["modelArchitecture"] = f"{metadata.get('name', 'Unknown')}Model"
# Datasets
if "datasets" in metadata:
ds_val = metadata["datasets"]
datasets = []
if isinstance(ds_val, list):
for d in ds_val:
if isinstance(d, str):
# CycloneDX 1.7 compliant componentData
datasets.append({
"type": "dataset",
"name": d,
"contents": {
"url": f"https://huggingface.co/datasets/{d}"
}
})
elif isinstance(d, dict) and "name" in d:
datasets.append({"type": "dataset", "name": d.get("name"), "url": d.get("url")})
elif isinstance(ds_val, str):
datasets.append({
"type": "dataset",
"name": ds_val,
"contents": {
"url": f"https://huggingface.co/datasets/{ds_val}"
}
})
if datasets:
params["datasets"] = datasets
# Inputs / Outputs (Inferred from task)
task = params.get("task")
if task:
inputs, outputs = self._infer_io_formats(task)
if inputs:
params["inputs"] = [{"format": i} for i in inputs]
if outputs:
params["outputs"] = [{"format": o} for o in outputs]
if params:
section["modelParameters"] = params
# 2. Quantitative Analysis
if "eval_results" in metadata:
metrics = []
raw_results = metadata["eval_results"]
if isinstance(raw_results, list):
for res in raw_results:
# Handle object or dict
if hasattr(res, "metric_type") and hasattr(res, "metric_value"):
metrics.append({"type": str(res.metric_type), "value": str(res.metric_value)})
elif isinstance(res, dict) and "metric_type" in res and "metric_value" in res:
metrics.append({"type": str(res["metric_type"]), "value": str(res["metric_value"])})
if metrics:
section["quantitativeAnalysis"] = {"performanceMetrics": metrics}
# 3. Considerations
considerations = {}
# intendedUse -> useCases
if "intendedUse" in metadata:
considerations["useCases"] = [metadata["intendedUse"]]
# technicalLimitations
if "technicalLimitations" in metadata:
considerations["technicalLimitations"] = [metadata["technicalLimitations"]]
# ethicalConsiderations
if "ethicalConsiderations" in metadata:
considerations["ethicalConsiderations"] = [{"name": "Ethical Considerations", "description": metadata["ethicalConsiderations"]}]
if considerations:
section["considerations"] = considerations
# 4. Properties (GGUF & Taxonomy + Leftovers)
props = []
taxonomy_modelcard_mapping = {
"hyperparameter": "hyperparameter",
"vocab_size": "vocabSize",
"tokenizer_class": "tokenizerClass",
"context_length": "contextLength",
"embedding_length": "embeddingLength",
"block_count": "blockCount",
"attention_head_count": "attentionHeadCount",
"attention_head_count_kv": "attentionHeadCountKV",
"feed_forward_length": "feedForwardLength",
"rope_dimension_count": "ropeDimensionCount",
"quantization_version": "quantizationVersion",
"quantization_file_type": "quantizationFileType",
"modelExplainability": "modelCardExplainability"
}
taxonomy_mapped_keys = list(taxonomy_modelcard_mapping.keys())
for p_key, p_name in taxonomy_modelcard_mapping.items():
if p_key in metadata:
val = metadata[p_key]
if p_key == "hyperparameter" and isinstance(val, dict):
props.append({"name": f"genai:aibom:modelcard:{p_name}", "value": json.dumps(val)})
elif val is not None:
props.append({"name": f"genai:aibom:modelcard:{p_name}", "value": str(val)})
# Quantization dict handling
if "quantization" in metadata and isinstance(metadata["quantization"], dict):
q_dict = metadata["quantization"]
if "version" in q_dict:
props.append({"name": "genai:aibom:modelcard:quantizationVersion", "value": str(q_dict["version"])})
if "file_type" in q_dict:
props.append({"name": "genai:aibom:modelcard:quantizationFileType", "value": str(q_dict["file_type"])})
taxonomy_mapped_keys.append("quantization")
# Basic Fields we've already mapped to structured homes
mapped_fields = [
"primaryPurpose", "typeOfModel", "suppliedBy", "intendedUse",
"technicalLimitations", "ethicalConsiderations", "datasets", "eval_results",
"pipeline_tag", "name", "author", "license", "description",
"commit", "bomFormat", "specVersion", "version", "licenses",
"external_references", "tags", "library_name", "paper", "downloadLocation",
"gguf_filename", "gguf_license", "model_type", "architectures"
] + taxonomy_mapped_keys
for k, v in metadata.items():
if k not in mapped_fields and v is not None:
# Basic types only for properties
if isinstance(v, (str, int, float, bool)):
props.append({"name": k, "value": str(v)})
elif isinstance(v, list) and all(isinstance(x, (str, int, float, bool)) for x in v):
props.append({"name": k, "value": ", ".join(map(str, v))})
if props:
section["properties"] = props
return section
def _infer_io_formats(self, task: str) -> tuple:
"""
Infer input and output formats based on the pipeline task.
Returns (inputs: list, outputs: list)
"""
task = task.lower().strip()
# Text to Text
if task in ["text-generation", "text2text-generation", "summarization", "translation",
"conversational", "question-answering", "text-classification", "token-classification"]:
return (["string"], ["string"])
# Image to Text/Label
if task in ["image-classification", "object-detection", "image-segmentation"]:
return (["image"], ["string", "json"])
# Text to Image
if task in ["text-to-image"]:
return (["string"], ["image"])
# Audio
if task in ["automatic-speech-recognition", "audio-classification"]:
return (["audio"], ["string"])
if task in ["text-to-speech"]:
return (["string"], ["audio"])
# Multimodal
if task in ["visual-question-answering"]:
return (["image", "string"], ["string"])
# Tabular
if task in ["tabular-classification", "tabular-regression"]:
return (["csv", "json"], ["string", "number"])
return ([], [])