| """ |
| Data Validation and Sanitization Layer |
| FAANG-Level Quality Control for News Articles |
| |
| EMERGENCY HOTFIX (2026-01-23): Fixed AttributeError 'Article' object has no attribute 'get' |
| - Now supports both Pydantic Article models AND dicts |
| - Converts Pydantic models to dicts safely before validation |
| """ |
|
|
| from typing import Dict, Optional, List, Union |
| from datetime import datetime, timezone, timedelta |
| from zoneinfo import ZoneInfo |
| import re |
| from urllib.parse import urlparse |
| from dateutil import parser as dateutil_parser |
|
|
|
|
| def is_valid_article(article: Union[Dict, 'Article']) -> bool: |
| """ |
| Validate article data quality before database insertion |
| |
| HOTFIX: Now handles both Pydantic Article objects and dicts |
| |
| Returns True only if article meets all quality criteria |
| """ |
| |
| if hasattr(article, 'model_dump'): |
| |
| article_dict = article.model_dump() |
| elif hasattr(article, 'dict'): |
| |
| article_dict = article.dict() |
| elif isinstance(article, dict): |
| |
| article_dict = article |
| else: |
| |
| return False |
| |
| |
| if not article_dict.get('title'): |
| return False |
| |
| title = article_dict['title'].strip() |
| if len(title) < 10 or len(title) > 500: |
| return False |
| |
| |
| if not article_dict.get('url'): |
| return False |
| |
| |
| url = article_dict['url'] |
| if hasattr(url, '__str__'): |
| url = str(url) |
| url = url.strip() |
| |
| if not url.startswith(('http://', 'https://')): |
| return False |
| |
| |
| try: |
| parsed = urlparse(url) |
| if not parsed.netloc: |
| return False |
| except Exception: |
| return False |
| |
| |
| raw_date = article_dict.get('publishedAt') or article_dict.get('published_at') |
| if not raw_date: |
| return False |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| try: |
| if isinstance(raw_date, datetime): |
| pub_dt = raw_date |
| else: |
| pub_dt = dateutil_parser.parse(str(raw_date)) |
|
|
| |
| if pub_dt.tzinfo is None: |
| pub_dt = pub_dt.replace(tzinfo=timezone.utc) |
|
|
| |
| |
| |
| ist_zone = ZoneInfo("Asia/Kolkata") |
| now_ist = datetime.now(ist_zone) |
| cutoff_ist = now_ist.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1) |
|
|
| |
| |
| |
| if pub_dt < cutoff_ist: |
| |
| return False |
|
|
| except Exception: |
| |
| |
| return False |
| |
|
|
| |
| |
| image_url = article_dict.get('image') or article_dict.get('image_url') |
| if image_url: |
| image_url = str(image_url).strip() |
| if not image_url.startswith(('http://', 'https://')): |
| |
| if 'image' in article_dict: article_dict['image'] = None |
| if 'image_url' in article_dict: article_dict['image_url'] = None |
|
|
| return True |
|
|
|
|
| def sanitize_article(article: Union[Dict, 'Article']) -> Dict: |
| """ |
| Clean and normalize article data |
| |
| HOTFIX: Now handles both Pydantic Article objects and dicts |
| |
| Ensures data fits schema constraints and is properly formatted |
| """ |
| |
| if hasattr(article, 'model_dump'): |
| article_dict = article.model_dump() |
| elif hasattr(article, 'dict'): |
| article_dict = article.dict() |
| elif isinstance(article, dict): |
| article_dict = article |
| else: |
| raise TypeError(f"Expected Dict or Article model, got {type(article)}") |
| |
| |
| title = article_dict.get('title', '').strip() |
| title = re.sub(r'\s+', ' ', title) |
| title = title[:500] |
| |
| |
| url = article_dict.get('url', '') |
| if hasattr(url, '__str__'): |
| url = str(url) |
| url = url.strip()[:2048] |
| |
| |
| description = article_dict.get('description', '').strip() |
| description = re.sub(r'\s+', ' ', description) |
| description = description[:2000] |
| |
| |
| raw_image = article_dict.get('image') or article_dict.get('image_url') |
| image_url = str(raw_image).strip() if raw_image else None |
| |
| if image_url: |
| image_url = image_url[:2048] |
| if not image_url.startswith(('http://', 'https://')): |
| image_url = None |
| |
| |
| source = article_dict.get('source', 'Unknown').strip() |
| source = source[:200] |
| |
| |
| slug = generate_slug(title) |
| |
| |
| quality_score = calculate_quality_score(article_dict) |
| |
| |
| |
| published_at = article_dict.get('publishedAt') or article_dict.get('published_at') |
| |
| if isinstance(published_at, datetime): |
| published_at = published_at.isoformat() |
| elif not published_at: |
| |
| published_at = datetime.now().isoformat() |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| return { |
| 'title': title, |
| 'url': url, |
| 'description': description or '', |
| 'image': image_url, |
| 'image_url': image_url, |
| 'publishedAt': published_at, |
| 'published_at': published_at, |
| 'source': source, |
| 'category': article_dict.get('category', '').strip()[:100], |
| 'slug': slug, |
| 'quality_score': quality_score |
| } |
|
|
|
|
| def generate_slug(title: str) -> str: |
| """ |
| Generate URL-friendly slug from title |
| |
| Example: "Google Announces New AI" β "google-announces-new-ai" |
| """ |
| slug = title.lower() |
| slug = re.sub(r'[^a-z0-9\s-]', '', slug) |
| slug = re.sub(r'\s+', '-', slug) |
| slug = re.sub(r'-+', '-', slug) |
| slug = slug.strip('-') |
| slug = slug[:200] |
| return slug |
|
|
|
|
| def calculate_quality_score(article: Dict) -> int: |
| """ |
| Score article quality from 0-100 |
| |
| Higher scores = better quality articles |
| Used for sorting and filtering |
| """ |
| score = 50 |
| |
| |
| if article.get('image'): |
| score += 20 |
| |
| |
| description = article.get('description', '') |
| if len(description) > 100: |
| score += 15 |
| |
| |
| source = article.get('source', '').lower() |
| premium_sources = [ |
| 'reuters', 'bloomberg', 'techcrunch', 'wired', |
| 'the verge', 'zdnet', 'cnet', 'ars technica' |
| ] |
| if any(ps in source for ps in premium_sources): |
| score += 15 |
| |
| |
| title = article.get('title', '') |
| if len(title) > 100: |
| score -= 10 |
| |
| |
| return min(max(score, 0), 100) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| CATEGORY_KEYWORDS = { |
|
|
| |
| 'ai': [ |
| 'artificial intelligence', 'machine learning', 'deep learning', |
| 'neural network', 'gpt', 'llm', 'chatgpt', 'generative ai', |
| 'computer vision', 'nlp', 'natural language processing', 'transformer', |
| 'openai', 'anthropic', 'sam altman', 'claude', 'gemini', 'mistral', |
| 'llama', 'copilot', 'midjourney', 'stable diffusion', 'hugging face', |
| 'rag', 'vector database', 'prompt engineering', 'agi', 'agentic ai', |
| 'ai model', 'ai startup', 'genai', 'intelligence', 'robotics', 'algorithm', |
| ], |
|
|
| |
| 'cloud-computing': [ |
| 'cloud computing', 'cloud services', 'aws', 'azure', 'google cloud', |
| 'gcp', 'salesforce', 'alibaba cloud', 'tencent cloud', 'huawei cloud', |
| 'cloudflare', 'saas', 'paas', 'iaas', 'serverless', 'kubernetes', |
| 'multi-cloud', 'hybrid cloud', 'cloud infrastructure', 'cloud deployment', |
| ], |
|
|
| |
| 'cloud-aws': [ |
| 'aws', 'amazon web services', 's3', 'ec2', 'lambda', 'cloudfront', |
| 'sagemaker', 'dynamodb', 'amazon bedrock', 'aws reinvent', |
| 'fargate', 'aws graviton', 'elastic beanstalk', 'amazon cloud', |
| ], |
| 'cloud-azure': [ |
| 'azure', 'microsoft azure', 'azure devops', 'azure ml', |
| 'azure openai', 'microsoft cloud', 'azure synapse', 'cosmos db', |
| 'azure arc', 'microsoft entra', 'azure cloud', |
| ], |
| 'cloud-gcp': [ |
| 'gcp', 'google cloud', 'bigquery', 'vertex ai', 'cloud run', |
| 'dataflow', 'google kubernetes engine', 'gke', 'google spanner', |
| 'anthos', 'cloud sql', 'gemini for google cloud', 'google workspace', |
| ], |
| 'cloud-alibaba': [ |
| 'alibaba cloud', 'aliyun', 'alicloud', 'polar db', 'maxcompute', |
| 'elastic compute service', 'tongyi qianwen', 'qwen', 'alibaba', |
| ], |
| 'cloud-huawei': [ |
| 'huawei cloud', 'huaweicloud', 'pangu model', |
| 'harmonyos', 'kunpeng', 'ascend ai', 'huawei', |
| ], |
| 'cloud-digitalocean': [ |
| 'digitalocean', 'digital ocean', 'do droplet', 'digitalocean spaces', |
| 'digitalocean app platform', 'managed kubernetes', 'cloudways', 'vps', |
| ], |
| 'cloud-oracle': [ |
| 'oracle cloud', 'oci', 'oracle database', 'oracle fusion', |
| 'oracle cloud infrastructure', 'mysql heatwave', 'oracle apex', 'oracle', |
| ], |
| 'cloud-ibm': [ |
| 'ibm cloud', 'ibm watson', 'red hat', 'openshift', |
| 'ibm z', 'watsonx', 'ibm mainframe', 'ibm', |
| ], |
| 'cloud-cloudflare': [ |
| 'cloudflare', 'cloudflare workers', 'cloudflare r2', |
| 'cloudflare pages', 'zero trust', 'cdn', 'ddos', |
| ], |
|
|
| |
| 'data-engineering': [ |
| 'data engineering', 'data pipeline', 'etl', 'elt', 'big data', |
| 'apache spark', 'hadoop', 'kafka', 'airflow', 'data warehouse', |
| 'snowflake', 'databricks', 'dbt', 'fivetran', 'apache iceberg', |
| 'delta lake', 'data lakehouse', 'data processing', 'streaming data', |
| ], |
|
|
| |
| 'data-security': [ |
| 'security', 'cybersecurity', 'data breach', 'hacking', 'vulnerability', |
| 'encryption', 'malware', 'ransomware', 'firewall', 'zero trust', |
| 'phishing', 'soc2', 'infosec', 'penetration testing', 'cyber attack', |
| |
| 'cyber threat', 'threat intelligence', 'security incident', 'identity and access', |
| 'iam', 'mfa', 'multi-factor authentication', 'devsecops', 'security posture', |
| 'insider threat', 'data exfiltration', 'endpoint security', 'siem', 'xdr', 'edr', |
| ], |
|
|
| |
| 'data-governance': [ |
| 'data governance', 'compliance', 'regulation', 'audit', 'data policy', |
| 'metadata management', 'data lineage', 'data stewardship', |
| 'regulatory compliance', 'data ethics', 'data standards', |
| |
| 'governance framework', 'data ownership', 'data accountability', |
| 'data control', 'enterprise data', 'data risk', 'governance platform', |
| 'compliance management', 'risk and compliance', |
| ], |
|
|
| |
| 'data-privacy': [ |
| 'data privacy', 'gdpr', 'ccpa', 'user consent', 'personal data', |
| 'pii', 'anonymization', 'data protection', 'privacy law', |
| 'hipaa', 'cookie tracking', 'data sovereignty', |
| |
| 'privacy regulation', 'privacy compliance', 'privacy policy', 'privacy shield', |
| 'data rights', 'right to be forgotten', 'data subject', 'consent management', |
| 'biometric data', 'sensitive data', 'data localization', 'privacy tech', |
| ], |
|
|
| |
| 'data-management': [ |
| 'data management', 'master data', 'mdm', 'data catalog', |
| 'data quality', 'reference data', 'data lifecycle', 'data architecture', |
| 'database management', 'data integration', |
| |
| 'data platform', 'data fabric', 'data mesh', 'data store', 'data ops', |
| 'dataops', 'data observability', 'data reliability', 'data strategy', |
| ], |
|
|
| |
| 'business-intelligence': [ |
| 'business intelligence', 'bi tool', 'analytics dashboard', 'tableau', |
| 'power bi', 'looker', 'data reporting', 'kpi', 'quicksight', 'qlik', |
| 'data visualization', 'metrics dashboard', 'business intelligence analytics', |
| 'bi platform', 'bi software', 'bi solution', 'bi market', 'bi vendor', |
| 'intelligence analytics', 'embedded analytics', 'self-service analytics', |
| ], |
|
|
| |
| 'business-analytics': [ |
| 'data analytics', 'data analysis', 'business insights', 'business metrics', |
| 'data-driven', 'business analytics', 'predictive analytics', 'forecasting', |
| 'data science', 'business trends', 'business intelligence analytics', |
| 'analytics platform', 'analytics solution', 'analytics market', |
| |
| 'analytics', 'prescriptive analytics', 'descriptive analytics', |
| 'augmented analytics', 'analytics report', 'analytics vendor', |
| ], |
|
|
| |
| 'customer-data-platform': [ |
| 'cdp', 'customer data platform', 'crm', 'customer experience', |
| 'personalization engine', 'audience segmentation', |
| 'segment.com', 'salesforce data cloud', 'unified profile', |
| |
| 'first-party data', 'customer journey', 'customer analytics', |
| 'customer insights', 'customer 360', 'real-time personalization', |
| 'user profiling', 'identity resolution', 'marketing data', |
| ], |
|
|
| |
| 'data-centers': [ |
| 'data center', 'data centre', 'datacenter', 'server rack', 'colocation', |
| 'edge computing', 'hyperscale', 'hpc', 'liquid cooling', |
| 'data center cooling', 'server hosting', 'infrastructure', |
| |
| 'facility expansion', 'power usage effectiveness', 'pue', 'green data center', |
| 'data center market', 'carrier hotel', 'colo facility', 'rack unit', |
| 'data center construction', 'data hall', 'tier iii', 'tier iv', |
| ], |
|
|
| |
| 'medium-article': [ |
| 'medium', 'article', 'blog', 'writing', 'publishing', |
| 'content', 'story', 'author', 'blogging', 'programming', 'developer', |
| ], |
| 'magazines': [ |
| 'technology', 'tech', 'innovation', 'digital', 'startup', |
| 'software', 'hardware', 'gadget', 'science', 'electronics', |
| |
| 'developer', 'programming', 'open source', 'engineering', 'product launch', |
| 'research', 'industry report', 'tech news', 'venture capital', 'funding round', |
| ], |
| } |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def _build_category_regex(keywords: list) -> 're.Pattern': |
| """ |
| Turn a list of keywords into one pre-compiled word-boundary OR pattern. |
| |
| Example: |
| ['gpt', 'llm', 'openai'] |
| β re.compile(r'\\bgpt\\b|\\bllm\\b|\\bopenai\\b', re.IGNORECASE) |
| """ |
| parts = [r'\b' + re.escape(kw) + r'\b' for kw in keywords] |
| return re.compile('|'.join(parts), re.IGNORECASE) |
|
|
|
|
| |
| |
| |
| COMPILED_CATEGORY_REGEX: dict = { |
| category: _build_category_regex(keywords) |
| for category, keywords in CATEGORY_KEYWORDS.items() |
| } |
|
|
|
|
| def is_relevant_to_category(article: Union[Dict, 'Article'], category: str) -> bool: |
| """ |
| Check whether an article belongs to the given category. |
| |
| Uses pre-compiled word-boundary regex patterns (built once at server start) |
| so that: |
| β’ Short acronyms like "ai", "bi", "aws" only match as full words. |
| "trail" β does NOT match 'ai' anymore. |
| "kubernot" β does NOT match 'gcp' anymore. |
| β’ Multi-word phrases like "openai" or "sagemaker" are matched exactly. |
| β’ Unknown categories automatically pass (return True) so we don't |
| accidentally drop articles routed to categories we haven't mapped yet. |
| |
| Scans: article title + description + URL path (all lowercased). |
| |
| Returns: |
| True β article is relevant (at least 1 keyword matches). |
| False β no keyword matched; article is rejected for this category. |
| """ |
| |
| if hasattr(article, 'model_dump'): |
| article_dict = article.model_dump() |
| elif hasattr(article, 'dict'): |
| article_dict = article.dict() |
| else: |
| article_dict = article |
|
|
| |
| |
| |
| source = article_dict.get('source', '').lower() |
| if source.startswith('official ') and ' blog' in source: |
| return True |
|
|
| |
| pattern = COMPILED_CATEGORY_REGEX.get(category) |
|
|
| if pattern is None: |
| |
| return True |
|
|
| |
| |
| |
| |
| |
| |
| |
| title = (article_dict.get('title') or '').lower() |
| description = (article_dict.get('description') or '').lower() |
|
|
| raw_url = article_dict.get('url') or '' |
| url_str = str(raw_url).lower() |
| try: |
| parsed_url = urlparse(url_str) |
| |
| |
| url_words = parsed_url.path.replace('-', ' ').replace('/', ' ') |
| except Exception: |
| url_words = '' |
|
|
| search_text = f"{title} {description} {url_words}" |
|
|
| |
| |
| |
| if pattern.search(search_text): |
| return True |
|
|
| |
| print( |
| f"π« Rejected '{article_dict.get('title', 'Unknown')[:50]}' " |
| f"from {category} (0 keyword matches)" |
| ) |
| return False |
|
|
|
|
| |
| __all__ = [ |
| 'is_valid_article', |
| 'sanitize_article', |
| 'generate_slug', |
| 'calculate_quality_score', |
| 'is_relevant_to_category' |
| ] |
|
|