Spaces:
Build error
Build error
| import logging | |
| import os | |
| import json | |
| import time | |
| import pandas as pd | |
| from typing import Callable, Optional, Dict, Any, List | |
| import openai | |
| from models import FeedbackAnalysisConfig, FeedbackAnalysisResult, BatchProcessingStats | |
| def setup_logging(): | |
| """Set up logging configuration.""" | |
| os.makedirs("logs", exist_ok=True) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler(f"logs/feedback_analyzer_{time.strftime('%Y%m%d')}.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| def parse_llm_response(response_text: str) -> Dict[str, Any]: | |
| """ | |
| Parse the LLM response into a structured format. | |
| Args: | |
| response_text: Raw text response from the LLM | |
| Returns: | |
| Parsed JSON data as a dictionary | |
| Raises: | |
| ValueError: If the response cannot be parsed as JSON | |
| """ | |
| logger = logging.getLogger(__name__) | |
| # Try to extract JSON from the response | |
| try: | |
| # First, try to parse the entire response as JSON | |
| return json.loads(response_text) | |
| except json.JSONDecodeError: | |
| logger.warning(f"Could not parse entire response as JSON, trying to extract JSON block") | |
| # Try to find JSON block in the response | |
| try: | |
| # Look for text between curly braces | |
| start_idx = response_text.find('{') | |
| end_idx = response_text.rfind('}') + 1 | |
| if start_idx >= 0 and end_idx > start_idx: | |
| json_str = response_text[start_idx:end_idx] | |
| return json.loads(json_str) | |
| else: | |
| raise ValueError("No JSON object found in response") | |
| except Exception as e: | |
| logger.error(f"Failed to extract JSON from response: {e}") | |
| raise ValueError(f"Could not parse response as JSON: {str(e)}") | |
| def analyze_feedback(feedback: str, config: FeedbackAnalysisConfig) -> FeedbackAnalysisResult: | |
| """ | |
| Analyze a single feedback item using the LLM. | |
| Args: | |
| feedback: The feedback text to analyze | |
| config: Configuration for the analysis | |
| Returns: | |
| FeedbackAnalysisResult object with the analysis results | |
| """ | |
| logger = logging.getLogger(__name__) | |
| # Prepare the prompt - ensure it mentions JSON to satisfy OpenAI's requirement | |
| prompt = config.prompt_template.replace("{feedback}", feedback) | |
| if "json" not in prompt.lower(): | |
| prompt += " Return the result in JSON format." | |
| try: | |
| # Call the OpenAI API | |
| response = openai.chat.completions.create( | |
| model=config.model, | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant that analyzes customer feedback and returns structured data in JSON format."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=config.temperature, | |
| max_tokens=config.max_tokens, | |
| response_format={"type": "json_object"} | |
| ) | |
| # Extract the response text | |
| response_text = response.choices[0].message.content | |
| # Parse the response | |
| try: | |
| parsed_data = parse_llm_response(response_text) | |
| return FeedbackAnalysisResult( | |
| raw_response=response_text, | |
| parsed_data=parsed_data, | |
| error=None | |
| ) | |
| except ValueError as e: | |
| logger.warning(f"Failed to parse response: {e}") | |
| return FeedbackAnalysisResult( | |
| raw_response=response_text, | |
| parsed_data=None, | |
| error=str(e) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error calling OpenAI API: {e}") | |
| return FeedbackAnalysisResult( | |
| raw_response="", | |
| parsed_data=None, | |
| error=f"API Error: {str(e)}" | |
| ) | |
| def process_feedback_batch( | |
| df: pd.DataFrame, | |
| config: FeedbackAnalysisConfig, | |
| start_idx: int, | |
| end_idx: int, | |
| stats: BatchProcessingStats | |
| ) -> List[FeedbackAnalysisResult]: | |
| """ | |
| Process a batch of feedback items. | |
| Args: | |
| df: DataFrame containing the feedback | |
| config: Configuration for the analysis | |
| start_idx: Starting index for the batch | |
| end_idx: Ending index for the batch | |
| stats: BatchProcessingStats object to track progress | |
| Returns: | |
| List of FeedbackAnalysisResult objects | |
| """ | |
| logger = logging.getLogger(__name__) | |
| logger.info(f"Processing batch from index {start_idx} to {end_idx}") | |
| results = [] | |
| for idx in range(start_idx, min(end_idx, len(df))): | |
| feedback = df.iloc[idx][config.feedback_column] | |
| # Skip empty feedback | |
| if pd.isna(feedback) or feedback.strip() == "": | |
| logger.warning(f"Empty feedback at index {idx}, skipping") | |
| stats.processed_items += 1 | |
| stats.failed_items += 1 | |
| results.append(FeedbackAnalysisResult( | |
| raw_response="", | |
| parsed_data=None, | |
| error="Empty feedback" | |
| )) | |
| continue | |
| # Analyze the feedback | |
| try: | |
| result = analyze_feedback(feedback, config) | |
| results.append(result) | |
| stats.processed_items += 1 | |
| if result.error is None: | |
| stats.successful_items += 1 | |
| else: | |
| stats.failed_items += 1 | |
| logger.info(f"Processed item {idx+1}/{stats.total_items} - {'Success' if result.error is None else 'Failed'}") | |
| except Exception as e: | |
| logger.error(f"Error processing feedback at index {idx}: {e}") | |
| stats.processed_items += 1 | |
| stats.failed_items += 1 | |
| results.append(FeedbackAnalysisResult( | |
| raw_response="", | |
| parsed_data=None, | |
| error=f"Processing error: {str(e)}" | |
| )) | |
| return results | |
| def process_feedback( | |
| df: pd.DataFrame, | |
| config: FeedbackAnalysisConfig, | |
| progress_callback: Optional[Callable[[float], None]] = None | |
| ) -> pd.DataFrame: | |
| """ | |
| Process all feedback in the DataFrame. | |
| Args: | |
| df: DataFrame containing the feedback | |
| config: Configuration for the analysis | |
| progress_callback: Optional callback function to report progress | |
| Returns: | |
| DataFrame with the analysis results added | |
| """ | |
| logger = logging.getLogger(__name__) | |
| logger.info(f"Starting feedback processing with batch size {config.batch_size}") | |
| # Create a copy of the DataFrame to avoid modifying the original | |
| result_df = df.copy() | |
| # Initialize the output column | |
| result_df[config.output_column] = None | |
| # Initialize stats | |
| stats = BatchProcessingStats( | |
| total_items=len(df), | |
| start_time=time.time() | |
| ) | |
| # Process in batches | |
| for start_idx in range(0, len(df), config.batch_size): | |
| end_idx = start_idx + config.batch_size | |
| # Process the batch | |
| batch_results = process_feedback_batch(df, config, start_idx, end_idx, stats) | |
| # Update the DataFrame with the results | |
| for i, result in enumerate(batch_results): | |
| idx = start_idx + i | |
| if idx < len(result_df): | |
| if result.error is None and result.parsed_data is not None: | |
| result_df.at[idx, config.output_column] = json.dumps(result.parsed_data) | |
| else: | |
| result_df.at[idx, config.output_column] = f"Error: {result.error}" | |
| # Update progress | |
| if progress_callback is not None: | |
| progress_callback(stats.calculate_progress()) | |
| # Finalize stats | |
| stats.end_time = time.time() | |
| processing_time = stats.end_time - stats.start_time | |
| logger.info(f"Processing completed in {processing_time:.2f} seconds") | |
| logger.info(f"Total items: {stats.total_items}, Successful: {stats.successful_items}, Failed: {stats.failed_items}") | |
| return result_df | |