riskengine / src /bmc /engine.py
datavorous's picture
intial commit
28779c1 verified
from rapidfuzz import fuzz
from urllib.parse import urlparse
from .config_loader import load_app_config, load_yaml
from .models import EvidenceItem, RiskAssessment
APP_CONFIG = load_app_config()
WEIGHTS = APP_CONFIG["weights"]
THRESHOLDS = APP_CONFIG.get("thresholds", {})
HIGH_CONF_POINTS_MIN = THRESHOLDS.get("high_confidence_points_min", 40)
MEDIUM_SCORE_MIN = THRESHOLDS.get("medium_score_min", 30)
FUZZY_CONFIG = APP_CONFIG.get("fuzzy", {})
KEYWORD_THRESHOLD = FUZZY_CONFIG.get("keyword_threshold", 80)
DOMAIN_THRESHOLD = FUZZY_CONFIG.get("domain_threshold", 90)
SHORTENER_DOMAINS = set(APP_CONFIG.get("shortener_domains", []))
DOMAINS = load_yaml("restricted_domains.yaml")
KEYWORDS = load_yaml("keywords.yaml")
if not isinstance(DOMAINS, dict) or not all(
isinstance(k, str) and isinstance(v, list) for k, v in DOMAINS.items()
):
raise ValueError("restricted_domains.yaml must be a mapping of strings to lists")
def _keyword_matches(bio: str, keyword: str) -> bool:
if keyword in bio:
return True
return fuzz.partial_ratio(keyword, bio) >= KEYWORD_THRESHOLD
def _domain_matches(final_domain: str, domain: str) -> bool:
if not final_domain:
return False
if final_domain == domain:
return True
return fuzz.ratio(final_domain, domain) >= DOMAIN_THRESHOLD
def _extract_domains(urls):
domains = []
for url in urls:
parsed = urlparse(url)
if parsed.netloc:
domains.append(parsed.netloc.lower().replace("www.", ""))
return domains
def assess(profile, resolved_links):
score = 0
evidence = []
seen = set()
limitations = profile.limitations.copy()
bio = (profile.bio or "").lower()
def add_evidence(item: EvidenceItem) -> bool:
key = (item.type_, item.category, item.snippet, item.source)
if key in seen:
return False
seen.add(key)
evidence.append(item)
return True
if profile.has_nsfw:
item = EvidenceItem(
type_="flag",
category="adult",
confidence="high",
snippet="has_nsfw flag",
source="profile",
points=WEIGHTS["has_nsfw"],
)
if add_evidence(item):
score += WEIGHTS["has_nsfw"]
for category, rules in KEYWORDS.items():
for kw in rules.get("strong", []):
if _keyword_matches(bio, kw):
item = EvidenceItem(
type_="bio_keyword",
category=category,
confidence="high",
snippet=kw,
source="bio",
points=WEIGHTS["strong_keyword"],
)
if add_evidence(item):
score += WEIGHTS["strong_keyword"]
for kw in rules.get("weak", []):
if _keyword_matches(bio, kw):
item = EvidenceItem(
type_="bio_keyword",
category=category,
confidence="low",
snippet=kw,
source="bio",
points=WEIGHTS["weak_keyword"],
)
if add_evidence(item):
score += WEIGHTS["weak_keyword"]
for link in resolved_links:
if link.error:
limitations.append(
f"Link resolution error for {link.original_url}: {link.error}"
)
continue
if link.status_code in {403, 451}:
limitations.append(f"Restricted content (HTTP {link.status_code})")
if len(link.redirect_chain) >= 2:
item = EvidenceItem(
type_="link_redirect",
category="redirect",
confidence="low",
snippet=f"{len(link.redirect_chain)} redirects",
source="profile_link",
points=WEIGHTS["redirect_penalty"],
)
if add_evidence(item):
score += WEIGHTS["redirect_penalty"]
redirect_domains = _extract_domains(link.redirect_chain)
all_domains = set(redirect_domains)
if link.final_domain:
all_domains.add(link.final_domain)
if any(
domain == d or domain.endswith("." + d)
for domain in all_domains
for d in SHORTENER_DOMAINS
):
item = EvidenceItem(
type_="link_shortener",
category="redirect",
confidence="low",
snippet="shortener domain",
source="profile_link",
points=WEIGHTS["shortener_penalty"],
)
if add_evidence(item):
score += WEIGHTS["shortener_penalty"]
for category, domains in DOMAINS.items():
for domain in domains:
if _domain_matches(link.final_domain, domain):
item = EvidenceItem(
type_="link_domain",
category=category,
confidence="high",
snippet=link.final_domain,
source="profile_link",
points=WEIGHTS["restricted_domain"],
)
if add_evidence(item):
score += WEIGHTS["restricted_domain"]
break
if link.title:
title = link.title.lower()
for category, rules in KEYWORDS.items():
for kw in rules.get("strong", []):
if _keyword_matches(title, kw):
item = EvidenceItem(
type_="link_title_keyword",
category=category,
confidence="low",
snippet=kw,
source="link_title",
points=WEIGHTS["weak_keyword"],
)
if add_evidence(item):
score += WEIGHTS["weak_keyword"]
for kw in rules.get("weak", []):
if _keyword_matches(title, kw):
item = EvidenceItem(
type_="link_title_keyword",
category=category,
confidence="low",
snippet=kw,
source="link_title",
points=WEIGHTS["weak_keyword"],
)
if add_evidence(item):
score += WEIGHTS["weak_keyword"]
score = min(score, 100)
if any(
e.confidence == "high" and e.points >= HIGH_CONF_POINTS_MIN for e in evidence
):
level, action = "high", "reject"
elif score >= MEDIUM_SCORE_MIN:
level, action = "medium", "review"
else:
level, action = "low", "accept"
return RiskAssessment(
risk_score=score,
risk_level=level,
recommended_action=action,
evidence=evidence,
limitations=limitations,
)