Data#

User dataset#

class pfl.data.dataset.AbstractDataset#

Base class for user dataset representations.

abstract split(fraction=None, min_train_size=None, min_val_size=None)#

Split the dataset into two smaller disjoint datasets. Used by algorithms that require both a train and val dataset for each user.

Parameters:
  • fraction (Optional[float]) – The fraction of data to split at. Defaults to 0.8

  • min_train_size (Optional[int]) – The minimum number of samples for the train partition after the split. If the dataset size is 4 and split is done with the parameters fraction=0.1 and min_train_size=1, the split would result in a train set of size 0, but min_train_size overrides that to 1. Defaults to 1.

  • min_val_size (Optional[int]) – Same as min_train_size, but for the val partition. An error is thrown if dataset length before split is less than min_train_size+min_val_size. Defaults to 1.

Return type:

Tuple[AbstractDataset, AbstractDataset]

Returns:

A tuple (left_dataset, right_dataset), where left_dataset is a Dataset with a fraction of fraction of the data and right_dataset is a Dataset with a fraction of 1-fraction of the data.

abstract get_worker_partition()#

Partition the dataset among active workers in multi-worker training and return a unique partition for the current worker. If multi-worker training is not in use, this method just returns the identity because the partitioning is 1 set with the full data.

Return type:

AbstractDataset

Returns:

A subset of this dataset, unique to the current worker. All worker partitions add up to the original dataset.

class pfl.data.dataset.Dataset(raw_data, user_id=None, metadata=None, train_kwargs=None, eval_kwargs=None, val_indices=None)#

A representation of a flat (user) dataset. Use only for smaller (user) datasets that fit in memory.

If using PyTorch or TF tensors, use datasets in pfl.data.tensorflow and pfl.data.pytorch instead.

Parameters:
  • raw_data (Tuple[Any, ...]) – A tuple of data inputs in the order they are specified by the model. The “order” for some common models is given by: * Tensorflow - First entry is a tuple of features (input to model), second entry is a tuple of labels (input to loss/metrics). * PyTorch - a tuple of tensors, unrolled to model.loss and model.metrics.

  • user_id (Optional[str]) – (Optional) String user identifier. Only needed if you are using algorithms or other components that explicitly make use of user IDs. You will notice if that is the case from errors that say user IDs must be defined.

  • metadata (Optional[Dict[str, Any]]) – (Optional) Store additional data about the user. Can be retrieved later by the algorithm.

  • train_kwargs (Optional[Dict[str, Any]]) – A dictionary of any additional training parameters to unpack in the training call of the deep learning framework.

  • eval_kwargs (Optional[Dict[str, Any]]) – A dictionary of any additional evaluation parameters to unpack in the evaluation call of the deep learning framework.

  • val_indices (Optional[List[int]]) –

    TODO: rdar://115345691 (enable val_indices feature in user

    Dataset). this parameter is not actually used right now. Meanwhile, use parameters of split instead.

    A list of datapoint indices specifying which datapoints in raw_data to split into the val partition when calling split on this user dataset. This parameter is optional but useful when you have an explicit set of val datapoints for this user. As noted above, this parameter is currently not used.

    Note

    Splitting a user’s dataset is only done by algorithms that require a local val dataset for measuring the generalized performance after training, e.g. meta-learning or personalization via fine-tuning the model locally on each device.

split(fraction=None, min_train_size=None, min_val_size=None)#

Split the dataset into two smaller disjoint datasets. Used by algorithms that require both a train and val dataset for each user.

Parameters:
  • fraction (Optional[float]) – The fraction of data to split at. Defaults to 0.8

  • min_train_size (Optional[int]) – The minimum number of samples for the train partition after the split. If the dataset size is 4 and split is done with the parameters fraction=0.1 and min_train_size=1, the split would result in a train set of size 0, but min_train_size overrides that to 1. Defaults to 1.

  • min_val_size (Optional[int]) – Same as min_train_size, but for the val partition. An error is thrown if dataset length before split is less than min_train_size+min_val_size. Defaults to 1.

Return type:

Tuple[Dataset, Dataset]

Returns:

A tuple (left_dataset, right_dataset), where left_dataset is a Dataset with a fraction of fraction of the data and right_dataset is a Dataset with a fraction of 1-fraction of the data.

get_worker_partition()#

Partition the dataset among active workers in multi-worker training and return a unique partition for the current worker. If multi-worker training is not in use, this method just returns the identity because the partitioning is 1 set with the full data.

Return type:

AbstractDataset

Returns:

A subset of this dataset, unique to the current worker. All worker partitions add up to the original dataset.

class pfl.data.dataset.TabularDataset(features, labels, metadata=None, val_indices=None, train_kwargs=None, eval_kwargs=None)#

A dataset comprising tabular dataset: (features, labels). See Dataset for more information about parameters.

class pfl.data.dataset.DatasetSplit(train_dataset, val_dataset)#

Decorates the dataset split into predefined train_dataset and val_dataset. This means that when using an instance of this class without splitting has the same behavior as train_dataset except iter is only implemented for the two datasets from the split.

This class is useful when using algorithms that require a local train-val split, e.g. meta-learning, and you want to pre-define how the split should be done.

property raw_data: Tuple[Any, ...]#

The raw data of the train dataset.

property train_kwargs: Dict[str, Any]#

Additional training arguments of the train dataset.

property eval_kwargs: Dict[str, Any]#

Additional evaluation arguments of the train dataset.

split(fraction=None, min_train_size=None, min_val_size=None)#

Split the dataset by returning the train and val Dataset previously specified in the constructor.

Parameters:
  • fraction (Optional[float]) – Has no effect since the split is determined when initializing.

  • min_train_size (Optional[int]) – Has no effect since the split is determined when initializing.

  • min_val_size (Optional[int]) – Has no effect since the split is determined when initializing.

Return type:

Tuple[AbstractDataset, AbstractDataset]

Returns:

A tuple with the user datasets train_dataset and val_dataset previously specified in the constructor.

get_worker_partition()#

Partition the dataset among active workers in multi-worker training and return a unique partition for the current worker. If multi-worker training is not in use, this method just returns the identity because the partitioning is 1 set with the full data.

Return type:

AbstractDataset

Returns:

A subset of this dataset, unique to the current worker. All worker partitions add up to the original dataset.

class pfl.data.tensorflow.TFTensorDataset(features, labels, user_id=None, metadata=None, val_indices=None, train_kwargs=None, eval_kwargs=None)#

In-memory TensorFlow tensors representing a user dataset. See Dataset for more information about the class and its parameters.

Parameters:
  • features (Union[Tensor, Tuple[Tensor, ...]]) – A single tf.Tensor or a tuple of tf.Tensor. Should be valid input to your Tensorflow model’s call method.

  • labels (Union[Tensor, Tuple[Tensor, ...]]) – A single tf.Tensor or a tuple of tf.Tensor. Should be valid input to tf.keras.losses.Loss

  • eval_kwargs (Optional[Dict[str, Any]]) – A dictionary of any additional evaluation parameters for user. These will not be input to loss or metrics for TF, only stored in this dataset for other custom usage.

Example:

features and labels should be in a format compatible with this:

preds = model(features, **train_kwargs)
tf.keras.losses.Loss()(labels, preds)
class pfl.data.tensorflow.TFDataDataset(raw_data, prefetch, user_id=None, metadata=None, train_kwargs=None, eval_kwargs=None)#

Dataset for representing a user using tf.data.Dataset as source. Use this if the (user) dataset is big and can’t fit into memory, e.g. a big central eval dataset or cross-silo FL. Otherwise, because the overhead to initialize a tf.data.Dataset is big, consider using faster in-memory dataset TFTensorDataset along with TFFederatedDataset for parallel preprocessing.

Example:
data = tf.data.Dataset.zip(
    (tf.data.Dataset.from_tensor_slices(features),
     tf.data.Dataset.from_tensor_slices(labels)))
TFDataDataset(data, prefetch=5)
Parameters:
  • raw_data (Dataset) – The TF dataset to represent a user.

  • prefetch (int) – How many batches to prefetch. 0 for no prefetching.

  • train_kwargs (Optional[Dict[str, Any]]) – Additional keyword arguments to propagate through pfl for the user. Will be unpacked in the tensorflow model’s call to loss function.

  • eval_kwargs (Optional[Dict[str, Any]]) – Additional keyword arguments to propagate through pfl for the user. Will be unpacked in the tensorflow model’s call to metrics function.

split(fraction=None, min_train_size=None, min_val_size=None)#

Split the dataset into two smaller disjoint datasets. Used by algorithms that require both a train and val dataset for each user.

Parameters:
  • fraction (Optional[float]) – The fraction of data to split at. Defaults to 0.8

  • min_train_size (Optional[int]) – The minimum number of samples for the train partition after the split. If the dataset size is 4 and split is done with the parameters fraction=0.1 and min_train_size=1, the split would result in a train set of size 0, but min_train_size overrides that to 1. Defaults to 1.

  • min_val_size (Optional[int]) – Same as min_train_size, but for the val partition. An error is thrown if dataset length before split is less than min_train_size+min_val_size. Defaults to 1.

Return type:

Tuple[Dataset, Dataset]

Returns:

A tuple (left_dataset, right_dataset), where left_dataset is a Dataset with a fraction of fraction of the data and right_dataset is a Dataset with a fraction of 1-fraction of the data.

get_worker_partition()#

Partition the dataset among active workers in multi-worker training and return a unique partition for the current worker. If multi-worker training is not in use, this method just returns the identity because the partitioning is 1 set with the full data.

Return type:

TFDataDataset

Returns:

A subset of this dataset, unique to the current worker. All worker partitions add up to the original dataset.

class pfl.data.pytorch.PyTorchTensorDataset(tensors, user_id=None, metadata=None, val_indices=None, train_kwargs=None, eval_kwargs=None)#

In-memory PyTorch tensors representing a user dataset. See Dataset for more information about the class and its parameters.

Parameters:

tensors (Union[Tuple[Tensor, ...], Dict[str, Tensor]]) –

A list or dictionary of tensors which can be accepted into your model’s loss and metrics functions like this:

model.loss(*tensors, **train_kwargs).backward()
model.metrics(*tensors, **eval_kwargs)

See PyTorchModel for more information about loss and metrics functions.

class pfl.data.pytorch.PyTorchDataDataset(raw_data, user_id=None, metadata=None, train_kwargs=None, eval_kwargs=None, **dataloader_kwargs)#

Dataset for representing a user using torch.utils.data.Dataset as source. Use this if the (user) dataset is big and can’t fit into memory, e.g. a big central eval dataset or cross-silo FL. Otherwise, consider using faster in-memory dataset Dataset along with PyTorchFederatedDataset for parallel preprocessing.

Example:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self):
        self.data = torch.randn(100, 2)
        self.labels = torch.randint(0, 2, (100,))

    def __getitem__(self, index):
        return self.data[index], self.labels[index]

    def __len__(self):
        return len(self.labels)

PyTorchDataDataset(raw_data=MyDataset())
Parameters:
  • raw_data (Dataset) – The PyTorch dataset to represent a user.

  • train_kwargs (Optional[Dict[str, Any]]) – Additional keyword arguments to propagate through pfl for the user. Will be unpacked in the PyTorch model’s call to loss function.

  • eval_kwargs (Optional[Dict[str, Any]]) – Additional keyword arguments to propagate through pfl for the user. Will be unpacked in the PyTorch model’s call to metrics function.

split(fraction=None, min_train_size=None, min_val_size=None)#

Split the dataset into two smaller disjoint datasets. Used by algorithms that require both a train and val dataset for each user.

Parameters:
  • fraction (Optional[float]) – The fraction of data to split at. Defaults to 0.8

  • min_train_size (Optional[int]) – The minimum number of samples for the train partition after the split. If the dataset size is 4 and split is done with the parameters fraction=0.1 and min_train_size=1, the split would result in a train set of size 0, but min_train_size overrides that to 1. Defaults to 1.

  • min_val_size (Optional[int]) – Same as min_train_size, but for the val partition. An error is thrown if dataset length before split is less than min_train_size+min_val_size. Defaults to 1.

Return type:

Tuple[Dataset, Dataset]

Returns:

A tuple (left_dataset, right_dataset), where left_dataset is a Dataset with a fraction of fraction of the data and right_dataset is a Dataset with a fraction of 1-fraction of the data.

get_worker_partition()#

Partition the dataset among active workers in multi-worker training and return a unique partition for the current worker. If multi-worker training is not in use, this method just returns the identity because the partitioning is 1 set with the full data.

Return type:

PyTorchDataDataset

Returns:

A subset of this dataset, unique to the current worker. All worker partitions add up to the original dataset.

Federated dataset#

A dataset is simply an iterator for iterating through datasets of individual users. How the user’s dataset is constructed is defined by inherited classes of FederatedDatasetBase.

Every time __next__ is called, a tuple (user_data, seed) is returned, where user_data is a Dataset and seed is a random integer. The user for every call is chosen by a sampling strategy provided as argument to the inherited subclasses of FederatedDatasetBase. The random integer seed is different for each call, and different on each worker, and should be used as seed for local DP to break the otherwise unwanted behaviour of each worker generating identical noise patterns.

class pfl.data.federated_dataset.FederatedDatasetBase#

Base class for federated datasets used by the simulator. A federated dataset contains many smaller subsets of data, representing a user’s data.

abstract get_cohort(cohort_size)#

Fetch an entire cohort of users. In the context of multi worker training, only the users assigned to the current worker should be returned.

Parameters:

cohort_size (int) – The number of users to fetch.

Return type:

Iterable[Tuple[AbstractDataset, int]]

Returns:

An iterable to iterate through each user dataset from the cohort. Each step of the iterator returns a tuple (user_dataset, seed). Lazy loading of the users’ datasets is performed when stepping through iterator.

class pfl.data.federated_dataset.ArtificialFederatedDataset(make_dataset_fn, data_sampler, sample_dataset_len)#

Simulates a federated dataset by automatically grouping data points into simulated users. This class is useful when there is no such thing as an associated user identifier for each sample, but will of course work if you happen to have user identifiers as well. How a user dataset is generated is implemented by the sampling mechanism provided.

Parameters:
  • make_dataset_fn

    A function that takes as input a list of sample indices and returns the generated user dataset. You are expected to implement this function yourself in any way you want, but the interface should be func(dataset_indices) -> user_dataset. You can find example implementations in the class methods. Note that the function to implement here differs from the function to provide when constructing a FederatedDataset:

    • ArtificialFederatedDataset - func(dataset_indices) -> user_dataset.

    • FederatedDataset - func(user_id) -> user_dataset.

  • data_sampler – Sampling mechanism that samples datapoint indices to later use for constructing a new user dataset. The definition of a sampling mechanism is simply a callable callable(dataset_length) -> dataset_indices. The purpose of the sampling mechanism is for you to be able to have control over the distribution of the generated user dataset, which should ideally be as close to how a real federated dataset would look like for your use case. The factory called get_data_sampler provides some examples of how this callable might look like.

  • sample_dataset_len

    A callable that should sample the dataset length of a user, i.e. callable() -> dataset_length.

    Example:

    data_sampler = lambda: max(1, np.random.poisson(5). data_sampler is a callable (function) that draws a user dataset size from a poisson distribution of mean 5, and should be a minimum of size 1.

classmethod from_slices(data, data_sampler, sample_dataset_len, create_dataset_fn=<function ArtificialFederatedDataset.<lambda>>)#

Construct a simulated federated dataset from a regular dataset where there is no such thing as a user identifier.

Parameters:
  • data – A list of np.ndarray, i.e. the same format as a Dataset accepts.

  • create_dataset_fn (Callable) – A lambda function to create an instance of Dataset, or a subclass of Dataset.

Returns:

An instance of ArtificialFederatedDataset.

get_cohort(cohort_size)#

Fetch an entire cohort of users. In the context of multi worker training, only the users assigned to the current worker should be returned.

Parameters:

cohort_size (int) – The number of users to fetch.

Return type:

Iterable[Tuple[AbstractDataset, int]]

Returns:

An iterable to iterate through each user dataset from the cohort. Each step of the iterator returns a tuple (user_dataset, seed). Lazy loading of the users’ datasets is performed when stepping through iterator.

class pfl.data.federated_dataset.FederatedDataset(make_dataset_fn, user_sampler, user_id_to_weight=None)#

A federated dataset is a collection of smaller datasets that are each associated to a unique user. Iterating through an instance of this class will each time return the dataset for a specific user.

Parameters:
  • make_dataset_fn – A function that takes as input a user identifier and returns a user dataset. You are expected to implement this function yourself in any way you want, but the interface should be func(user_id) -> user_dataset. You can find example implementations in the class methods.

  • user_sampler

    Sampling mechanism that samples a user id. The interface of the user sampling mechanism should be callable() -> user_id. In most cases you want to use MinimizeReuseUserSampler because its behaviour mimics what usually happens in live federated learning with user devices. See MinimizeReuseUserSampler for explanation why.

    The factory called get_user_sampler provides some examples of how this callable might look like.

  • user_id_to_weight (Optional[Dict[Any, int]]) – A dictionary mapping user id to a weight which acts as a proxy for compute time to train this user. In most cases, when model training time scales with data, number of user datapoints/tokens/batches should be a good estimate. This is solely used for minimizing straggling processes in distributed simulations. Leaving this None will have same performance result but simulations will be slower if users have varying dataset sizes.

classmethod from_slices(data, user_sampler)#

Construct a federated dataset from a dictionary of user to dataset mappings.

Parameters:

data – A dictionary user_id:dataset, where user_id is the unique user identifier and dataset is the dataset of the user (represented as a list of np.ndarray like in Dataset).

Returns:

An instance of FederatedDataset.

classmethod from_slices_with_dirichlet_class_distribution(data, labels, alpha, user_dataset_len_sampler, spread_distribution_after_num_fails=20, spread_distribution_after_fails_percentage=0.02)#

Create a federated dataset by partitioning data into artificial users generated by partition_by_dirichlet_class_distribution(). See the above partition function for more information and references. The user partitions are constructed once using all data, and unlike a ArtificialFederatedDataset the user partitioning remains the same throughout the training. Sampling the generated users is done uniformly at random.

Parameters:
  • data (Tuple) – A tuple of tensors, representing the data to be partitioned (including labels). Each user will have a dataset consisting of the same number of tensors as in data, but they are slices of data.

  • labels (ndarray) – A one-dimensional array of all labels (integers). Must have same length as the first dimension of every tensor in data. If labels should also be included as one of the tensors in the dataset, be sure to also include it in data.

See partition_by_dirichlet_class_distribution() for further description of the parameters.

get_cohort(cohort_size)#

Fetch an entire cohort of users. In the context of multi worker training, only the users assigned to the current worker should be returned.

Parameters:

cohort_size (int) – The number of users to fetch.

Return type:

Iterable[Tuple[AbstractDataset, int]]

Returns:

An iterable to iterate through each user dataset from the cohort. Each step of the iterator returns a tuple (user_dataset, seed). Lazy loading of the users’ datasets is performed when stepping through iterator.

class pfl.data.tensorflow.TFFederatedDataset(make_dataset_fn, user_sampler, user_id_dtype=None, dataset_cls=None, dataset_kwargs=None, user_id_to_weight=None)#

Create a federated dataset from a TF dataset and user sampler.

Note

We highly encourage to use this class instead of FederatedDataset when data is loaded from disk in make_dataset_fn because using tf.data.Dataset.prefetch in your code will allow users to be loaded asynchronously into memory.

Parameters:
  • make_dataset_fn (Callable[[Dataset], Dataset]) – To make your tf.data.Dataset compatible with a user sampler, this argument specifies a function that takes a tf.data.Dataset as input and returns another tf.data.Dataset. The input dataset will return a new user id each time a step in its iterator is taken. The output dataset should use this user id to load and preprocess the data for an entire user, and generate a structure that is expected by dataset_cls, see e.g. Dataset. To load users asynchronously, you need to have called prefetch on the dataset that you return from make_dataset_fn (see the code snippet at the end of this docstring for an example).

  • user_sampler (Callable[[], Any]) – A callable with no parameters that return one sampled user ID when called.

  • user_id_dtype (Optional[DType]) – Tensorflow data type of user ids. tf.int32 by default.

  • dataset_cls (Optional[Type[TFTensorDataset]]) – The dataset class to wrap around tensors returned from tf.data.Dataset. This is Dataset by default and doesn’t need to be changed in most cases.

  • dataset_kwargs (Optional[Dict]) – A dictionary for keyword arguments when constructing the pfl dataset. If the dataset_cls is Dataset then the valid keyword arguments are val_indices, train_kwargs and eval_kwargs.

  • user_id_to_weight (Optional[Dict[Any, int]]) – A dictionary mapping user id to a weight which acts as a proxy for compute time to train this user. In most cases, when model training time scales with data, number of user datapoints/tokens/batches should be a good estimate. This is solely used for minimizing straggling processes in distributed simulations. Leaving this None will have same performance result but simulations will be slower if users have varying dataset sizes.

Returns:

A federated dataset where user datasets are generated from a tf.data.Dataset.

Example:

# Assume files with data exists.
sample_it = itertools.cycle(['user1.txt', 'user2.txt'])
# Sampler needs to be a callable.
sampler = lambda: next(sample_it)

@tf.function
def pipeline(data):
    # Load user dataset and do any kind of preprocessing here.
    data = data.map(lambda user_id: tf.io.read_file(user_id))
    # Shuffle dataset of 1 user.
    data = data.shuffle(100)
    # Prefetch 10 users.
    data = data.prefetch(10)
    return data

fed = FederatedDataset.from_tf_dataset(pipeline, sampler,
                                       user_id_dtype=tf.string)
class pfl.data.pytorch.PyTorchFederatedDataset(dataset, user_sampler, dataset_cls=None, dataset_kwargs=None, user_id_to_weight=None, **dataloader_kwargs)#

Create a federated dataset from a PyTorch dataset and sampler. A torch.utils.data.DataLoader is created from the arguments and is used to load user datasets asynchronously.

Parameters:
  • dataset (Dataset) – A PyTorch dataset instance. __getitem__, which is the method you override to load a datapoint, should be constructed such that the index parameter of __getitem__ is assumed to be a user ID and the returned tensor should not be for one datapoint, but the entire data of the user with this user ID. The first dimension of the tensor(s) returned should be the number of samples for that user. E.g., for user_id=’user_123’ who has 9 datapoints with input features of shape [256, 256, 3] and output labels of shape [17], dataset[‘user_123’] would consist of a list of two tensors of shapes [torch.Size([9, 256, 256, 3]), torch.Size([9, 17])].

  • user_sampler (Callable[[], Any]) – Sampling mechanism that samples a user id. The interface of the user sampling mechanism should be callable() -> user_id. This parameter is the same as user_sampler in constructor of :class:~`pfl.data.federated_dataset.FederatedDataset`. In most cases you want to use MinimizeReuseUserSampler because its behaviour mimics what usually happens in live federated learning with user devices.

  • dataset_cls (Optional[Type[PyTorchTensorDataset]]) – The dataset class to wrap around tensors returned from torch.utils.data.DataLoader. This is PyTorchTensorDataset by default and doesn’t need to be changed in most cases.

  • dataset_kwargs (Optional[Dict]) – A dictionary for keyword arguments when constructing the pfl dataset. If the dataset_cls is Dataset then the valid keyword arguments are val_indices, train_kwargs and eval_kwargs.

  • user_id_to_weight (Optional[Dict[Any, int]]) – A dictionary mapping user id to a weight which acts as a proxy for compute time to train this user. In most cases, when model training time scales with data, number of user datapoints/tokens/batches should be a good estimate. This is solely used for minimizing straggling processes in distributed simulations. Leaving this None will not affect the final outcomes and metrics of PFL simulation, but the simulation will be slower if users have varying dataset sizes.

  • dataloader_kwargs – Keyword arguments to add to initialization of DataLoader. It is important to specify these parameters correctly to receive any kind of parallelization for data loading. For details, see torch.utils.data.DataLoader.

class pfl.data.federated_dataset.FederatedDatasetMixture(mixture_weights, mixture_component_datasets)#

Simulates a mixture of federated datasets and/or artificial federated datasets.

To sample new users we sample from the component datasets using the probability vector mixture_weights. We then sample a user from the sampled dataset.

The mixture of federated datasets is useful for modelling clusters or different modes of users in a federated datasets. In particular, this class is used for modelling a Mixture-of-Polya (Dirichlet-Multinomial) distribution.

Example:
# First component dataset is an artificial federated dataset
# created from data inputs X and data targets Y.
component_dataset_0 = ArtificialFederatedDataset.from_slices(
    data=[X, Y],
    data_sampler=lambda n: list(range(n)),
    sample_dataset_len=lambda: np.random.sample(4))

# Second component dataset is a federated dataset, created
# from a dictionary mapping user IDs to datapoints
data = {0: [X_0, y_0], 1: [X_1, y_1], 2: [X_2, y_2]}
user_sampler = lambda: MinimizeReuseUserSampler(
    list(data.keys()))
make_user_dataset = lambda user_id: Dataset(data[user_id])
component_dataset_1 = FederatedDataset(make_user_dataset,
    user_sampler)

mixture_weights = [0.5, 0.5]

FederatedDatasetMixture(mixture_weights,
                       [component_dataset_0, component_dataset_1])
Parameters:
  • mixture_weights (Union[List[float], ndarray]) – A list or np.ndarray containing the weights for each of the component datasets in the mixture. The mixture weights give the probability of occurrence of each component dataset. Ideally the mixture weights sum to 1, but if not, this class will normalise the mixture weights to sum to 1.

  • mixture_component_datasets (List[FederatedDatasetBase]) – Individual federated datasets that are the components of the mixture of federated datasets. Each federated dataset combined form a mixture. List of type ArtificialFederatedDataset

get_cohort(cohort_size)#

Fetch an entire cohort of users. In the context of multi worker training, only the users assigned to the current worker should be returned.

Parameters:

cohort_size (int) – The number of users to fetch.

Return type:

Iterable[Tuple[AbstractDataset, int]]

Returns:

An iterable to iterate through each user dataset from the cohort. Each step of the iterator returns a tuple (user_dataset, seed). Lazy loading of the users’ datasets is performed when stepping through iterator.

Sampling#

class pfl.data.sampling.MinimizeReuseDataSampler(max_bound)#

Data sampling mechanism that maximises the time between instances of reuse. This is done by simply iterating through the sample space in linear fashion and starting over once the end is reached. Every data sampling mechanism returns a list of indices when called. The indices can be used to construct an artificial user dataset.

Parameters:

max_bound – Maximum bound for sampling space. Will sample in the range [0, max_bound).

class pfl.data.sampling.DirichletDataSampler(alpha, labels)#

Data sampling mechanism that samples user class proportions from a Dirichlet distribution with a given alpha parameter. Sampling is done by first drawing a vector of class proportions p ~ Dir(alpha), then sampling a class from a categorical distribution with parameter p and uniformly at random choosing (with replacement) an index with the corresponding class.

Parameters:
  • alpha (ndarray) – Parameter of the Dirichlet distribution. Must be array_like and have length equal to the number of unique classes present in labels.

  • labels (ndarray) – A one-dimensional array of all labels (integers). This should have length equal to the size of the corresponding dataset.

pfl.data.sampling.get_data_sampler(sample_type, max_bound=None, alpha=None, labels=None)#

Factory for data sampling mechanisms.

These samplers can be used when sampling data points for an artificial user dataset in ArtificialFederatedDataset, by providing it as the sampler argument.

Implemented samplers: * random - Randomly sample from the range [0, max_bound), max_bound must be specified. * minimize_reuse - Sample while minimizing the number of times a number is sampled again, max_bound must be specified. * dirichlet - Sample class proportions from a Dirichlet with given alpha parameter and sample classes according to these proportions. Must specify values for alpha and labels.

class pfl.data.sampling.MinimizeReuseUserSampler(user_ids)#

User sampling mechanism that maximizes the time between instances of reuse, similar to MinimizeReuseDataSampler but for sampling user ids.

In live PFL training, it is common that a user can not participate more than one time per day for a particular use case. In other words, the distance of a user being selected again during the day is infinite. This is what the sampling strategy is trying to mimic.

Parameters:

user_ids – A list of user ids as the sampling space.

class pfl.data.sampling.CrossSiloUserSampler(sampling_type='random', user_ids=None, silo_to_user_ids=None, num_silos=None)#

User sampling mechanism that assumes the users are partitioned into disjoint subsets belonging to different silos. In each round, all silos contribute to the cohort and each silo may sample one or more users.

In the distributed setting, silos will be split into different nodes. In each sampling call, a silo is picked according to a fixed order and then a user is sampled from the subset in that silo.

Example:
silo_to_user_ids = {
    0: [0, 1],
    1: [2, 3],
    2: [4, 5],
    3: [6, 7]
}
num_silos = 4
# Assume 4 processes on 2 nodes
# Node 1 will hold silo 0 and 1
# Node 2 will hold silos 2 and 3
sampler = CrossSiloUserSampler(
    sampling_type='minimize_reuse',
    silo_to_user_ids=silo_to_user_ids)
[sampler() for _ in range(4)] = [0, 4, 2, 6]
Parameters:
  • sampling_type – The sampling method used to choose a user from a silo.

  • user_ids – A list of user ids as the sampling space.

  • silo_to_user_ids – An optional dictionary where the keys are the silos and the values are the corresponding subset of user ids. If not provided, the user_ids will be evenly split into n silos where n is the number of workers in the distributed setting.

  • num_silos – An optional int for indicating number of silos to partition the users if silo_to_user_ids is not provided.

pfl.data.sampling.get_user_sampler(sample_type, user_ids, silo_to_user_ids=None, num_silos=None)#

Factory for user sampling mechanisms.

These can be used when sampling the next user for FederatedDataset by providing it as the user_sampler argument.

Implemented samplers: * random - Randomly sample a user id. * minimize_reuse - Sample a user while minimizing the number of times the user is sampled again.

Partitioning#

pfl.data.partition.partition_by_dirichlet_class_distribution(labels, alpha, user_dataset_len_sampler, spread_distribution_after_num_fails=20, spread_distribution_after_fails_percentage=0.02)#

Given an array of labels, create a partitioning representing artificial user dataset splits where each user’s class distribution is sampled ~Dir(alpha).

It is common to use alpha=0.1 (majority of samples for each user will be from 1 class). See S.J. Reddi et al. https://arxiv.org/pdf/2003.00295.pdf, J. Wang. et al https://arxiv.org/pdf/2007.07481.pdf

Parameters:
  • labels (ndarray) – A one-dimensional array of all labels (integers). Classes should be consecutive non-negative integers starting at 0, i.e. from {0, 1, …, num_classes-1}.

  • alpha (float) – The alpha parameter for Dirichlet distribution.

  • user_dataset_len_sampler (Callable) – A function which samples the dataset length of the user to construct next. E.g., use lambda: 25 to sample 25 data points for all users.

  • spread_distribution_after_num_fails (int) – When there are few datapoints left to sample, there might not be any datapoints left to sample for particular classes. Each (user_num_datapoints * spread_distribution_after_num_fails) iterations of sampling each user, even out the class distribution by adding spread_distribution_after_fails_percentage to each class probability and normalize (this will make the distribution less heterogeneous and move it closer to the uniform one). This will normally only start occurring when there are only a few datapoints left to sample from.

  • spread_distribution_after_fails_percentage (float) – See above how this works with spread_distribution_after_num_fails. E.g., the default value 0.02 increases each class probability by 2% and then renormalizes to ensure that the probabilities sum to 1. This moves the distribution closer to the uniform one. E.g., for 2 classes with probabilities [0.1, 0.9] we end up adjusting to [0.12, 0.92] first and then normalize to [0.11538462, 0.88461538].

Return type:

List[List[int]]

Returns:

A list, where each element is a list of label indices that represents one sampled user.

User state#

Sometimes, Federated Learning algorithms require users to be stateful, e.g. each user might have unique parameter values that are used and updated only locally. This module implements various methods of saving and retrieving such user state.

class pfl.data.user_state.AbstractUserStateStorage#

Abstract base class for storage to save and load user state.

Incorporating user state in a custom algorithm might look something like this:

Example:
storage = InMemoryUserStateStorage()

class MyAlgorithm(FederatedNNAlgorithm):

  def train_one_user(self, initial_model_state, model, user_dataset,
                     central_context):
    user_state = storage.load_state(user_dataset.user_id, 'my-algo')
    # ... do local algorithm here, which also generates a new state
    storage.save_state(new_user_state, user_dataset.user_id, 'my-algo')
    return model.get_model_difference(initial_model_state), Metrics()

If the user state is particularly big, e.g. a full set of model weights, loading the state in the data loading stage can be beneficial if the data loading stage is already parallelized.

Example:
storage = InMemoryUserStateStorage()

def make_dataset_fn(user_id):
    raw_data = load_raw_data(user_id)
    return Dataset(
        raw_data=[inputs, targets],
        local_state=storage.load_state(user_id, 'my-algo'))

# fed_data is input to SimulatedAggregator
fed_data = FederatedDataset(
    make_dataset_fn, sampler, user_id_to_weight=user_num_images)

class MyAlgorithm(FederatedNNAlgorithm):

  def train_one_user(self, initial_model_state, model, user_dataset,
                     central_context):
    user_state = user_dataset.local_state
    # ... do local algorithm here, which also generate a new state
    storage.save_state(new_user_state, user_dataset.user_id, 'my-algo')
    return model.get_model_difference(initial_model_state), Metrics()
abstract clear_states()#

Remove all existing stored user states.

abstract save_state(state, user_id, key=None)#
Parameters:
  • state (Dict[str, Any]) – A dictionary with generic values to store as state.

  • user_id (str) – ID of the user of this state.

  • key (Optional[str]) – Multiple states of same user can be stored simultaneously if unique keys are specified for each different state object. Not specifying a key will overwrite the default saved user state.

abstract load_state(user_id, key=None)#
Parameters:
  • user_id (str) – Restore the state of the user with this ID.

  • key (Optional[str]) – Optional key to load a specific user state that was previously saved with this key. Not specifying a key will load the default saved user state.

Return type:

Dict[str, Any]

Returns:

The user state.

class pfl.data.user_state.InMemoryUserStateStorage#

Save and load user state for a given user ID. Keeps states of all users in memory such that loading and saving state is very fast.

Warning

Saving user state in memory is neither compatible out of the box with distributed simulations on multiple machines nor on multiple processes on same machine. If your large simulations require multiple processes, you can use a cross-silo user sampler such that a unique user is pinned to being sampled in the same worker process each time, where the state of that user is cached.

clear_states()#

Remove all existing stored user states.

save_state(state, user_id, key=None)#
Parameters:
  • state (Dict[str, Any]) – A dictionary with generic values to store as state.

  • user_id (str) – ID of the user of this state.

  • key (Optional[str]) – Multiple states of same user can be stored simultaneously if unique keys are specified for each different state object. Not specifying a key will overwrite the default saved user state.

load_state(user_id, key=None)#
Parameters:
  • user_id (str) – Restore the state of the user with this ID.

  • key (Optional[str]) – Optional key to load a specific user state that was previously saved with this key. Not specifying a key will load the default saved user state.

Return type:

Dict[str, Any]

Returns:

The user state.

class pfl.data.user_state.DiskUserStateStorage(dir_path)#

Save and load user state for a given user ID. Keeps states of all users on disk. This is slower than InMemoryUserStateStorage, but necessary if the user states are too large to fit into memory.

Warning

Saving user state on disk is not compatible out of the box with distributed simulations on multiple machines. Try to use a single machine with enough GPUs and a common disk space. If your large simulations require multiple machines, you can use a cross-silo user sampler such that a unique user is pinned to being sampled on the same machine each time.

clear_states()#

Remove all existing stored user states.

acquire_lock(path)#

Acquire lock on a specific file. The lock state is written to disk and thereby works across processes that share disk space.

Parameters:

path – Acquire lock for this file.

save_state(state, user_id, key=None)#
Parameters:
  • state (Dict[str, Any]) – A dictionary with generic values to store as state.

  • user_id (str) – ID of the user of this state.

  • key (Optional[str]) – Multiple states of same user can be stored simultaneously if unique keys are specified for each different state object. Not specifying a key will overwrite the default saved user state.

load_state(user_id, key=None)#
Parameters:
  • user_id (str) – Restore the state of the user with this ID.

  • key (Optional[str]) – Optional key to load a specific user state that was previously saved with this key. Not specifying a key will load the default saved user state.

Return type:

Dict[str, Any]

Returns:

The user state.