import logging import os import json import time import pandas as pd from typing import Callable, Optional, Dict, Any, List import openai from models import FeedbackAnalysisConfig, FeedbackAnalysisResult, BatchProcessingStats def setup_logging(): """Set up logging configuration.""" os.makedirs("logs", exist_ok=True) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(f"logs/feedback_analyzer_{time.strftime('%Y%m%d')}.log"), logging.StreamHandler() ] ) def parse_llm_response(response_text: str) -> Dict[str, Any]: """ Parse the LLM response into a structured format. Args: response_text: Raw text response from the LLM Returns: Parsed JSON data as a dictionary Raises: ValueError: If the response cannot be parsed as JSON """ logger = logging.getLogger(__name__) # Try to extract JSON from the response try: # First, try to parse the entire response as JSON return json.loads(response_text) except json.JSONDecodeError: logger.warning(f"Could not parse entire response as JSON, trying to extract JSON block") # Try to find JSON block in the response try: # Look for text between curly braces start_idx = response_text.find('{') end_idx = response_text.rfind('}') + 1 if start_idx >= 0 and end_idx > start_idx: json_str = response_text[start_idx:end_idx] return json.loads(json_str) else: raise ValueError("No JSON object found in response") except Exception as e: logger.error(f"Failed to extract JSON from response: {e}") raise ValueError(f"Could not parse response as JSON: {str(e)}") def analyze_feedback(feedback: str, config: FeedbackAnalysisConfig) -> FeedbackAnalysisResult: """ Analyze a single feedback item using the LLM. Args: feedback: The feedback text to analyze config: Configuration for the analysis Returns: FeedbackAnalysisResult object with the analysis results """ logger = logging.getLogger(__name__) # Prepare the prompt - ensure it mentions JSON to satisfy OpenAI's requirement prompt = config.prompt_template.replace("{feedback}", feedback) if "json" not in prompt.lower(): prompt += " Return the result in JSON format." try: # Call the OpenAI API response = openai.chat.completions.create( model=config.model, messages=[ {"role": "system", "content": "You are a helpful assistant that analyzes customer feedback and returns structured data in JSON format."}, {"role": "user", "content": prompt} ], temperature=config.temperature, max_tokens=config.max_tokens, response_format={"type": "json_object"} ) # Extract the response text response_text = response.choices[0].message.content # Parse the response try: parsed_data = parse_llm_response(response_text) return FeedbackAnalysisResult( raw_response=response_text, parsed_data=parsed_data, error=None ) except ValueError as e: logger.warning(f"Failed to parse response: {e}") return FeedbackAnalysisResult( raw_response=response_text, parsed_data=None, error=str(e) ) except Exception as e: logger.error(f"Error calling OpenAI API: {e}") return FeedbackAnalysisResult( raw_response="", parsed_data=None, error=f"API Error: {str(e)}" ) def process_feedback_batch( df: pd.DataFrame, config: FeedbackAnalysisConfig, start_idx: int, end_idx: int, stats: BatchProcessingStats ) -> List[FeedbackAnalysisResult]: """ Process a batch of feedback items. Args: df: DataFrame containing the feedback config: Configuration for the analysis start_idx: Starting index for the batch end_idx: Ending index for the batch stats: BatchProcessingStats object to track progress Returns: List of FeedbackAnalysisResult objects """ logger = logging.getLogger(__name__) logger.info(f"Processing batch from index {start_idx} to {end_idx}") results = [] for idx in range(start_idx, min(end_idx, len(df))): feedback = df.iloc[idx][config.feedback_column] # Skip empty feedback if pd.isna(feedback) or feedback.strip() == "": logger.warning(f"Empty feedback at index {idx}, skipping") stats.processed_items += 1 stats.failed_items += 1 results.append(FeedbackAnalysisResult( raw_response="", parsed_data=None, error="Empty feedback" )) continue # Analyze the feedback try: result = analyze_feedback(feedback, config) results.append(result) stats.processed_items += 1 if result.error is None: stats.successful_items += 1 else: stats.failed_items += 1 logger.info(f"Processed item {idx+1}/{stats.total_items} - {'Success' if result.error is None else 'Failed'}") except Exception as e: logger.error(f"Error processing feedback at index {idx}: {e}") stats.processed_items += 1 stats.failed_items += 1 results.append(FeedbackAnalysisResult( raw_response="", parsed_data=None, error=f"Processing error: {str(e)}" )) return results def process_feedback( df: pd.DataFrame, config: FeedbackAnalysisConfig, progress_callback: Optional[Callable[[float], None]] = None ) -> pd.DataFrame: """ Process all feedback in the DataFrame. Args: df: DataFrame containing the feedback config: Configuration for the analysis progress_callback: Optional callback function to report progress Returns: DataFrame with the analysis results added """ logger = logging.getLogger(__name__) logger.info(f"Starting feedback processing with batch size {config.batch_size}") # Create a copy of the DataFrame to avoid modifying the original result_df = df.copy() # Initialize the output column result_df[config.output_column] = None # Initialize stats stats = BatchProcessingStats( total_items=len(df), start_time=time.time() ) # Process in batches for start_idx in range(0, len(df), config.batch_size): end_idx = start_idx + config.batch_size # Process the batch batch_results = process_feedback_batch(df, config, start_idx, end_idx, stats) # Update the DataFrame with the results for i, result in enumerate(batch_results): idx = start_idx + i if idx < len(result_df): if result.error is None and result.parsed_data is not None: result_df.at[idx, config.output_column] = json.dumps(result.parsed_data) else: result_df.at[idx, config.output_column] = f"Error: {result.error}" # Update progress if progress_callback is not None: progress_callback(stats.calculate_progress()) # Finalize stats stats.end_time = time.time() processing_time = stats.end_time - stats.start_time logger.info(f"Processing completed in {processing_time:.2f} seconds") logger.info(f"Total items: {stats.total_items}, Successful: {stats.successful_items}, Failed: {stats.failed_items}") return result_df