Spaces:
Sleeping
Sleeping
| from rapidfuzz import fuzz | |
| from urllib.parse import urlparse | |
| from .config_loader import load_app_config, load_yaml | |
| from .models import EvidenceItem, RiskAssessment | |
| APP_CONFIG = load_app_config() | |
| WEIGHTS = APP_CONFIG["weights"] | |
| THRESHOLDS = APP_CONFIG.get("thresholds", {}) | |
| HIGH_CONF_POINTS_MIN = THRESHOLDS.get("high_confidence_points_min", 40) | |
| MEDIUM_SCORE_MIN = THRESHOLDS.get("medium_score_min", 30) | |
| FUZZY_CONFIG = APP_CONFIG.get("fuzzy", {}) | |
| KEYWORD_THRESHOLD = FUZZY_CONFIG.get("keyword_threshold", 80) | |
| DOMAIN_THRESHOLD = FUZZY_CONFIG.get("domain_threshold", 90) | |
| SHORTENER_DOMAINS = set(APP_CONFIG.get("shortener_domains", [])) | |
| DOMAINS = load_yaml("restricted_domains.yaml") | |
| KEYWORDS = load_yaml("keywords.yaml") | |
| if not isinstance(DOMAINS, dict) or not all( | |
| isinstance(k, str) and isinstance(v, list) for k, v in DOMAINS.items() | |
| ): | |
| raise ValueError("restricted_domains.yaml must be a mapping of strings to lists") | |
| def _keyword_matches(bio: str, keyword: str) -> bool: | |
| if keyword in bio: | |
| return True | |
| return fuzz.partial_ratio(keyword, bio) >= KEYWORD_THRESHOLD | |
| def _domain_matches(final_domain: str, domain: str) -> bool: | |
| if not final_domain: | |
| return False | |
| if final_domain == domain: | |
| return True | |
| return fuzz.ratio(final_domain, domain) >= DOMAIN_THRESHOLD | |
| def _extract_domains(urls): | |
| domains = [] | |
| for url in urls: | |
| parsed = urlparse(url) | |
| if parsed.netloc: | |
| domains.append(parsed.netloc.lower().replace("www.", "")) | |
| return domains | |
| def assess(profile, resolved_links): | |
| score = 0 | |
| evidence = [] | |
| seen = set() | |
| limitations = profile.limitations.copy() | |
| bio = (profile.bio or "").lower() | |
| def add_evidence(item: EvidenceItem) -> bool: | |
| key = (item.type_, item.category, item.snippet, item.source) | |
| if key in seen: | |
| return False | |
| seen.add(key) | |
| evidence.append(item) | |
| return True | |
| if profile.has_nsfw: | |
| item = EvidenceItem( | |
| type_="flag", | |
| category="adult", | |
| confidence="high", | |
| snippet="has_nsfw flag", | |
| source="profile", | |
| points=WEIGHTS["has_nsfw"], | |
| ) | |
| if add_evidence(item): | |
| score += WEIGHTS["has_nsfw"] | |
| for category, rules in KEYWORDS.items(): | |
| for kw in rules.get("strong", []): | |
| if _keyword_matches(bio, kw): | |
| item = EvidenceItem( | |
| type_="bio_keyword", | |
| category=category, | |
| confidence="high", | |
| snippet=kw, | |
| source="bio", | |
| points=WEIGHTS["strong_keyword"], | |
| ) | |
| if add_evidence(item): | |
| score += WEIGHTS["strong_keyword"] | |
| for kw in rules.get("weak", []): | |
| if _keyword_matches(bio, kw): | |
| item = EvidenceItem( | |
| type_="bio_keyword", | |
| category=category, | |
| confidence="low", | |
| snippet=kw, | |
| source="bio", | |
| points=WEIGHTS["weak_keyword"], | |
| ) | |
| if add_evidence(item): | |
| score += WEIGHTS["weak_keyword"] | |
| for link in resolved_links: | |
| if link.error: | |
| limitations.append( | |
| f"Link resolution error for {link.original_url}: {link.error}" | |
| ) | |
| continue | |
| if link.status_code in {403, 451}: | |
| limitations.append(f"Restricted content (HTTP {link.status_code})") | |
| if len(link.redirect_chain) >= 2: | |
| item = EvidenceItem( | |
| type_="link_redirect", | |
| category="redirect", | |
| confidence="low", | |
| snippet=f"{len(link.redirect_chain)} redirects", | |
| source="profile_link", | |
| points=WEIGHTS["redirect_penalty"], | |
| ) | |
| if add_evidence(item): | |
| score += WEIGHTS["redirect_penalty"] | |
| redirect_domains = _extract_domains(link.redirect_chain) | |
| all_domains = set(redirect_domains) | |
| if link.final_domain: | |
| all_domains.add(link.final_domain) | |
| if any( | |
| domain == d or domain.endswith("." + d) | |
| for domain in all_domains | |
| for d in SHORTENER_DOMAINS | |
| ): | |
| item = EvidenceItem( | |
| type_="link_shortener", | |
| category="redirect", | |
| confidence="low", | |
| snippet="shortener domain", | |
| source="profile_link", | |
| points=WEIGHTS["shortener_penalty"], | |
| ) | |
| if add_evidence(item): | |
| score += WEIGHTS["shortener_penalty"] | |
| for category, domains in DOMAINS.items(): | |
| for domain in domains: | |
| if _domain_matches(link.final_domain, domain): | |
| item = EvidenceItem( | |
| type_="link_domain", | |
| category=category, | |
| confidence="high", | |
| snippet=link.final_domain, | |
| source="profile_link", | |
| points=WEIGHTS["restricted_domain"], | |
| ) | |
| if add_evidence(item): | |
| score += WEIGHTS["restricted_domain"] | |
| break | |
| if link.title: | |
| title = link.title.lower() | |
| for category, rules in KEYWORDS.items(): | |
| for kw in rules.get("strong", []): | |
| if _keyword_matches(title, kw): | |
| item = EvidenceItem( | |
| type_="link_title_keyword", | |
| category=category, | |
| confidence="low", | |
| snippet=kw, | |
| source="link_title", | |
| points=WEIGHTS["weak_keyword"], | |
| ) | |
| if add_evidence(item): | |
| score += WEIGHTS["weak_keyword"] | |
| for kw in rules.get("weak", []): | |
| if _keyword_matches(title, kw): | |
| item = EvidenceItem( | |
| type_="link_title_keyword", | |
| category=category, | |
| confidence="low", | |
| snippet=kw, | |
| source="link_title", | |
| points=WEIGHTS["weak_keyword"], | |
| ) | |
| if add_evidence(item): | |
| score += WEIGHTS["weak_keyword"] | |
| score = min(score, 100) | |
| if any( | |
| e.confidence == "high" and e.points >= HIGH_CONF_POINTS_MIN for e in evidence | |
| ): | |
| level, action = "high", "reject" | |
| elif score >= MEDIUM_SCORE_MIN: | |
| level, action = "medium", "review" | |
| else: | |
| level, action = "low", "accept" | |
| return RiskAssessment( | |
| risk_score=score, | |
| risk_level=level, | |
| recommended_action=action, | |
| evidence=evidence, | |
| limitations=limitations, | |
| ) | |