Spaces:
Sleeping
Sleeping
| """ | |
| English Article Summarizer | |
| =========================== | |
| Summarizes English news articles using HuggingFace transformer models. | |
| Supports single file, folder, or category-based processing. | |
| This script handles ONLY English articles. Hindi articles are processed | |
| by hindi_summary.py (mT5 ONNX + Groq) instead. | |
| Pipeline: | |
| 1. Load JSON articles | |
| 2. Check Supabase registry for already-processed IDs (deduplication) | |
| 3. Clean text (remove URLs, emojis, duplicate sentences) | |
| 4. Summarize with the configured model (default: t5-small) | |
| 5. Save summarized JSON to summarized-articles/{language}/categories/{category}/ | |
| 6. Upload to Cloudinary | |
| Usage: | |
| python backend/summarization/english_summary.py --file "articles/english/categories/sports/1_feb_2_30_pm.json" | |
| python backend/summarization/english_summary.py --folder "articles/english/categories/sports" | |
| python backend/summarization/english_summary.py --category sports | |
| python backend/summarization/english_summary.py --file "..." --no-dedup | |
| """ | |
| import json | |
| import sys | |
| import os | |
| import time | |
| import argparse | |
| from pathlib import Path | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import List, Dict, Optional | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| # Add project root to path | |
| sys.path.append(str(Path(__file__).parent.parent.parent)) | |
| from backend.common.colors import Colors, Log | |
| from backend.common.paths import get_project_root | |
| from backend.utils.cloudinary_utils import upload_to_cloudinary | |
| from backend.summarization.model import get_summarizer | |
| from backend.summarization.utils import clean_text, should_summarize | |
| load_dotenv() | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Configuration | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| class Config: | |
| MAX_WORKERS = int(os.getenv('SUMMARIZATION_MAX_WORKERS', '4')) | |
| MAX_SUMMARY_WORDS = 150 | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Progress Tracking | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| class ProgressBar: | |
| def __init__(self, total: int, width: int = 40): | |
| self.total = total | |
| self.width = width | |
| self.current = 0 | |
| self.success_count = 0 | |
| self.fail_count = 0 | |
| self.start_time = time.time() | |
| def update(self, success: bool = True): | |
| self.current += 1 | |
| if success: | |
| self.success_count += 1 | |
| else: | |
| self.fail_count += 1 | |
| self._render() | |
| def _render(self): | |
| percent = (self.current / self.total) * 100 | |
| filled = int(self.width * self.current / self.total) | |
| bar = 'β' * filled + 'β' * (self.width - filled) | |
| elapsed = time.time() - self.start_time | |
| if self.current > 0: | |
| eta = (elapsed / self.current) * (self.total - self.current) | |
| eta_str = f"{int(eta)}s" if eta < 60 else f"{int(eta/60)}m {int(eta%60)}s" | |
| else: | |
| eta_str = "calculating..." | |
| status = f"\r[{bar}] {self.current}/{self.total} ({percent:.1f}%) | β {self.success_count} β {self.fail_count} | ETA: {eta_str}" | |
| sys.stdout.write(status) | |
| sys.stdout.flush() | |
| def finish(self): | |
| elapsed = time.time() - self.start_time | |
| print(f"\n{Colors.DIM}Completed in {elapsed:.2f}s{Colors.RESET}") | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # File Management | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| class FileManager: | |
| def get_summarized_dir(): | |
| return get_project_root() / "summarized-articles" | |
| def ensure_category_dir(category_path: str): | |
| """Create output folder mirroring the input article path structure.""" | |
| summarized_dir = FileManager.get_summarized_dir() | |
| path_obj = Path(category_path) | |
| language = "english" | |
| if "hindi" in path_obj.parts: | |
| language = "hindi" | |
| if "categories" in path_obj.parts: | |
| idx = path_obj.parts.index("categories") | |
| category_name = path_obj.parts[idx + 1] if idx + 1 < len(path_obj.parts) else "unknown" | |
| elif "search_queries" in path_obj.parts: | |
| idx = path_obj.parts.index("search_queries") | |
| category_name = path_obj.parts[idx + 1] if idx + 1 < len(path_obj.parts) else "unknown" | |
| else: | |
| category_name = "unknown" | |
| parent_folder = "search_queries" if "search_queries" in path_obj.parts else "categories" | |
| category_dir = summarized_dir / language / parent_folder / category_name | |
| category_dir.mkdir(parents=True, exist_ok=True) | |
| return category_dir | |
| def get_output_path(input_path: Path): | |
| category_dir = FileManager.ensure_category_dir(str(input_path)) | |
| return category_dir / input_path.name | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Article Processor | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| class ArticleProcessor: | |
| def __init__(self): | |
| Log.info("Initializing summarization model...") | |
| self.summarizer = get_summarizer() | |
| def process_article(self, article: Dict) -> Optional[Dict]: | |
| """Process a single article: clean text and generate summary.""" | |
| try: | |
| content = article.get('content', '') | |
| if not content: | |
| return None | |
| cleaned_content = clean_text(content) | |
| if not should_summarize(cleaned_content): | |
| summarized_article = article.copy() | |
| summarized_article['summary'] = cleaned_content | |
| summarized_article['summarized'] = False | |
| return summarized_article | |
| language = self._resolve_language(article) | |
| summary = self.summarizer.summarize( | |
| cleaned_content, | |
| max_words=Config.MAX_SUMMARY_WORDS, | |
| language=language | |
| ) | |
| summarized_article = article.copy() | |
| summarized_article['summary'] = summary | |
| summarized_article['summarized'] = True | |
| summarized_article['summary_generated_at'] = datetime.now().isoformat() | |
| return summarized_article | |
| except Exception as e: | |
| Log.error(f"Failed to process article: {str(e)[:50]}") | |
| return None | |
| def _resolve_language(article: Dict) -> str: | |
| language = str(article.get("language", "english")).strip().lower() | |
| if language in {"english", "hindi"}: | |
| return language | |
| return "english" | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # File Operations | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_json_file(filepath: Path) -> List[Dict]: | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if isinstance(data, list): | |
| return data | |
| elif isinstance(data, dict): | |
| return [data] | |
| else: | |
| return [] | |
| except Exception as e: | |
| Log.error(f"Failed to load {filepath.name}: {str(e)[:50]}") | |
| return [] | |
| def filter_processed_articles(articles: List[Dict]) -> List[Dict]: | |
| """Filter out articles that are already in the Supabase registry.""" | |
| if not articles: | |
| return [] | |
| try: | |
| from backend.utils.db_utils import DatabaseManager | |
| db = DatabaseManager() | |
| article_ids = [a.get('id') for a in articles if a.get('id')] | |
| if not article_ids: | |
| return articles | |
| existing_ids = db.check_registry(article_ids) | |
| if existing_ids: | |
| Log.info(f"Found {len(existing_ids)} articles already in registry. Skipping them.") | |
| return [a for a in articles if a.get('id') not in existing_ids] | |
| return articles | |
| except ImportError: | |
| Log.warning("db_utils not found or Supabase not configured. Skipping registry check.") | |
| return articles | |
| except Exception as e: | |
| Log.error(f"Error checking registry: {str(e)}") | |
| return articles | |
| def save_json_file(articles: List[Dict], filepath: Path): | |
| try: | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(articles, f, indent=2, ensure_ascii=False) | |
| return True | |
| except Exception as e: | |
| Log.error(f"Failed to save {filepath.name}: {str(e)[:50]}") | |
| return False | |
| def find_json_files(folder_path: Path) -> List[Path]: | |
| if not folder_path.exists() or not folder_path.is_dir(): | |
| return [] | |
| return [f for f in folder_path.rglob("*.json") if f.is_file()] | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Processing Functions | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_single_file(filepath: Path, processor: ArticleProcessor, no_dedup: bool = False): | |
| Log.info(f"Processing: {Colors.CYAN}{filepath.name}{Colors.RESET}") | |
| articles = load_json_file(filepath) | |
| if not articles: | |
| Log.error(f"No articles found in {filepath.name}") | |
| return False | |
| total_scraped = len(articles) | |
| if no_dedup: | |
| filtered_articles = articles | |
| already_processed_count = 0 | |
| else: | |
| filtered_articles = filter_processed_articles(articles) | |
| already_processed_count = total_scraped - len(filtered_articles) | |
| articles = filtered_articles | |
| total_articles = len(articles) | |
| print(f"\n{Colors.BOLD}{Colors.CYAN}--- Processing Funnel ---{Colors.RESET}") | |
| print(f"Total scraped : {total_scraped}") | |
| if no_dedup: | |
| print(f"Deduplication : {Colors.YELLOW}DISABLED (--no-dedup){Colors.RESET}") | |
| else: | |
| print(f"Already processed : {already_processed_count}") | |
| print(f"New articles to do: {Colors.GREEN}{total_articles}{Colors.RESET}") | |
| print(f"{Colors.BOLD}{Colors.CYAN}-------------------------{Colors.RESET}\n") | |
| if not articles: | |
| Log.info("All articles in this file have been processed already.") | |
| return False | |
| summarized_articles = [] | |
| progress = ProgressBar(len(articles)) | |
| with ThreadPoolExecutor(max_workers=Config.MAX_WORKERS) as executor: | |
| futures = {executor.submit(processor.process_article, article): article for article in articles} | |
| for future in as_completed(futures): | |
| result = future.result() | |
| if result: | |
| summarized_articles.append(result) | |
| progress.update(success=True) | |
| else: | |
| progress.update(success=False) | |
| progress.finish() | |
| if summarized_articles: | |
| output_path = FileManager.get_output_path(filepath) | |
| if save_json_file(summarized_articles, output_path): | |
| actually_summarized = sum(1 for a in summarized_articles if a.get('summarized', False)) | |
| file_size = os.path.getsize(output_path) / 1024 | |
| Log.success(f"Saved {len(summarized_articles)} articles ({actually_summarized} summarized)") | |
| Log.info(f"Output: {Colors.CYAN}{output_path}{Colors.RESET}") | |
| Log.info(f"Size: {file_size:.2f} KB\n") | |
| # Cloudinary Upload | |
| try: | |
| project_root = get_project_root() | |
| relative_folder = output_path.parent.relative_to(project_root) | |
| cloud_folder = str(relative_folder).replace("\\", "/") | |
| Log.info(f"Uploading summary to Cloudinary: {cloud_folder}") | |
| upload_to_cloudinary(str(output_path), cloud_folder, resource_type="raw") | |
| except Exception as e: | |
| Log.error(f"Cloudinary upload failed: {e}") | |
| return True | |
| else: | |
| return False | |
| else: | |
| Log.warning("No articles processed successfully\n") | |
| return False | |
| def process_folder(folder_path: Path, processor: ArticleProcessor, no_dedup: bool = False): | |
| Log.info(f"Scanning folder: {Colors.CYAN}{folder_path}{Colors.RESET}") | |
| json_files = find_json_files(folder_path) | |
| if not json_files: | |
| Log.error("No JSON files found in folder") | |
| return | |
| Log.success(f"Found {len(json_files)} JSON files\n") | |
| total_success = 0 | |
| total_failed = 0 | |
| for idx, json_file in enumerate(json_files, 1): | |
| print(f"{Colors.BOLD}File {idx}/{len(json_files)}{Colors.RESET}") | |
| if process_single_file(json_file, processor, no_dedup=no_dedup): | |
| total_success += 1 | |
| else: | |
| total_failed += 1 | |
| print(f"\n{Colors.BOLD}{Colors.CYAN}Summary{Colors.RESET}") | |
| print(f"Total Files: {len(json_files)}") | |
| print(f"Successful: {Colors.GREEN}{total_success}{Colors.RESET}") | |
| print(f"Failed: {Colors.RED}{total_failed}{Colors.RESET}\n") | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # CLI | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def parse_args(): | |
| parser = argparse.ArgumentParser( | |
| description="English News Article Summarizer", | |
| formatter_class=argparse.RawTextHelpFormatter | |
| ) | |
| group = parser.add_mutually_exclusive_group(required=True) | |
| group.add_argument("--file", "-f", help="Path to a single JSON file to summarize") | |
| group.add_argument("--folder", "-d", help="Path to a folder of JSON files to summarize") | |
| group.add_argument("--category", "-c", help="Category name to summarize (e.g. sports, technology)") | |
| parser.add_argument("--no-dedup", action="store_true", help="Skip checking Supabase for already-processed articles") | |
| return parser.parse_args() | |
| def main(): | |
| try: | |
| args = parse_args() | |
| except SystemExit: | |
| if len(sys.argv) == 1: | |
| print(f""" | |
| {Colors.BOLD}{Colors.CYAN}English News Article Summarizer{Colors.RESET} | |
| {Colors.BOLD}Usage:{Colors.RESET} | |
| python english_summary.py --file "path/to/file.json" | |
| python english_summary.py --folder "path/to/folder" | |
| python english_summary.py --category "sports" | |
| {Colors.BOLD}Output:{Colors.RESET} | |
| Files are saved to: summarized-articles/english/categories/{{category}}/ | |
| """) | |
| sys.exit(0) | |
| raise | |
| mode = None | |
| path = None | |
| if args.file: | |
| mode = "file" | |
| path = args.file | |
| elif args.folder: | |
| mode = "folder" | |
| path = args.folder | |
| elif args.category: | |
| mode = "folder" | |
| path = f"articles/categories/{args.category}" | |
| path_obj = Path(path) | |
| if path_obj.exists(): | |
| path_obj = path_obj.resolve() | |
| else: | |
| project_root = get_project_root() | |
| project_path = project_root / path | |
| if project_path.exists(): | |
| path_obj = project_path.resolve() | |
| else: | |
| if args.category: | |
| Log.error(f"Category folder not found: {project_path}") | |
| try: | |
| cats_dir = project_root / "articles" / "categories" | |
| if cats_dir.exists(): | |
| val_cats = [d.name for d in cats_dir.iterdir() if d.is_dir()] | |
| if val_cats: | |
| print(f"Valid categories: {', '.join(val_cats)}") | |
| except: | |
| pass | |
| sys.exit(1) | |
| path_obj = project_path | |
| if not path_obj.exists(): | |
| Log.error(f"Path does not exist: {path}") | |
| sys.exit(1) | |
| processor = ArticleProcessor() | |
| print(f"\n{Colors.BOLD}{Colors.CYAN}Article Summarization{Colors.RESET}\n") | |
| if mode == "file": | |
| if not path_obj.is_file(): | |
| Log.error(f"Not a file: {path}") | |
| sys.exit(1) | |
| if process_single_file(path_obj, processor, no_dedup=args.no_dedup): | |
| print(f"{Colors.GREEN}Summarization completed!{Colors.RESET}\n") | |
| else: | |
| print(f"{Colors.RED}Summarization failed{Colors.RESET}\n") | |
| sys.exit(1) | |
| elif mode == "folder": | |
| if not path_obj.is_dir(): | |
| Log.error(f"Not a folder: {path}") | |
| sys.exit(1) | |
| process_folder(path_obj, processor, no_dedup=args.no_dedup) | |
| print(f"{Colors.GREEN}Batch summarization completed!{Colors.RESET}\n") | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print(f"\n\n{Colors.YELLOW}Summarization cancelled by user{Colors.RESET}\n") | |
| sys.exit(0) | |
| except Exception as e: | |
| Log.error(f"Critical error: {str(e)}") | |
| raise | |