Backend#

class pfl.aggregate.base.Backend#

Backend base class.

gather_results(model, training_algorithm, *, central_context)#

Evaluate or compute training statistics on devices and aggregate them.

This call blocks.

Parameters:
  • model (Model) – A model to use for training users. Subclasses can refine the type of Model they expect

  • training_algorithm – An object that defines the training algorithm. Subclasses can have additional requirements on the type of this object.

  • central_context (CentralContext) – Settings to use for this round.

Return type:

Tuple[Optional[TrainingStatistics], Metrics]

Returns:

A dictionary of the raw statistics (if this is a training round) and aggregated metrics.

abstract async async_gather_results(model, training_algorithm, *, central_context)#

Should implement the map-reduce procedure of locally training on devices and aggregating the model updates. Can be either a simulation or using live infrastructure. If an implementation of async_gather_results requires runs of it to be non-overlapping in time, don’t use await in it.

Implementations of this may require that calls to it are started in a fixed order.

For the parameters, see gather_results.

Return type:

Tuple[Optional[TrainingStatistics], Metrics]

class pfl.aggregate.simulate.SimulatedBackend(training_data, val_data, postprocessors=None, aggregator=None, max_overshoot_fraction=0.0)#

Backend that simulates an algorithm on the given federated datasets. The simulation performs the following:

  1. Perform the following steps on all users (can be parallelized with multiple workers):

    1. Sample a user Dataset from the source FederatedDatasetBase.

    2. Train model on the user dataset and extract the model updates.

      The algorithm used for local training is injected by training_algorithm when calling gather_results.

    3. Apply local clipping to individual contribution.

    4. Privatize individual contribution with local_privacy.

  2. Apply central clipping of individual contribution.

  3. Sum all the individual contributions.

  4. Add central noise.

Parameters:
  • training_data (FederatedDatasetBase) – Training dataset generator, derived from FederatedDatasetBase.

  • val_data (FederatedDatasetBase) – Validation dataset generator, derived from FederatedDatasetBase.

  • postprocessors (Optional[List[Postprocessor]]) – Steps for postprocessing statistics returned by users, e.g. weighting, sparsification, compression.

  • aggregator (Optional[Aggregator]) – The aggregation method to use. If not specified, use the default pfl.aggregate.base.SumAggregator which will sum all statistics of users trained in each process, then all-reduce sum across processes.

  • max_overshoot_fraction (float) – Simulate overshooting the number of results received from users. This is disabled by default, but is expected to be up to 0.05-0.10 when training live. The overshooting is assumed to be a fraction p ~ Uniform(0,max_overshoot_fraction).

async async_gather_results(model, training_algorithm, *, central_context)#

Run a simulation on a cohort of user datasets. Returns aggregated statistics and metrics from the procedure described in the class description.

This implementation is not actually asynchronous and it yields execution only once it is completely done. The work may be split up amongst multiple workers. In this case, the order in which this method is called must be the same across workers, so that their synchronisation points line up.

Parameters:
  • model (Model) – A model to be used for gathering the statistics.

  • training_algorithm (FederatedAlgorithm) – An object that inherits from FederatedAlgorithm, which injects the algorithm-specific behaviour.

  • central_context (CentralContext) – Settings to use for this round.

Return type:

Tuple[Optional[TrainingStatistics], Metrics]

Returns:

A dictionary of the raw model updates and a metrics object of aggregated statistics. The model updates are weighted depending on the weighting strategy. The model update can be None if the algorithm does not train on the val population.

If using local and central DP, useful privacy metrics with the category “local DP” and “central DP” are returned. Metrics from training are returned with the population as the category. number of devices is the number of users that the data came from. number of data points is self-explanatory. total weight is the denominator for a potentially weighted aggregation.

Aggregator#

class pfl.aggregate.base.Aggregator#

Base class for aggregating pfl.stats.TrainingStatistics and pfl.metrics.Metrics.

abstract accumulate(*, accumulated, user_stats)#

Accumulate user statistics on current worker process.

Parameters:
  • accumulated (Optional[TypeVar(StatisticsType, bound= TrainingStatistics)]) – The state of the server-side statistics accumulator. If None, then user_stats is the first contribution in the aggregation.

  • user_stats (TypeVar(StatisticsType, bound= TrainingStatistics)) – Statistics from the user to accumulate. Each variable in the object will be accumulated separately, which means each accumulated user needs to have statistics of the same structure, e.g. the same number of variables of the same shapes.

Return type:

TypeVar(StatisticsType, bound= TrainingStatistics)

worker_reduce(*, aggregated_worker_stats, central_context, aggregated_worker_metrics)#

User statistics and metrics are first summed on each worker process, then they are reduced across workers using this method. This method can be ignored in concrete aggregator classes if they are not compatible with multi-process training.

Return type:

Tuple[TypeVar(StatisticsType, bound= TrainingStatistics), Metrics]

worker_reduce_metrics_only(*, central_context, aggregated_worker_metrics)#

This method is run instead of worker_reduce when the accumulated statistics is None, what happens e.g. in a validation iteration.

User metrics are first summed on each worker process, then they are reduced across workers using this method. This method can be ignored in concrete aggregator classes if they are not compatible with multi-process training.

Return type:

Metrics

class pfl.aggregate.base.SumAggregator#

Aggregation of user statistics as a regular sum. Reduction across worker processes are done with an all-reduce sum.

accumulate(*, accumulated, user_stats)#

Accumulate user statistics on current worker process.

Parameters:
  • accumulated (Optional[TypeVar(StatisticsType, bound= TrainingStatistics)]) – The state of the server-side statistics accumulator. If None, then user_stats is the first contribution in the aggregation.

  • user_stats (TypeVar(StatisticsType, bound= TrainingStatistics)) – Statistics from the user to accumulate. Each variable in the object will be accumulated separately, which means each accumulated user needs to have statistics of the same structure, e.g. the same number of variables of the same shapes.

Return type:

TypeVar(StatisticsType, bound= TrainingStatistics)

worker_reduce_metrics_only(*, central_context, aggregated_worker_metrics)#

This method is run instead of worker_reduce when the accumulated statistics is None, what happens e.g. in a validation iteration.

User metrics are first summed on each worker process, then they are reduced across workers using this method. This method can be ignored in concrete aggregator classes if they are not compatible with multi-process training.

Return type:

Metrics

worker_reduce(*, aggregated_worker_stats, central_context, aggregated_worker_metrics)#

User statistics and metrics are first summed on each worker process, then they are reduced across workers using this method. This method can be ignored in concrete aggregator classes if they are not compatible with multi-process training.

Return type:

Tuple[TypeVar(StatisticsType, bound= TrainingStatistics), Metrics]

Data transport#

class pfl.aggregate.data_transport.DataTransport#

Data transport base class. Specifies in what format model updates are sent.

abstract postprocess_one_user(*, stats, user_context)#

Simulate the transport of model updates in the format specified by this class. This is simulated by converting to the transport format and then converting back to the original format, which will result in the same eventual loss of information.

Parameters:
Return type:

Tuple[TrainingStatistics, Metrics]

Returns:

The same (transported) model update and any new metrics generated.

class pfl.aggregate.data_transport.Float32DataTransport#

Use the float32 (legacy) format for data transport. Has no effect in simulations.

postprocess_one_user(*, stats, user_context)#

Simulate the transport of model updates in the format specified by this class. This is simulated by converting to the transport format and then converting back to the original format, which will result in the same eventual loss of information.

Parameters:
Return type:

Tuple[TrainingStatistics, Metrics]

Returns:

The same (transported) model update and any new metrics generated.

class pfl.aggregate.data_transport.BFloat16DataTransport#

Use the bfloat16 format for data transport. Simulates this behaviour by converting model updates original_format -> bfloat16 -> original_format.

postprocess_one_user(*, stats, user_context)#

Simulate the transport of model updates in the format specified by this class. This is simulated by converting to the transport format and then converting back to the original format, which will result in the same eventual loss of information.

Parameters:
Return type:

Tuple[TrainingStatistics, Metrics]

Returns:

The same (transported) model update and any new metrics generated.

Weighting#

Weighting strategies are Postprocessor that provide different ways of applying a weight to each individual statistics contribution before aggregation. This is a way to increase the magnitude of contributions that are considered important by some metric and decrease the magnitude of less important contributions.

How to use this feature is a philosophical question. When using federated SGD (1 local iteration), then it makes sense to weight by datapoints using WeightByDatapoints, since this sums the gradient over the whole data set. However, when the number of local gradient steps is much higher than 1, and the local models are overtrained, it makes more sense to assign each locally overtrained model the same weight, i.e. weighting by users using WeightByUser.

Warning

Scaling user statistics with different weights will require you to adjust the clipping bound of your privacy mechanism accordingly.

class pfl.aggregate.weighting.WeightingStrategy#

Base class for weighting schemes.

abstract postprocess_one_user(*, stats, user_context)#

Re-weights stats in-place according to the weighting scheme of this class.

Return type:

Tuple[WeightedStatistics, Metrics]

class pfl.aggregate.weighting.WeightByUser#

Re-weights statistics in-place by user. This is generally used with federated averaging if you want a simple unweighted average of local model updates (statistics).

postprocess_one_user(*, stats, user_context)#

Re-weights stats in-place according to the weighting scheme of this class.

Return type:

Tuple[WeightedStatistics, Metrics]

class pfl.aggregate.weighting.WeightByDatapoints#

Re-weights statistics in-place by the number of data points of user. This is generally used with federated averaging if you want to weight the averaging by the number of data points used for training the model on each device.

postprocess_one_user(*, stats, user_context)#

Re-weights stats in-place according to the weighting scheme of this class.

Return type:

Tuple[WeightedStatistics, Metrics]