| """ |
| Evidence Validator Module |
| Validates claim evidence with multi-tier verification including: |
| - Link integrity and provenance checking |
| - Repository forensics for GitHub/GitLab projects |
| - Cross-section triangulation |
| - Artifact credibility scoring |
| """ |
|
|
| import re |
| import json |
| import requests |
| import hashlib |
| from typing import Dict, List, Optional, Any, Tuple |
| from datetime import datetime, timedelta |
| from urllib.parse import urlparse |
| import logging |
| import asyncio |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| |
| try: |
| import aiohttp |
| HAS_AIOHTTP = True |
| except ImportError: |
| HAS_AIOHTTP = False |
|
|
| try: |
| from git import Repo |
| HAS_GIT = True |
| except ImportError: |
| HAS_GIT = False |
|
|
| try: |
| from pydriller import Repository as PyDrillerRepo |
| HAS_PYDRILLER = True |
| except ImportError: |
| HAS_PYDRILLER = False |
|
|
| |
| try: |
| from config.prompts import EVIDENCE_VALIDATION_PROMPT, SCORING_CONFIG |
| except ImportError: |
| from prompts import EVIDENCE_VALIDATION_PROMPT, SCORING_CONFIG |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class EvidenceValidator: |
| """ |
| Multi-tier evidence validation system with artifact verification |
| and repository forensics capabilities |
| """ |
| |
| |
| ARTIFACT_TIERS = { |
| 'doi_arxiv': { |
| 'patterns': [r'doi\.org', r'arxiv\.org', r'dl\.acm\.org', r'ieee\.org'], |
| 'weight': 1.0, |
| 'category': 'academic' |
| }, |
| 'github_active': { |
| 'patterns': [r'github\.com', r'gitlab\.com', r'bitbucket\.org'], |
| 'weight': 0.9, |
| 'category': 'repository' |
| }, |
| 'corporate_blog': { |
| 'patterns': [r'medium\.com/@[\w]+company', r'engineering\.\w+\.com', r'tech\.\w+\.com'], |
| 'weight': 0.8, |
| 'category': 'corporate' |
| }, |
| 'portfolio': { |
| 'patterns': [r'portfolio', r'herokuapp\.com', r'netlify\.app', r'vercel\.app'], |
| 'weight': 0.7, |
| 'category': 'personal' |
| }, |
| 'personal_blog': { |
| 'patterns': [r'medium\.com/@[\w]+$', r'dev\.to', r'hashnode\.dev', r'blogger\.com'], |
| 'weight': 0.6, |
| 'category': 'personal' |
| }, |
| 'certificate': { |
| 'patterns': [r'coursera\.org', r'udacity\.com', r'credly\.com', r'linkedin\.com/learning'], |
| 'weight': 0.7, |
| 'category': 'certification' |
| } |
| } |
| |
| def __init__(self, |
| gemini_client: Any, |
| enable_async: bool = True, |
| max_concurrent_checks: int = 5): |
| """ |
| Initialize evidence validator |
| |
| Args: |
| gemini_client: Configured Gemini API client |
| enable_async: Use async for link checking |
| max_concurrent_checks: Max concurrent HTTP requests |
| """ |
| self.gemini_client = gemini_client |
| self.enable_async = enable_async |
| self.max_concurrent_checks = max_concurrent_checks |
| self.session = None |
| self.executor = ThreadPoolExecutor(max_workers=max_concurrent_checks) |
| |
| async def __aenter__(self): |
| """Async context manager entry""" |
| if self.enable_async: |
| self.session = aiohttp.ClientSession() |
| return self |
| |
| async def __aexit__(self, exc_type, exc_val, exc_tb): |
| """Async context manager exit""" |
| if self.session: |
| await self.session.close() |
| |
| def validate_evidence(self, |
| claims: List[Dict], |
| full_cv_text: str, |
| check_links: bool = True, |
| deep_repo_analysis: bool = True) -> Dict[str, Any]: |
| """ |
| Validate evidence for all claims |
| |
| Args: |
| claims: List of extracted claims |
| full_cv_text: Complete CV text for cross-reference |
| check_links: Perform HTTP status checks on URLs |
| deep_repo_analysis: Perform repository forensics |
| |
| Returns: |
| Validation results with scores and findings |
| """ |
| validation_start = datetime.now() |
| |
| |
| claims_json = json.dumps(claims, default=str) |
| |
| |
| llm_validations = self._get_llm_validation(claims_json, full_cv_text) |
| |
| |
| validations = [] |
| |
| for i, claim in enumerate(claims): |
| |
| base_validation = llm_validations[i] if i < len(llm_validations) else {} |
| |
| |
| if check_links and claim.get('links_artifacts'): |
| link_results = self._validate_links(claim['links_artifacts']) |
| base_validation['link_integrity'] = link_results |
| |
| |
| if deep_repo_analysis: |
| repo_links = [url for url in claim.get('links_artifacts', []) |
| if any(host in url for host in ['github.com', 'gitlab.com'])] |
| |
| if repo_links: |
| repo_analysis = self._analyze_repositories(repo_links, claim) |
| base_validation['repository_forensics'] = repo_analysis |
| |
| |
| triangulation = self._cross_validate_claim(claim, claims, full_cv_text) |
| base_validation['triangulation'] = triangulation |
| |
| |
| final_score = self._calculate_evidence_score(base_validation) |
| base_validation['final_evidence_score'] = final_score |
| base_validation['claim_id'] = claim['claim_id'] |
| |
| |
| base_validation['verification_status'] = self._determine_status(final_score) |
| |
| validations.append(base_validation) |
| |
| |
| consistency_score = self._calculate_consistency_score(validations, claims) |
| |
| return { |
| 'validations': validations, |
| 'consistency_score': consistency_score, |
| 'validation_time': (datetime.now() - validation_start).total_seconds(), |
| 'summary': self._generate_validation_summary(validations) |
| } |
| |
| def _get_llm_validation(self, claims_json: str, full_cv_text: str) -> List[Dict]: |
| """ |
| Get validation from Gemini LLM |
| """ |
| prompt = EVIDENCE_VALIDATION_PROMPT.format( |
| claims_json=claims_json, |
| full_cv_text=full_cv_text |
| ) |
| |
| try: |
| response = self.gemini_client.generate_content( |
| prompt, |
| generation_config={ |
| 'temperature': 0.2, |
| 'top_p': 0.95, |
| 'max_output_tokens': 4096 |
| } |
| ) |
| |
| result = json.loads(response.text) |
| return result.get('validations', []) |
| |
| except Exception as e: |
| logger.error(f"LLM validation failed: {e}") |
| |
| return [{}] * len(json.loads(claims_json)) |
| |
| def _validate_links(self, urls: List[str]) -> Dict[str, Any]: |
| """ |
| Check link integrity and gather metadata |
| """ |
| results = { |
| 'total_links': len(urls), |
| 'valid_links': 0, |
| 'broken_links': 0, |
| 'link_details': [] |
| } |
| |
| for url in urls: |
| try: |
| |
| parsed = urlparse(url) |
| domain = parsed.netloc |
| |
| |
| tier = 'unknown' |
| weight = 0.3 |
| |
| for tier_name, tier_config in self.ARTIFACT_TIERS.items(): |
| if any(re.search(pattern, url) for pattern in tier_config['patterns']): |
| tier = tier_name |
| weight = tier_config['weight'] |
| break |
| |
| |
| try: |
| response = requests.head(url, timeout=5, allow_redirects=True) |
| status_code = response.status_code |
| is_valid = status_code < 400 |
| |
| |
| last_modified = response.headers.get('Last-Modified', 'unknown') |
| |
| except requests.RequestException: |
| status_code = 0 |
| is_valid = False |
| last_modified = 'unknown' |
| |
| if is_valid: |
| results['valid_links'] += 1 |
| else: |
| results['broken_links'] += 1 |
| |
| results['link_details'].append({ |
| 'url': url, |
| 'domain': domain, |
| 'tier': tier, |
| 'weight': weight, |
| 'status_code': status_code, |
| 'is_valid': is_valid, |
| 'last_modified': last_modified |
| }) |
| |
| except Exception as e: |
| logger.warning(f"Link validation failed for {url}: {e}") |
| results['broken_links'] += 1 |
| |
| |
| if results['link_details']: |
| results['weighted_score'] = sum( |
| link['weight'] for link in results['link_details'] if link['is_valid'] |
| ) / len(results['link_details']) |
| else: |
| results['weighted_score'] = 0.0 |
| |
| return results |
| |
| def _analyze_repositories(self, repo_urls: List[str], claim: Dict) -> Dict[str, Any]: |
| """ |
| Perform deep repository analysis |
| """ |
| repo_analysis = { |
| 'repositories_analyzed': len(repo_urls), |
| 'findings': [] |
| } |
| |
| for url in repo_urls: |
| try: |
| |
| parts = url.rstrip('/').split('/') |
| if 'github.com' in url and len(parts) >= 2: |
| owner = parts[-2] |
| repo_name = parts[-1] |
| |
| |
| api_url = f"https://api.github.com/repos/{owner}/{repo_name}" |
| response = requests.get(api_url, timeout=10) |
| |
| if response.status_code == 200: |
| repo_data = response.json() |
| |
| |
| commits_url = f"{api_url}/commits" |
| commits_response = requests.get(commits_url, timeout=10) |
| commits = commits_response.json() if commits_response.status_code == 200 else [] |
| |
| |
| analysis = { |
| 'url': url, |
| 'stars': repo_data.get('stargazers_count', 0), |
| 'forks': repo_data.get('forks_count', 0), |
| 'created_at': repo_data.get('created_at', ''), |
| 'last_updated': repo_data.get('updated_at', ''), |
| 'language': repo_data.get('language', 'unknown'), |
| 'has_readme': 'README' in str(repo_data.get('default_branch', '')), |
| 'total_commits': len(commits), |
| 'is_fork': repo_data.get('fork', False), |
| 'open_issues': repo_data.get('open_issues_count', 0) |
| } |
| |
| |
| if claim.get('time_period', {}).get('start_date'): |
| claim_date = claim['time_period']['start_date'] |
| repo_created = repo_data.get('created_at', '')[:10] |
| |
| if repo_created and claim_date < repo_created: |
| analysis['date_mismatch'] = f"Claim predates repo creation" |
| |
| |
| analysis['activity_score'] = self._calculate_repo_activity_score(analysis) |
| |
| |
| if commits: |
| |
| analysis['author_commits'] = self._check_author_commits(commits, claim) |
| |
| repo_analysis['findings'].append(analysis) |
| |
| except Exception as e: |
| logger.warning(f"Repository analysis failed for {url}: {e}") |
| repo_analysis['findings'].append({ |
| 'url': url, |
| 'error': str(e), |
| 'activity_score': 0.0 |
| }) |
| |
| |
| if repo_analysis['findings']: |
| scores = [f.get('activity_score', 0) for f in repo_analysis['findings']] |
| repo_analysis['avg_activity_score'] = sum(scores) / len(scores) |
| else: |
| repo_analysis['avg_activity_score'] = 0.0 |
| |
| return repo_analysis |
| |
| def _calculate_repo_activity_score(self, repo_data: Dict) -> float: |
| """ |
| Calculate repository activity/credibility score |
| """ |
| score = 0.0 |
| |
| |
| if repo_data.get('stars', 0) > 0: |
| score += min(0.15, repo_data['stars'] * 0.01) |
| if repo_data.get('forks', 0) > 0: |
| score += min(0.15, repo_data['forks'] * 0.02) |
| |
| |
| if repo_data.get('has_readme'): |
| score += 0.2 |
| |
| |
| commits = repo_data.get('total_commits', 0) |
| if commits > 10: |
| score += 0.3 |
| elif commits > 5: |
| score += 0.2 |
| elif commits > 0: |
| score += 0.1 |
| |
| |
| if not repo_data.get('is_fork'): |
| score += 0.1 |
| |
| |
| if repo_data.get('last_updated'): |
| last_update = datetime.fromisoformat(repo_data['last_updated'].replace('Z', '+00:00')) |
| if (datetime.now(last_update.tzinfo) - last_update).days < 180: |
| score += 0.1 |
| |
| return min(1.0, score) |
| |
| def _check_author_commits(self, commits: List[Dict], claim: Dict) -> Dict: |
| """ |
| Check for author's commits in repository |
| """ |
| |
| |
| |
| if not commits: |
| return {'found': False, 'commit_count': 0} |
| |
| first_commit_date = commits[-1].get('commit', {}).get('author', {}).get('date', '') |
| last_commit_date = commits[0].get('commit', {}).get('author', {}).get('date', '') |
| |
| return { |
| 'found': True, |
| 'commit_count': len(commits), |
| 'first_commit': first_commit_date[:10] if first_commit_date else '', |
| 'last_commit': last_commit_date[:10] if last_commit_date else '', |
| 'commit_density': len(commits) / max(30, (datetime.now() - datetime.fromisoformat(first_commit_date.replace('Z', '+00:00'))).days) if first_commit_date else 0 |
| } |
| |
| def _cross_validate_claim(self, |
| claim: Dict, |
| all_claims: List[Dict], |
| full_text: str) -> Dict[str, Any]: |
| """ |
| Cross-validate claim across CV sections |
| """ |
| validation = { |
| 'skill_used_in_project': False, |
| 'dates_consistent': True, |
| 'tech_timeline_valid': True, |
| 'cross_references': [] |
| } |
| |
| |
| if claim['category'] == 'skill': |
| technologies = claim.get('technologies_mentioned', []) |
| |
| for tech in technologies: |
| tech_lower = tech.lower() |
| |
| |
| for other_claim in all_claims: |
| if other_claim['claim_id'] != claim['claim_id']: |
| if tech_lower in other_claim['claim_text'].lower(): |
| validation['skill_used_in_project'] = True |
| validation['cross_references'].append({ |
| 'technology': tech, |
| 'used_in': other_claim['category'], |
| 'claim_id': other_claim['claim_id'] |
| }) |
| break |
| |
| |
| if claim.get('time_period', {}).get('start_date'): |
| claim_dates = claim['time_period'] |
| |
| |
| if claim['category'] == 'work_experience': |
| for other_claim in all_claims: |
| if (other_claim['claim_id'] != claim['claim_id'] and |
| other_claim['category'] == 'work_experience' and |
| other_claim.get('time_period', {}).get('start_date')): |
| |
| other_dates = other_claim['time_period'] |
| |
| |
| if self._check_date_overlap(claim_dates, other_dates): |
| validation['dates_consistent'] = False |
| validation['date_conflict'] = { |
| 'claim_id': other_claim['claim_id'], |
| 'type': 'overlapping_positions' |
| } |
| |
| |
| try: |
| from modules.claim_extractor import ClaimExtractor |
| except ImportError: |
| from claim_extractor import ClaimExtractor |
| |
| for tech in claim.get('technologies_mentioned', []): |
| tech_lower = tech.lower() |
| |
| if tech_lower in ClaimExtractor.TECH_RELEASE_DATES: |
| release_date = ClaimExtractor.TECH_RELEASE_DATES[tech_lower] |
| |
| if claim.get('time_period', {}).get('start_date'): |
| if claim['time_period']['start_date'] < release_date: |
| validation['tech_timeline_valid'] = False |
| validation['tech_timeline_issues'] = validation.get('tech_timeline_issues', []) |
| validation['tech_timeline_issues'].append({ |
| 'technology': tech, |
| 'claimed_use': claim['time_period']['start_date'], |
| 'public_release': release_date |
| }) |
| |
| return validation |
| |
| def _check_date_overlap(self, dates1: Dict, dates2: Dict) -> bool: |
| """ |
| Check if two date ranges overlap |
| """ |
| try: |
| start1 = datetime.fromisoformat(dates1.get('start_date', '2000-01')) |
| end1 = datetime.fromisoformat(dates1.get('end_date', '2030-12')) |
| start2 = datetime.fromisoformat(dates2.get('start_date', '2000-01')) |
| end2 = datetime.fromisoformat(dates2.get('end_date', '2030-12')) |
| |
| return start1 <= end2 and start2 <= end1 |
| |
| except: |
| return False |
| |
| def _calculate_evidence_score(self, validation: Dict) -> float: |
| """ |
| Calculate final evidence score combining all factors |
| """ |
| score = 0.0 |
| weights = SCORING_CONFIG['evidence_tier_weights'] |
| |
| |
| if validation.get('evidence_score'): |
| score += validation['evidence_score'] * 0.4 |
| |
| |
| if validation.get('link_integrity'): |
| score += validation['link_integrity'].get('weighted_score', 0) * 0.2 |
| |
| |
| if validation.get('repository_forensics'): |
| score += validation['repository_forensics'].get('avg_activity_score', 0) * 0.2 |
| |
| |
| triangulation = validation.get('triangulation', {}) |
| |
| triangulation_score = 0.0 |
| if triangulation.get('skill_used_in_project'): |
| triangulation_score += 0.4 |
| if triangulation.get('dates_consistent'): |
| triangulation_score += 0.3 |
| if triangulation.get('tech_timeline_valid'): |
| triangulation_score += 0.3 |
| |
| score += triangulation_score * 0.2 |
| |
| return min(1.0, score) |
| |
| def _determine_status(self, score: float) -> str: |
| """ |
| Determine verification status based on score |
| """ |
| if score >= 0.8: |
| return 'verified' |
| elif score >= 0.4: |
| return 'partial' |
| elif score >= 0.2: |
| return 'unverified' |
| else: |
| return 'red_flag' |
| |
| def _calculate_consistency_score(self, |
| validations: List[Dict], |
| claims: List[Dict]) -> float: |
| """ |
| Calculate overall consistency score across all validations |
| """ |
| if not validations: |
| return 0.0 |
| |
| consistency_factors = [] |
| |
| |
| date_conflicts = sum(1 for v in validations |
| if not v.get('triangulation', {}).get('dates_consistent', True)) |
| date_consistency = 1.0 - (date_conflicts / len(validations)) |
| consistency_factors.append(date_consistency) |
| |
| |
| tech_issues = sum(1 for v in validations |
| if not v.get('triangulation', {}).get('tech_timeline_valid', True)) |
| tech_consistency = 1.0 - (tech_issues / len(validations)) |
| consistency_factors.append(tech_consistency) |
| |
| |
| skill_claims = [c for c in claims if c['category'] == 'skill'] |
| if skill_claims: |
| used_skills = sum(1 for v in validations |
| if v.get('triangulation', {}).get('skill_used_in_project')) |
| skill_consistency = used_skills / len(skill_claims) |
| consistency_factors.append(skill_consistency) |
| |
| |
| link_validations = [v for v in validations if v.get('link_integrity')] |
| if link_validations: |
| valid_links = sum(v['link_integrity']['valid_links'] |
| for v in link_validations) |
| total_links = sum(v['link_integrity']['total_links'] |
| for v in link_validations) |
| if total_links > 0: |
| link_consistency = valid_links / total_links |
| consistency_factors.append(link_consistency) |
| |
| return sum(consistency_factors) / len(consistency_factors) if consistency_factors else 0.0 |
| |
| def _generate_validation_summary(self, validations: List[Dict]) -> Dict: |
| """ |
| Generate summary of validation results |
| """ |
| total = len(validations) |
| |
| if total == 0: |
| return { |
| 'total_validated': 0, |
| 'verified': 0, |
| 'partial': 0, |
| 'unverified': 0, |
| 'red_flags': 0 |
| } |
| |
| status_counts = { |
| 'verified': sum(1 for v in validations if v.get('verification_status') == 'verified'), |
| 'partial': sum(1 for v in validations if v.get('verification_status') == 'partial'), |
| 'unverified': sum(1 for v in validations if v.get('verification_status') == 'unverified'), |
| 'red_flag': sum(1 for v in validations if v.get('verification_status') == 'red_flag') |
| } |
| |
| return { |
| 'total_validated': total, |
| 'verified': status_counts['verified'], |
| 'verified_pct': (status_counts['verified'] / total) * 100, |
| 'partial': status_counts['partial'], |
| 'partial_pct': (status_counts['partial'] / total) * 100, |
| 'unverified': status_counts['unverified'], |
| 'unverified_pct': (status_counts['unverified'] / total) * 100, |
| 'red_flags': status_counts['red_flag'], |
| 'red_flag_pct': (status_counts['red_flag'] / total) * 100, |
| 'avg_evidence_score': sum(v.get('final_evidence_score', 0) for v in validations) / total |
| } |