Spaces:
Sleeping
Sleeping
| import csv | |
| from typing import Dict, List | |
| import os | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class CSVHandler: | |
| def __init__(self): | |
| self.clusters_file = 'clusters.csv' | |
| self.completed_posts_file = 'completed_posts.csv' | |
| def get_cluster_data(self) -> Dict: | |
| try: | |
| with open(self.clusters_file, 'r', newline='') as file: | |
| reader = csv.DictReader(file) | |
| for i, row in enumerate(reader, start=2): # start=2 because row 1 is header | |
| if row['Status'].lower() == 'no': | |
| return { | |
| 'Keywords': row['Keywords'], | |
| 'Intent': row['Intent'], | |
| 'Primary Keyword': row['Primary Keyword'], | |
| 'row_number': i | |
| } | |
| return None | |
| except Exception as e: | |
| print(f"Error reading clusters CSV: {e}") | |
| return None | |
| def get_previous_posts(self) -> List[Dict]: | |
| try: | |
| posts = [] | |
| with open(self.completed_posts_file, 'r', newline='') as file: | |
| reader = csv.DictReader(file) | |
| for row in reader: | |
| posts.append({ | |
| 'title': row['Title'], | |
| 'keywords': row['Keywords'], | |
| 'summary': row['Meta Description'], | |
| 'url': row['URL'] | |
| }) | |
| return posts | |
| except Exception as e: | |
| print(f"Error reading completed posts CSV: {e}") | |
| return [] | |
| def mark_cluster_complete(self, row_number: int): | |
| try: | |
| # Read all rows | |
| rows = [] | |
| with open(self.clusters_file, 'r', newline='', encoding='utf-8') as file: | |
| reader = csv.reader(file) | |
| header = next(reader) # Get header row | |
| # Find Status column index, default to last column if not found | |
| status_index = header.index('Status') if 'Status' in header else -1 | |
| if status_index == -1: | |
| header.append('Status') | |
| status_index = len(header) - 1 | |
| rows = [header] | |
| rows.extend(list(reader)) | |
| # Update status to 'completed' for the specified row | |
| if row_number < len(rows): | |
| # Ensure row has enough columns | |
| while len(rows[row_number]) <= status_index: | |
| rows[row_number].append('') | |
| rows[row_number][status_index] = 'completed' | |
| # Write back all rows | |
| with open(self.clusters_file, 'w', newline='', encoding='utf-8') as file: | |
| writer = csv.writer(file) | |
| writer.writerows(rows) | |
| except Exception as e: | |
| logger.error(f"Error updating cluster status: {e}") | |
| raise | |
| def log_completed_post(self, metadata: Dict): | |
| try: | |
| with open(self.completed_posts_file, 'a', newline='') as file: | |
| writer = csv.writer(file) | |
| writer.writerow([ | |
| metadata['title'], | |
| metadata['keywords'], | |
| metadata['meta_description'], | |
| f"https://yourblog.com/{metadata['slug']}" | |
| ]) | |
| except Exception as e: | |
| print(f"Error logging completed post: {e}") | |
| def get_all_clusters(self): | |
| """Get all uncompleted clusters from the CSV file.""" | |
| clusters = [] | |
| try: | |
| with open(self.clusters_file, 'r', newline='', encoding='utf-8') as file: | |
| reader = csv.DictReader(file) | |
| for row_number, row in enumerate(reader, start=1): | |
| # Check if Status column exists, if not or empty, treat as not completed | |
| status = row.get('Status', '').lower() | |
| if status != 'completed': | |
| cluster_data = { | |
| 'Keywords': row.get('Keywords', ''), | |
| 'Intent': row.get('Intent', ''), | |
| 'Primary Keyword': row.get('Primary Keyword', ''), | |
| 'row_number': row_number, | |
| 'Status': status | |
| } | |
| # Validate required fields | |
| if all(cluster_data[field] for field in ['Keywords', 'Intent', 'Primary Keyword']): | |
| clusters.append(cluster_data) | |
| else: | |
| logger.warning(f"Row {row_number}: Missing required fields, skipping") | |
| return clusters | |
| except Exception as e: | |
| logger.error(f"Error reading clusters file: {e}") | |
| raise | |
| def process_uploaded_csv(self, csv_content: str) -> List[Dict]: | |
| """ | |
| Process an uploaded CSV content string and return cluster data for blog generation. | |
| Args: | |
| csv_content (str): The decoded CSV content as a string | |
| Returns: | |
| List[Dict]: List of cluster data dictionaries | |
| """ | |
| clusters = [] | |
| try: | |
| # Split the content into lines and process as CSV | |
| from io import StringIO | |
| csv_file = StringIO(csv_content) | |
| reader = csv.DictReader(csv_file) | |
| for row_number, row in enumerate(reader, start=1): | |
| # Validate required columns | |
| required_columns = ['Keywords', 'Intent', 'Primary Keyword'] | |
| if not all(col in row for col in required_columns): | |
| logger.error(f"Row {row_number}: Missing required columns. Required: {required_columns}") | |
| continue | |
| cluster_data = { | |
| 'Keywords': row['Keywords'], | |
| 'Intent': row['Intent'], | |
| 'Primary Keyword': row['Primary Keyword'], | |
| 'row_number': row_number | |
| } | |
| clusters.append(cluster_data) | |
| logger.info(f"Successfully processed {len(clusters)} clusters from uploaded CSV") | |
| return clusters | |
| except Exception as e: | |
| logger.error(f"Error processing uploaded CSV: {e}") | |
| raise | |
| def process_csv_text(self, csv_text: str) -> List[Dict]: | |
| """ | |
| Process CSV content provided as a text string and return cluster data for blog generation. | |
| Args: | |
| csv_text (str): The CSV content as a string | |
| Returns: | |
| List[Dict]: List of cluster data dictionaries | |
| """ | |
| clusters = [] | |
| try: | |
| # Split the text into lines | |
| from io import StringIO | |
| csv_file = StringIO(csv_text.strip()) | |
| reader = csv.DictReader(csv_file) | |
| for row_number, row in enumerate(reader, start=1): | |
| # Validate required columns | |
| required_columns = ['Keywords', 'Intent', 'Primary Keyword'] | |
| if not all(col in row for col in required_columns): | |
| logger.error(f"Row {row_number}: Missing required columns. Required: {required_columns}") | |
| continue | |
| cluster_data = { | |
| 'Keywords': row['Keywords'], | |
| 'Intent': row['Intent'], | |
| 'Primary Keyword': row['Primary Keyword'], | |
| 'row_number': row_number | |
| } | |
| clusters.append(cluster_data) | |
| logger.info(f"Successfully processed {len(clusters)} clusters from CSV text") | |
| return clusters | |
| except Exception as e: | |
| logger.error(f"Error processing CSV text: {e}") | |
| raise |