Spaces:
Build error
Build error
| import re | |
| import datetime | |
| import textwrap | |
| from Config import Config | |
| from DataDictionary import * | |
| from FastFacts import * | |
| from PersonalityValues import * | |
| import pandas as pd | |
| import numpy as np | |
| class AttributeGroup: | |
| """ | |
| Represents an attribute group (type) in the user profile. | |
| Fields are dynamically populated based on the group's parameters. | |
| """ | |
| def __init__(self, group_name, fields): | |
| self.group_name = group_name | |
| self.fields = {field: None for field in fields} | |
| def set_field(self, field_name, value): | |
| """ | |
| Set a value for a specific field in the attribute group. | |
| If the field does not exist, it is added dynamically. | |
| """ | |
| if field_name not in self.fields: | |
| print(f"Warning: Field '{field_name}' not found in '{self.group_name}'. Adding dynamically.") | |
| self.fields[field_name] = None # Add the field dynamically | |
| self.fields[field_name] = value # Assign the provided value | |
| def get_field(self, field_name): | |
| """ | |
| Get a value for a specific field in the attribute group. | |
| """ | |
| if field_name in self.fields: | |
| return self.fields[field_name] | |
| else: | |
| print(f"Field '{field_name}' does not exist in the '{self.group_name}' attribute group.") | |
| def to_dict(self): | |
| """ | |
| Convert the attribute group to a dictionary with non-null values. | |
| """ | |
| return {field: value for field, value in self.fields.items() if value is not None} | |
| def __repr__(self): | |
| """ | |
| String representation of the attribute group with non-null fields. | |
| """ | |
| fields_repr = ", ".join(f"{k}={v}" for k, v in self.fields.items() if v is not None) | |
| return f"{self.group_name}({fields_repr})" | |
| class UserProfile: | |
| """ | |
| Represents a user profile, dynamically initialised with attribute groups based on the DataDictionary. | |
| Includes a lazily initialised FastFacts section for storing additional facts about the user. | |
| """ | |
| def __init__(self, data_dictionary): | |
| self.data_dictionary = data_dictionary # Store the data dictionary for dynamic group creation | |
| self.attribute_groups = {} # Dictionary to hold created attribute groups | |
| self.ID = None # Unique identifier for the user profile | |
| self.fast_facts = None # Lazily initialised FastFacts attribute | |
| def set_ID(self, ID): | |
| """ | |
| Set the ID for the user profile. | |
| """ | |
| self.ID = ID | |
| def set_field(self, group_name, field_name, value): | |
| """ | |
| Set a value for a field in a specific attribute group. | |
| If the group does not already exist, it will be created dynamically. | |
| """ | |
| if group_name not in self.attribute_groups: | |
| # Create the AttributeGroup only when needed | |
| if group_name in self.data_dictionary.get_types(): | |
| self.attribute_groups[group_name] = AttributeGroup( | |
| group_name, | |
| self.data_dictionary.get_parameters(type=group_name) | |
| ) | |
| else: | |
| print(f"Attribute group '{group_name}' is not defined in the DataDictionary.") | |
| return | |
| self.attribute_groups[group_name].set_field(field_name, value) | |
| def get_field(self, group_name, field_name): | |
| """ | |
| Get a value for a field in a specific attribute group. | |
| """ | |
| if group_name not in self.attribute_groups: | |
| print(f"Attribute group '{group_name}' is not found.") | |
| return None | |
| return self.attribute_groups[group_name].get_field(field_name) | |
| def set_fields_from_list(self, attribute_type, fields, field_key="field_name", value_key="value"): | |
| """ | |
| Sets fields in a UserProfile from a list of field-value pairs. | |
| Args: | |
| attribute_type (str): The name of the attribute group (e.g., "Values"). | |
| fields (list): A list of dictionaries with field names and values to set. | |
| field_key (str): The key in the dictionary that corresponds to the field name. | |
| value_key (str): The key in the dictionary that corresponds to the value. | |
| """ | |
| if not isinstance(fields, list) or not all(isinstance(field, dict) for field in fields): | |
| print("Fields must be a list of dictionaries.") | |
| return | |
| for field in fields: | |
| field_name = field.get(field_key) # Use the specified key for field names | |
| value = field.get(value_key) # Use the specified key for values | |
| if field_name is not None and value is not None: | |
| self.set_field(attribute_type, field_name, value) | |
| else: | |
| print(f"Skipping invalid field: {field}") | |
| def get_attributes(self, attribute_type=None): | |
| """ | |
| Retrieve attributes for a specific attribute type or all attributes if no type is specified. | |
| Args: | |
| attribute_type (str, optional): The name of the attribute group to retrieve. | |
| If None, retrieves all attributes. | |
| Returns: | |
| dict: A dictionary of non-null attributes for the specified type or all types. | |
| """ | |
| if attribute_type: | |
| if attribute_type in self.attribute_groups: | |
| return self.attribute_groups[attribute_type].to_dict() | |
| else: | |
| print(f"Attribute type '{attribute_type}' does not exist in this user profile.") | |
| return {} | |
| else: | |
| # Combine all attributes if no specific type is specified | |
| all_attributes = {} | |
| for group_name, group in self.attribute_groups.items(): | |
| all_attributes.update({f"{group_name}_{k}": v for k, v in group.to_dict().items()}) | |
| return all_attributes | |
| def add_fast_facts(self, facts): | |
| """ | |
| Lazily initialise and add a set of facts to the FastFacts attribute. | |
| Args: | |
| facts (iterable): A collection of facts to add to FastFacts. | |
| """ | |
| if not isinstance(facts, (set, list)): | |
| print("Facts must be provided as a set or list.") | |
| return | |
| if self.fast_facts is None: | |
| self.fast_facts = FastFacts() | |
| self.fast_facts.add_facts(facts) | |
| def to_dict(self, data_dictionary): | |
| """ | |
| Convert the entire user profile to a dictionary. | |
| """ | |
| profile_dict = {'ID': self.ID} | |
| # Iterate over all types in the data dictionary | |
| for attribute_type in data_dictionary.get_types(): | |
| group_attributes = self.get_attributes(attribute_type) | |
| for field_name in data_dictionary.get_parameters(type=attribute_type): | |
| full_field_name = f"{attribute_type}_{field_name}" | |
| # Access the value directly from the dictionary | |
| value = group_attributes.get(field_name) | |
| # Ensure value exists before updating the profile_dict | |
| if value is not None: | |
| profile_dict[full_field_name] = value | |
| else: | |
| print(f"Warning: {field_name} not found in type {attribute_type}") | |
| return profile_dict | |
| def __repr__(self): | |
| """ | |
| String representation of the user profile with attribute groups. | |
| """ | |
| groups_repr = ", ".join(str(group) for group in self.attribute_groups.values()) | |
| return f"UserProfile(ID={self.ID}, {groups_repr})" | |
| def write_user_profiles_to_excel(user_profiles, filename, data_dictionary): | |
| """ | |
| Writes a list of UserProfile objects to an Excel file with columns ordered by the data dictionary. | |
| Args: | |
| user_profiles (list): List of UserProfile objects. | |
| filename (str): Path to the Excel file. | |
| data_dictionary (object): Data dictionary containing column order and metadata. | |
| """ | |
| if not user_profiles: | |
| print("No user profiles to write.") | |
| return | |
| # Convert user profiles to a list of dictionaries | |
| # profiles_data = [user_profile.to_dict(data_dictionary) for user_profile in user_profiles] | |
| profiles_data = [] | |
| for i, user_profile in enumerate(user_profiles): | |
| print(f"Processing profile {i+1}: {user_profile}") | |
| profile_dict = user_profile.to_dict(data_dictionary) | |
| print(f"Dict output: {profile_dict}") | |
| profiles_data.append(profile_dict) | |
| # Get the column order from the data dictionary | |
| column_order = ['ID'] + data_dictionary.get_columns() # Ensure this method exists and returns the column names in the desired order | |
| # Create a DataFrame from the profiles data | |
| df = pd.DataFrame(profiles_data) | |
| # Ensure all columns in the data dictionary are present in the DataFrame | |
| for column in column_order: | |
| if column not in df.columns: | |
| print(f"Column {column} is missing") | |
| df[column] = None # Add missing columns with NaN/None | |
| # Reorder columns based on the data dictionary | |
| df = df[column_order] | |
| # Write the DataFrame to an Excel file | |
| df.to_excel(filename, index=False) | |
| print(f"User profiles successfully written to {filename}") | |
| def read_user_profiles_from_excel(respondent_details_file, data_dictionary, pv_criteria): | |
| """ | |
| Reads a list of UserProfile objects from an Excel file and converts scores into definitions. | |
| Args: | |
| respondent_details_file (str): Path to the Excel file. | |
| data_dictionary (DataDictionary): Instance of DataDictionary containing valid fields. | |
| pv_criteria (PVAssessment): Instance of PVAssessment to retrieve text descriptions of values. | |
| Returns: | |
| list: List of UserProfile objects. | |
| """ | |
| user_profiles = [] | |
| # Read the Excel file into a DataFrame | |
| df = pd.read_excel(respondent_details_file) | |
| # Iterate over the rows in the DataFrame | |
| for _, row in df.iterrows(): | |
| user_profile = UserProfile(data_dictionary) | |
| # Set basic fields for UserProfile if they are present | |
| if pd.notna(row.get('ID')): | |
| user_profile.set_ID(row.get('ID')) | |
| # Iterate over all types in the data dictionary | |
| for attribute_type in data_dictionary.get_types(): | |
| for field in data_dictionary.get_parameters(type=attribute_type): | |
| full_field_name = f"{attribute_type}_{field}" | |
| raw_value = row.get(full_field_name) | |
| if pd.notna(raw_value): | |
| # Special handling for Values type | |
| if attribute_type.lower() == "values": | |
| try: | |
| score = int(raw_value) # Convert to integer score | |
| # Get description from PVAssessment | |
| description = PVAssessment.get_score_definition(field, score, pv_criteria) | |
| # Set score field | |
| #user_profile.set_field(attribute_type, field, score) | |
| # Set description field | |
| description_field_name = f"{attribute_type}_{field}_Description" | |
| user_profile.set_field(attribute_type, description_field_name, description) | |
| except ValueError: | |
| print(f"Warning: Could not convert '{raw_value}' to an integer for field '{full_field_name}'.") | |
| else: | |
| # Generic field setting for non-Values types | |
| user_profile.set_field(attribute_type, field, raw_value) | |
| user_profiles.append(user_profile) | |
| print(f"User profiles successfully read from {respondent_details_file}") | |
| return user_profiles | |
| class UserProfileDetail: | |
| def __init__(self, key, original_value, qa_check, value): | |
| """ | |
| Initialize a UserProfileDetail entry. | |
| """ | |
| self.key = key | |
| self.original_value = original_value | |
| self.qa_check = qa_check | |
| self.value = value | |
| def __repr__(self): | |
| fields = {k: v for k, v in self.__dict__.items() if v and v != "Unable to map"} | |
| formatted_fields = [f"{k}='{v}'" for k, v in fields.items()] | |
| return f"{self.__class__.__name__}: " + ", ".join(formatted_fields) + ")" | |
| def filter_profiles(profiles, key=None, qa_check=None, value=None): | |
| """ | |
| Static method to filter user profiles by key, QA check status, or value. | |
| Args: | |
| profiles (list): List of UserProfileDetail objects. | |
| key (str, optional): The key to filter by. | |
| qa_check (str, optional): The QA check status to filter by. | |
| value (str, optional): The value to filter by. | |
| Returns: | |
| list: A list of UserProfileDetail entries that match the criteria. | |
| """ | |
| return [ | |
| profile for profile in profiles | |
| if (key is None or profile.key == key) and | |
| (qa_check is None or profile.qa_check == qa_check) and | |
| (value is None or profile.value == value) | |
| ] | |
| def generate_user_profiles(file_path): | |
| """ | |
| Static method to generate a list of UserProfileDetail entries from an Excel (.xlsx) file. | |
| Args: | |
| file_path (str): The path to the Excel file containing user profile entries. | |
| Returns: | |
| list: A list of UserProfileDetail objects generated from the file. | |
| """ | |
| # Read the Excel file | |
| df = pd.read_excel(file_path) | |
| profiles = [] | |
| for _, row in df.iterrows(): | |
| profile = UserProfileDetail( | |
| key=row['Key'], | |
| original_value=row['Value'], | |
| qa_check=row['QA Check'], | |
| value=row['Revised Value'] | |
| ) | |
| profiles.append(profile) | |
| return profiles | |