Spaces:
Sleeping
Sleeping
| # interaction_logger.py | |
| """ | |
| Interaction logging service for Chaplain Feedback System. | |
| Logs all interaction steps with input/output and supports approval status updates. | |
| """ | |
| import uuid | |
| from typing import List, Optional, Dict, Any | |
| from datetime import datetime | |
| from src.core.chaplain_models import ( | |
| InteractionStepLog, | |
| TaggingRecord, | |
| ) | |
| class InteractionLogger: | |
| """ | |
| Logs all interaction steps in the chaplain feedback system. | |
| Records input/output for each step and supports updating approval status | |
| with tagging data. | |
| """ | |
| def __init__(self): | |
| """Initialize the interaction logger.""" | |
| # In-memory storage of logs (can be extended to persist to database/file) | |
| self._logs: Dict[str, InteractionStepLog] = {} | |
| self._session_logs: Dict[str, List[str]] = {} # session_id -> list of step_ids | |
| def log_step( | |
| self, | |
| session_id: str, | |
| message_id: str, | |
| step_type: str, | |
| input_text: str, | |
| model_output: str, | |
| ) -> str: | |
| """ | |
| Log an interaction step. | |
| Args: | |
| session_id: ID of the verification session | |
| message_id: ID of the message being processed | |
| step_type: Type of step (classification, explanation, permission_check, etc.) | |
| input_text: Input text for this step | |
| model_output: Output from the model/system for this step | |
| Returns: | |
| step_id: Unique identifier for this logged step | |
| Raises: | |
| ValueError: If step_type is invalid | |
| """ | |
| step_id = str(uuid.uuid4()) | |
| # Create log entry | |
| log_entry = InteractionStepLog( | |
| step_id=step_id, | |
| session_id=session_id, | |
| message_id=message_id, | |
| step_type=step_type, | |
| input_text=input_text, | |
| model_output=model_output, | |
| approval_status=None, | |
| tagging_data=None, | |
| timestamp=datetime.now(), | |
| ) | |
| # Store log entry | |
| self._logs[step_id] = log_entry | |
| # Track logs by session | |
| if session_id not in self._session_logs: | |
| self._session_logs[session_id] = [] | |
| self._session_logs[session_id].append(step_id) | |
| return step_id | |
| def update_approval( | |
| self, | |
| step_id: str, | |
| approval_status: str, | |
| tagging_data: Optional[TaggingRecord] = None, | |
| ) -> None: | |
| """ | |
| Update a step with approval status and optional tagging data. | |
| Args: | |
| step_id: ID of the step to update | |
| approval_status: "approved" or "disapproved" | |
| tagging_data: Optional TaggingRecord with feedback details | |
| Raises: | |
| ValueError: If step_id not found or approval_status is invalid | |
| """ | |
| if step_id not in self._logs: | |
| raise ValueError(f"Step {step_id} not found") | |
| if approval_status not in ("approved", "disapproved"): | |
| raise ValueError(f"Invalid approval_status: {approval_status}") | |
| log_entry = self._logs[step_id] | |
| log_entry.approval_status = approval_status | |
| log_entry.tagging_data = tagging_data | |
| def get_step(self, step_id: str) -> Optional[InteractionStepLog]: | |
| """ | |
| Get a specific logged step. | |
| Args: | |
| step_id: ID of the step to retrieve | |
| Returns: | |
| InteractionStepLog if found, None otherwise | |
| """ | |
| return self._logs.get(step_id) | |
| def get_session_logs(self, session_id: str) -> List[InteractionStepLog]: | |
| """ | |
| Get all logs for a session. | |
| Args: | |
| session_id: ID of the session | |
| Returns: | |
| List of InteractionStepLog entries for the session, in order | |
| """ | |
| step_ids = self._session_logs.get(session_id, []) | |
| return [self._logs[step_id] for step_id in step_ids if step_id in self._logs] | |
| def get_session_logs_by_type( | |
| self, | |
| session_id: str, | |
| step_type: str, | |
| ) -> List[InteractionStepLog]: | |
| """ | |
| Get all logs of a specific type for a session. | |
| Args: | |
| session_id: ID of the session | |
| step_type: Type of step to filter by | |
| Returns: | |
| List of InteractionStepLog entries matching the type | |
| """ | |
| all_logs = self.get_session_logs(session_id) | |
| return [log for log in all_logs if log.step_type == step_type] | |
| def get_message_logs(self, message_id: str) -> List[InteractionStepLog]: | |
| """ | |
| Get all logs for a specific message across all sessions. | |
| Args: | |
| message_id: ID of the message | |
| Returns: | |
| List of InteractionStepLog entries for the message | |
| """ | |
| return [log for log in self._logs.values() if log.message_id == message_id] | |
| def get_unapproved_steps(self, session_id: str) -> List[InteractionStepLog]: | |
| """ | |
| Get all steps in a session that haven't been approved/disapproved yet. | |
| Args: | |
| session_id: ID of the session | |
| Returns: | |
| List of InteractionStepLog entries with no approval status | |
| """ | |
| session_logs = self.get_session_logs(session_id) | |
| return [log for log in session_logs if log.approval_status is None] | |
| def get_disapproved_steps(self, session_id: str) -> List[InteractionStepLog]: | |
| """ | |
| Get all disapproved steps in a session. | |
| Args: | |
| session_id: ID of the session | |
| Returns: | |
| List of disapproved InteractionStepLog entries | |
| """ | |
| session_logs = self.get_session_logs(session_id) | |
| return [log for log in session_logs if log.approval_status == "disapproved"] | |
| def get_session_statistics(self, session_id: str) -> Dict[str, Any]: | |
| """ | |
| Get statistics for a session's interaction logs. | |
| Args: | |
| session_id: ID of the session | |
| Returns: | |
| Dictionary with statistics about the session's interactions | |
| """ | |
| session_logs = self.get_session_logs(session_id) | |
| if not session_logs: | |
| return { | |
| "session_id": session_id, | |
| "total_steps": 0, | |
| "approved_steps": 0, | |
| "disapproved_steps": 0, | |
| "unapproved_steps": 0, | |
| "steps_by_type": {}, | |
| } | |
| # Count by approval status | |
| approved = sum(1 for log in session_logs if log.approval_status == "approved") | |
| disapproved = sum(1 for log in session_logs if log.approval_status == "disapproved") | |
| unapproved = sum(1 for log in session_logs if log.approval_status is None) | |
| # Count by step type | |
| steps_by_type = {} | |
| for log in session_logs: | |
| if log.step_type not in steps_by_type: | |
| steps_by_type[log.step_type] = 0 | |
| steps_by_type[log.step_type] += 1 | |
| return { | |
| "session_id": session_id, | |
| "total_steps": len(session_logs), | |
| "approved_steps": approved, | |
| "disapproved_steps": disapproved, | |
| "unapproved_steps": unapproved, | |
| "steps_by_type": steps_by_type, | |
| } | |
| def clear_session(self, session_id: str) -> None: | |
| """ | |
| Clear all logs for a session. | |
| Args: | |
| session_id: ID of the session to clear | |
| """ | |
| step_ids = self._session_logs.get(session_id, []) | |
| for step_id in step_ids: | |
| if step_id in self._logs: | |
| del self._logs[step_id] | |
| if session_id in self._session_logs: | |
| del self._session_logs[session_id] | |
| def export_session_logs(self, session_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Export all logs for a session as dictionaries. | |
| Args: | |
| session_id: ID of the session | |
| Returns: | |
| List of log entries as dictionaries | |
| """ | |
| session_logs = self.get_session_logs(session_id) | |
| return [log.to_dict() for log in session_logs] | |