3AM / asmk-src /asmk /asmk_method.py
nycu-cplab's picture
app building
e6f20b8
"""A wrapper around all asmk-related packages for convenient use"""
import os.path
import time
import numpy as np
from . import kernel as kern_pkg, codebook as cdb_pkg, index as idx_pkg, inverted_file as ivf_pkg
from . import io_helpers
class ASMKMethod:
"""
Class to keep necessary objects and provide easy access to asmk method's steps. Each step
of asmk method corresponds to one method. Use initialize_untrained() class method instead
of directly calling the constructor.
:param dict params: contains keys index, train_codebook, build_ivf and query_ivf, each
containing the corresponding step parameters
:param dict metadata: only stored by this object, never changed
:param Codebook codebook: object from the codebook module
:param ASMKKernel kernel: object from the kernel module
:param IVF inverted_file: object from the inverted_file module
"""
def __init__(self, params, metadata, *, codebook=None, kernel=None, inverted_file=None):
self.params = params
self.metadata = metadata
self.codebook = codebook
self.kernel = kernel
self.inverted_file = inverted_file
@classmethod
def initialize_untrained(cls, params):
"""Initialize asmk method before training, provided only params (see constructor docs)"""
return cls(params, {})
#
# Method steps
#
def train_codebook(self, *columns, cache_path=None, step_params=None):
"""The first step of the method - training the codebook (or loading from cache)
:param ndarray vecs: 2D numpy array, rows are vectors for codebook training
:param str cache_path: trained codebook will be stored under given file path and loaded
next time without training (None to turn off)
:param dict step_params: parameters that will override stored parameters for this step
(self.params['train_codebook'])
:return: new ASMKMethod object (containing metadata of this step), do not change self
"""
assert not self.codebook, "Codebook already trained"
index_factory = idx_pkg.initialize_index(**self.params['index'])
step_params = step_params or self.params.get("train_codebook")
if cache_path and os.path.exists(cache_path):
time0 = time.time()
cdb = cdb_pkg.Codebook.initialize_from_state(io_helpers.load_pickle(cache_path),
index_factory=index_factory)
cdb.index()
assert cdb.params == step_params['codebook']
metadata = {"load_time": time.time() - time0}
else:
cdb = cdb_pkg.Codebook(**step_params['codebook'], index_factory=index_factory)
metadata = cdb.train(*columns)
if cache_path:
io_helpers.save_pickle(cache_path, cdb.state_dict())
metadata["index_class"] = index_factory.__class__.__name__
return self.__class__({**self.params, "train_codebook": step_params},
{**self.metadata, "train_codebook": metadata},
codebook=cdb)
def build_ivf(self, *columns, distractors_path=None, cache_path=None, step_params=None):
"""The second step of the method - building the ivf (or loading from cache)
:param ndarray vecs: 2D numpy array, rows are vectors to be indexed by the ivf
:param ndarray imids: 1D numpy array of image ids corresponding to 'vecs'
:param str distractors_path: ivf will be initialized with given distractors ivf path
:param str cache_path: built ivf will be stored under given file path and loaded
next time without training (None to turn off)
:param dict step_params: parameters that will override stored parameters for this step
(self.params['build_ivf'])
:return: new ASMKMethod object (containing metadata of this step), do not change self
"""
builder = self.create_ivf_builder(cache_path=cache_path, step_params=step_params)
# Skip if loaded, otherwise add everything at once
if not builder.loaded_from_cache:
if distractors_path:
builder.initialize_with_distractors(distractors_path)
builder.add(*columns)
return self.add_ivf_builder(builder)
def create_ivf_builder(self, *, cache_path=None, step_params=None):
"""Part of the second step of the method, see build_ivf() method implementation for usage
:param str cache_path: built ivf will be stored under given file path and loaded
next time without training (None to turn off)
:param dict step_params: parameters that will override stored parameters for this step
(self.params['build_ivf'])
:return: IvfBuilder object
"""
assert not self.kernel and not self.inverted_file, "Inverted file already built"
step_params = step_params or self.params.get("build_ivf")
kern = kern_pkg.ASMKKernel(self.codebook, **step_params['kernel'])
return IvfBuilder(step_params, self.codebook, kern, cache_path=cache_path)
def add_ivf_builder(self, ivf_builder):
"""Part of the second step of the method, see build_ivf() method implementation for usage
:param IvfBuilder ivf_builder: Builder with vectors added
:return: new ASMKMethod object (containing metadata from the builder), do not change self
"""
ivf_metadata = ivf_builder.save()
return self.__class__({**self.params, "build_ivf": ivf_builder.step_params},
{**self.metadata, "build_ivf": ivf_metadata},
codebook=self.codebook, kernel=ivf_builder.kernel,
inverted_file=ivf_builder.ivf)
def query_ivf(self, *columns, step_params=None, progress=None):
"""The last step of the method - querying the ivf
:param ndarray qvecs: 2D numpy array, rows are vectors, each acting as a query for the ivf
:param ndarray qimids: 1D numpy array of image ids corresponding to 'qvecs'
:param dict step_params: parameters that will override stored parameters for this step
(self.params['query_ivf'])
:param bool progress: step at which update progress printing (None to disable)
:return: tuple (dict metadata, ndarray images, 2D ndarray ranks, 2D ndarray scores), do not
change self
"""
step_params = step_params or self.params.get("query_ivf")
time0 = time.time()
images, ranks, scores = self.accumulate_scores(self.codebook, self.kernel, \
self.inverted_file, *columns, params=step_params, progress=progress)
metadata = {"query_avg_time": (time.time()-time0)/len(ranks)}
return metadata, images, ranks, scores
#
# Helper functions
#
@staticmethod
def accumulate_scores(cdb, kern, ivf, qvecs, qimids, *cols, params, progress=None):
"""Accumulate scores for every query image (qvecs, qimids) given codebook, kernel,
inverted_file and parameters."""
similarity_func = lambda *x: kern.similarity(*x, **params["similarity"])
acc = []
slices = list(io_helpers.slice_unique(qimids))
for imid, seq in io_helpers.progress(slices, frequency=progress, header="Query"):
quantized = cdb.quantize(qvecs[seq], *(x[seq] for x in cols), **params["quantize"])
aggregated = kern.aggregate_image(*quantized, **params["aggregate"])
ranks, scores = ivf.search(*aggregated, **params["search"], similarity_func=similarity_func)
acc.append((imid, ranks, scores))
imids_all, ranks_all, scores_all = zip(*acc)
return np.array(imids_all), np.vstack(ranks_all), np.vstack(scores_all)
class IvfBuilder:
"""Inverted file (IVF) wrapper simplifying vector addition
:param dict step_params: contains parameters for build_ivf step
:param Codebook codebook: object from the codebook module
:param ASMKKernel kernel: object from the kernel module
:param str cache_path: built ivf will be stored under given file path and loaded
next time without training (None to turn off)
"""
def __init__(self, step_params, codebook, kernel, *, cache_path):
self.step_params = step_params
self.codebook = codebook
self.kernel = kernel
if cache_path and os.path.exists(cache_path):
time0 = time.time()
self.ivf = ivf_pkg.IVF.initialize_from_state(io_helpers.load_pickle(cache_path))
self.metadata = {"load_time": time.time() - time0}
self.cache_path = None
else:
self.ivf = ivf_pkg.IVF.initialize_empty(**step_params['ivf'],
codebook_size=codebook.size)
self.metadata = {"index_time": 0}
self.cache_path = cache_path
@property
def loaded_from_cache(self):
"""If the contained IVF was loaded (otherwise, it is empty after initialization)"""
return "load_time" in self.metadata
def initialize_with_distractors(self, path):
"""Initialize with distractors ivf at given path"""
self.ivf = ivf_pkg.IVF.initialize_from_state(io_helpers.load_pickle(path))
self.ivf.imid_offset = self.ivf.n_images
def add(self, *columns, progress=None):
"""Add descriptors and cooresponding image ids to the IVF
:param np.ndarray vecs: 2D array of local descriptors
:param np.ndarray imids: 1D array of image ids
:param bool progress: step at which update progress printing (None to disable)
"""
time0 = time.time()
quantized = self.codebook.quantize(*columns, **self.step_params["quantize"])
if progress:
print(">> Descriptors quantized")
aggregated = self.kernel.aggregate(*quantized, **self.step_params["aggregate"], progress=progress)
self.ivf.add(*aggregated, progress=200*progress if progress else None)
self.metadata['index_time'] += time.time() - time0
def save(self):
"""Save to cache path if defined
:return: dict metadata with ivf stats
"""
if self.cache_path:
io_helpers.save_pickle(self.cache_path, self.ivf.state_dict())
return {**self.metadata, "ivf_stats": self.ivf.stats}