| | |
| | """ |
| | data_warehouse |
| | |
| | Simple tools to manage data from text file |
| | """ |
| | from __future__ import absolute_import |
| |
|
| | import os |
| | import sys |
| | import itertools |
| | import numpy as np |
| |
|
| | from core_scripts.other_tools import list_tools |
| |
|
| | __author__ = "Xin Wang" |
| | __email__ = "wangxin@nii.ac.jp" |
| | __copyright__ = "Copyright 2021, Xin Wang" |
| |
|
| | class DataEntry: |
| | """DataEntry to store data for one entry |
| | """ |
| | def __init__(self, data, tags, comment=""): |
| | """DataEntry(data, tags, comment) |
| | |
| | args: |
| | data: any kind of python object |
| | tags: list of str, tags of the data entry |
| | comment: coment |
| | """ |
| | self.data_value = data |
| | self.tags = self._parse_tag(tags) |
| | self.comment = comment |
| | |
| | def _parse_tag(self, tags): |
| | """[tag_1, tag_2, tag_3] -> {1: tag1, 2: tag2, 3: tag3} |
| | """ |
| | temp = {x:y for x, y in enumerate(tags)} |
| | return temp |
| | |
| | def get_value(self): |
| | return self.data_value |
| | |
| | def get_tag(self, tag_idx): |
| | return self.tags[tag_idx] |
| | |
| | def check_tags(self, tag_indices, tag_values): |
| | """check_tags(tag_indices, tag_values) |
| | check whether the specified tag is equal to the tag value |
| | |
| | input: |
| | tag_indices: list, self.tags[tag_index] should be accessible |
| | tag_values: list, self.tags[tag_index] == tag_value? |
| | |
| | output: |
| | True: if tag_values are matched with tags if this data |
| | """ |
| | for tag_idx, tag_value in zip(tag_indices, tag_values): |
| | if self.tags[tag_idx] != tag_value: |
| | return False |
| | return True |
| | |
| | class DataWarehouse: |
| | """DataWarehouse to manage data with multi-view |
| | """ |
| | def __init__(self, orig_file_path, parse_value_methods, parse_tag_methods): |
| | """DataWarehouse(orig_file_path, parse_methods) |
| | input: |
| | orig_file_path: str, path to the original file |
| | parse_methods: list of functions, to parse the data entry |
| | """ |
| | self.file_path = orig_file_path |
| | self.parse_v_methods = parse_value_methods |
| | self.parse_t_methods = parse_tag_methods |
| | self.data_list = [] |
| | self.tag_list = {} |
| | self.data_entries = self._parse_file() |
| | |
| | def _parse_file(self): |
| | |
| | data_content = list_tools.read_list_from_text(self.file_path) |
| | |
| | for data_entry in data_content: |
| | |
| | for parse_v_method, parse_t_method in \ |
| | zip(self.parse_v_methods, self.parse_t_methods): |
| | |
| | |
| | data_value = parse_v_method(data_entry) |
| | |
| | tags = [x(data_entry) for x in parse_t_method] |
| | |
| | |
| | if data_value is None or None in tags: |
| | continue |
| | |
| | |
| | tmp_data_entry = DataEntry(data_value, tags) |
| | self.data_list.append(tmp_data_entry) |
| | |
| | |
| | for tag_id, tag_val in enumerate(tags): |
| | self._add_tag(tag_id, tag_val) |
| | return |
| | |
| | def _add_tag(self, tag_id, tag_val): |
| | |
| | if tag_id in self.tag_list: |
| | if not tag_val in self.tag_list[tag_id]: |
| | self.tag_list[tag_id].append(tag_val) |
| | else: |
| | self.tag_list[tag_id] = [tag_val] |
| | return |
| | |
| | |
| | def get_view(self, tag_idxs, tag_values, score_parse = None): |
| | """ get_view(tag_idxs, tag_values, score_parse = None) |
| | |
| | input: |
| | tag_idxs: list, the index of the tag slot to check |
| | tag_values: list, the value of the tag slot to compare |
| | score_parse: function, a function to extract score from entry |
| | |
| | output: |
| | data_view: list of data |
| | """ |
| | |
| | data_view = [x.get_value() for x in self.data_list \ |
| | if x.check_tags(tag_idxs, tag_values)] |
| | if score_parse is not None: |
| | return [score_parse(x) for x in data_view] |
| | else: |
| | return data_view |
| | |
| | def _to_numpy(self, data_list, dims, statistics): |
| | """ convert data_list to numpy |
| | """ |
| | |
| | max_length = max([len(x) for x in data_list]) |
| | |
| | if statistics is None: |
| | data_array = np.ones([np.prod(dims), max_length]) * np.inf |
| |
|
| | for idx, data_entry in enumerate(data_list): |
| | data_array[idx, 0:len(data_entry)] = np.array(data_entry) |
| | return np.reshape(data_array, dims + [max_length]) |
| | else: |
| | data_array = np.ones([np.prod(dims)]) |
| |
|
| | for idx, data_entry in enumerate(data_list): |
| | if data_entry: |
| | data_array[idx] = statistics(data_entry) |
| | return np.reshape(data_array, dims) |
| | |
| | |
| | def get_views_cross(self, tag_idxs, tag_values, |
| | score_parse=None, to_numpy=False, statistics=None): |
| | """get_views_cross(self, tag_idxs, tag_values, |
| | score_parse=None, to_numpy=False, statistics=None) |
| | input: |
| | tag_idxs: list, list of tag indices to check |
| | tag_values: list of list, for each tag_index, |
| | A list of tags will be created through this cross: |
| | tag_values[0] x tag_values[1] x ... |
| | |
| | Then, each combination is used to retrieve the data |
| | output data will be a tensor of |
| | [len(tag_values[0]), len(tag_values[1]), ...] |
| | |
| | output: |
| | data_list: |
| | """ |
| | data_list = [] |
| | data_mat_size = [len(x) for x in tag_values] |
| | |
| | tag_iter = itertools.product(*tag_values) |
| | for tag_ent in tag_iter: |
| | data_list.append(self.get_view(tag_idxs, tag_ent, score_parse)) |
| | |
| | if to_numpy: |
| | return self._to_numpy(data_list, data_mat_size, statistics) |
| | else: |
| | return data_list |
| | |
| | def get_tags(self, tag_idx): |
| | if tag_idx in self.tag_list: |
| | return self.tag_list[tag_idx] |
| | else: |
| | return None |
| |
|
| | if __name__ == "__main__": |
| | print("tools for data warehouse") |
| |
|