Source code for sad.generator.implicit_fb

#
# For licensing see accompanying LICENSE file.
# Copyright (C) 2020 Apple Inc. All Rights Reserved.
#

import copy
import json
import os
import tarfile
from typing import Any, Iterator, Tuple

import cornac.data as CData
import numpy as np
import pandas as pd
import surprise
from recommenders.models.ncf.dataset import Dataset as NCFDataset

from sad.model import SADModel

from .base import GeneratorBase, GeneratorFactory

INTERACTION_FILENAME_TRN = "raw.json"
INTERACTION_FILENAME_VAL = "deleted_raw.json"

RATING_FILENAME_TRN = "raw_with_rating.json"
RATING_FILENAME_VAL = "deleted_raw_with_rating.json"


[docs]@GeneratorFactory.register class ImplicitFeedbackGenerator(GeneratorBase): """A concrete generator class that handles user-item implicit feedbacks. After an instance of this class is created, ``self.add(filepath)`` will need to be called to add a local file to this generator. The format of the local file is a compressed tarball, containing a ``raw.json`` file, and an optionally ``raw_with_rating.json`` file. The ``raw.json`` file is a dictionary mapping a user (in ``user_id``) to a list of items (in ``item_id``) that the user has interacted with. The optional ``raw_with_rating.json`` file is a nested dictionary. It is a mapping between a user (in ``user_id``) and items that the user has rated. The value of the dictionary is another dict with mapping between items (in ``item_id``) and their rating scores. """ def __init__(self, config: dict, model: SADModel, task: "TrainingTask"): super().__init__(config, model, task) self._data_df = None self._cornac_dataset = None self._msft_ncf_dataset = None self._surprise_dataset = None @property def data_df(self) -> pd.DataFrame: """A Pandas Dataframe containing user/item pairs and ratings associated with them. For ``ImplicitFeedbackGenerator`` the ratings are set to ``1.0|0.0``. User and item IDs are under ``userID`` and ``itemID`` respectively. """ if not self._data_df: records = [] for u_id, u_idx in self.user_id_to_idx.items(): inter_iidxs, nonint_iidxs = self.uidx_to_iidxs_tuple[u_idx] for i_idx in inter_iidxs: i_id = self.item_idx_to_id[i_idx] records.append({"userID": u_id, "itemID": i_id, "rating": 1}) for i_idx in nonint_iidxs: i_id = self.item_idx_to_id[i_idx] records.append({"userID": u_id, "itemID": i_id, "rating": 0}) self._data_df = pd.DataFrame(records) return self._data_df @data_df.setter def data_df(self, data_df: pd.DataFrame): self._data_df = data_df @property def cornac_dataset(self) -> CData.Dataset: """A Cornac Dataset object containing user/item pairs and ratings associated with them. Will be used for fitting models from ``cornac`` package. """ if not self._cornac_dataset: data_df = self.data_df self._cornac_dataset = CData.Dataset.from_uir( data_df.itertuples(index=False) ) return self._cornac_dataset @property def msft_ncf_dataset(self) -> NCFDataset: """A NCF (Neural Collaborative Filtering) Dataset object implemented in ``recommenders`` package from MSFT. It contains user/item pairs and ratings associated with them. Will be used for fitting a NCF model using ``recommenders`` package. """ if not self._msft_ncf_dataset: data_df = self.data_df self._msft_ncf_dataset = NCFDataset(train=data_df) return self._msft_ncf_dataset @property def surprise_dataset(self) -> surprise.Dataset: """A Dataset object implemented in ``surprise`` package. It contains user/item pairs and ratings associated with them. Will be used for fitting a SVD model using ``surprise`` package. """ if not self._surprise_dataset: data_df = self.data_df reader = surprise.reader.Reader(rating_scale=(0, 1)) self._surprise_dataset = surprise.Dataset.load_from_df( data_df, reader ).build_full_trainset() return self._surprise_dataset def __iter__(self) -> Iterator[Tuple[int, int, int, int]]: return self._gen_producer() def _gen_producer(self) -> Iterator[Tuple[int, int, int, int]]: mode = self.mode if mode == "random": return self._gen_producer_random() elif mode == "iteration": return self._gen_producer_iteration() def _gen_producer_random(self) -> Iterator[Tuple[int, int, int, int]]: """A protected helper function to produce samples in ``"random"`` mode.""" model = self.model u_batch = self.u_batch i_batch = self.i_batch for u_idx in np.random.choice(model.n, u_batch, replace=True): inter_item_idxs, nonint_item_idxs = self.uidx_to_iidxs_tuple[u_idx] inter_item_idxs = np.random.choice( list(inter_item_idxs), i_batch, replace=True ) nonint_item_idxs = np.random.choice( list(nonint_item_idxs), i_batch, replace=True ) for i_idx, j_idx in zip(inter_item_idxs, nonint_item_idxs): yield (u_idx, i_idx, j_idx, 1) def _gen_producer_iteration(self) -> Iterator[Tuple[int, int, int, int]]: """A protected helper function to produce samples in ``"iteration"`` mode.""" model = self.model u_shuffled = list(range(model.n)) np.random.shuffle(u_shuffled) for uu_idx in range(model.n): u_idx = u_shuffled[uu_idx] inter_item_idxs, nonint_item_idxs = self.uidx_to_iidxs_tuple[u_idx] inter_item_idxs = list(inter_item_idxs) np.random.shuffle(inter_item_idxs) for i_idx in inter_item_idxs: for j_idx in np.random.choice( list(nonint_item_idxs), size=self.n_negatives ): yield (u_idx, i_idx, j_idx, 1)
[docs] def prepare(self): """Instance method that will be called to inform a generator instance that all raw data have been added. For this class, the format of raw data is a compressed tarball, containing a ``raw.json`` file, and optionally, a ``raw_with_rating.json`` file, a ``delete_raw.json``, and `delete_raw_with_rating.json`. The second two files contain hold-out user-item interactions (and their ratings). Upon being called, following steps will be performed. 1. Unzip raw data tarball. Read the ``raw.json`` and ``raw_with_rating.json`` file. When multiple such tarballs exist, their json files will be merged into one. When hold-out user-item interactions exist (``delete_raw.json``, and ``delete_raw_with_rating.json``), those interactions will be read too. Interaction data will be read to ``self.data_trn``, ``self.data_val`` and ``self.data_all`` fields. Data with ratings will be in ``self.ratings_trn``, ``self.ratings_val``, and ``self.ratings_all``. 2. Create a ``self.user_idx_to_id`` and ``self.user_id_to_idx`` mapping. The same will be created for items. 3. Create (optionally) ``self.tensor`` with size ``n x m x m`` containing personalized pairwise comparison between items. Its value takes ``-1``, ``1`` and ``0``, meaning first item is less preferable, more preferable and preference not available respectively. This tensor is only created when ``self.tensor_flag`` is set to ``True``. Large values of ``n`` and ``m`` may result memory overflow. 4. Create ``self.uidx_to_iidxs_tuple``, a mapping between user idx to a tuple of two sets, with first one being interacted items and second one being non-interacted items, in ``item_idx``. 5. Create ``self.user_idx_to_preference``, a mapping between user idx to another dictionary, with keys being a tuple of two items (in ``item_id``) and values being ``1``. The order of the two items in keys indicate their preference. """ data_trn = {} # training user-item interaction data_val = {} # validation user-item interaction ratings_trn = {} # training item rating ratings_val = {} # validation item rating data_all = {} # combined ratings_all = {} # combined for input_file in self.input_files: folder = os.path.dirname(input_file) with tarfile.open(input_file) as tf: tf.extractall(folder) folder = input_file.replace(".tar.gz", "") interaction_filename_trn = os.path.join(folder, INTERACTION_FILENAME_TRN) data_trn.update(json.load(open(interaction_filename_trn))) rating_filename_trn = os.path.join(folder, RATING_FILENAME_TRN) ratings_trn.update(json.load(open(rating_filename_trn))) interaction_filename_val = os.path.join(folder, INTERACTION_FILENAME_VAL) if os.path.exists(interaction_filename_val): data_val.update(json.load(open(interaction_filename_val))) rating_filename_val = os.path.join(folder, RATING_FILENAME_VAL) if os.path.exists(rating_filename_val): ratings_val.update(json.load(open(rating_filename_val))) data_all = copy.deepcopy(data_trn) for u_id, i_list in data_val.items(): data_all[u_id].extend(i_list) ratings_all = copy.deepcopy(ratings_trn) for u_id, rating_dict in ratings_val.items(): ratings_all[u_id].update(rating_dict) self.data_trn = data_trn self.data_val = data_val self.data_all = data_all self.ratings_trn = ratings_trn self.ratings_val = ratings_val self.ratings_all = ratings_all user_set = sorted(data_all.keys()) item_set = {} for items in data_all.values(): for item_id in items: item_set[item_id] = 1 item_set = sorted(item_set.keys()) assert len(user_set) == self.model.n assert len(item_set) == self.model.m n = self.model.n m = self.model.m user_idx_to_id = dict(zip(range(n), user_set)) item_idx_to_id = dict(zip(range(m), item_set)) # a dictionary mapping user idx to interacted/noninter item idxs # the rest will use data_trn (not data_all) to produce samples data = data_trn ratings = ratings_trn uidx_to_iidxs_tuple = dict() if self.tensor_flag: tensor = np.zeros((n, m, m)) else: tensor = np.ndarray(0) for u_idx in range(n): user_id = user_idx_to_id[u_idx] inter_items = set(data[user_id]) inter_flag = np.array( [int(item_idx_to_id[i_idx] in inter_items) for i_idx in range(m)] ) if self.tensor_flag: tensor[u_idx, :, :] = inter_flag.reshape(m, 1) - inter_flag uidx_to_iidxs_tuple[u_idx] = ( set(np.where(inter_flag == True)[0]), set(np.where(inter_flag == False)[0]), ) # create mapping between user idx to item preference pairs # item preference pair is stored as a map as well, with key being a tuple of a item pair user_idx_to_preference = {} if ratings: for u_idx in range(n): user_idx_to_preference[u_idx] = {} rated_items = ratings[user_idx_to_id[u_idx]] rated_items = list(zip(rated_items.keys(), rated_items.values())) for ii in range(len(rated_items)): ii_id, ii_rating = rated_items[ii] for jj in range(ii + 1, len(rated_items)): jj_id, jj_rating = rated_items[jj] if ii_rating > jj_rating: user_idx_to_preference[u_idx][(ii_id, jj_id)] = 1 elif ii_rating < jj_rating: user_idx_to_preference[u_idx][(jj_id, ii_id)] = 1 self.data = data self.user_idx_to_id = user_idx_to_id self.item_idx_to_id = item_idx_to_id self.user_id_to_idx = dict(zip(user_idx_to_id.values(), user_idx_to_id.keys())) self.item_id_to_idx = dict(zip(item_idx_to_id.values(), item_idx_to_id.keys())) self.uidx_to_iidxs_tuple = uidx_to_iidxs_tuple self.tensor = tensor self.user_idx_to_preference = user_idx_to_preference
[docs] def get_obs_uij(self, u_idx: int, i_idx: int, j_idx: int) -> int: """Get the ``(u, i, j)``-th observation from personalized three-way tensor ``self.tensor``. When ``self.tensor`` is pre-calculated, its value will be returned. Otherwise, ``self.uidx_to_iidxs_tuple`` will be used to infer the observation at runtime. Args: u_idx (:obj:`int`): The user idx. i_idx (:obj:`int`): Index of first item in comparison. j_idx (:obj:`int`): Index of second item in comparison. Returns: :obj:`int`: A value from ``(-1, 1, 0)`` indicating the personalized preference of the two items. ``1`` indicates ``i_idx``-th item is preferable than ``j_idx``-th; ``-1`` suggests otherwise; ``0`` indicate such information is not available. """ if self.tensor_flag: return self.tensor[u_idx, i_idx, j_idx] else: inter_iidxs, nonint_iidxs = self.uidx_to_iidxs_tuple[u_idx] if ((i_idx in inter_iidxs) and (j_idx in inter_iidxs)) or ( (i_idx in nonint_iidxs) and (j_idx in nonint_iidxs) ): return 0 elif (i_idx in inter_iidxs) and (j_idx in nonint_iidxs): return 1 elif (i_idx in nonint_iidxs) and (j_idx in inter_iidxs): return -1 else: return None
[docs] def get_trn(self) -> Iterator[Any]: pass
[docs] def get_val_or_not(self) -> Iterator[Any]: pass