from rapidfuzz import fuzz from urllib.parse import urlparse from .config_loader import load_app_config, load_yaml from .models import EvidenceItem, RiskAssessment APP_CONFIG = load_app_config() WEIGHTS = APP_CONFIG["weights"] THRESHOLDS = APP_CONFIG.get("thresholds", {}) HIGH_CONF_POINTS_MIN = THRESHOLDS.get("high_confidence_points_min", 40) MEDIUM_SCORE_MIN = THRESHOLDS.get("medium_score_min", 30) FUZZY_CONFIG = APP_CONFIG.get("fuzzy", {}) KEYWORD_THRESHOLD = FUZZY_CONFIG.get("keyword_threshold", 80) DOMAIN_THRESHOLD = FUZZY_CONFIG.get("domain_threshold", 90) SHORTENER_DOMAINS = set(APP_CONFIG.get("shortener_domains", [])) DOMAINS = load_yaml("restricted_domains.yaml") KEYWORDS = load_yaml("keywords.yaml") if not isinstance(DOMAINS, dict) or not all( isinstance(k, str) and isinstance(v, list) for k, v in DOMAINS.items() ): raise ValueError("restricted_domains.yaml must be a mapping of strings to lists") def _keyword_matches(bio: str, keyword: str) -> bool: if keyword in bio: return True return fuzz.partial_ratio(keyword, bio) >= KEYWORD_THRESHOLD def _domain_matches(final_domain: str, domain: str) -> bool: if not final_domain: return False if final_domain == domain: return True return fuzz.ratio(final_domain, domain) >= DOMAIN_THRESHOLD def _extract_domains(urls): domains = [] for url in urls: parsed = urlparse(url) if parsed.netloc: domains.append(parsed.netloc.lower().replace("www.", "")) return domains def assess(profile, resolved_links): score = 0 evidence = [] seen = set() limitations = profile.limitations.copy() bio = (profile.bio or "").lower() def add_evidence(item: EvidenceItem) -> bool: key = (item.type_, item.category, item.snippet, item.source) if key in seen: return False seen.add(key) evidence.append(item) return True if profile.has_nsfw: item = EvidenceItem( type_="flag", category="adult", confidence="high", snippet="has_nsfw flag", source="profile", points=WEIGHTS["has_nsfw"], ) if add_evidence(item): score += WEIGHTS["has_nsfw"] for category, rules in KEYWORDS.items(): for kw in rules.get("strong", []): if _keyword_matches(bio, kw): item = EvidenceItem( type_="bio_keyword", category=category, confidence="high", snippet=kw, source="bio", points=WEIGHTS["strong_keyword"], ) if add_evidence(item): score += WEIGHTS["strong_keyword"] for kw in rules.get("weak", []): if _keyword_matches(bio, kw): item = EvidenceItem( type_="bio_keyword", category=category, confidence="low", snippet=kw, source="bio", points=WEIGHTS["weak_keyword"], ) if add_evidence(item): score += WEIGHTS["weak_keyword"] for link in resolved_links: if link.error: limitations.append( f"Link resolution error for {link.original_url}: {link.error}" ) continue if link.status_code in {403, 451}: limitations.append(f"Restricted content (HTTP {link.status_code})") if len(link.redirect_chain) >= 2: item = EvidenceItem( type_="link_redirect", category="redirect", confidence="low", snippet=f"{len(link.redirect_chain)} redirects", source="profile_link", points=WEIGHTS["redirect_penalty"], ) if add_evidence(item): score += WEIGHTS["redirect_penalty"] redirect_domains = _extract_domains(link.redirect_chain) all_domains = set(redirect_domains) if link.final_domain: all_domains.add(link.final_domain) if any( domain == d or domain.endswith("." + d) for domain in all_domains for d in SHORTENER_DOMAINS ): item = EvidenceItem( type_="link_shortener", category="redirect", confidence="low", snippet="shortener domain", source="profile_link", points=WEIGHTS["shortener_penalty"], ) if add_evidence(item): score += WEIGHTS["shortener_penalty"] for category, domains in DOMAINS.items(): for domain in domains: if _domain_matches(link.final_domain, domain): item = EvidenceItem( type_="link_domain", category=category, confidence="high", snippet=link.final_domain, source="profile_link", points=WEIGHTS["restricted_domain"], ) if add_evidence(item): score += WEIGHTS["restricted_domain"] break if link.title: title = link.title.lower() for category, rules in KEYWORDS.items(): for kw in rules.get("strong", []): if _keyword_matches(title, kw): item = EvidenceItem( type_="link_title_keyword", category=category, confidence="low", snippet=kw, source="link_title", points=WEIGHTS["weak_keyword"], ) if add_evidence(item): score += WEIGHTS["weak_keyword"] for kw in rules.get("weak", []): if _keyword_matches(title, kw): item = EvidenceItem( type_="link_title_keyword", category=category, confidence="low", snippet=kw, source="link_title", points=WEIGHTS["weak_keyword"], ) if add_evidence(item): score += WEIGHTS["weak_keyword"] score = min(score, 100) if any( e.confidence == "high" and e.points >= HIGH_CONF_POINTS_MIN for e in evidence ): level, action = "high", "reject" elif score >= MEDIUM_SCORE_MIN: level, action = "medium", "review" else: level, action = "low", "accept" return RiskAssessment( risk_score=score, risk_level=level, recommended_action=action, evidence=evidence, limitations=limitations, )