Source code for sad.generator.simulation

#
# For licensing see accompanying LICENSE file.
# Copyright (C) 2020 Apple Inc. All Rights Reserved.
#

import os
import tarfile
from typing import Any, Iterator, Tuple

import numpy as np
from numpy.random import RandomState

from sad.model import SADModel

from .base import GeneratorBase, GeneratorFactory


[docs]@GeneratorFactory.register class SimulationGenerator(GeneratorBase): """A concrete generator class that handles simulated data from the generative model of ``SAD``. After an instance of this class is created, ``self.add(filepath)`` will need to be called to add a local file to this generator. The format of the local file is a compressed tarball, containing a ``raw.npz`` file, inside which true model parameters ``XI0``, ``T0`` ``H0`` and ``X0`` (derived from the first three matrices) are contained. An observation tensor ``Obs0`` is in the raw file as well, containing a fully observed personalized pairwise comparision taking values of ``-1`` or ``1``. One can set ``self.missing_ratio`` to control the percentage of missing data in the observation. Details see below. """ def __init__(self, config: dict, model: SADModel, task: "TrainingTask"): super().__init__(config, model, task) @property def XI0(self) -> np.ndarray: """The true user matrix (``k x n``) containing user vectors as columns.""" return self._XI0 @XI0.setter def XI0(self, XI0: np.ndarray): self._XI0 = XI0 @property def H0(self) -> np.ndarray: """The true left item matrix (``k x m``) containing item left vectors as columns.""" return self._H0 @H0.setter def H0(self, H0: np.ndarray): self._H0 = H0 @property def T0(self) -> np.ndarray: """The true right item matrix (``k x m``) containing item right vectors as columns.""" return self._T0 @T0.setter def T0(self, T0: np.ndarray): self._T0 = T0 @property def X0(self) -> np.ndarray: """The three way tensor (``n x m x m``) containing true preference scores.""" return self._X0 @X0.setter def X0(self, X0: np.ndarray): self._X0 = X0 @property def Obs0(self) -> np.ndarray: """Three way tensor containing observations. An alias to ``self.tensor``.""" return self.tensor @property def missing_ratio(self) -> float: """Proportion of missing entries in ``self.Obs0``. Default to ``0`` meaning no observation is missing. Will read directly from ``"missing_ratio"`` field in ``self.spec``. Missing entries in ``self.Obs0`` will be set to ``0`` when ``self.prepare()`` is invoked.""" return self.spec.get("missing_ratio", 0) @property def ll0(self) -> float: """The log likelihood of non-missing observations under true parameter values. Its value will be set after running ``self.prepare()``.""" return self._ll0 @ll0.setter def ll0(self, ll0: float): self._ll0 = ll0 @property def rnd_seed(self) -> int: """Random seed. Used for reproducibility purposes. Will read directly from ``"rnd_seed"`` field from ``self.spec``.""" return self.spec.get("rnd_seed", 10203) def __iter__(self) -> Iterator[Tuple[int, int, int, int]]: return self._gen_producer() def _gen_producer(self) -> Iterator[Tuple[int, int, int, int]]: mode = self.mode if mode == "random": return self._gen_producer_random() elif mode == "iteration": return self._gen_producer_iteration() def _gen_producer_random(self) -> Iterator[Tuple[int, int, int, int]]: """A protected helper function to produce samples in ``"random"`` mode.""" model = self.model u_batch = self.u_batch i_batch = self.i_batch for u_idx in np.random.choice(model.n, u_batch, replace=True): ii_idxs = np.random.choice(model.m, i_batch, replace=True) jj_idxs = np.random.choice(model.m, i_batch, replace=True) for i_idx, j_idx in zip(ii_idxs, jj_idxs): obs = self.tensor[u_idx, i_idx, j_idx] yield (u_idx, i_idx, j_idx, obs) def _gen_producer_iteration(self) -> Iterator[Tuple[int, int, int, int]]: """A protected helper function to produce samples in ``"iteration"`` mode.""" model = self.model u_shuffled = list(range(model.n)) np.random.shuffle(u_shuffled) i_shuffled = list(range(model.m)) np.random.shuffle(i_shuffled) for uu_idx in range(model.n): u_idx = u_shuffled[uu_idx] for ii_idx in range(model.m): i_idx = i_shuffled[ii_idx] for jj_idx in range(ii_idx + 1, model.m): j_idx = i_shuffled[jj_idx] obs = self.tensor[u_idx, i_idx, j_idx] yield (u_idx, i_idx, j_idx, obs)
[docs] def prepare(self): """Instance method that will be called to inform a generator that all raw data have been added. For this class, the format of raw data is a compressed tarball, containing a ``raw.npz`` file. Upon being called, following steps will be performed. For this class only one raw data file is allowed to be added to the generator. 1. Unzip raw data tarball. Read true parameter values from ``raw.npz`` file, set corresponding attributes of current generator. 2. Create a ``self.user_idx_to_id`` and ``self.user_id_to_idx`` mapping. The same will be created for items. 3. Randomly set certain proportion of observations to ``0``, suggesting data are missing. In the meanwhile, calculate log likelihood of observed entries under true parameter values. 4. Create ``self.user_idx_to_preference``, a mapping between user idx to another dictionary, with keys being a tuple of two items (in ``item_id``) and values being ``1``. The order of the two items in keys indicate their preference. """ assert len(self.input_files) == 1 input_file = self.input_files[0] folder = os.path.dirname(input_file) with tarfile.open(input_file) as tf: tf.extractall(folder) input_file = os.path.join(input_file.replace(".tar.gz", ""), "raw.npz") data = np.load(input_file) XI0, H0, T0, X0, Obs0 = ( data["XI0"], data["H0"], data["T0"], data["X0"], data["Obs0"], ) k, n = XI0.shape _, m = H0.shape assert n == self.model.n assert m == self.model.m user_set = list(range(n)) item_set = list(range(m)) user_idx_to_id = dict(zip(range(n), user_set)) item_idx_to_id = dict(zip(range(m), item_set)) ll0 = 0 rng = RandomState(self.rnd_seed) for u_idx in range(n): for i_idx in range(m): for j_idx in range(i_idx + 1, m): coin = 0 if self.missing_ratio > 0: coin = rng.binomial(1, self.missing_ratio) if coin: Obs0[u_idx, i_idx, j_idx] = 0 Obs0[u_idx, j_idx, i_idx] = 0 else: o = Obs0[u_idx, i_idx, j_idx] xuij = X0[u_idx, i_idx, j_idx] ll0 += (o - 1) * xuij - np.log(1 + np.exp(-1 * xuij)) # create mapping between user idx to item preference pairs # item preference pair is stored as a map as well, with key being a tuple of a item pair user_idx_to_preference = {} for u_idx in range(n): user_idx_to_preference[u_idx] = {} for i_idx in range(m): i_id = item_idx_to_id[i_idx] for j_idx in range(i_idx + 1, m): j_id = item_idx_to_id[j_idx] obs = Obs0[u_idx, i_idx, j_idx] if obs == 1: user_idx_to_preference[u_idx][(i_id, j_id)] = 1 elif obs == -1: user_idx_to_preference[u_idx][(j_id, i_id)] = 1 self.data = data self.user_idx_to_id = user_idx_to_id self.item_idx_to_id = item_idx_to_id self.user_id_to_idx = dict(zip(user_idx_to_id.values(), user_idx_to_id.keys())) self.item_id_to_idx = dict(zip(item_idx_to_id.values(), item_idx_to_id.keys())) self.XI0 = XI0 self.H0 = H0 self.T0 = T0 self.X0 = X0 self.ll0 = ll0 self.tensor = Obs0 self.user_idx_to_preference = user_idx_to_preference
[docs] def get_obs_uij(self, u_idx: int, i_idx: int, j_idx: int) -> int: """Get the ``(u, i, j)``-th observation from observation tensor ``self.Obs0``. Args: u_idx (:obj:`int`): The user idx. i_idx (:obj:`int`): Index of first item in comparison. j_idx (:obj:`int`): Index of second item in comparison. Returns: :obj:`int`: A value from ``(-1, 1, 0)`` indicating the personalized preference of the two items. ``1`` indicates ``i_idx``-th item is preferable than ``j_idx``-th; ``-1`` suggests otherwise; ``0`` indicate such information is not available. """ return self.tensor[u_idx, i_idx, j_idx]
[docs] def get_trn(self) -> Iterator[Any]: pass
[docs] def get_val_or_not(self) -> Iterator[Any]: pass