csun22's picture
Upload 59 files
ca1888b verified
#!/usr/bin/env python
"""
data_warehouse
Simple tools to manage data from text file
"""
from __future__ import absolute_import
import os
import sys
import itertools
import numpy as np
from core_scripts.other_tools import list_tools
__author__ = "Xin Wang"
__email__ = "wangxin@nii.ac.jp"
__copyright__ = "Copyright 2021, Xin Wang"
class DataEntry:
"""DataEntry to store data for one entry
"""
def __init__(self, data, tags, comment=""):
"""DataEntry(data, tags, comment)
args:
data: any kind of python object
tags: list of str, tags of the data entry
comment: coment
"""
self.data_value = data
self.tags = self._parse_tag(tags)
self.comment = comment
def _parse_tag(self, tags):
"""[tag_1, tag_2, tag_3] -> {1: tag1, 2: tag2, 3: tag3}
"""
temp = {x:y for x, y in enumerate(tags)}
return temp
def get_value(self):
return self.data_value
def get_tag(self, tag_idx):
return self.tags[tag_idx]
def check_tags(self, tag_indices, tag_values):
"""check_tags(tag_indices, tag_values)
check whether the specified tag is equal to the tag value
input:
tag_indices: list, self.tags[tag_index] should be accessible
tag_values: list, self.tags[tag_index] == tag_value?
output:
True: if tag_values are matched with tags if this data
"""
for tag_idx, tag_value in zip(tag_indices, tag_values):
if self.tags[tag_idx] != tag_value:
return False
return True
class DataWarehouse:
"""DataWarehouse to manage data with multi-view
"""
def __init__(self, orig_file_path, parse_value_methods, parse_tag_methods):
"""DataWarehouse(orig_file_path, parse_methods)
input:
orig_file_path: str, path to the original file
parse_methods: list of functions, to parse the data entry
"""
self.file_path = orig_file_path
self.parse_v_methods = parse_value_methods
self.parse_t_methods = parse_tag_methods
self.data_list = []
self.tag_list = {}
self.data_entries = self._parse_file()
def _parse_file(self):
# load list
data_content = list_tools.read_list_from_text(self.file_path)
for data_entry in data_content:
# iterate over parse methods
for parse_v_method, parse_t_method in \
zip(self.parse_v_methods, self.parse_t_methods):
# get value
data_value = parse_v_method(data_entry)
# get tag
tags = [x(data_entry) for x in parse_t_method]
# skip invalid line
if data_value is None or None in tags:
continue
# create data entry
tmp_data_entry = DataEntry(data_value, tags)
self.data_list.append(tmp_data_entry)
# add tag to the self.tag_list
for tag_id, tag_val in enumerate(tags):
self._add_tag(tag_id, tag_val)
return
def _add_tag(self, tag_id, tag_val):
# collect all possible tags for the tag_id-th tag
if tag_id in self.tag_list:
if not tag_val in self.tag_list[tag_id]:
self.tag_list[tag_id].append(tag_val)
else:
self.tag_list[tag_id] = [tag_val]
return
def get_view(self, tag_idxs, tag_values, score_parse = None):
""" get_view(tag_idxs, tag_values, score_parse = None)
input:
tag_idxs: list, the index of the tag slot to check
tag_values: list, the value of the tag slot to compare
score_parse: function, a function to extract score from entry
output:
data_view: list of data
"""
data_view = [x.get_value() for x in self.data_list \
if x.check_tags(tag_idxs, tag_values)]
if score_parse is not None:
return [score_parse(x) for x in data_view]
else:
return data_view
def _to_numpy(self, data_list, dims, statistics):
""" convert data_list to numpy
"""
# maximum length of one data entry
max_length = max([len(x) for x in data_list])
# create data array
if statistics is None:
data_array = np.ones([np.prod(dims), max_length]) * np.inf
for idx, data_entry in enumerate(data_list):
data_array[idx, 0:len(data_entry)] = np.array(data_entry)
return np.reshape(data_array, dims + [max_length])
else:
data_array = np.ones([np.prod(dims)])
for idx, data_entry in enumerate(data_list):
if data_entry:
data_array[idx] = statistics(data_entry)
return np.reshape(data_array, dims)
def get_views_cross(self, tag_idxs, tag_values,
score_parse=None, to_numpy=False, statistics=None):
"""get_views_cross(self, tag_idxs, tag_values,
score_parse=None, to_numpy=False, statistics=None)
input:
tag_idxs: list, list of tag indices to check
tag_values: list of list, for each tag_index,
A list of tags will be created through this cross:
tag_values[0] x tag_values[1] x ...
Then, each combination is used to retrieve the data
output data will be a tensor of
[len(tag_values[0]), len(tag_values[1]), ...]
output:
data_list:
"""
data_list = []
data_mat_size = [len(x) for x in tag_values]
tag_iter = itertools.product(*tag_values)
for tag_ent in tag_iter:
data_list.append(self.get_view(tag_idxs, tag_ent, score_parse))
if to_numpy:
return self._to_numpy(data_list, data_mat_size, statistics)
else:
return data_list
def get_tags(self, tag_idx):
if tag_idx in self.tag_list:
return self.tag_list[tag_idx]
else:
return None
if __name__ == "__main__":
print("tools for data warehouse")