Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import google.generativeai as genai | |
| from functools import lru_cache | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import pandas as pd | |
| from pathlib import Path | |
| import time | |
| from tqdm import tqdm | |
| import re | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| # Initialize Gemini API | |
| try: | |
| genai.configure(api_key=os.getenv("GEMINI_API_KEY","AIzaSyCunB1oTkxl7IINRMgQTVqIXKcFYw0Jqow")) | |
| model = genai.GenerativeModel("gemini-1.5-flash") | |
| logger.info("Gemini API initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Error initializing Gemini API: {e}") | |
| model = None | |
| # Prompt for worklog categorization - modified for batch processing | |
| BATCH_CATEGORIZATION_PROMPT = """ | |
| You are a technology skill categorizer. Analyze each worklog entry and assign a single technology category word that best represents the technical skill or technology involved. | |
| Guidelines: | |
| 1. Respond with ONLY a single word (or hyphenated term if necessary) for each worklog | |
| 2. Focus on the core technology, framework, or skill | |
| 3. Be specific when the technology is clear (e.g., "React", "Python", "AWS") | |
| 4. Use broader categories when specific technology isn't clear (e.g., "Frontend", "Backend", "DevOps") | |
| 5. Prefer standard technology names over abbreviations | |
| 6. Don't include unnecessary adjectives or descriptions | |
| 7. Respond in a numbered list format matching the input worklogs | |
| Examples: | |
| Worklog 1: "fixing issue in next js application" β "NextJS" | |
| Worklog 2: "Task issue fixing - next js application" β "NextJS" | |
| Worklog 3: "Debugging Python script for data analysis" β "Python" | |
| Worklog 4: "Creating responsive CSS layout" β "CSS" | |
| Worklog 5: "Implementing REST API endpoints" β "Backend" | |
| Here are the worklogs to categorize: | |
| {worklogs} | |
| For each worklog, respond with a numbered list containing only the category word for each entry: | |
| 1. [category for worklog 1] | |
| 2. [category for worklog 2] | |
| ...and so on | |
| """ | |
| def is_upskilling_issue(issue_text): | |
| """ | |
| Check if an issue is related to upskilling using regex to match various formats. | |
| Args: | |
| issue_text: The issue text to check | |
| Returns: | |
| Boolean indicating if this is an upskilling issue | |
| """ | |
| if not issue_text or not isinstance(issue_text, str): | |
| return False | |
| # Case insensitive search for "upskill" with potential variations | |
| # This will match: Upskilling, upskill, UPSKILLING, Up-skilling, Up skilling, etc. | |
| pattern = re.compile(r'up[-\s]?skill', re.IGNORECASE) | |
| return bool(pattern.search(issue_text)) | |
| def estimate_token_count(text: str) -> int: | |
| """ | |
| Estimate token count for a given text string. | |
| This is an approximation based on GPT tokenization patterns: | |
| - Average of ~4 characters per token for English text | |
| - Spaces count as tokens | |
| - Special characters typically count as their own tokens | |
| Args: | |
| text: The text to estimate token count for | |
| Returns: | |
| Estimated token count | |
| """ | |
| if not text: | |
| return 0 | |
| # Count words (splitting by whitespace) | |
| words = len(text.split()) | |
| # Count characters | |
| chars = len(text) | |
| # Count special tokens (punctuation, etc.) | |
| special_chars = len(re.findall(r'[^\w\s]', text)) | |
| # Estimate based on a combination of factors | |
| # This formula is approximate and can be adjusted based on testing | |
| estimated_tokens = max(words, int(chars / 4) + special_chars) | |
| return estimated_tokens | |
| def categorize_worklog_batch(worklogs: List[str]) -> List[str]: | |
| """ | |
| Categorize multiple worklog entries with a single API call. | |
| Args: | |
| worklogs: List of worklog texts to categorize | |
| Returns: | |
| List of categories corresponding to each worklog | |
| """ | |
| if not worklogs or model is None: | |
| return ["Unknown"] * len(worklogs) | |
| # Format worklogs as a numbered list for the prompt | |
| formatted_worklogs = "\n".join([f"{i+1}. {worklog}" for i, worklog in enumerate(worklogs)]) | |
| prompt = BATCH_CATEGORIZATION_PROMPT.format(worklogs=formatted_worklogs) | |
| # Estimate token usage | |
| worklogs_token_count = sum(estimate_token_count(w) for w in worklogs) | |
| prompt_token_count = estimate_token_count(prompt) | |
| total_tokens = prompt_token_count | |
| logger.info(f"Sending batch with {len(worklogs)} worklogs (~{worklogs_token_count} worklog tokens, ~{total_tokens} total tokens)") | |
| try: | |
| response = model.generate_content(prompt) | |
| response_text = response.text.strip() | |
| logger.info(f"Response received: {response_text}") | |
| # Parse numbered response - looking for patterns like "1. Python", "2. JavaScript", etc. | |
| categories = [] | |
| # First, try to match numbered lines (1. Category) | |
| number_pattern = re.compile(r'^\s*(\d+)\.\s*(.+?)$', re.MULTILINE) | |
| matches = number_pattern.findall(response_text) | |
| if matches: | |
| # Sort by the number to maintain order | |
| sorted_matches = sorted(matches, key=lambda x: int(x[0])) | |
| categories = [match[1].strip() for match in sorted_matches] | |
| else: | |
| # Fallback: try to split by lines | |
| lines = [line.strip() for line in response_text.split('\n') if line.strip()] | |
| categories = [line.split('.')[-1].strip() if '.' in line else line for line in lines] | |
| # Ensure we have the right number of categories | |
| if len(categories) != len(worklogs): | |
| logger.warning(f"Mismatch between number of worklogs ({len(worklogs)}) and categories ({len(categories)})") | |
| # Pad with "Unknown" if we have too few categories | |
| if len(categories) < len(worklogs): | |
| categories.extend(["Unknown"] * (len(worklogs) - len(categories))) | |
| # Truncate if we have too many categories | |
| else: | |
| categories = categories[:len(worklogs)] | |
| # Ensure each category is a single word | |
| for i, category in enumerate(categories): | |
| if len(category.split()) > 1 and "-" not in category: | |
| logger.warning(f"Response '{category}' contains multiple words, taking first word") | |
| categories[i] = category.split()[0] | |
| # Log the results for verification | |
| for i, (worklog, category) in enumerate(zip(worklogs, categories)): | |
| logger.info(f"Worklog {i+1}: '{worklog[:50]}{'...' if len(worklog) > 50 else ''}' β '{category}'") | |
| return categories | |
| except Exception as e: | |
| logger.error(f"Error categorizing worklog batch: {e}") | |
| return ["Unknown"] * len(worklogs) | |
| def batch_process_worklogs(worklogs: List[str], batch_size: int = 10, | |
| pause_seconds: int = 5, show_progress: bool = True) -> List[str]: | |
| """ | |
| Process multiple worklog entries in batches with pauses to avoid rate limits. | |
| Using 10 queries at a time with 5 seconds rest between batches. | |
| Args: | |
| worklogs: List of worklog texts to categorize | |
| batch_size: Number of worklogs to process in each batch (default: 10) | |
| pause_seconds: Seconds to pause between batches (default: 5) | |
| show_progress: Whether to show a progress bar | |
| Returns: | |
| List of categories corresponding to each worklog | |
| """ | |
| results = [] | |
| total_worklogs = len(worklogs) | |
| # Create batches | |
| batches = [worklogs[i:i + batch_size] for i in range(0, total_worklogs, batch_size)] | |
| # Process each batch with progress indication | |
| progress_bar = tqdm(total=total_worklogs, desc="Categorizing worklogs") if show_progress else None | |
| for i, batch in enumerate(batches): | |
| # Process current batch | |
| logger.info(f"Processing batch {i+1}/{len(batches)} with {len(batch)} worklogs") | |
| batch_results = categorize_worklog_batch(batch) | |
| results.extend(batch_results) | |
| # Update progress | |
| if progress_bar: | |
| progress_bar.update(len(batch)) | |
| # Pause between batches (except after the last batch) | |
| if i < len(batches) - 1 and pause_seconds > 0: | |
| logger.info(f"Pausing for {pause_seconds}s before next batch. Processed {len(results)}/{total_worklogs} worklogs") | |
| if show_progress: | |
| for s in range(pause_seconds): | |
| progress_bar.set_description(f"Waiting {pause_seconds-s}s before next batch") | |
| time.sleep(1) | |
| progress_bar.set_description("Categorizing worklogs") | |
| else: | |
| time.sleep(pause_seconds) | |
| if progress_bar: | |
| progress_bar.close() | |
| logger.info(f"Completed processing {total_worklogs} worklogs") | |
| return results | |
| def process_dataframe(df: pd.DataFrame, worklog_column: str = "Worklog", | |
| issue_column: str = "Issue", default_category: str = "N/A", | |
| batch_size: int = 10, pause_seconds: int = 5, | |
| show_progress: bool = True) -> pd.DataFrame: | |
| """ | |
| Add a new column with technology categories to a dataframe. | |
| Only categorizes worklogs associated with upskilling issues. | |
| Processes 10 worklogs at a time with 5-second pauses between batches. | |
| Args: | |
| df: Pandas DataFrame containing worklog data | |
| worklog_column: Name of the column containing worklog text | |
| issue_column: Name of the column containing issue text | |
| default_category: Default value for non-upskilling worklogs | |
| batch_size: Number of worklogs to process in each batch (default: 10) | |
| pause_seconds: Seconds to pause between batches (default: 5) | |
| show_progress: Whether to show a progress bar | |
| Returns: | |
| DataFrame with an additional 'TechCategory' column | |
| """ | |
| # Initialize TechCategory column with default value | |
| df["TechCategory"] = default_category | |
| # Check if required columns exist | |
| if worklog_column not in df.columns: | |
| logger.error(f"Column '{worklog_column}' not found in DataFrame") | |
| return df | |
| if issue_column not in df.columns: | |
| logger.error(f"Column '{issue_column}' not found in DataFrame") | |
| return df | |
| # Filter for upskilling issues | |
| upskilling_mask = df[issue_column].apply(is_upskilling_issue) | |
| upskilling_rows = df[upskilling_mask].copy() | |
| logger.info(f"Found {len(upskilling_rows)} rows with upskilling issues out of {len(df)} total rows") | |
| if upskilling_rows.empty: | |
| logger.info("No upskilling issues found, returning dataframe with default category values") | |
| return df | |
| # Extract unique non-null worklog entries from upskilling issues | |
| unique_worklogs = upskilling_rows[worklog_column].dropna().unique().tolist() | |
| # Calculate total estimated tokens | |
| total_estimated_tokens = sum(estimate_token_count(worklog) for worklog in unique_worklogs) | |
| logger.info(f"Processing {len(unique_worklogs)} unique upskilling worklog entries with approximately {total_estimated_tokens} tokens") | |
| # Create a mapping of worklog text to category | |
| if unique_worklogs: | |
| categories = batch_process_worklogs( | |
| unique_worklogs, | |
| batch_size=batch_size, | |
| pause_seconds=pause_seconds, | |
| show_progress=show_progress | |
| ) | |
| worklog_to_category = dict(zip(unique_worklogs, categories)) | |
| else: | |
| worklog_to_category = {} | |
| # Apply categorization only to upskilling worklog entries | |
| df.loc[upskilling_mask, "TechCategory"] = df.loc[upskilling_mask, worklog_column].apply( | |
| lambda x: worklog_to_category.get(x, default_category) if pd.notna(x) else default_category | |
| ) | |
| # Count the number of actually categorized entries | |
| categorized_count = len(df[df["TechCategory"] != default_category]) | |
| logger.info(f"Successfully categorized {categorized_count} worklog entries") | |
| return df | |
| def process_csv_file( | |
| csv_path: str, | |
| worklog_column: str = "Worklog", | |
| issue_column: str = "Issue", | |
| default_category: str = "N/A", | |
| output_path: Optional[str] = None, | |
| overwrite: bool = False, | |
| batch_size: int = 10, | |
| pause_seconds: int = 5 | |
| ) -> str: | |
| """ | |
| Process a CSV file to add technology categories based on worklog entries. | |
| Only categorizes worklogs associated with upskilling issues. | |
| Processes 10 worklogs at a time with 5-second pauses between batches. | |
| Args: | |
| csv_path: Path to the CSV file to process | |
| worklog_column: Name of the column containing worklog text | |
| issue_column: Name of the column containing issue text | |
| default_category: Default value for non-upskilling worklogs | |
| output_path: Path to save the processed file (if None, creates a new file with '_categorized' suffix) | |
| overwrite: If True, overwrite the original file | |
| batch_size: Number of worklogs to process in each batch (default: 10) | |
| pause_seconds: Seconds to pause between batches (default: 5) | |
| Returns: | |
| Path to the saved CSV file | |
| """ | |
| try: | |
| # Check if file exists | |
| if not Path(csv_path).exists(): | |
| logger.error(f"CSV file not found: {csv_path}") | |
| return "" | |
| # Read CSV | |
| logger.info(f"Reading CSV file: {csv_path}") | |
| df = pd.read_csv(csv_path) | |
| # Process dataframe | |
| processed_df = process_dataframe( | |
| df, | |
| worklog_column=worklog_column, | |
| issue_column=issue_column, | |
| default_category=default_category, | |
| batch_size=batch_size, | |
| pause_seconds=pause_seconds | |
| ) | |
| # Determine output path | |
| if overwrite: | |
| save_path = csv_path | |
| elif output_path: | |
| save_path = output_path | |
| else: | |
| # Create new filename with _categorized suffix | |
| path_obj = Path(csv_path) | |
| save_path = str(path_obj.with_stem(f"{path_obj.stem}_categorized")) | |
| # Save processed dataframe | |
| processed_df.to_csv(save_path, index=False) | |
| logger.info(f"Saved categorized CSV to: {save_path}") | |
| return save_path | |
| except Exception as e: | |
| logger.error(f"Error processing CSV file: {e}") | |
| return "" | |