|
|
import json |
|
|
import uuid |
|
|
import datetime |
|
|
import json |
|
|
from typing import Dict, Optional, Any, List |
|
|
|
|
|
from huggingface_hub import HfApi, ModelCard |
|
|
from huggingface_hub.repocard_data import EvalResult |
|
|
from urllib.parse import urlparse |
|
|
from .utils import calculate_completeness_score |
|
|
|
|
|
|
|
|
try: |
|
|
from .enhanced_extractor import EnhancedExtractor |
|
|
from .field_registry_manager import get_field_registry_manager |
|
|
ENHANCED_EXTRACTION_AVAILABLE = True |
|
|
print("β
Registry-aware enhanced extraction module loaded successfully") |
|
|
except ImportError: |
|
|
try: |
|
|
from enhanced_extractor import EnhancedExtractor |
|
|
from field_registry_manager import get_field_registry_manager |
|
|
ENHANCED_EXTRACTION_AVAILABLE = True |
|
|
print("β
Registry-aware enhanced extraction module loaded successfully (direct import)") |
|
|
except ImportError: |
|
|
ENHANCED_EXTRACTION_AVAILABLE = False |
|
|
print("β οΈ Registry-aware enhanced extraction not available, using basic extraction") |
|
|
|
|
|
|
|
|
class AIBOMGenerator: |
|
|
def __init__( |
|
|
self, |
|
|
hf_token: Optional[str] = None, |
|
|
inference_model_url: Optional[str] = None, |
|
|
use_inference: bool = True, |
|
|
cache_dir: Optional[str] = None, |
|
|
use_best_practices: bool = True, |
|
|
): |
|
|
self.hf_api = HfApi(token=hf_token) |
|
|
self.inference_model_url = inference_model_url |
|
|
self.use_inference = use_inference |
|
|
self.cache_dir = cache_dir |
|
|
self.enhancement_report = None |
|
|
self.use_best_practices = use_best_practices |
|
|
self._setup_enhanced_logging() |
|
|
|
|
|
self.extraction_results = {} |
|
|
|
|
|
|
|
|
self.registry_manager = None |
|
|
if ENHANCED_EXTRACTION_AVAILABLE: |
|
|
try: |
|
|
self.registry_manager = get_field_registry_manager() |
|
|
print("β
Registry manager initialized for generator") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Could not initialize registry manager: {e}") |
|
|
self.registry_manager = None |
|
|
|
|
|
def get_extraction_results(self): |
|
|
"""Return the enhanced extraction results from the last extraction""" |
|
|
return getattr(self, 'extraction_results', {}) |
|
|
|
|
|
def _setup_enhanced_logging(self): |
|
|
"""Setup enhanced logging for extraction tracking""" |
|
|
import logging |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
|
force=True |
|
|
) |
|
|
|
|
|
|
|
|
logger = logging.getLogger('enhanced_extractor') |
|
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
print("π§ Enhanced logging configured for AI SBOM generation") |
|
|
|
|
|
|
|
|
def generate_aibom( |
|
|
self, |
|
|
model_id: str, |
|
|
output_file: Optional[str] = None, |
|
|
include_inference: Optional[bool] = None, |
|
|
use_best_practices: Optional[bool] = None, |
|
|
) -> Dict[str, Any]: |
|
|
try: |
|
|
model_id = self._normalise_model_id(model_id) |
|
|
use_inference = include_inference if include_inference is not None else self.use_inference |
|
|
|
|
|
use_best_practices = use_best_practices if use_best_practices is not None else self.use_best_practices |
|
|
|
|
|
model_info = self._fetch_model_info(model_id) |
|
|
model_card = self._fetch_model_card(model_id) |
|
|
|
|
|
|
|
|
original_metadata = self._extract_structured_metadata(model_id, model_info, model_card) |
|
|
print(f"π ENHANCED EXTRACTION DEBUG: Returned {len(original_metadata)} fields:") |
|
|
for key, value in original_metadata.items(): |
|
|
print(f" {key}: {value}") |
|
|
print(f"π EXTRACTION RESULTS: {len(self.extraction_results) if hasattr(self, 'extraction_results') and self.extraction_results else 0} extraction results available") |
|
|
|
|
|
|
|
|
original_aibom = self._create_aibom_structure(model_id, original_metadata) |
|
|
|
|
|
print(f"π AI SBOM CREATION DEBUG: Checking what made it into AIBOM:") |
|
|
if 'components' in original_aibom and original_aibom['components']: |
|
|
component = original_aibom['components'][0] |
|
|
if 'properties' in component: |
|
|
print(f" Found {len(component['properties'])} properties in AIBOM:") |
|
|
for prop in component['properties']: |
|
|
print(f" {prop.get('name')}: {prop.get('value')}") |
|
|
else: |
|
|
print(" No properties found in component") |
|
|
else: |
|
|
print(" No components found in AI SBOM") |
|
|
print(f"π FIELD PRESERVATION VERIFICATION:") |
|
|
print(f" Enhanced extraction returned: {len(original_metadata)} fields") |
|
|
|
|
|
|
|
|
aibom_field_count = 0 |
|
|
|
|
|
|
|
|
if 'components' in original_aibom and original_aibom['components']: |
|
|
component = original_aibom['components'][0] |
|
|
if 'properties' in component: |
|
|
aibom_field_count += len(component['properties']) |
|
|
|
|
|
|
|
|
if 'modelCard' in component and 'properties' in component['modelCard']: |
|
|
aibom_field_count += len(component['modelCard']['properties']) |
|
|
|
|
|
|
|
|
if 'metadata' in original_aibom and 'properties' in original_aibom['metadata']: |
|
|
aibom_field_count += len(original_aibom['metadata']['properties']) |
|
|
|
|
|
print(f" Final AIBOM contains: {aibom_field_count} fields") |
|
|
print(f" Field preservation rate: {(aibom_field_count/len(original_metadata)*100):.1f}%") |
|
|
|
|
|
if aibom_field_count >= len(original_metadata) * 0.9: |
|
|
print("β
EXCELLENT: Field preservation successful!") |
|
|
elif aibom_field_count >= len(original_metadata) * 0.7: |
|
|
print("β οΈ GOOD: Most fields preserved, some optimization possible") |
|
|
else: |
|
|
print("β POOR: Significant field loss detected") |
|
|
|
|
|
|
|
|
|
|
|
original_score = calculate_completeness_score(original_aibom, validate=True, use_best_practices=use_best_practices, extraction_results=self.extraction_results) |
|
|
|
|
|
|
|
|
|
|
|
final_metadata = original_metadata.copy() if original_metadata else {} |
|
|
|
|
|
|
|
|
ai_enhanced = False |
|
|
ai_model_name = None |
|
|
|
|
|
if use_inference and self.inference_model_url: |
|
|
try: |
|
|
|
|
|
enhanced_metadata = self._extract_unstructured_metadata(model_card, model_id) |
|
|
|
|
|
|
|
|
if enhanced_metadata: |
|
|
ai_enhanced = True |
|
|
ai_model_name = "BERT-base-uncased" |
|
|
|
|
|
|
|
|
for key, value in enhanced_metadata.items(): |
|
|
if value is not None and (key not in final_metadata or not final_metadata[key]): |
|
|
final_metadata[key] = value |
|
|
except Exception as e: |
|
|
print(f"Error during AI enhancement: {e}") |
|
|
|
|
|
print("π¨ FALLBACK: Using _create_minimal_aibom due to error!") |
|
|
print(f"π¨ ERROR DETAILS: {str(e)}") |
|
|
|
|
|
aibom = self._create_aibom_structure(model_id, final_metadata) |
|
|
|
|
|
|
|
|
extraction_results = self.get_extraction_results() |
|
|
final_score = calculate_completeness_score( |
|
|
aibom, |
|
|
validate=True, |
|
|
use_best_practices=use_best_practices, |
|
|
extraction_results=extraction_results |
|
|
) |
|
|
|
|
|
|
|
|
if output_file: |
|
|
with open(output_file, 'w') as f: |
|
|
json.dump(aibom, f, indent=2) |
|
|
|
|
|
|
|
|
self.enhancement_report = { |
|
|
"ai_enhanced": ai_enhanced, |
|
|
"ai_model": ai_model_name if ai_enhanced else None, |
|
|
"original_score": original_score, |
|
|
"final_score": final_score, |
|
|
"improvement": round(final_score["total_score"] - original_score["total_score"], 2) if ai_enhanced else 0 |
|
|
} |
|
|
|
|
|
|
|
|
return aibom |
|
|
except Exception as e: |
|
|
print(f"Error generating AI SBOM: {e}") |
|
|
|
|
|
return self._create_minimal_aibom(model_id) |
|
|
|
|
|
def _create_minimal_aibom(self, model_id: str) -> Dict[str, Any]: |
|
|
"""Create a minimal valid AIBOM structure in case of errors""" |
|
|
return { |
|
|
"bomFormat": "CycloneDX", |
|
|
"specVersion": "1.6", |
|
|
"serialNumber": f"urn:uuid:{str(uuid.uuid4())}", |
|
|
"version": 1, |
|
|
"metadata": { |
|
|
"timestamp": datetime.datetime.utcnow().isoformat() + "Z", |
|
|
"tools": { |
|
|
"components": [{ |
|
|
"bom-ref": "pkg:generic/aetheris-ai/aetheris-aibom-generator@1.0.0", |
|
|
"type": "application", |
|
|
"name": "aetheris-aibom-generator", |
|
|
"version": "1.0.0", |
|
|
"manufacturer": { |
|
|
"name": "Aetheris AI" |
|
|
} |
|
|
}] |
|
|
}, |
|
|
"component": { |
|
|
"bom-ref": f"pkg:generic/{model_id.replace('/', '%2F')}@1.0", |
|
|
"type": "application", |
|
|
"name": model_id.split("/")[-1], |
|
|
"description": f"AI model {model_id}", |
|
|
"version": "1.0", |
|
|
"purl": f"pkg:generic/{model_id.replace('/', '%2F')}@1.0", |
|
|
"copyright": "NOASSERTION" |
|
|
} |
|
|
}, |
|
|
"components": [{ |
|
|
"bom-ref": f"pkg:huggingface/{model_id.replace('/', '/')}@1.0", |
|
|
"type": "machine-learning-model", |
|
|
"name": model_id.split("/")[-1], |
|
|
"version": "1.0", |
|
|
"purl": f"pkg:huggingface/{model_id.replace('/', '/')}@1.0" |
|
|
}], |
|
|
"dependencies": [{ |
|
|
"ref": f"pkg:generic/{model_id.replace('/', '%2F')}@1.0", |
|
|
"dependsOn": [f"pkg:huggingface/{model_id.replace('/', '/')}@1.0"] |
|
|
}] |
|
|
} |
|
|
|
|
|
def get_enhancement_report(self): |
|
|
"""Return the enhancement report from the last generate_aibom call""" |
|
|
return self.enhancement_report |
|
|
|
|
|
def _fetch_model_info(self, model_id: str) -> Dict[str, Any]: |
|
|
try: |
|
|
return self.hf_api.model_info(model_id) |
|
|
except Exception as e: |
|
|
print(f"Error fetching model info for {model_id}: {e}") |
|
|
return {} |
|
|
|
|
|
|
|
|
@staticmethod |
|
|
def _normalise_model_id(raw_id: str) -> str: |
|
|
""" |
|
|
Accept either 'owner/model' or a full URL like |
|
|
'https://huggingface.co/owner/model'. Return 'owner/model'. |
|
|
""" |
|
|
if raw_id.startswith(("http://", "https://")): |
|
|
path = urlparse(raw_id).path.lstrip("/") |
|
|
|
|
|
parts = path.split("/") |
|
|
if len(parts) >= 2: |
|
|
return "/".join(parts[:2]) |
|
|
return path |
|
|
return raw_id |
|
|
|
|
|
|
|
|
def _fetch_model_card(self, model_id: str) -> Optional[ModelCard]: |
|
|
try: |
|
|
return ModelCard.load(model_id) |
|
|
except Exception as e: |
|
|
print(f"Error fetching model card for {model_id}: {e}") |
|
|
return None |
|
|
|
|
|
def _create_aibom_structure( |
|
|
self, |
|
|
model_id: str, |
|
|
metadata: Dict[str, Any], |
|
|
) -> Dict[str, Any]: |
|
|
|
|
|
print(f"π CRASH_DEBUG: _create_aibom_structure called") |
|
|
print(f"π CRASH_DEBUG: model_id = {model_id}") |
|
|
print(f"π CRASH_DEBUG: metadata type = {type(metadata)}") |
|
|
print(f"π CRASH_DEBUG: metadata keys = {list(metadata.keys()) if isinstance(metadata, dict) else 'NOT A DICT'}") |
|
|
|
|
|
|
|
|
parts = model_id.split("/") |
|
|
group = parts[0] if len(parts) > 1 else "" |
|
|
name = parts[1] if len(parts) > 1 else parts[0] |
|
|
|
|
|
|
|
|
version = metadata.get("commit", "1.0") |
|
|
|
|
|
|
|
|
print(f"π CRASH_DEBUG: About to create metadata section") |
|
|
|
|
|
aibom = { |
|
|
"bomFormat": "CycloneDX", |
|
|
"specVersion": "1.6", |
|
|
"serialNumber": f"urn:uuid:{str(uuid.uuid4())}", |
|
|
"version": 1, |
|
|
"metadata": self._create_metadata_section(model_id, metadata), |
|
|
"components": [self._create_component_section(model_id, metadata)], |
|
|
"dependencies": [ |
|
|
{ |
|
|
"ref": f"pkg:generic/{model_id.replace('/', '%2F')}@{version}", |
|
|
"dependsOn": [f"pkg:huggingface/{model_id.replace('/', '/')}@{version}"] |
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
print(f"π CRASH_DEBUG: Successfully created basic AIBOM structure") |
|
|
|
|
|
|
|
|
aibom["externalReferences"] = [{ |
|
|
"type": "distribution", |
|
|
"url": f"https://huggingface.co/{model_id}" |
|
|
}] |
|
|
|
|
|
if metadata and "commit_url" in metadata: |
|
|
aibom["externalReferences"].append({ |
|
|
"type": "vcs", |
|
|
"url": metadata["commit_url"] |
|
|
} ) |
|
|
|
|
|
print(f"π CRASH_DEBUG: _create_aibom_structure completed successfully") |
|
|
return aibom |
|
|
|
|
|
def _extract_structured_metadata( |
|
|
self, |
|
|
model_id: str, |
|
|
model_info: Dict[str, Any], |
|
|
model_card: Optional[ModelCard], |
|
|
) -> Dict[str, Any]: |
|
|
|
|
|
|
|
|
if ENHANCED_EXTRACTION_AVAILABLE: |
|
|
try: |
|
|
print(f"π Using registry-aware enhanced extraction for: {model_id}") |
|
|
|
|
|
|
|
|
extractor = EnhancedExtractor(self.hf_api, self.registry_manager) |
|
|
|
|
|
|
|
|
metadata = extractor.extract_metadata(model_id, model_info, model_card) |
|
|
|
|
|
|
|
|
self.extraction_results = extractor.extraction_results |
|
|
|
|
|
|
|
|
if extractor.registry_fields: |
|
|
registry_field_count = len(extractor.registry_fields) |
|
|
extracted_count = len([k for k, v in metadata.items() if v is not None]) |
|
|
extraction_results_count = len(extractor.extraction_results) |
|
|
|
|
|
print(f"β
Registry-driven extraction completed:") |
|
|
print(f" π Registry fields available: {registry_field_count}") |
|
|
print(f" π Fields attempted: {extraction_results_count}") |
|
|
print(f" β
Fields extracted: {extracted_count}") |
|
|
|
|
|
|
|
|
if registry_field_count > 0: |
|
|
coverage = (extracted_count / registry_field_count) * 100 |
|
|
print(f" π Registry field coverage: {coverage:.1f}%") |
|
|
else: |
|
|
extracted_count = len([k for k, v in metadata.items() if v is not None]) |
|
|
print(f"β
Legacy extraction completed: {extracted_count} fields extracted") |
|
|
|
|
|
return metadata |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Registry-aware enhanced extraction failed: {e}") |
|
|
print("π Falling back to original extraction method") |
|
|
|
|
|
|
|
|
|
|
|
metadata = {} |
|
|
|
|
|
if model_info: |
|
|
try: |
|
|
author = getattr(model_info, "author", None) |
|
|
if not author or author.strip() == "": |
|
|
parts = model_id.split("/") |
|
|
author = parts[0] if len(parts) > 1 else "unknown" |
|
|
print(f"DEBUG: Fallback author used: {author}") |
|
|
else: |
|
|
print(f"DEBUG: Author from model_info: {author}") |
|
|
|
|
|
metadata.update({ |
|
|
"name": getattr(model_info, "modelId", model_id).split("/")[-1], |
|
|
"author": author, |
|
|
"tags": getattr(model_info, "tags", []), |
|
|
"pipeline_tag": getattr(model_info, "pipeline_tag", None), |
|
|
"downloads": getattr(model_info, "downloads", 0), |
|
|
"last_modified": getattr(model_info, "lastModified", None), |
|
|
"commit": getattr(model_info, "sha", None)[:7] if getattr(model_info, "sha", None) else None, |
|
|
"commit_url": f"https://huggingface.co/{model_id}/commit/{model_info.sha}" if getattr(model_info, "sha", None ) else None, |
|
|
}) |
|
|
except Exception as e: |
|
|
print(f"Error extracting model info metadata: {e}") |
|
|
|
|
|
if model_card and hasattr(model_card, "data") and model_card.data: |
|
|
try: |
|
|
card_data = model_card.data.to_dict() if hasattr(model_card.data, "to_dict") else {} |
|
|
metadata.update({ |
|
|
"language": card_data.get("language"), |
|
|
"license": card_data.get("license"), |
|
|
"library_name": card_data.get("library_name"), |
|
|
"base_model": card_data.get("base_model"), |
|
|
"datasets": card_data.get("datasets"), |
|
|
"model_name": card_data.get("model_name"), |
|
|
"tags": card_data.get("tags", metadata.get("tags", [])), |
|
|
"description": card_data.get("model_summary", None) |
|
|
}) |
|
|
if hasattr(model_card.data, "eval_results") and model_card.data.eval_results: |
|
|
metadata["eval_results"] = model_card.data.eval_results |
|
|
except Exception as e: |
|
|
print(f"Error extracting model card metadata: {e}") |
|
|
|
|
|
metadata["ai:type"] = "Transformer" |
|
|
metadata["ai:task"] = metadata.get("pipeline_tag", "Text Generation") |
|
|
metadata["ai:framework"] = "PyTorch" if "transformers" in metadata.get("library_name", "") else "Unknown" |
|
|
|
|
|
metadata["primaryPurpose"] = metadata.get("ai:task", "text-generation") |
|
|
|
|
|
|
|
|
if not metadata.get("author"): |
|
|
parts = model_id.split("/") |
|
|
metadata["author"] = parts[0] if len(parts) > 1 else "unknown" |
|
|
|
|
|
metadata["suppliedBy"] = metadata.get("author", "unknown") |
|
|
metadata["typeOfModel"] = metadata.get("ai:type", "Transformer") |
|
|
|
|
|
print(f"DEBUG: Final metadata['author'] = {metadata.get('author')}") |
|
|
print(f"DEBUG: Adding primaryPurpose = {metadata.get('ai:task', 'Text Generation')}") |
|
|
print(f"DEBUG: Adding suppliedBy = {metadata.get('suppliedBy')}") |
|
|
|
|
|
return {k: v for k, v in metadata.items() if v is not None} |
|
|
|
|
|
|
|
|
|
|
|
def _extract_unstructured_metadata(self, model_card: Optional[ModelCard], model_id: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Placeholder for future AI enhancement. |
|
|
Currently returns empty dict since AI enhancement is not implemented. |
|
|
""" |
|
|
return {} |
|
|
|
|
|
|
|
|
def _create_metadata_section(self, model_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]: |
|
|
print(f"π CRASH_DEBUG: _create_metadata_section called") |
|
|
print(f"π CRASH_DEBUG: metadata type in metadata_section = {type(metadata)}") |
|
|
|
|
|
timestamp = datetime.datetime.utcnow().isoformat() + "Z" |
|
|
|
|
|
|
|
|
version = metadata.get("commit", "1.0") |
|
|
|
|
|
|
|
|
tools = { |
|
|
"components": [{ |
|
|
"bom-ref": "pkg:generic/aetheris-ai/aetheris-aibom-generator@1.0.0", |
|
|
"type": "application", |
|
|
"name": "aetheris-aibom-generator", |
|
|
"version": "1.0", |
|
|
"manufacturer": { |
|
|
"name": "Aetheris AI" |
|
|
} |
|
|
}] |
|
|
} |
|
|
|
|
|
|
|
|
authors = [] |
|
|
if "author" in metadata and metadata["author"]: |
|
|
authors.append({ |
|
|
"name": metadata["author"] |
|
|
}) |
|
|
|
|
|
|
|
|
component = { |
|
|
"bom-ref": f"pkg:generic/{model_id.replace('/', '%2F')}@{version}", |
|
|
"type": "application", |
|
|
"name": metadata.get("name", model_id.split("/")[-1]), |
|
|
"description": metadata.get("description", f"AI model {model_id}"), |
|
|
"version": version, |
|
|
"purl": f"pkg:generic/{model_id.replace('/', '%2F')}@{version}" |
|
|
} |
|
|
|
|
|
|
|
|
if authors: |
|
|
component["authors"] = authors |
|
|
|
|
|
|
|
|
if "author" in metadata and metadata["author"]: |
|
|
component["publisher"] = metadata["author"] |
|
|
component["supplier"] = { |
|
|
"name": metadata["author"] |
|
|
} |
|
|
component["manufacturer"] = { |
|
|
"name": metadata["author"] |
|
|
} |
|
|
|
|
|
|
|
|
component["copyright"] = "NOASSERTION" |
|
|
|
|
|
|
|
|
properties = [] |
|
|
|
|
|
|
|
|
critical_fields = { |
|
|
"primaryPurpose": metadata.get("primaryPurpose", "text-generation"), |
|
|
"suppliedBy": metadata.get("suppliedBy", "unknown"), |
|
|
"typeOfModel": metadata.get("typeOfModel", "Transformer") |
|
|
} |
|
|
for key, value in critical_fields.items(): |
|
|
properties.append({"name": key, "value": str(value)}) |
|
|
|
|
|
|
|
|
|
|
|
component_fields = ["name", "author", "description", "commit"] |
|
|
critical_fields = ["primaryPurpose", "suppliedBy", "typeOfModel"] |
|
|
|
|
|
|
|
|
enhanced_fields = ["model_type", "tokenizer_class", "architectures", "library_name", |
|
|
"pipeline_tag", "tags", "datasets", "base_model", "language", |
|
|
"downloads", "last_modified", "commit_url", "ai:type", "ai:task", |
|
|
"ai:framework", "eval_results"] |
|
|
|
|
|
print(f"π CRASH_DEBUG: About to call .items() on metadata") |
|
|
print(f"π CRASH_DEBUG: metadata type before .items() = {type(metadata)}") |
|
|
|
|
|
for key, value in metadata.items(): |
|
|
|
|
|
if key not in component_fields and value is not None: |
|
|
|
|
|
if isinstance(value, (list, dict)): |
|
|
if isinstance(value, list) and len(value) > 0: |
|
|
|
|
|
if all(isinstance(item, str) for item in value): |
|
|
value = ", ".join(value) |
|
|
else: |
|
|
value = json.dumps(value) |
|
|
elif isinstance(value, dict): |
|
|
value = json.dumps(value) |
|
|
|
|
|
properties.append({"name": key, "value": str(value)}) |
|
|
print(f"β
METADATA: Added {key} = {value} to properties") |
|
|
|
|
|
|
|
|
metadata_section = { |
|
|
"timestamp": timestamp, |
|
|
"tools": tools, |
|
|
"component": component, |
|
|
"properties": properties |
|
|
} |
|
|
|
|
|
return metadata_section |
|
|
|
|
|
def _create_component_section(self, model_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]: |
|
|
print(f"π CRASH_DEBUG: _create_component_section called") |
|
|
print(f"π CRASH_DEBUG: metadata type in component_section = {type(metadata)}") |
|
|
|
|
|
|
|
|
parts = model_id.split("/") |
|
|
group = parts[0] if len(parts) > 1 else "" |
|
|
name = parts[1] if len(parts) > 1 else parts[0] |
|
|
|
|
|
|
|
|
version = metadata.get("commit", "1.0") |
|
|
|
|
|
|
|
|
purl = f"pkg:huggingface/{model_id.replace('/', '/')}" |
|
|
if "commit" in metadata: |
|
|
purl = f"{purl}@{metadata['commit']}" |
|
|
else: |
|
|
purl = f"{purl}@{version}" |
|
|
|
|
|
component = { |
|
|
"bom-ref": f"pkg:huggingface/{model_id.replace('/', '/')}@{version}", |
|
|
"type": "machine-learning-model", |
|
|
"group": group, |
|
|
"name": name, |
|
|
"version": version, |
|
|
"purl": purl |
|
|
} |
|
|
|
|
|
|
|
|
if metadata and "license" in metadata and metadata["license"]: |
|
|
component["licenses"] = [{ |
|
|
"license": { |
|
|
"id": metadata["license"], |
|
|
"url": self._get_license_url(metadata["license"]) |
|
|
} |
|
|
}] |
|
|
print(f"β
COMPONENT: Added license = {metadata['license']}") |
|
|
else: |
|
|
component["licenses"] = [{ |
|
|
"license": { |
|
|
"id": "NOASSERTION", |
|
|
"url": "https://spdx.org/licenses/" |
|
|
} |
|
|
}] |
|
|
print(f"β οΈ COMPONENT: No license found, using NOASSERTION") |
|
|
|
|
|
|
|
|
component["description"] = metadata.get("description", f"AI model {model_id}") |
|
|
|
|
|
|
|
|
technical_properties = [] |
|
|
|
|
|
|
|
|
if "model_type" in metadata: |
|
|
technical_properties.append({"name": "model_type", "value": str(metadata["model_type"])}) |
|
|
print(f"β
COMPONENT: Added model_type = {metadata['model_type']}") |
|
|
|
|
|
|
|
|
if "tokenizer_class" in metadata: |
|
|
technical_properties.append({"name": "tokenizer_class", "value": str(metadata["tokenizer_class"])}) |
|
|
print(f"β
COMPONENT: Added tokenizer_class = {metadata['tokenizer_class']}") |
|
|
|
|
|
|
|
|
if "architectures" in metadata: |
|
|
arch_value = metadata["architectures"] |
|
|
if isinstance(arch_value, list): |
|
|
arch_value = ", ".join(arch_value) |
|
|
technical_properties.append({"name": "architectures", "value": str(arch_value)}) |
|
|
print(f"β
COMPONENT: Added architectures = {arch_value}") |
|
|
|
|
|
|
|
|
if "library_name" in metadata: |
|
|
technical_properties.append({"name": "library_name", "value": str(metadata["library_name"])}) |
|
|
print(f"β
COMPONENT: Added library_name = {metadata['library_name']}") |
|
|
|
|
|
|
|
|
if technical_properties: |
|
|
component["properties"] = technical_properties |
|
|
|
|
|
print(f"DEBUG: License in metadata: {'license' in metadata}" ) |
|
|
if "license" in metadata: |
|
|
print(f"DEBUG: Adding licenses = {metadata['license']}") |
|
|
|
|
|
|
|
|
component["description"] = metadata.get("description", f"AI model {model_id}") |
|
|
if metadata.get("license"): |
|
|
component["licenses"] = [{ |
|
|
"license": { |
|
|
"id": metadata["license"], |
|
|
"url": self._get_license_url(metadata["license"]) |
|
|
} |
|
|
}] |
|
|
else: |
|
|
component["licenses"] = [{ |
|
|
"license": { |
|
|
"id": "unknown", |
|
|
"url": "https://spdx.org/licenses/" |
|
|
} |
|
|
}] |
|
|
|
|
|
|
|
|
|
|
|
external_refs = [{ |
|
|
"type": "website", |
|
|
"url": f"https://huggingface.co/{model_id}" |
|
|
}] |
|
|
if "commit_url" in metadata: |
|
|
external_refs.append({ |
|
|
"type": "vcs", |
|
|
"url": metadata["commit_url"] |
|
|
}) |
|
|
component["externalReferences"] = external_refs |
|
|
|
|
|
|
|
|
author_name = metadata.get("author", group if group else "unknown") |
|
|
if author_name and author_name != "unknown": |
|
|
component["authors"] = [{"name": author_name}] |
|
|
component["publisher"] = author_name |
|
|
component["supplier"] = { |
|
|
"name": author_name, |
|
|
"url": [f"https://huggingface.co/{author_name}"] |
|
|
} |
|
|
component["manufacturer"] = { |
|
|
"name": author_name, |
|
|
"url": [f"https://huggingface.co/{author_name}"] |
|
|
} |
|
|
|
|
|
|
|
|
component["copyright"] = "NOASSERTION" |
|
|
|
|
|
|
|
|
component["modelCard"] = self._create_model_card_section(metadata) |
|
|
|
|
|
return component |
|
|
|
|
|
def _eval_results_to_json(self, eval_results: List[EvalResult]) -> List[Dict[str, str]]: |
|
|
res = [] |
|
|
for eval_result in eval_results: |
|
|
if hasattr(eval_result, "metric_type") and hasattr(eval_result, "metric_value"): |
|
|
res.append({"type": eval_result.metric_type, "value": str(eval_result.metric_value)}) |
|
|
return res |
|
|
|
|
|
|
|
|
def _create_model_card_section(self, metadata: Dict[str, Any]) -> Dict[str, Any]: |
|
|
print(f"π CRASH_DEBUG: _create_model_card_section called") |
|
|
print(f"π CRASH_DEBUG: metadata type in model_card_section = {type(metadata)}") |
|
|
|
|
|
model_card_section = {} |
|
|
|
|
|
|
|
|
if "eval_results" in metadata: |
|
|
model_card_section["quantitativeAnalysis"] = { |
|
|
"performanceMetrics": self._eval_results_to_json(metadata["eval_results"]), |
|
|
"graphics": {} |
|
|
} |
|
|
else: |
|
|
model_card_section["quantitativeAnalysis"] = {"graphics": {}} |
|
|
|
|
|
|
|
|
properties = [] |
|
|
|
|
|
|
|
|
component_level_fields = ["name", "author", "license", "description", "commit"] |
|
|
|
|
|
|
|
|
print(f"π DEBUG: About to iterate metadata.items()") |
|
|
print(f"π DEBUG: metadata type = {type(metadata)}") |
|
|
if isinstance(metadata, dict): |
|
|
print(f"π DEBUG: metadata keys = {list(metadata.keys())}") |
|
|
else: |
|
|
print(f"π DEBUG: metadata value = {metadata}") |
|
|
print(f"π DEBUG: This is the problem - metadata should be a dict!") |
|
|
|
|
|
|
|
|
try: |
|
|
for key, value in metadata.items(): |
|
|
if key not in component_level_fields and value is not None: |
|
|
|
|
|
if isinstance(value, (list, dict)): |
|
|
if isinstance(value, list) and len(value) > 0: |
|
|
|
|
|
if all(isinstance(item, str) for item in value): |
|
|
value = ", ".join(value) |
|
|
else: |
|
|
value = json.dumps(value) |
|
|
elif isinstance(value, dict): |
|
|
value = json.dumps(value) |
|
|
|
|
|
properties.append({"name": key, "value": str(value)}) |
|
|
print(f"β
MODEL_CARD: Added {key} = {value}") |
|
|
except AttributeError as e: |
|
|
print(f"β FOUND THE ERROR: {e}") |
|
|
print(f"β metadata type: {type(metadata)}") |
|
|
print(f"β metadata value: {metadata}") |
|
|
raise e |
|
|
|
|
|
|
|
|
model_card_section["properties"] = properties |
|
|
print(f"β
MODEL_CARD: Added {len(properties)} properties to model card") |
|
|
|
|
|
|
|
|
model_parameters = {} |
|
|
|
|
|
|
|
|
model_parameters["outputs"] = [{"format": "generated-text"}] |
|
|
|
|
|
|
|
|
model_parameters["task"] = metadata.get("pipeline_tag", "text-generation") |
|
|
|
|
|
|
|
|
model_parameters["architectureFamily"] = "llama" if "llama" in metadata.get("name", "").lower() else "transformer" |
|
|
model_parameters["modelArchitecture"] = f"{metadata.get('name', 'Unknown')}ForCausalLM" |
|
|
|
|
|
|
|
|
if "datasets" in metadata: |
|
|
datasets = [] |
|
|
if isinstance(metadata["datasets"], list): |
|
|
for dataset in metadata["datasets"]: |
|
|
if isinstance(dataset, str): |
|
|
datasets.append({ |
|
|
"type": "dataset", |
|
|
"name": dataset, |
|
|
"description": f"Dataset used for training {metadata.get('name', 'the model')}" |
|
|
}) |
|
|
elif isinstance(dataset, dict) and "name" in dataset: |
|
|
|
|
|
dataset_entry = { |
|
|
"type": dataset.get("type", "dataset"), |
|
|
"name": dataset["name"], |
|
|
"description": dataset.get("description", f"Dataset: {dataset['name']}") |
|
|
} |
|
|
datasets.append(dataset_entry) |
|
|
elif isinstance(metadata["datasets"], str): |
|
|
datasets.append({ |
|
|
"type": "dataset", |
|
|
"name": metadata["datasets"], |
|
|
"description": f"Dataset used for training {metadata.get('name', 'the model')}" |
|
|
}) |
|
|
|
|
|
if datasets: |
|
|
model_parameters["datasets"] = datasets |
|
|
|
|
|
|
|
|
model_parameters["inputs"] = [{"format": "text"}] |
|
|
|
|
|
|
|
|
model_card_section["modelParameters"] = model_parameters |
|
|
|
|
|
if "model_type" in metadata or "tokenizer_class" in metadata or "architectures" in metadata: |
|
|
technical_details = {} |
|
|
|
|
|
if "model_type" in metadata: |
|
|
technical_details["modelType"] = metadata["model_type"] |
|
|
|
|
|
if "tokenizer_class" in metadata: |
|
|
technical_details["tokenizerClass"] = metadata["tokenizer_class"] |
|
|
|
|
|
if "architectures" in metadata: |
|
|
technical_details["architectures"] = metadata["architectures"] |
|
|
|
|
|
|
|
|
model_parameters.update(technical_details) |
|
|
print(f"β
MODEL_CARD: Added technical details: {list(technical_details.keys())}") |
|
|
|
|
|
|
|
|
model_card_section["modelParameters"] = model_parameters |
|
|
|
|
|
|
|
|
considerations = {} |
|
|
for k in ["limitations", "ethical_considerations", "bias", "risks"]: |
|
|
if k in metadata: |
|
|
considerations[k] = metadata[k] |
|
|
if considerations: |
|
|
model_card_section["considerations"] = considerations |
|
|
|
|
|
return model_card_section |
|
|
|
|
|
def _get_license_url(self, license_id: str) -> str: |
|
|
"""Get the URL for a license based on its SPDX ID.""" |
|
|
license_urls = { |
|
|
"apache-2.0": "https://www.apache.org/licenses/LICENSE-2.0", |
|
|
"mit": "https://opensource.org/licenses/MIT", |
|
|
"bsd-3-clause": "https://opensource.org/licenses/BSD-3-Clause", |
|
|
"gpl-3.0": "https://www.gnu.org/licenses/gpl-3.0.en.html", |
|
|
"cc-by-4.0": "https://creativecommons.org/licenses/by/4.0/", |
|
|
"cc-by-sa-4.0": "https://creativecommons.org/licenses/by-sa/4.0/", |
|
|
"cc-by-nc-4.0": "https://creativecommons.org/licenses/by-nc/4.0/", |
|
|
"cc-by-nd-4.0": "https://creativecommons.org/licenses/by-nd/4.0/", |
|
|
"cc-by-nc-sa-4.0": "https://creativecommons.org/licenses/by-nc-sa/4.0/", |
|
|
"cc-by-nc-nd-4.0": "https://creativecommons.org/licenses/by-nc-nd/4.0/", |
|
|
"lgpl-3.0": "https://www.gnu.org/licenses/lgpl-3.0.en.html", |
|
|
"mpl-2.0": "https://www.mozilla.org/en-US/MPL/2.0/", |
|
|
} |
|
|
|
|
|
return license_urls.get(license_id.lower(), "https://spdx.org/licenses/" ) |
|
|
|
|
|
def _fetch_with_retry(self, fetch_func, *args, max_retries=3, **kwargs): |
|
|
"""Fetch data with retry logic for network failures.""" |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
return fetch_func(*args, **kwargs) |
|
|
except Exception as e: |
|
|
if attempt == max_retries - 1: |
|
|
logger.warning(f"Failed to fetch after {max_retries} attempts: {e}") |
|
|
return None |
|
|
time.sleep(1 * (attempt + 1)) |
|
|
return None |
|
|
|
|
|
def validate_registry_integration(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Validate that the registry integration is working correctly. |
|
|
This method helps debug registry-related issues. |
|
|
""" |
|
|
validation_results = { |
|
|
'registry_manager_available': bool(self.registry_manager), |
|
|
'enhanced_extraction_available': ENHANCED_EXTRACTION_AVAILABLE, |
|
|
'registry_fields_count': 0, |
|
|
'registry_fields_loaded': False, |
|
|
'validation_status': 'unknown' |
|
|
} |
|
|
|
|
|
try: |
|
|
if self.registry_manager: |
|
|
registry = self.registry_manager.registry |
|
|
registry_fields = registry.get('fields', {}) |
|
|
validation_results['registry_fields_count'] = len(registry_fields) |
|
|
validation_results['registry_fields_loaded'] = len(registry_fields) > 0 |
|
|
|
|
|
if len(registry_fields) > 0: |
|
|
validation_results['validation_status'] = 'success' |
|
|
print(f"β
Registry validation successful: {len(registry_fields)} fields loaded") |
|
|
|
|
|
|
|
|
sample_fields = list(registry_fields.keys())[:5] |
|
|
print(f"π Sample registry fields: {', '.join(sample_fields)}") |
|
|
else: |
|
|
validation_results['validation_status'] = 'no_fields' |
|
|
print("β οΈ Registry loaded but no fields found") |
|
|
else: |
|
|
validation_results['validation_status'] = 'no_registry_manager' |
|
|
print("β Registry manager not available") |
|
|
|
|
|
except Exception as e: |
|
|
validation_results['validation_status'] = 'error' |
|
|
validation_results['error'] = str(e) |
|
|
print(f"β Registry validation failed: {e}") |
|
|
|
|
|
return validation_results |
|
|
|
|
|
def test_registry_integration(): |
|
|
""" |
|
|
Test function to validate registry integration is working correctly. |
|
|
This function can be called to debug registry-related issues. |
|
|
""" |
|
|
print("π§ͺ Testing Registry Integration...") |
|
|
print("=" * 50) |
|
|
|
|
|
try: |
|
|
|
|
|
generator = AIBOMGenerator() |
|
|
|
|
|
|
|
|
validation_results = generator.validate_registry_integration() |
|
|
|
|
|
print("π Validation Results:") |
|
|
for key, value in validation_results.items(): |
|
|
print(f" {key}: {value}") |
|
|
|
|
|
|
|
|
test_model = "deepseek-ai/DeepSeek-R1" |
|
|
print(f"\nπ Testing extraction with model: {test_model}") |
|
|
|
|
|
try: |
|
|
|
|
|
model_info = generator.hf_api.model_info(test_model) |
|
|
model_card = ModelCard.load(test_model) |
|
|
|
|
|
|
|
|
if ENHANCED_EXTRACTION_AVAILABLE and generator.registry_manager: |
|
|
extractor = EnhancedExtractor(generator.hf_api, generator.registry_manager) |
|
|
metadata = extractor.extract_metadata(test_model, model_info, model_card) |
|
|
|
|
|
print(f"β
Test extraction successful: {len(metadata)} fields extracted") |
|
|
|
|
|
|
|
|
sample_fields = dict(list(metadata.items())[:5]) |
|
|
print("π Sample extracted fields:") |
|
|
for key, value in sample_fields.items(): |
|
|
print(f" {key}: {value}") |
|
|
|
|
|
|
|
|
extraction_results = extractor.get_extraction_results() |
|
|
confidence_counts = {} |
|
|
for result in extraction_results.values(): |
|
|
conf = result.confidence.value |
|
|
confidence_counts[conf] = confidence_counts.get(conf, 0) + 1 |
|
|
|
|
|
print("π Extraction confidence distribution:") |
|
|
for conf, count in confidence_counts.items(): |
|
|
print(f" {conf}: {count} fields") |
|
|
|
|
|
else: |
|
|
print("β οΈ Registry-aware extraction not available for testing") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Test extraction failed: {e}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Registry integration test failed: {e}") |
|
|
|
|
|
print("=" * 50) |
|
|
print("π§ͺ Registry Integration Test Complete") |
|
|
|
|
|
|
|
|
test_registry_integration() |