Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import time | |
| import json | |
| import io | |
| import google.generativeai as genai | |
| from typing import List, Dict, Any, Tuple | |
| import logging | |
| import sys | |
| from .api_manager import ApiKeyManager | |
| # Configure logging to stdout only | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout) | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Constants for processing | |
| MAX_RETRIES = 3 # Maximum number of retries for API calls | |
| RETRY_DELAY = 5 # Delay between retries in seconds | |
| BATCH_DELAY = 5 # Delay between title generation in seconds | |
| # Default parameters | |
| DEFAULT_TOP_NICHES = 5 # Number of top niches to use | |
| DEFAULT_BOTTOM_SUBNICHES = 2 # Number of bottom subniches to use for each niche | |
| DEFAULT_TITLES_PER_COMBINATION = 2 # Number of titles to generate per niche-subniche combination | |
| def configure_genai(api_key: str) -> None: | |
| """Configure the Gemini API with the given API key""" | |
| genai.configure(api_key=api_key) | |
| def load_niche_data(niche_data_input) -> pd.DataFrame: | |
| """ | |
| Load data from the niche analysis file | |
| Args: | |
| niche_data_input: File-like object containing the niche ranking data | |
| Returns: | |
| DataFrame containing niche analysis data | |
| """ | |
| try: | |
| logger.info("Loading niche data") | |
| niche_ranking = pd.read_csv(niche_data_input) | |
| logger.info(f"Loaded columns: {niche_ranking.columns.tolist()}") | |
| return niche_ranking | |
| except Exception as e: | |
| logger.error(f"Error loading niche data: {e}") | |
| return None | |
| def extract_top_niches_and_bottom_subniches( | |
| niche_data: pd.DataFrame, | |
| top_niches: int = DEFAULT_TOP_NICHES, | |
| bottom_subniches: int = DEFAULT_BOTTOM_SUBNICHES | |
| ) -> List[Dict]: | |
| """ | |
| Extract top niches and their least exploited subniches from the niche ranking data | |
| Args: | |
| niche_data: DataFrame with niche ranking data | |
| top_niches: Number of top niches to use | |
| bottom_subniches: Number of bottom (least exploited) subniches to use for each niche | |
| Returns: | |
| List of dictionaries with niche-subniche combinations | |
| """ | |
| if niche_data is None or niche_data.empty: | |
| logger.error("No niche data to analyze") | |
| return [] | |
| # Ensure Count column is numeric | |
| niche_data['Count'] = pd.to_numeric(niche_data['Count'], errors='coerce') | |
| # Sort niches by count (descending) and take top N | |
| top_niches_data = niche_data.sort_values('Count', ascending=False).head(top_niches) | |
| target_combinations = [] | |
| for _, row in top_niches_data.iterrows(): | |
| niche = row['Niche'] | |
| # Get subniches from the Top Subniches column | |
| try: | |
| subniches_str = row.get('Top Subniches', '') | |
| if not isinstance(subniches_str, str): | |
| continue | |
| # Parse the subniches and their counts | |
| subniches = [s.strip() for s in subniches_str.split(',')] | |
| # Extract the count info and clean the subniche names | |
| cleaned_subniches = [] | |
| for subniche in subniches: | |
| count_info = "" | |
| cleaned_subniche = subniche | |
| # Check if there's a count in parentheses | |
| if "(" in subniche and ")" in subniche: | |
| count_info = subniche[subniche.find("("):] | |
| cleaned_subniche = subniche[:subniche.find("(")].strip() | |
| cleaned_subniches.append({ | |
| 'name': cleaned_subniche, | |
| 'count_info': count_info | |
| }) | |
| # Take the bottom M subniches (least frequent) | |
| bottom_subniches_list = cleaned_subniches[-bottom_subniches:] if len(cleaned_subniches) > bottom_subniches else cleaned_subniches | |
| for subniche_obj in bottom_subniches_list: | |
| if subniche_obj['name']: # Ensure it's not empty | |
| target_combinations.append({ | |
| 'niche': niche, | |
| 'subniche': subniche_obj['name'], | |
| 'count_info': subniche_obj['count_info'] | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error processing subniches for niche {niche}: {e}") | |
| return target_combinations | |
| def generate_title_with_gemini( | |
| niche: str, | |
| subniche: str, | |
| key_manager: ApiKeyManager | |
| ) -> Tuple[Dict[str, Any], bool]: | |
| """ | |
| Generate a new title and subtitle using Gemini API | |
| Args: | |
| niche: Niche category | |
| subniche: Subniche category | |
| key_manager: API key manager instance | |
| Returns: | |
| Tuple of (result_dict, success_flag) | |
| """ | |
| prompt = f""" | |
| Generate a creative, catchy title and subtitle for a piece of content in the niche "{niche}" | |
| specifically targeting the subniche "{subniche}". | |
| STRICT REQUIREMENTS: | |
| 1. The title MUST be short and powerful, between 2-7 words only. Make it attention-grabbing and marketable. | |
| 2. The subtitle MUST be between 15-20 words. It should complement the title and provide additional context. | |
| 3. RANDOMLY choose whether to include trademarked/copyrighted content (characters, brands, franchises like Disney, Marvel, DC, etc.). | |
| Make this choice truly random - approximately 50% should contain trademarked content, 50% should not. | |
| Output in this exact JSON format: | |
| {{ | |
| "title": "The generated title (2-7 words)", | |
| "subtitle": "The generated subtitle (15-20 words)", | |
| "trademark": "yes" if the title or subtitle contains copyrighted characters/brands, "no" if not | |
| }} | |
| Return only the JSON, no additional explanation needed. Do not include any text before or after the JSON. | |
| """ | |
| for attempt in range(MAX_RETRIES): | |
| try: | |
| # Get the next API key | |
| api_key = key_manager.get_next_api_key() | |
| configure_genai(api_key) | |
| # Create a generative model | |
| model = genai.GenerativeModel('gemini-2.0-flash') | |
| # Set generation config | |
| generation_config = { | |
| "temperature": 0.7, # Higher temperature for creativity | |
| "top_p": 0.95, | |
| "top_k": 40, | |
| "max_output_tokens": 1024, | |
| } | |
| # Generate content | |
| response = model.generate_content( | |
| prompt, | |
| generation_config=generation_config | |
| ) | |
| # Check if response has text | |
| if not hasattr(response, 'text') or not response.text: | |
| raise ValueError("Empty response received from API") | |
| response_text = response.text.strip() | |
| # Clean the response if needed | |
| if not response_text.startswith('{'): | |
| start_idx = response_text.find('{') | |
| end_idx = response_text.rfind('}') | |
| if start_idx >= 0 and end_idx > start_idx: | |
| response_text = response_text[start_idx:end_idx+1] | |
| else: | |
| raise ValueError(f"Could not find valid JSON in response: {response_text[:100]}") | |
| # Parse the response as JSON | |
| result = json.loads(response_text) | |
| # Validate the result | |
| if not isinstance(result, dict): | |
| raise ValueError("Response is not a valid JSON object") | |
| if "title" not in result or "subtitle" not in result: | |
| raise ValueError("Missing required fields in response") | |
| if "trademark" not in result: | |
| # If missing, assume no trademark | |
| result["trademark"] = "no" | |
| logger.warning("Trademark field missing in API response, defaulting to 'no'") | |
| # Normalize the trademark value to lowercase | |
| result["trademark"] = result["trademark"].lower() | |
| logger.info(f"Generated title: '{result['title']}' with trademark: {result['trademark']}") | |
| return result, True | |
| except Exception as e: | |
| logger.error(f"Error on attempt {attempt + 1}: {str(e)}") | |
| if "quota" in str(e).lower() or "rate" in str(e).lower() or "limit" in str(e).lower(): | |
| logger.warning(f"API key quota exceeded or rate limited: {e}") | |
| key_manager.mark_key_as_failed(api_key) | |
| if attempt < MAX_RETRIES - 1: | |
| retry_delay = RETRY_DELAY * (attempt + 1) # Progressive backoff | |
| logger.info(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| # If all attempts failed, return a default value | |
| logger.warning(f"All attempts failed for niche: {niche}, subniche: {subniche}") | |
| return { | |
| "title": f"[Failed to generate {niche} title]", | |
| "subtitle": f"[Failed to generate {subniche} subtitle]", | |
| "trademark": "unknown" | |
| }, False | |
| def generate_titles( | |
| niche_data_input, | |
| top_niches: int = DEFAULT_TOP_NICHES, | |
| bottom_subniches: int = DEFAULT_BOTTOM_SUBNICHES, | |
| titles_per_combination: int = DEFAULT_TITLES_PER_COMBINATION | |
| ) -> pd.DataFrame: | |
| """ | |
| Generate new titles based on niche analysis | |
| Args: | |
| niche_data_input: File-like object containing the niche ranking data | |
| top_niches: Number of top niches to use | |
| bottom_subniches: Number of bottom subniches to use per niche | |
| titles_per_combination: Number of titles to generate per niche-subniche combination | |
| Returns: | |
| DataFrame containing generated titles | |
| """ | |
| try: | |
| logger.info(f"Starting title generation with parameters:") | |
| logger.info(f"- Top niches: {top_niches}") | |
| logger.info(f"- Bottom subniches per niche: {bottom_subniches}") | |
| logger.info(f"- Titles per combination: {titles_per_combination}") | |
| # Initialize API key manager | |
| key_manager = ApiKeyManager() | |
| logger.info(f"Initialized API key manager with {len(key_manager.api_keys)} keys") | |
| # Load niche data | |
| niche_data = load_niche_data(niche_data_input) | |
| if niche_data is None: | |
| raise ValueError("Failed to load niche data") | |
| # Extract target niche-subniche combinations | |
| combinations = extract_top_niches_and_bottom_subniches(niche_data, top_niches, bottom_subniches) | |
| logger.info(f"Found {len(combinations)} niche-subniche combinations to use") | |
| # Generate titles for each combination | |
| generated_titles = [] | |
| for i, combo in enumerate(combinations): | |
| niche = combo['niche'] | |
| subniche = combo['subniche'] | |
| logger.info(f"Processing combination {i+1}/{len(combinations)}: {niche} - {subniche}") | |
| for j in range(titles_per_combination): | |
| logger.info(f"Generating title {j+1}/{titles_per_combination} for {niche} - {subniche}") | |
| title_result, success = generate_title_with_gemini(niche, subniche, key_manager) | |
| if success: | |
| generated_titles.append({ | |
| 'Niche': niche, | |
| 'Subniche': subniche, | |
| 'Title': title_result.get('title', ''), | |
| 'Subtitle': title_result.get('subtitle', ''), | |
| 'Trademark': title_result.get('trademark', 'unknown') | |
| }) | |
| # Check if we have any working keys left | |
| if not key_manager.has_working_keys(): | |
| logger.error("No working API keys left. Stopping generation.") | |
| break | |
| # Add delay between generations (except the last one) | |
| if j < titles_per_combination - 1: | |
| time.sleep(BATCH_DELAY) | |
| if not key_manager.has_working_keys(): | |
| break | |
| # Create DataFrame from results | |
| if not generated_titles: | |
| logger.warning("No titles were generated") | |
| return pd.DataFrame(columns=['Niche', 'Subniche', 'Title', 'Subtitle', 'Trademark']) | |
| result_df = pd.DataFrame(generated_titles) | |
| logger.info(f"Generated {len(result_df)} titles in total") | |
| return result_df | |
| except Exception as e: | |
| logger.error(f"Error in generate_titles: {str(e)}") | |
| raise |