Spaces:
Build error
Build error
| #main.py | |
| import re | |
| import time | |
| import os | |
| import logging | |
| from typing import List, Dict, Optional, Set, Tuple | |
| import google_auth_oauthlib.flow | |
| import googleapiclient.discovery | |
| import googleapiclient.errors | |
| from google_auth_oauthlib.flow import Flow | |
| from google.oauth2.credentials import Credentials | |
| from googleapiclient.discovery import build | |
| from fastapi import FastAPI, Request, Form, File, UploadFile, HTTPException, Depends | |
| from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.security import OAuth2PasswordBearer | |
| from google.oauth2.credentials import Credentials | |
| from pydantic import BaseModel | |
| import unicodedata | |
| import unidecode | |
| import io | |
| import pandas as pd | |
| import json | |
| from dotenv import load_dotenv | |
| # For monitoring with Prometheus | |
| load_dotenv() | |
| # Configure logging at the top of the file | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - [%(levelname)s] %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| VISUAL_MAP = { | |
| 'А': 'A','В': 'B','С': 'C','Е': 'E','Н': 'H','К': 'K','М': 'M','О': 'O','Р': 'P','Т': 'T','Х': 'X', | |
| 'а': 'a','в': 'b','с': 'c','е': 'e','о': 'o','р': 'p','х': 'x','у': 'y', | |
| 'Я': 'R','я': 'r', | |
| 'ρ': 'p', | |
| 'Π': 'P', | |
| # etc... | |
| } | |
| # At the top of your main.py, after your imports: | |
| # In a real DB model, you'd do this in a table. | |
| # But for demonstration, let's store it in memory: | |
| manual_overrides = {} | |
| # This might be a class-level dict keyed by comment_id or (video_id, comment_id) | |
| from google.oauth2 import service_account | |
| def get_google_credentials(): | |
| if os.getenv("HF_SPACE") == "true": | |
| # In Hugging Face Spaces: load from secrets | |
| service_account_str = os.getenv("GOOGLE_SERVICE_ACCOUNT_JSON") | |
| if not service_account_str: | |
| raise RuntimeError("Missing GOOGLE_SERVICE_ACCOUNT_JSON in Hugging Face secret.") | |
| service_account_info = json.loads(service_account_str) | |
| credentials = service_account.Credentials.from_service_account_info(service_account_info) | |
| # Attach the service account info so we can retrieve it later | |
| credentials._sa_info = service_account_info | |
| return credentials | |
| else: | |
| # Local development: use OAuth flow | |
| flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file( | |
| "./app/client_secret.json", | |
| scopes=[ | |
| "https://www.googleapis.com/auth/youtube.readonly", | |
| "https://www.googleapis.com/auth/youtube.force-ssl" | |
| ], | |
| redirect_uri=os.getenv('YOUTUBE_REDIRECT_URI') | |
| ) | |
| return flow.run_local_server(port=0) | |
| def keep_comment(comment_id: str, video_id: str): | |
| # Mark this comment as manually kept | |
| manual_overrides[(video_id, comment_id)] = "safe" | |
| # --- GamblingFilter class (with rule updates) --- | |
| class GamblingFilter: | |
| """ | |
| A high-performance filter for detecting online gambling-related comments. | |
| Features include aggressive Unicode normalization, keyword matching, and pattern detection. | |
| """ | |
| def __init__(self): | |
| logger.info("Initializing GamblingFilter") | |
| self._platform_names: Set[str] = { | |
| 'agustoto', 'aero', 'aero88', 'dora', 'dora77', 'dewadora', 'pulau777', 'pulau', '777', | |
| 'jptogel', 'mandalika', 'cnd88', 'axl', 'berkah99', 'weton88', 'garuda', 'hoki' | |
| } | |
| self._gambling_terms: Set[str] = { | |
| 'jackpot', 'jp', 'wd', 'depo', 'cuan', 'gacor', 'gacir', 'jekpot', 'sultan', | |
| 'rezeki nomplok', 'rezeki', 'menang', 'nomplok', 'deposit', 'withdraw', 'maxwin', | |
| 'auto sultan', 'jepe', 'jepee', 'bikin nagih', 'berkah' | |
| } | |
| self._ambiguous_terms: Set[str] = { | |
| 'auto', 'main', 'bermain', 'hasil', 'dapat', 'dapet', 'berkat' | |
| } | |
| self._safe_indicators: Set[str] = { | |
| 'tidak mengandung', 'bukan perjudian', 'tanpa perjudian', | |
| 'dokumentasi', 'profesional', 'pembelajaran' | |
| } | |
| self._gambling_contexts: List[str] = [ | |
| r'(main|bermain|coba).{1,30}(dapat|dapet|pro|jadi|langsung|menang|jp|cuan)', | |
| r'(modal|depo).{1,30}(jadi|langsung|wd|cuan)', | |
| r'(jp|jackpot|jekpot).{1,30}(gede|besar|pecah)', | |
| r'(berkat|dari).{1,30}(rezeki|menang|cuan|sultan)', | |
| r'(gacor|gacir).{1,30}(terus|parah|tiap|hari)', | |
| r'(rezeki|cuan).{1,30}(nomplok|datang|mengalir|lancar)', | |
| r'(hari ini).{1,30}(menang|cuan|rezeki|berkat)', | |
| r'(malah|eh).{1,30}(jadi|dapat|dapet|rezeki)', | |
| r'(auto).{1,30}(sultan|cuan|rezeki|kaya)', | |
| r'(0\d:[0-5]\d).{1,30}(menang|rezeki|cuan|gacor)', | |
| r'(iseng|coba).{1,30}(malah|jadi|eh|pro)', | |
| r'(deposit|depo|wd).{1,30}(jadi|langsung)', | |
| r'(langsung|auto).{1,30}(jp|cuan|sultan|rezeki)', | |
| r'bikin\s+nagih', | |
| r'gak\s+ada\s+duanya', | |
| r'berkah.{0,20}rezeki', | |
| r'puji\s+syukur' | |
| ] | |
| self._compiled_gambling_contexts = [ | |
| re.compile(pattern, re.IGNORECASE | re.DOTALL) | |
| for pattern in self._gambling_contexts | |
| ] | |
| self._update_platform_pattern() | |
| self._number_pattern = re.compile(r'(88|777|77|99|7+)') | |
| def _update_platform_pattern(self): | |
| """Recompile the platform name regex based on current _platform_names.""" | |
| platform_patterns = [] | |
| for platform in self._platform_names: | |
| # chars = list(platform) | |
| # strict = ''.join(f'[{c.upper()}{c.lower()}][^a-zA-Z0-9]*' for c in chars[:-1]) + f'[{chars[-1].upper()}{chars[-1].lower()}]' | |
| # flexible = '.*?'.join(re.escape(c) for c in chars) | |
| # platform_patterns.append(f'({strict})') | |
| # platform_patterns.append(f'({flexible})') | |
| chars = list(platform) # e.g. ['p', 'u', 'l', 'a', 'u'] | |
| # Each letter can be followed by up to 3 non-alphanumeric chars: | |
| # (or fewer if you want to be more strict) | |
| segments = [ | |
| f'[{c.upper()}{c.lower()}][^a-zA-Z0-9]{{0,3}}' | |
| for c in chars[:-1] | |
| ] | |
| # Then the last char without trailing non-alphanumerics | |
| segments.append(f'[{chars[-1].upper()}{chars[-1].lower()}]') | |
| strict = ''.join(segments) | |
| platform_patterns.append(strict) | |
| self._platform_pattern = re.compile('|'.join(platform_patterns), re.DOTALL) | |
| def add_rule(self, rule_type: str, rule_value: str): | |
| """ | |
| Add a new rule based on the rule type. | |
| Allowed types: 'platform', 'gambling_term', 'safe_indicator', 'gambling_context', 'ambiguous_term' | |
| """ | |
| rule_type = rule_type.lower() | |
| if rule_type == 'platform': | |
| self._platform_names.add(rule_value) | |
| self._update_platform_pattern() | |
| elif rule_type == 'gambling_term': | |
| self._gambling_terms.add(rule_value) | |
| elif rule_type == 'safe_indicator': | |
| self._safe_indicators.add(rule_value) | |
| elif rule_type == 'gambling_context': | |
| self._gambling_contexts.append(rule_value) | |
| self._compiled_gambling_contexts.append(re.compile(rule_value, re.IGNORECASE | re.DOTALL)) | |
| elif rule_type == 'ambiguous_term': | |
| self._ambiguous_terms.add(rule_value) | |
| else: | |
| raise ValueError("Unsupported rule type") | |
| def _strip_all_formatting(self, text: str) -> str: | |
| result = [] | |
| for c in text: | |
| if c.isalnum() or c.isspace(): | |
| result.append(c.lower()) | |
| return ''.join(result) | |
| def _aggressive_normalize_text(self, text: str) -> str: | |
| normalized = unicodedata.normalize('NFKD', text) | |
| ascii_text = ''.join(c for c in normalized if ord(c) < 128) | |
| return ascii_text.lower() | |
| def _robust_normalize(self, text: str) -> str: | |
| """ | |
| 1) Replace visually-similar letters (Cyrillic/Greek) with Latin equivalents. | |
| 2) Then use unidecode to handle bold/italic forms, fullwidth, etc. | |
| 3) Lowercase the result. | |
| """ | |
| # Step 1: custom pass for visual lookalikes | |
| mapped_chars = [] | |
| for ch in text: | |
| if ch in VISUAL_MAP: | |
| mapped_chars.append(VISUAL_MAP[ch]) | |
| else: | |
| mapped_chars.append(ch) | |
| mapped_text = ''.join(mapped_chars) | |
| # Step 2: apply normal Unicode decomposition + unidecode | |
| # This handles bold/italic/mathematical letters, fullwidth forms, etc. | |
| decomposed = unicodedata.normalize('NFKD', mapped_text) | |
| ascii_equiv = unidecode.unidecode(decomposed) | |
| # Step 3: lowercase the result | |
| return ascii_equiv.lower() | |
| def _extract_platform_names(self, text: str) -> List[str]: | |
| matches = [] | |
| pattern_matches = self._platform_pattern.findall(text) | |
| if pattern_matches: | |
| pattern_matches = [m for sublist in pattern_matches for m in sublist if m] | |
| matches.extend(pattern_matches) | |
| normalized = self._robust_normalize(text) | |
| stripped = self._strip_all_formatting(text) | |
| for platform in self._platform_names: | |
| if platform in normalized or platform in stripped: | |
| if not any(platform in m.lower() for m in matches): | |
| matches.append(platform) | |
| if '88' in text or '88' in normalized: | |
| if not any('88' in m for m in matches): | |
| matches.append('88') | |
| if '777' in text or '777' in normalized: | |
| if not any('777' in m for m in matches): | |
| matches.append('777') | |
| return matches | |
| def normalize_text(self, text: str) -> str: | |
| normalized = unicodedata.normalize('NFKD', text) | |
| normalized = ''.join(c for c in normalized if ord(c) < 128 or c.isspace()) | |
| return normalized.lower() | |
| def is_gambling_comment(self, text: str, threshold: float = 0.55) -> Tuple[bool, Dict]: | |
| start_time = time.time() | |
| logger.info(f"Analyzing comment for gambling content: {text[:100]}...") | |
| metrics = { | |
| 'platform_matches': [], | |
| 'gambling_term_matches': [], | |
| 'context_matches': [], | |
| 'safe_indicators': [], | |
| 'has_numbers': False, | |
| 'confidence_score': 0.0, | |
| 'processing_time_ms': 0 | |
| } | |
| normalized_text = self.normalize_text(text) | |
| stripped_text = self._strip_all_formatting(text) | |
| aggressive_text = self._robust_normalize(text) | |
| for indicator in self._safe_indicators: | |
| if indicator in normalized_text.lower(): | |
| metrics['safe_indicators'].append(indicator) | |
| if len(metrics['safe_indicators']) > 0: | |
| metrics['confidence_score'] = 0.0 | |
| metrics['processing_time_ms'] = (time.time() - start_time) * 1000 | |
| return False, metrics | |
| platform_matches = self._extract_platform_names(text) | |
| if platform_matches: | |
| metrics['platform_matches'] = platform_matches | |
| for term in self._gambling_terms: | |
| if (term in normalized_text.lower() or | |
| term in stripped_text.lower() or | |
| term in aggressive_text.lower()): | |
| metrics['gambling_term_matches'].append(term) | |
| if self._number_pattern.search(normalized_text): | |
| metrics['has_numbers'] = True | |
| for pattern in self._compiled_gambling_contexts: | |
| match = pattern.search(normalized_text) | |
| if match: | |
| metrics['context_matches'].append(match.group(0)) | |
| match = pattern.search(aggressive_text) | |
| if match and match.group(0) not in metrics['context_matches']: | |
| metrics['context_matches'].append(match.group(0)) | |
| platform_score = min(len(metrics['platform_matches']) * 1.0, 1) | |
| term_score = min(len(metrics['gambling_term_matches']) * 0.2, 0.4) | |
| context_score = min(len(metrics['context_matches']) * 0.2, 0.4) | |
| number_score = 0.1 if metrics['has_numbers'] else 0 | |
| if platform_score > 0 and (term_score > 0 or context_score > 0): | |
| total_score = platform_score + term_score + context_score + number_score | |
| elif context_score > 0.2 and term_score > 0: | |
| total_score = context_score + term_score + number_score | |
| else: | |
| total_score = max(platform_score, term_score, context_score) * 0.8 | |
| metrics['confidence_score'] = min(total_score, 1.0) | |
| if ("berkah" in normalized_text.lower() or "berkah" in aggressive_text.lower()) and \ | |
| ("rezeki" in normalized_text.lower() or "rezeki" in aggressive_text.lower()) and \ | |
| len(metrics['platform_matches']) > 0: | |
| metrics['confidence_score'] = max(metrics['confidence_score'], 0.7) | |
| if "Special case: berkah+rezeki+platform" not in metrics['context_matches']: | |
| metrics['context_matches'].append("Special case: berkah+rezeki+platform") | |
| elif ("puji" in normalized_text.lower() or "puji" in aggressive_text.lower()) and \ | |
| ("syukur" in normalized_text.lower() or "syukur" in aggressive_text.lower()) and \ | |
| len(metrics['platform_matches']) > 0: | |
| metrics['confidence_score'] = max(metrics['confidence_score'], 0.7) | |
| if "Special case: puji+syukur+platform" not in metrics['context_matches']: | |
| metrics['context_matches'].append("Special case: puji+syukur+platform") | |
| metrics['processing_time_ms'] = (time.time() - start_time) * 1000 | |
| is_gambling = metrics['confidence_score'] >= threshold | |
| return is_gambling, metrics | |
| def filter_comments(self, comments: List[str], threshold: float = 0.55) -> Dict[str, List]: | |
| result = { | |
| 'gambling_comments': [], | |
| 'safe_comments': [], | |
| 'metrics': [] | |
| } | |
| for comment in comments: | |
| is_gambling, metrics = self.is_gambling_comment(comment, threshold) | |
| if is_gambling: | |
| result['gambling_comments'].append(comment) | |
| else: | |
| result['safe_comments'].append(comment) | |
| metrics['original_text'] = comment | |
| result['metrics'].append(metrics) | |
| return result | |
| class YouTubeCommentModerator: | |
| def __init__(self, | |
| client_secrets_path: str = "./app/client_secret.json", | |
| gambling_filter: Optional[GamblingFilter] = None): | |
| """ | |
| Initialize the YouTube Comment Moderator with configurable settings. | |
| :param client_secrets_path: Path to OAuth 2.0 client secrets file | |
| :param gambling_filter: Optional pre-configured GamblingFilter instance | |
| """ | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - [%(levelname)s] %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| self.logger = logging.getLogger(__name__) | |
| # OAuth configuration | |
| self.client_secrets_path = client_secrets_path | |
| self.scopes = [ | |
| "https://www.googleapis.com/auth/youtube.readonly", | |
| "https://www.googleapis.com/auth/youtube.force-ssl" | |
| ] | |
| # Disable OAuthlib's HTTPS verification when running locally | |
| os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" | |
| # YouTube service | |
| self.youtube_service = None | |
| # Gambling Filter | |
| self.gambling_filter = gambling_filter or GamblingFilter() | |
| def authenticate(self) -> bool: | |
| """ | |
| Authenticate with YouTube Data API. | |
| :return: Boolean indicating successful authentication | |
| """ | |
| try: | |
| # flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file( | |
| # self.client_secrets_path, self.scopes) | |
| credentials = get_google_credentials() | |
| self.youtube_service = googleapiclient.discovery.build( | |
| "youtube", "v3", credentials=credentials | |
| ) | |
| self.logger.info("YouTube API authentication successful.") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Authentication failed: {e}") | |
| return False | |
| def moderate_video_comments(self, video_id: str, threshold: float = 0.55) -> Dict: | |
| if not self.youtube_service: | |
| self.logger.error("YouTube service not authenticated.") | |
| return {"error": "Not authenticated"} | |
| try: | |
| comments = [] | |
| request = self.youtube_service.commentThreads().list( | |
| part="snippet", | |
| videoId=video_id, | |
| maxResults=100, | |
| textFormat="plainText" | |
| ) | |
| response = request.execute() | |
| moderation_results = { | |
| "total_comments": 0, | |
| "gambling_comments": [], | |
| "safe_comments": [], | |
| "moderation_metrics": [] | |
| } | |
| while request is not None: | |
| for item in response.get("items", []): | |
| comment_id = item["snippet"]["topLevelComment"]["id"] | |
| comment_snippet = item["snippet"]["topLevelComment"]["snippet"] | |
| comment_text = comment_snippet["textDisplay"] | |
| # Check for manual override first | |
| if manual_overrides.get((video_id, comment_id)) == "safe": | |
| # The user previously pressed "Keep" - skip the gambling filter | |
| is_gambling = False | |
| metrics = {"confidence_score": 0.0} | |
| else: | |
| # Normal path - filter it | |
| is_gambling, metrics = self.gambling_filter.is_gambling_comment(comment_text, threshold) | |
| comment_info = { | |
| "id": comment_id, | |
| "text": comment_text, | |
| "author": comment_snippet["authorDisplayName"], | |
| "is_gambling": is_gambling, | |
| "metrics": metrics | |
| } | |
| moderation_results["total_comments"] += 1 | |
| if is_gambling: | |
| moderation_results["gambling_comments"].append(comment_info) | |
| else: | |
| moderation_results["safe_comments"].append(comment_info) | |
| metrics["original_text"] = comment_text | |
| moderation_results["moderation_metrics"].append(metrics) | |
| # Handle pagination if available | |
| request = self.youtube_service.commentThreads().list_next(request, response) | |
| if request: | |
| response = request.execute() | |
| else: | |
| break | |
| return moderation_results | |
| except Exception as e: | |
| self.logger.error(f"Error moderating comments: {e}") | |
| return {"error": str(e)} | |
| def delete_comment(self, comment_id: str) -> bool: | |
| """ | |
| Delete a specific comment. | |
| :param comment_id: YouTube comment ID | |
| :return: Boolean indicating successful deletion | |
| """ | |
| try: | |
| # self.youtube_service.comments().delete(id=comment_id).execute() | |
| self.youtube_service.comments().setModerationStatus( | |
| id=comment_id, | |
| moderationStatus="rejected" | |
| ).execute() | |
| self.logger.info(f"Comment {comment_id} deleted successfully.") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Failed to delete comment {comment_id}: {e}") | |
| return False | |
| def get_channel_videos(self, max_results: int = 50) -> List[Dict]: | |
| """ | |
| Retrieve videos from authenticated user's channel. | |
| :param max_results: Maximum number of videos to retrieve | |
| :return: List of video details | |
| """ | |
| if not self.youtube_service: | |
| self.logger.error("YouTube service not authenticated.") | |
| return [] | |
| try: | |
| request = self.youtube_service.search().list( | |
| part="snippet", | |
| channelId=self._get_channel_id(), | |
| maxResults=max_results, | |
| type="video" | |
| ) | |
| response = request.execute() | |
| videos = [] | |
| for item in response.get("items", []): | |
| video_info = { | |
| "id": item["id"]["videoId"], | |
| "title": item["snippet"]["title"], | |
| "thumbnail": item["snippet"]["thumbnails"]["default"]["url"] | |
| } | |
| videos.append(video_info) | |
| return videos | |
| except Exception as e: | |
| self.logger.error(f"Error retrieving videos: {e}") | |
| return [] | |
| def _get_channel_id(self) -> Optional[str]: | |
| """ | |
| Retrieve the authenticated user's channel ID. | |
| :return: Channel ID or None | |
| """ | |
| try: | |
| request = self.youtube_service.channels().list(part="id", mine=True) | |
| response = request.execute() | |
| return response["items"][0]["id"] | |
| except Exception as e: | |
| self.logger.error(f"Error retrieving channel ID: {e}") | |
| return None | |
| class User(BaseModel): | |
| username: str | |
| email: Optional[str] = None | |
| youtube_credentials: Optional[Dict] = None | |
| class UserDatabase: | |
| """ | |
| In-memory user database. In a production app, | |
| replace with a proper database like SQLAlchemy | |
| """ | |
| users = {} | |
| def create_user(cls, username: str, credentials: Dict): | |
| user = User(username=username, youtube_credentials=credentials) | |
| cls.users[username] = user | |
| return user | |
| def get_user(cls, username: str): | |
| return cls.users.get(username) | |
| class YouTubeAuthenticator: | |
| def authenticate_with_client_secrets(client_secrets_file=None): | |
| try: | |
| credentials = get_google_credentials() | |
| return credentials | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Authentication failed: {str(e)}") | |
| # --- FastAPI application setup --- | |
| app = FastAPI() | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| templates = Jinja2Templates(directory="templates") | |
| # Create a single instance of the GamblingFilter | |
| filter_instance = GamblingFilter() | |
| # ----Google ---- | |
| class GoogleOAuthHandler: | |
| def __init__(self): | |
| # Configuration paths and settings | |
| self.client_secrets_file = "./app/client_secret.json" | |
| self.scopes = [ | |
| 'https://www.googleapis.com/auth/youtube.readonly', | |
| 'https://www.googleapis.com/auth/userinfo.profile' | |
| ] | |
| self.redirect_uri = os.getenv('YOUTUBE_REDIRECT_URI', 'http://localhost:8000/oauth/callback') | |
| def create_oauth_flow(self): | |
| """ | |
| Create OAuth 2.0 Flow for Google Authorization | |
| """ | |
| flow = Flow.from_client_secrets_file( | |
| self.client_secrets_file, | |
| scopes=self.scopes, | |
| redirect_uri=self.redirect_uri | |
| ) | |
| return flow | |
| def initiate_oauth_flow(self): | |
| """ | |
| Generate Authorization URL for OAuth Flow | |
| This method can be called when you want to start the OAuth authentication process. | |
| In your case, it would be triggered from the login route. | |
| """ | |
| flow = self.create_oauth_flow() | |
| # Generate authorization URL | |
| authorization_url, state = flow.authorization_url( | |
| access_type='offline', # Ensures we get a refresh token | |
| prompt='consent', # Forces user to see and accept consent screen | |
| include_granted_scopes='true' | |
| ) | |
| return authorization_url | |
| def handle_oauth_callback(self, authorization_code): | |
| """ | |
| Handle the OAuth callback and retrieve user credentials | |
| This method exchanges the authorization code for access and refresh tokens | |
| """ | |
| try: | |
| # Create flow and exchange authorization code for tokens | |
| flow = self.create_oauth_flow() | |
| flow.fetch_token(code=authorization_code) | |
| # Get credentials | |
| credentials = flow.credentials | |
| # Fetch user information | |
| oauth2_client = build('oauth2', 'v2', credentials=credentials) | |
| user_info = oauth2_client.userinfo().get().execute() | |
| # Build YouTube service to get channel details | |
| youtube_service = build('youtube', 'v3', credentials=credentials) | |
| channel_req = youtube_service.channels().list(part="snippet", mine=True) | |
| channel_resp = channel_req.execute() | |
| # Extract channel username or use email as fallback | |
| if "items" in channel_resp and len(channel_resp["items"]) > 0: | |
| channel_username = channel_resp['items'][0]['snippet']['title'] | |
| else: | |
| channel_username = user_info.get('email', 'unknown_user') | |
| # Convert credentials to dict for storage | |
| credentials_dict = { | |
| 'token': credentials.token, | |
| 'refresh_token': credentials.refresh_token, | |
| 'token_uri': credentials.token_uri, | |
| 'client_id': credentials.client_id, | |
| 'client_secret': credentials.client_secret, | |
| 'scopes': credentials.scopes | |
| } | |
| return { | |
| 'username': channel_username, | |
| 'credentials': credentials_dict, | |
| 'user_info': user_info | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"OAuth callback failed: {str(e)}") | |
| moderator = YouTubeCommentModerator(gambling_filter=filter_instance) | |
| async def moderate_video(request: Request, video_id: str = Form(...), threshold: float = Form(0.55)): | |
| if not moderator.youtube_service: | |
| result = {"error": "YouTube service not authenticated. Please authenticate first."} | |
| else: | |
| result = moderator.moderate_video_comments(video_id, threshold) | |
| return templates.TemplateResponse("index.html", { | |
| "request": request, | |
| "result": result, | |
| "video_id": video_id, | |
| "rules": { | |
| "platform": sorted(list(filter_instance._platform_names)), | |
| "gambling_term": sorted(list(filter_instance._gambling_terms)), | |
| "safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
| "gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
| "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
| } | |
| }) | |
| async def api_delete_comment( | |
| request: Request, | |
| comment_id: str, | |
| video_id: str | |
| ): | |
| current_user = get_current_user_from_cookie(request) | |
| user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
| user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter()) | |
| user_moderator.youtube_service = googleapiclient.discovery.build( | |
| "youtube", "v3", | |
| credentials=user_creds | |
| ) | |
| success = user_moderator.delete_comment(comment_id) | |
| return {"success": success} | |
| # OAuth2 Password Bearer for session management | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | |
| # Hardcoded client secrets path (you'll need to replace this with your actual path) | |
| CLIENT_SECRETS_PATH = "./app/client_secret.json" | |
| # 1) Root route => Decide if user is logged in; if not, go to /login | |
| async def root_redirect(request: Request): | |
| token = request.cookies.get("token") | |
| if token: | |
| return RedirectResponse(url="/videos", status_code=303) | |
| else: | |
| return RedirectResponse(url="/login", status_code=303) | |
| # 2) Show the login form (GET /login) | |
| async def login_form(request: Request): | |
| return templates.TemplateResponse("login.html", {"request": request}) | |
| # 3) Handle login submission (POST /login) => Google OAuth => /videos | |
| async def login( | |
| request: Request, | |
| username: str = Form(None) # Make username optional | |
| ): | |
| try: | |
| # Get credentials (will return service account creds on HF, OAuth creds locally) | |
| credentials = get_google_credentials() | |
| youtube_service = googleapiclient.discovery.build( | |
| "youtube", "v3", credentials=credentials | |
| ) | |
| # If running in Hugging Face Space, use a default username | |
| if os.getenv("HF_SPACE") == "true": | |
| channel_username = "hf_space_user" | |
| else: | |
| req = youtube_service.channels().list(part="snippet", mine=True) | |
| resp = req.execute() | |
| if "items" in resp and len(resp["items"]) > 0: | |
| channel_username = resp['items'][0]['snippet']['title'] | |
| else: | |
| channel_username = "unknown_user" | |
| # Convert credentials to dict for storage | |
| import json | |
| if hasattr(credentials, "to_json"): | |
| credentials_dict = json.loads(credentials.to_json()) | |
| elif hasattr(credentials, "_sa_info"): | |
| credentials_dict = credentials._sa_info | |
| else: | |
| credentials_dict = {} | |
| # Create or update user in our "database" | |
| user = UserDatabase.create_user(channel_username, credentials_dict) | |
| # Determine cookie settings based on environment | |
| if os.getenv("HF_SPACE") == "true": | |
| secure_cookie = True | |
| samesite_value = "none" | |
| else: | |
| secure_cookie = False | |
| samesite_value = "lax" | |
| # Set the user token in a cookie and redirect to /videos | |
| response = RedirectResponse(url="/videos", status_code=303) | |
| response.set_cookie( | |
| key="token", | |
| value=channel_username, | |
| max_age=1800, | |
| httponly=True, | |
| secure=secure_cookie, | |
| samesite=samesite_value | |
| ) | |
| return response | |
| except Exception as e: | |
| return templates.TemplateResponse("login.html", { | |
| "request": request, | |
| "error": f"Authentication failed: {str(e)}" | |
| }) | |
| async def api_keep_comment( | |
| request: Request, | |
| comment_id: str, | |
| video_id: str | |
| ): | |
| try: | |
| logging.debug(f"Received keep request for comment_id: {comment_id}, video_id: {video_id}") | |
| # Get current user's credentials | |
| current_user = get_current_user_from_cookie(request) | |
| logging.debug(f"Current user: {current_user.username}") | |
| user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
| # Create a moderator instance with user credentials | |
| user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter()) | |
| user_moderator.youtube_service = googleapiclient.discovery.build( | |
| "youtube", "v3", | |
| credentials=user_creds | |
| ) | |
| logging.debug("Setting moderation status to 'published' on YouTube...") | |
| # Mark comment as approved on YouTube | |
| result = user_moderator.youtube_service.comments().setModerationStatus( | |
| id=comment_id, | |
| moderationStatus="published" # This marks the comment as approved | |
| ).execute() | |
| logging.debug(f"YouTube API response: {result}") | |
| # Add the comment ID to the manual overrides so it won't be reflagged | |
| keep_comment(comment_id, video_id) # Ensure this function is defined and working | |
| logging.debug("Manual override saved for comment.") | |
| return {"success": True, "message": "Comment kept successfully"} | |
| except Exception as e: | |
| logging.error(f"Error keeping comment: {e}", exc_info=True) | |
| return {"success": False, "error": str(e)} | |
| async def refresh_video_comments( | |
| request: Request, | |
| video_id: str, | |
| threshold: float = 0.55 | |
| ): | |
| """ | |
| Refresh comments for a specific video, reapplying moderation. | |
| :param request: Request object | |
| :param video_id: ID of the video to refresh comments for | |
| :param threshold: Gambling confidence threshold | |
| :return: Rendered template with updated comments | |
| """ | |
| # Get current user's credentials | |
| current_user = get_current_user_from_cookie(request) | |
| if not current_user: | |
| return RedirectResponse(url="/login", status_code=303) | |
| try: | |
| # Recreate moderator with current user's credentials | |
| user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
| moderator = YouTubeCommentModerator(gambling_filter=filter_instance) | |
| moderator.youtube_service = googleapiclient.discovery.build( | |
| "youtube", "v3", | |
| credentials=user_creds | |
| ) | |
| # Moderate comments for the video | |
| result = moderator.moderate_video_comments(video_id, threshold) | |
| # Fetch video details to pass to template | |
| youtube_service = googleapiclient.discovery.build( | |
| "youtube", "v3", | |
| credentials=user_creds | |
| ) | |
| video_request = youtube_service.videos().list( | |
| part="snippet", | |
| id=video_id | |
| ) | |
| video_response = video_request.execute() | |
| video_info = video_response['items'][0]['snippet'] if video_response['items'] else {} | |
| return templates.TemplateResponse("video_comments.html", { | |
| "request": request, | |
| "video": { | |
| "id": video_id, | |
| "title": video_info.get('title', 'Unknown Video') | |
| }, | |
| "safe_comments": result.get('safe_comments', []), | |
| "flagged_comments": result.get('gambling_comments', []), | |
| "total_comments": result.get('total_comments', 0) | |
| }) | |
| except Exception as e: | |
| logging.error(f"Error refreshing comments: {e}") | |
| return templates.TemplateResponse("error.html", { | |
| "request": request, | |
| "error": f"Failed to refresh comments: {str(e)}" | |
| }) | |
| # 4) Protected route to fetch current user from cookie | |
| def get_current_user(token: str = Depends(oauth2_scheme)): | |
| username = token # In a real app, decode/validate token properly | |
| user = UserDatabase.get_user(username) | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Invalid authentication credentials") | |
| return user | |
| def get_current_user_from_cookie(request: Request): | |
| token = request.cookies.get("token") | |
| if not token: | |
| raise HTTPException(status_code=401, detail="Not authenticated") | |
| user = UserDatabase.get_user(token) | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Invalid authentication credentials") | |
| return user | |
| async def auth_exception_handler(request: Request, exc: HTTPException): | |
| if exc.status_code == 401: | |
| # Redirect the user to the login page | |
| return RedirectResponse(url="/login") | |
| # For other HTTP errors, return a JSON response | |
| return JSONResponse( | |
| status_code=exc.status_code, | |
| content={"detail": exc.detail}, | |
| ) | |
| # 5) List user's videos (GET /videos) - requires login | |
| async def list_videos(request: Request, current_user: User = Depends(get_current_user_from_cookie)): | |
| # Reconstruct the credentials from the stored dictionary | |
| user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
| user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter()) | |
| user_moderator.youtube_service = googleapiclient.discovery.build( | |
| "youtube", "v3", credentials=user_creds | |
| ) | |
| videos = user_moderator.get_channel_videos() | |
| return templates.TemplateResponse("videos.html", { | |
| "request": request, | |
| "current_user": current_user, | |
| "videos": videos | |
| }) | |
| # 6) Moderate a specific video's comments (GET /video/{video_id}) - requires login | |
| async def moderate_video_comments( | |
| request: Request, | |
| video_id: str, | |
| current_user: User = Depends(get_current_user_from_cookie) | |
| ): | |
| # Reconstruct the Credentials object from the stored dict | |
| user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
| user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter()) | |
| user_moderator.youtube_service = googleapiclient.discovery.build( | |
| "youtube", "v3", | |
| credentials=user_creds | |
| ) | |
| moderation_results = user_moderator.moderate_video_comments(video_id) | |
| return templates.TemplateResponse("video_comments.html", { | |
| "request": request, | |
| "current_user": current_user, | |
| "video": {"id": video_id, "title": "Sample Video Title"}, # Optionally fetch actual title | |
| "safe_comments": moderation_results.get('safe_comments', []), | |
| "flagged_comments": moderation_results.get('gambling_comments', []) | |
| }) | |
| # 7) Logout => remove token | |
| async def logout(): | |
| response = RedirectResponse(url="/login") | |
| response.delete_cookie("token") | |
| return response | |
| from jinja2 import Undefined | |
| import json | |
| def pretty_json(value): | |
| if isinstance(value, Undefined): | |
| return "" | |
| return json.dumps(value, ensure_ascii=False, indent=2) | |
| templates.env.filters["pretty_json"] = pretty_json | |
| async def read_root(request: Request): | |
| return templates.TemplateResponse("index.html", { | |
| "request": request, | |
| "result": None, | |
| "comment": "", | |
| "rules": { | |
| "platform": sorted(list(filter_instance._platform_names)), | |
| "gambling_term": sorted(list(filter_instance._gambling_terms)), | |
| "safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
| "gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
| "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
| } | |
| }) | |
| async def classify_comment(request: Request, comment: str = Form(...)): | |
| is_gambling, metrics = filter_instance.is_gambling_comment(comment) | |
| result = {"is_gambling": is_gambling, "metrics": metrics} | |
| return templates.TemplateResponse("index.html", { | |
| "request": request, | |
| "result": result, | |
| "comment": comment, | |
| "rules": { | |
| "platform": sorted(list(filter_instance._platform_names)), | |
| "gambling_term": sorted(list(filter_instance._gambling_terms)), | |
| "safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
| "gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
| "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
| } | |
| }) | |
| async def add_rule(request: Request, rule_type: str = Form(...), rule_value: str = Form(...)): | |
| try: | |
| filter_instance.add_rule(rule_type, rule_value) | |
| message = f"Added rule '{rule_value}' as type '{rule_type}'." | |
| except ValueError as e: | |
| message = str(e) | |
| return templates.TemplateResponse("index.html", { | |
| "request": request, | |
| "result": {"message": message}, | |
| "comment": "", | |
| "rules": { | |
| "platform": sorted(list(filter_instance._platform_names)), | |
| "gambling_term": sorted(list(filter_instance._gambling_terms)), | |
| "safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
| "gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
| "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
| } | |
| }) | |
| async def upload_file(request: Request, file: UploadFile = File(...), column: str = Form("comment")): | |
| content = await file.read() | |
| try: | |
| if file.filename.endswith('.csv'): | |
| df = pd.read_csv(io.BytesIO(content)) | |
| elif file.filename.endswith('.xls') or file.filename.endswith('.xlsx'): | |
| df = pd.read_excel(io.BytesIO(content)) | |
| else: | |
| raise ValueError("Unsupported file type.") | |
| except Exception as e: | |
| return templates.TemplateResponse("index.html", { | |
| "request": request, | |
| "result": {"message": f"Error reading file: {e}"}, | |
| "comment": "", | |
| "rules": { | |
| "platform": sorted(list(filter_instance._platform_names)), | |
| "gambling_term": sorted(list(filter_instance._gambling_terms)), | |
| "safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
| "gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
| "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
| } | |
| }) | |
| if column not in df.columns: | |
| return templates.TemplateResponse("index.html", { | |
| "request": request, | |
| "result": {"message": f"Column '{column}' not found. Available columns: {list(df.columns)}"}, | |
| "comment": "", | |
| "rules": { | |
| "platform": sorted(list(filter_instance._platform_names)), | |
| "gambling_term": sorted(list(filter_instance._gambling_terms)), | |
| "safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
| "gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
| "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
| } | |
| }) | |
| comments = df[column].astype(str).tolist() | |
| results = filter_instance.filter_comments(comments) | |
| # Return the results as part of the template context. | |
| return templates.TemplateResponse("index.html", { | |
| "request": request, | |
| "result": {"upload_result": results}, | |
| "comment": "", | |
| "rules": { | |
| "platform": sorted(list(filter_instance._platform_names)), | |
| "gambling_term": sorted(list(filter_instance._gambling_terms)), | |
| "safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
| "gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
| "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
| } | |
| }) | |
| async def add_visual_char(request: Request, | |
| char: str = Form(...), | |
| ascii_equiv: str = Form(...)): | |
| # Add a new mapping | |
| VISUAL_MAP[char] = ascii_equiv | |
| message = f"Added visual map entry '{char}' -> '{ascii_equiv}'." | |
| return templates.TemplateResponse("index.html", { | |
| "request": request, | |
| "result": {"message": message}, | |
| "comment": "", | |
| "rules": { | |
| "platform": sorted(list(filter_instance._platform_names)), | |
| "gambling_term": sorted(list(filter_instance._gambling_terms)), | |
| "safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
| "gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
| "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
| } | |
| }) | |