Spaces:
Sleeping
Sleeping
| """ | |
| Snowflake Connector for Feedback System | |
| This module handles inserting user feedback into Snowflake. | |
| """ | |
| import os | |
| import json | |
| import logging | |
| from typing import Dict, Any, Optional | |
| from .feedback_schema import UserFeedback | |
| # Try to import snowflake connector | |
| try: | |
| import snowflake.connector | |
| SNOWFLAKE_AVAILABLE = True | |
| except ImportError: | |
| SNOWFLAKE_AVAILABLE = False | |
| logging.warning("β οΈ snowflake-connector-python not installed. Install with: pip install snowflake-connector-python") | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| class SnowflakeFeedbackConnector: | |
| """Connector for inserting feedback into Snowflake""" | |
| def __init__( | |
| self, | |
| user: str, | |
| password: str, | |
| account: str, | |
| warehouse: str, | |
| database: str = "SNOWFLAKE_LEARNING", | |
| schema: str = "PUBLIC" | |
| ): | |
| self.user = user | |
| self.password = password | |
| self.account = account | |
| self.warehouse = warehouse | |
| self.database = database | |
| self.schema = schema | |
| self._connection = None | |
| def connect(self): | |
| """Establish Snowflake connection""" | |
| if not SNOWFLAKE_AVAILABLE: | |
| raise ImportError("snowflake-connector-python is not installed. Install with: pip install snowflake-connector-python") | |
| logger.info("=" * 80) | |
| logger.info("π SNOWFLAKE CONNECTION: Attempting to connect...") | |
| logger.info(f" - Account: {self.account}") | |
| logger.info(f" - Warehouse: {self.warehouse}") | |
| logger.info(f" - Database: {self.database}") | |
| logger.info(f" - Schema: {self.schema}") | |
| logger.info(f" - User: {self.user}") | |
| try: | |
| self._connection = snowflake.connector.connect( | |
| user=self.user, | |
| password=self.password, | |
| account=self.account, | |
| warehouse=self.warehouse | |
| # Don't set database/schema in connection - we'll do it per query | |
| ) | |
| logger.info("β SNOWFLAKE CONNECTION: Successfully connected") | |
| logger.info("=" * 80) | |
| print(f"β Connected to Snowflake: {self.database}.{self.schema}") | |
| except Exception as e: | |
| logger.error(f"β SNOWFLAKE CONNECTION FAILED: {e}") | |
| logger.error("=" * 80) | |
| print(f"β Failed to connect to Snowflake: {e}") | |
| raise | |
| def disconnect(self): | |
| """Close Snowflake connection""" | |
| if self._connection: | |
| self._connection.close() | |
| print("β Disconnected from Snowflake") | |
| def insert_feedback(self, feedback: UserFeedback, table_name: Optional[str] = None) -> bool: | |
| """Insert a single feedback record into Snowflake""" | |
| logger.info("=" * 80) | |
| logger.info("π SNOWFLAKE INSERT: Starting feedback insertion process") | |
| logger.info(f"π Feedback ID: {feedback.feedback_id}") | |
| # Get table name from parameter, env var, or default | |
| if table_name is None: | |
| table_name = os.getenv("SNOWFLAKE_FEEDBACK_TABLE", "USER_FEEDBACK_V3") | |
| if not self._connection: | |
| logger.error("β Not connected to Snowflake. Call connect() first.") | |
| raise RuntimeError("Not connected to Snowflake. Call connect() first.") | |
| try: | |
| logger.info("π VALIDATION: Validating feedback data structure...") | |
| # Validate feedback object | |
| validation_errors = [] | |
| if not feedback.feedback_id: | |
| validation_errors.append("Missing feedback_id") | |
| if feedback.score is None: | |
| validation_errors.append("Missing score") | |
| if feedback.timestamp is None: | |
| validation_errors.append("Missing timestamp") | |
| if validation_errors: | |
| logger.error(f"β VALIDATION FAILED: {validation_errors}") | |
| return False | |
| else: | |
| logger.info("β VALIDATION PASSED: All required fields present") | |
| logger.info("π Data Summary:") | |
| logger.info(f" - Feedback ID: {feedback.feedback_id}") | |
| logger.info(f" - Score: {feedback.score}") | |
| logger.info(f" - Conversation ID: {feedback.conversation_id}") | |
| logger.info(f" - Has Retrievals: {feedback.has_retrievals}") | |
| logger.info(f" - Retrieval Count: {feedback.retrieval_count}") | |
| logger.info(f" - Message Count: {feedback.message_count}") | |
| logger.info(f" - Timestamp: {feedback.timestamp}") | |
| cursor = self._connection.cursor() | |
| logger.info("β SNOWFLAKE CONNECTION: Cursor created") | |
| # Set database and schema context | |
| logger.info(f"π§ SETTING CONTEXT: Database={self.database}, Schema={self.schema}") | |
| try: | |
| cursor.execute(f'USE DATABASE "{self.database}"') | |
| cursor.execute(f'USE SCHEMA "{self.schema}"') | |
| cursor.execute("SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()") | |
| current_db, current_schema = cursor.fetchone() | |
| logger.info(f"β Current context verified: Database={current_db}, Schema={current_schema}") | |
| except Exception as e: | |
| logger.error(f"β Could not set context: {e}") | |
| raise | |
| # Prepare data - convert to JSON strings for VARIANT columns (same approach as old retrieved_data) | |
| logger.info("π§ DATA PREPARATION: Preparing VARIANT columns...") | |
| feedback_dict = feedback.to_dict() | |
| # Prepare transcript (ARRAY) - convert to JSON string | |
| transcript_raw = feedback_dict.get('transcript', []) | |
| if transcript_raw: | |
| # Convert to JSON string (same approach as old retrieved_data) | |
| transcript_for_db = json.dumps(transcript_raw) | |
| logger.info(f" - Transcript: {len(transcript_raw)} messages, JSON length: {len(transcript_for_db)}") | |
| else: | |
| transcript_for_db = None | |
| logger.info(" - Transcript: None") | |
| # Prepare retrievals (ARRAY) - convert to JSON string | |
| retrievals_raw = feedback_dict.get('retrievals', []) | |
| if retrievals_raw: | |
| # Convert to JSON string (same approach as old retrieved_data) | |
| retrievals_for_db = json.dumps(retrievals_raw) | |
| logger.info(f" - Retrievals: {len(retrievals_raw)} entries, JSON length: {len(retrievals_for_db)}") | |
| else: | |
| retrievals_for_db = None | |
| logger.info(" - Retrievals: None") | |
| # Prepare feedback_score_related_retrieval_docs (OBJECT) - convert to JSON string | |
| feedback_score_related_raw = feedback_dict.get('feedback_score_related_retrieval_docs') | |
| if feedback_score_related_raw: | |
| # Convert to JSON string (same approach as old retrieved_data) | |
| feedback_score_related_for_db = json.dumps(feedback_score_related_raw) | |
| logger.info(f" - Feedback score related docs: present, JSON length: {len(feedback_score_related_for_db)}") | |
| else: | |
| feedback_score_related_for_db = None | |
| logger.info(" - Feedback score related docs: None") | |
| # Prepare retrieved_data (preserved old column) - convert to JSON string | |
| retrieved_data_raw = feedback_dict.get('retrieved_data') | |
| if retrieved_data_raw: | |
| # Convert to JSON string (same approach as old retrieved_data) | |
| retrieved_data_for_db = json.dumps(retrieved_data_raw) | |
| logger.info(f" - Retrieved data (preserved): present, JSON length: {len(retrieved_data_for_db)}") | |
| else: | |
| retrieved_data_for_db = None | |
| logger.info(" - Retrieved data (preserved): None") | |
| # Build SQL with new column structure | |
| # Columns are VARCHAR (storing JSON strings), same approach as old retrieved_data | |
| sql = f"""INSERT INTO {table_name} ( | |
| feedback_id, | |
| open_ended_feedback, | |
| score, | |
| is_feedback_about_last_retrieval, | |
| conversation_id, | |
| timestamp, | |
| message_count, | |
| has_retrievals, | |
| retrieval_count, | |
| transcript, | |
| retrievals, | |
| feedback_score_related_retrieval_docs, | |
| retrieved_data, | |
| created_at | |
| ) VALUES ( | |
| %(feedback_id)s, %(open_ended_feedback)s, %(score)s, %(is_feedback_about_last_retrieval)s, | |
| %(conversation_id)s, %(timestamp)s, %(message_count)s, %(has_retrievals)s, | |
| %(retrieval_count)s, %(transcript)s, %(retrievals)s, %(feedback_score_related_retrieval_docs)s, | |
| %(retrieved_data)s, %(created_at)s | |
| )""" | |
| logger.info("π SQL PREPARATION: Building INSERT statement...") | |
| logger.info(f" - Target table: {table_name}") | |
| logger.info(f" - Database: {self.database}") | |
| logger.info(f" - Schema: {self.schema}") | |
| # Prepare parameters | |
| # Pass JSON strings for VARIANT columns (same approach as old retrieved_data) | |
| params = { | |
| 'feedback_id': feedback.feedback_id, | |
| 'open_ended_feedback': feedback.open_ended_feedback, | |
| 'score': feedback.score, | |
| 'is_feedback_about_last_retrieval': feedback.is_feedback_about_last_retrieval, | |
| 'conversation_id': feedback.conversation_id, | |
| 'timestamp': int(feedback.timestamp), | |
| 'message_count': feedback.message_count, | |
| 'has_retrievals': feedback.has_retrievals, | |
| 'retrieval_count': feedback.retrieval_count, | |
| 'transcript': transcript_for_db, # JSON string | |
| 'retrievals': retrievals_for_db, # JSON string | |
| 'feedback_score_related_retrieval_docs': feedback_score_related_for_db, # JSON string | |
| 'retrieved_data': retrieved_data_for_db, # JSON string - preserved old column | |
| 'created_at': feedback.created_at | |
| } | |
| # Execute insert | |
| logger.info("π SQL EXECUTION: Executing INSERT query...") | |
| cursor.execute(sql, params) | |
| logger.info("β SQL EXECUTION: Query executed successfully") | |
| logger.info(f" - Rows affected: 1") | |
| logger.info(f" - Status: SUCCESS") | |
| cursor.close() | |
| logger.info("β SNOWFLAKE INSERT: Feedback inserted successfully") | |
| logger.info(f"π Inserted feedback: {feedback.feedback_id}") | |
| logger.info("=" * 80) | |
| return True | |
| except Exception as e: | |
| # Check if it's a Snowflake error | |
| if SNOWFLAKE_AVAILABLE and "ProgrammingError" in str(type(e)): | |
| logger.error(f"β SQL EXECUTION ERROR: {e}") | |
| logger.error(f" - Error code: {getattr(e, 'errno', 'Unknown')}") | |
| logger.error(f" - SQL state: {getattr(e, 'sqlstate', 'Unknown')}") | |
| else: | |
| logger.error(f"β SNOWFLAKE INSERT FAILED: {type(e).__name__}") | |
| logger.error(f" - Error: {e}") | |
| logger.error("=" * 80) | |
| return False | |
| def __enter__(self): | |
| """Context manager entry""" | |
| self.connect() | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit""" | |
| self.disconnect() | |
| def get_snowflake_connector_from_env() -> Optional[SnowflakeFeedbackConnector]: | |
| """Create Snowflake connector from environment variables""" | |
| user = os.getenv("SNOWFLAKE_USER") | |
| password = os.getenv("SNOWFLAKE_PASSWORD") | |
| account = os.getenv("SNOWFLAKE_ACCOUNT") | |
| warehouse = os.getenv("SNOWFLAKE_WAREHOUSE") | |
| database = os.getenv("SNOWFLAKE_DATABASE", "SNOWFLAKE_LEARN") | |
| schema = os.getenv("SNOWFLAKE_SCHEMA", "PUBLIC") | |
| if not all([user, password, account, warehouse]): | |
| print("β οΈ Snowflake credentials not found in environment variables") | |
| print("Required variables: SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE") | |
| return None | |
| return SnowflakeFeedbackConnector( | |
| user=user, | |
| password=password, | |
| account=account, | |
| warehouse=warehouse, | |
| database=database, | |
| schema=schema | |
| ) | |
| def save_to_snowflake(feedback: UserFeedback, table_name: Optional[str] = None) -> bool: | |
| """Helper function to save feedback to Snowflake""" | |
| logger.info("=" * 80) | |
| logger.info("π΅ SNOWFLAKE SAVE: Starting save process") | |
| logger.info(f"π Feedback ID: {feedback.feedback_id}") | |
| # Get table name from parameter or env var | |
| if table_name is None: | |
| table_name = os.getenv("SNOWFLAKE_FEEDBACK_TABLE", "USER_FEEDBACK_V3") | |
| connector = get_snowflake_connector_from_env() | |
| if not connector: | |
| logger.warning("β οΈ SNOWFLAKE SAVE: Skipping insertion (credentials not configured)") | |
| logger.warning(" Required variables: SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE") | |
| logger.info("=" * 80) | |
| return False | |
| try: | |
| logger.info("π‘ SNOWFLAKE SAVE: Establishing connection...") | |
| connector.connect() | |
| logger.info("β SNOWFLAKE SAVE: Connection established") | |
| logger.info("π₯ SNOWFLAKE SAVE: Attempting to insert feedback...") | |
| success = connector.insert_feedback(feedback, table_name=table_name) | |
| logger.info("π SNOWFLAKE SAVE: Disconnecting...") | |
| connector.disconnect() | |
| if success: | |
| logger.info("β SNOWFLAKE SAVE: Successfully saved feedback") | |
| else: | |
| logger.error("β SNOWFLAKE SAVE: Failed to save feedback") | |
| logger.info("=" * 80) | |
| return success | |
| except Exception as e: | |
| logger.error(f"β SNOWFLAKE SAVE ERROR: {type(e).__name__}") | |
| logger.error(f" - Error: {e}") | |
| logger.info("=" * 80) | |
| return False | |