Models#

Abstract base classes#

A model contains all the functionality needed for simulating federated learning experiments with the specific deep learning framework you have implemented your model with.

class pfl.model.base.Model#

Model that can be trained with federated learning.

Subclass this (or one of its base classes) to implement your own model. This class describes the minimal interface required for training the simplest algorithm possible.

abstract apply_model_update(statistics)#

Compute updated parameters based on statistics.

This can either construct a new model and return it; or mutate self and return self.

Parameters:

statistics (TypeVar(StatisticsType, bound= TrainingStatistics)) – Statistics for updating the parameters of the model.

Return type:

Tuple[TypeVar(ModelType, bound= Model), Metrics]

Returns:

The new model and any metrics.

class pfl.model.base.EvaluatableModel#
property allows_distributed_evaluation: bool | None#

Distributed evaluation can only be enabled when, whether or not splitting the dataset, doing evaluation on the separate datasets and summing the metrics ends up with the same results as doing one call to evaluate with all data.

As a conservative setting, distributed evaluation is not allowed by default. Every model subclass has to explicitly make sure distributing evaluation will work correctly. If set to None, it is interpreted as not yet determined, and any evaluation that support distributed evaluation will not be distributed.

abstract evaluate(dataset, name_formatting_fn, eval_params=None)#

Evaluate performance of model on the given input data.

This can be used in different circumstances. One is for simulated distributed evaluation, where the data is supposed to be from one device. Another is for centrally held data.

Parameters:
  • dataset (TypeVar(AbstractDatasetType, bound= AbstractDataset)) – Dataset to evaluate on. If this is centrally held data, it is still a flat list of data points.

  • name_formatting_fn (Callable[[str], StringMetricName]) – A function to be used to generate a metric name object from a simple string, which will adorn the string with additional information.

  • eval_params (Optional[TypeVar(ModelHyperParamsType, bound= ModelHyperParams)]) – Optional model parameters to use for evaluating the models. Some models can evaluate without a parameter object.

Return type:

Metrics

Returns:

A Metrics object with performance metrics.

class pfl.model.base.StatefulModel#

Convenience class for a model that has a fixed number of parameters and is stateful.

(This is true for frameworks for neural networks in a federated setting, hence the eclectic set of requirements.)

Simulation of local training in this setup is done by mutating the parameters in four steps:

  1. backing up the current (central) parameters (get_parameters)

  2. training the inner model on a user’s local data.

  3. computing the difference between the current parameters and the backed-up ones (get_model_difference)

  4. restoring to the backed-up state (set_parameters).

abstract get_model_difference(other_parameters, clone=False)#

Get the model difference between the current state of the model and the other state given as input (i.e. current-other).

Parameters:
  • other_parameters (TypeVar(StatisticsType, bound= TrainingStatistics)) – Get the model difference with respect to this other state of model parameters. Can be received from get_parameters.

  • clone (bool) –

    If False, there is a chance that the implementation can use a cache to make this method run faster. This is fine as long as you don’t require to hold multiple difference statistics in memory at the same time. If you do need to e.g.

    diff1 = model.get_model_difference(initial_params, clone=True)
    diff2 = model.get_model_difference(initial_params, clone=True)
    

    then clone should be True because otherwise there is a chance that diff1 points to diff2 after the second call.

Return type:

TypeVar(StatisticsType, bound= TrainingStatistics)

Returns:

The model difference statistics.

abstract get_parameters(placeholders=None)#

Retrieve model parameters at the current state. Useful if you want to restore the model to this state after it has been modified.

Parameters:

placeholders (Optional[TypeVar(StatisticsType, bound= TrainingStatistics)]) – Statistics with tensors to be updated in-place with model parameters. If the model supports this, it will greatly increase performance when training many users.

Return type:

TypeVar(StatisticsType, bound= TrainingStatistics)

abstract set_parameters(w)#

Apply new model parameters to the model. This is used to restore the model to a previous state, which can be received from get_parameters.

Return type:

None

TensorFlow#

class pfl.model.tensorflow.TFModel(model, metrics, central_optimizer, checkpoint_format_hdf5=False)#
Parameters:
  • model – A Tensorflow Keras model to train. Can either be defined from the functional API or the sequential API. If the model is a Sequential, it needs to be build with model.build(input_dims).

  • metrics (Dict[str, Union[Metric, Tuple[Metric, Callable[[Union[MetricValue, float, int]], Union[MetricValue, float, int]]]]]) –

    Specify metrics to use for evaluation. The key is the name of the metric and the value is either a tf.keras.metrics.Metric or a tuple of a tf.keras.metrics.Metric and a function that postprocesses the metric value for each user.

    example:
    from pfl.metrics import user_average
    metrics = {
        'overall accuracy': tf.keras.metrics.Accuracy(),
        'per-user accuracy': (tf.keras.metrics.Accuracy(),
                              user_average)
    }
    

  • central_optimizer – An optimizer instance from tf.keras.optimizers, which is used to apply the aggregated model updates to the variables. Learning rate decay can be applied using tf.keras.optimizers.schedules.

  • checkpoint_format_hdf5 – If True, save model checkpoints as hdf5 files. Otherwise, save model checkpoints in tensorflow format.

property allows_distributed_evaluation: bool | None#

Distributed evaluation can only be enabled when, whether or not splitting the dataset, doing evaluation on the separate datasets and summing the metrics ends up with the same results as doing one call to evaluate with all data.

As a conservative setting, distributed evaluation is not allowed by default. Every model subclass has to explicitly make sure distributing evaluation will work correctly. If set to None, it is interpreted as not yet determined, and any evaluation that support distributed evaluation will not be distributed.

save(dir_path)#

Save state of object to disk. Should be able to interpret saved state as a checkpoint that can be restored with load.

Parameters:

dir_path (str) – Directory on disk to store state.

Return type:

None

load(dir_path)#

Load checkpoint from disk, which is the state previously saved with save.

Parameters:

dir_path (str) – Path to root directory where checkpoint can be loaded from. Should be same path as used with save.

Return type:

None

get_parameters(placeholders=None)#

Retrieve model parameters at the current state. Useful if you want to restore the model to this state after it has been modified.

Parameters:

placeholders (Optional[MappedVectorStatistics]) – Statistics with tensors to be updated in-place with model parameters. If the model supports this, it will greatly increase performance when training many users.

Return type:

MappedVectorStatistics

set_parameters(w)#

Apply new model parameters to the model. This is used to restore the model to a previous state, which can be received from get_parameters.

Return type:

None

get_model_difference(other_parameters, clone=False)#

Get the model difference between the current state of the model and the other state given as input (i.e. current-other).

Parameters:
  • other_parameters (MappedVectorStatistics) – Get the model difference with respect to this other state of model parameters. Can be received from get_parameters.

  • clone (bool) –

    If False, there is a chance that the implementation can use a cache to make this method run faster. This is fine as long as you don’t require to hold multiple difference statistics in memory at the same time. If you do need to e.g.

    diff1 = model.get_model_difference(initial_params, clone=True)
    diff2 = model.get_model_difference(initial_params, clone=True)
    

    then clone should be True because otherwise there is a chance that diff1 points to diff2 after the second call.

Return type:

MappedVectorStatistics

Returns:

The model difference statistics.

do_multiple_epochs_of(user_dataset, train_params, train_step_fn, **kwargs)#

Perform multiple epochs of training. The customizable training function that will use a batch of data to update the local model state is defined by train_step_fn. If you have specified an optimizer using the parameter local_optimizer_create in the constructor, the optimizer state will be reset before training is performed in this method.

Parameters:
  • user_dataset (TypeVar(AbstractDatasetType, bound= AbstractDataset)) – Dataset of type Dataset to train on.

  • train_params (NNTrainHyperParams) – An instance of NNTrainHyperParams containing configuration for training.

  • train_step_fn (Callable) – A function with the following arguments: * inputs - the tensor(s) used as input to the Keras model’s __call__ method. * labels - the label tensor(s) used as the first argument to the Keras model’s loss function. * train_kwargs - the train_kwargs property from the user dataset. With this, you can pass user-specific metadata to local training. * kwargs - other keyword arguments that a custom train_step_fn might have. Notice that the TF model itself is not passed in the arguments, it needs to instead be in the closure of the function when it is defined. This is much more performant.

Return type:

None

evaluate(dataset, name_formatting_fn=<function TFModel.<lambda>>, eval_params=None)#

Evaluate performance of model on the given input data.

This can be used in different circumstances. One is for simulated distributed evaluation, where the data is supposed to be from one device. Another is for centrally held data.

Parameters:
  • dataset (TypeVar(AbstractDatasetType, bound= AbstractDataset)) – Dataset to evaluate on. If this is centrally held data, it is still a flat list of data points.

  • name_formatting_fn – A function to be used to generate a metric name object from a simple string, which will adorn the string with additional information.

  • eval_params (Optional[NNEvalHyperParams]) – Optional model parameters to use for evaluating the models. Some models can evaluate without a parameter object.

Return type:

Metrics

Returns:

A Metrics object with performance metrics.

apply_model_update(statistics)#

Compute updated parameters based on statistics.

This can either construct a new model and return it; or mutate self and return self.

Parameters:

statistics (MappedVectorStatistics) – Statistics for updating the parameters of the model.

Return type:

Tuple[TFModel, Metrics]

Returns:

The new model and any metrics.

PyTorch#

class pfl.model.pytorch.PyTorchModel(model, local_optimizer_create, central_optimizer, central_learning_rate_scheduler=None, amp_dtype=None, grad_scaling=False, model_dtype_same_as_amp=False, use_torch_compile=False)#
Parameters:
  • model

    A torch.nn.Module representing the pytorch model to train. The module must have two methods defined: * loss - (*user_data) –> loss_value, where user_data is a user’s dataset unpacked into the call of loss, and loss_value is the numeric value to minimize. * metrics - A function (*user_data) –> <name:metric_value> where user_data is the same as in loss, and the return value is a dictionary where name is the name of the metric and metric_value is an instance of :class:~pfl.metric.MetricValue or a tuple of a :class:~pfl.metric.MetricValue and a function that postprocesses the metric value for each user. The metrics method has the signature:

    Callable[[*torch.Tensor],
             Dict[str, Union[
                MetricValue,
                Tuple[MetricValue,
                      Callable[[MetricValue], MetricValue]
                ]
             ]]
            ]
    
    example:
    # user data looks like this:
    # UserDataset(raw_data=[x,y], eval_kwargs={'eval':True})
    from pfl.metrics import user_average
    l1loss = torch.nn.L1Loss(reduction='sum')
    
    def metrics(self, x, y, eval=False):
        self.eval() if eval else self.train()
        loss = l1loss(self(x), y).item()
        num_samples = len(x)
    
        return {
            'per sample loss': Weighted(loss, num_samples),
            'per user loss': (Weighted(loss, num_samples),
                              user_average),
        }
    

  • local_optimizer_create – A function to create a torch.optim.optimizer.Optimizer instance. The learning rate of this optimizer will be replaced by other training algorithms that uses this trainer.

  • central_optimizer – A torch.optim.optimizer.Optimizer instance, which is used to apply the central model updates to the variables.

  • central_learning_rate_scheduler – A torch.optim.lr_scheduler.LRScheduler instance, which is used to apply the learning rate scheduling of central_optimizer.

  • amp_dtype (Optional[dtype]) – A torch.dtype for mixed precision training with torch.amp.autocast. If set to None or torch.float32, no mixed precision training is enabled.

  • grad_scaling (bool) – A boolean indicating whether to apply gradient scaling when using mixed precision training.

  • model_dtype_same_as_amp (bool) – A boolean indicating whether to cast the model parameter dtype to the same as amp_dtype when using mixed precision training. Note that lower precision model parameters might cause divergence during training.

  • use_torch_compile (bool) – A boolean indicating whether to use torch.compile which can speed up the training and inference.

property allows_distributed_evaluation: bool | None#

Distributed evaluation can only be enabled when, whether or not splitting the dataset, doing evaluation on the separate datasets and summing the metrics ends up with the same results as doing one call to evaluate with all data.

As a conservative setting, distributed evaluation is not allowed by default. Every model subclass has to explicitly make sure distributing evaluation will work correctly. If set to None, it is interpreted as not yet determined, and any evaluation that support distributed evaluation will not be distributed.

save(dir_path)#

Save state of object to disk. Should be able to interpret saved state as a checkpoint that can be restored with load.

Parameters:

dir_path (str) – Directory on disk to store state.

Return type:

None

load(dir_path)#

Load checkpoint from disk, which is the state previously saved with save.

Parameters:

dir_path (str) – Path to root directory where checkpoint can be loaded from. Should be same path as used with save.

Return type:

None

get_parameters(placeholders=None)#

Retrieve model parameters at the current state. Useful if you want to restore the model to this state after it has been modified.

Parameters:

placeholders (Optional[MappedVectorStatistics]) – Statistics with tensors to be updated in-place with model parameters. If the model supports this, it will greatly increase performance when training many users.

Return type:

MappedVectorStatistics

set_parameters(w)#

Apply new model parameters to the model. This is used to restore the model to a previous state, which can be received from get_parameters.

Return type:

None

get_model_difference(other_parameters, clone=False)#

Get the model difference between the current state of the model and the other state given as input (i.e. current-other).

Parameters:
  • other_parameters (MappedVectorStatistics) – Get the model difference with respect to this other state of model parameters. Can be received from get_parameters.

  • clone (bool) –

    If False, there is a chance that the implementation can use a cache to make this method run faster. This is fine as long as you don’t require to hold multiple difference statistics in memory at the same time. If you do need to e.g.

    diff1 = model.get_model_difference(initial_params, clone=True)
    diff2 = model.get_model_difference(initial_params, clone=True)
    

    then clone should be True because otherwise there is a chance that diff1 points to diff2 after the second call.

Return type:

MappedVectorStatistics

Returns:

The model difference statistics.

do_multiple_epochs_of(user_dataset, train_params, train_step_fn, **kwargs)#

Perform multiple epochs of training. The customizable training function that will use a batch of data to update the local model state is defined by train_step_fn. If you have specified an optimizer using the parameter local_optimizer_create in the constructor, a new optimizer will be initialized before training is performed in this method.

Parameters:
  • user_dataset (TypeVar(AbstractDatasetType, bound= AbstractDataset)) – Dataset of type Dataset to train on.

  • train_params (NNTrainHyperParams) – An instance of NNTrainHyperParams containing configuration for training.

  • train_step_fn (Callable) – A function with the following arguments: * pytorch_model - the pytorch model object to train on. * local_optimizer - the optimizer to use for training. * raw_data - an iterable of tensors unpacked into the loss function pytorch_model.loss(*raw_data) * train_kwargs - the train_kwargs property from the user dataset. With this, you can pass user-specific metadata to local training. * train_step_args - an instance of PyTorchTrainStepArgs that contains common arguments for PyTorch local training. * kwargs - other keyword arguments that a custom train_step_fn might have.

Return type:

None

evaluate(dataset, name_formatting_fn=<function PyTorchModel.<lambda>>, eval_params=None)#

Evaluate performance of model on the given input data.

This can be used in different circumstances. One is for simulated distributed evaluation, where the data is supposed to be from one device. Another is for centrally held data.

Parameters:
  • dataset (TypeVar(AbstractDatasetType, bound= AbstractDataset)) – Dataset to evaluate on. If this is centrally held data, it is still a flat list of data points.

  • name_formatting_fn (Callable[[str], StringMetricName]) – A function to be used to generate a metric name object from a simple string, which will adorn the string with additional information.

  • eval_params (Optional[NNEvalHyperParams]) – Optional model parameters to use for evaluating the models. Some models can evaluate without a parameter object.

Return type:

Metrics

Returns:

A Metrics object with performance metrics.

apply_model_update(statistics)#

Compute updated parameters based on statistics.

This can either construct a new model and return it; or mutate self and return self.

Parameters:

statistics (MappedVectorStatistics) – Statistics for updating the parameters of the model.

Return type:

Tuple[PyTorchModel, Metrics]

Returns:

The new model and any metrics.

Gaussian mixture model#

class pfl.model.gaussian_mixture_model.GMMHyperParams(variance_scale=1.0, responsibility_scale=1.0, variance_floor_fraction=0.01, minimum_component_weight=0.01)#

Configuration for training Gaussian mixture models.

Parameters:
  • variance_scale (float) – Scaling factor for the part of the statistics vector where the variance statistics go. This is relative to the mean, which is not scaled. Scaling parts of the statistics can effectively reduce noise for differential privacy.

  • responsibility_scale (float) – Scaling factor for the part of the statistics vector where the responsibilities go. This is relative to the mean, which is not scaled. Scaling parts of the statistics can effectively reduce noise for differential privacy.

  • variance_floor_fraction (float) – Floor to the variance of each component when training, as a fraction of the global variance.

  • minimum_component_weight (float) – Weight below which components are pruned when training.

class pfl.model.gaussian_mixture_model.GaussianMixtureModel(num_dimensions, model=None, *, cached_model_train_params=None)#

A density model represented by a mixture of diagonal-covariance Gaussians. This model is immutable: apply_model_update only returns a new model, and it does not change the object.

Training uses standard expectation-maximization, which approximates maximum-likelihood training. The statistics are the standard statistics, but transformed so that they are efficient when noise is added for differential privacy.

Parameters:
  • num_dimension – The length of the vectors that this model is over.

  • model (Optional[Mixture]) – The underlying Mixture[DiagonalGaussian].

  • cached_model_train_params (Optional[GMMHyperParams]) – Used internally.

property allows_distributed_evaluation: bool | None#

Distributed evaluation can only be enabled when, whether or not splitting the dataset, doing evaluation on the separate datasets and summing the metrics ends up with the same results as doing one call to evaluate with all data.

As a conservative setting, distributed evaluation is not allowed by default. Every model subclass has to explicitly make sure distributing evaluation will work correctly. If set to None, it is interpreted as not yet determined, and any evaluation that support distributed evaluation will not be distributed.

global_gaussian()#
Return type:

DiagonalGaussian

Returns:

The global Gaussian, as computed from the mixture. Note that this is exact, in the sense that if the data that was used to train the mixture was used to train a single Gaussian, the global Gaussian would be that single Gaussian. (Here, “train” means “find the parameters that maximize the likelihood”.)

property model: Mixture#
Returns:

The underlying Mixture of DiagonalGaussian s.

property components: List[Tuple[float, Distribution]]#
Returns:

The underlying components, as a list of tuples with weights and components. The weights add up to 1.

get_mixture_statistics(points, variance_scale, responsibility_scale)#
Returns:

The statistics to train the mixture of Gaussians for multiple points.

apply_model_update(statistics)#

Compute updated parameters based on statistics.

This can either construct a new model and return it; or mutate self and return self.

Parameters:

statistics (MappedVectorStatistics) – Statistics for updating the parameters of the model.

Return type:

Tuple[GaussianMixtureModel, Metrics]

Returns:

The new model and any metrics.

evaluate(dataset, name_formatting_fn, eval_params=None)#

Evaluate performance of model on the given input data.

This can be used in different circumstances. One is for simulated distributed evaluation, where the data is supposed to be from one device. Another is for centrally held data.

Parameters:
  • dataset (TypeVar(AbstractDatasetType, bound= AbstractDataset)) – Dataset to evaluate on. If this is centrally held data, it is still a flat list of data points.

  • name_formatting_fn (Callable[[str], StringMetricName]) – A function to be used to generate a metric name object from a simple string, which will adorn the string with additional information.

  • eval_params (Optional[ModelHyperParams]) – Optional model parameters to use for evaluating the models. Some models can evaluate without a parameter object.

Return type:

Metrics

Returns:

A Metrics object with performance metrics.

mix_up(num_extra_components)#

Introduce extra components in a heuristic manner.

The new components are generated by splitting up the heaviest components.

Parameters:

num_extra_components (int) – The number of components to add, i.e. the number of components to split. If this is greater than the current number of components, then all components are split once, and the number of components is merely doubled.

Return type:

GaussianMixtureModel

Returns:

The new model with extra components.

Exponential moving average#

class pfl.model.ema.CentralExponentialMovingAverage(model, decay, dynamic_decay_rate=False)#

Maintains moving averages of variables by employing an exponential decay. When training a model, it is often beneficial to maintain moving averages of the trained parameters. Evaluations that use averaged parameters sometimes produce better results.

EMA update is described as in the following formula: shadow_variable -= (1 - decay) * (shadow_variable - variable), where shadow_variable is the moving average of variable.

Reasonable values for decay are close to 1.0, typically in the multiple-nines range: 0.99, 0.999, etc.

Example:
ema = CentralExponentialMovingAverage(model, 0.999)
# ... training the model
model.apply_model_update(statistics)
ema.update()
# backup model parameters in ``state''
state = model.get_parameters()
# set model parameters to their EMA values
ema.assign()
# ... evaluating the model
# restore the original model state
model.set_parameters(state)
Parameters:
  • decay (float) – EMA decaying rate, a floating point number between 0 and 1

  • dynamic_decay_rate (bool) – Whether to tweak the decay rate dynamically using the count of training steps. If True, the actual decay rate used is: min(decay, (1 + num_updates) / (10 + num_updates)) in which case the decay rate is lower at the start of training. This makes moving averages move faster in the early iterations.

property shadow_variable_map: Dict[str, Any]#
Returns:

A dictionary that maps variable name to the shadow EMA variables of self._model.

property decay: float#
Returns:

A float for the EMA decaying rate. Dynamically set if self._dynamic_decay_rate = True

update()#

Perform one step of EMA update on shadow variables using framework’s specific operation after each central optimization step.

Return type:

None

assign()#

Assign the EMA shadow variables to model variables

Return type:

None

save(dir_path)#

Save the EMA shadow variables. First model need to backup the current variables and then assign the EMA variables for saving. Then model need to restore the variables

Return type:

None