diff --git a/FedAvg/FedAvg/Client.py b/FedAvg/FedAvg/Client.py new file mode 100644 index 0000000000000000000000000000000000000000..e5c6cbef80481ebc1d7772d9f6658333d17cb74e --- /dev/null +++ b/FedAvg/FedAvg/Client.py @@ -0,0 +1,253 @@ +import flwr as fl +import torch.optim as optim +from flwr.common import (GetPropertiesIns, GetPropertiesRes, GetParametersRes, Parameters, FitRes, FitIns, + GetParametersIns, EvaluateIns, EvaluateRes, Code, Status, ndarrays_to_parameters, + parameters_to_ndarrays) +from torch import nn + +from .utils.tools import StopWatch +from .utils.logger import Logger + + +class FedAvgClient(fl.client.Client): + def __init__(self, client_id, device, model, train_loader, val_loader, log_dir='./logs'): + self.client_id = client_id + self.local_model = model + self.train_loader = train_loader + self.val_loader = val_loader + self.device = device + + self.logger = Logger(log_dir, f'client_{self.client_id}') + + def get_properties(self, ins: GetPropertiesIns) -> GetPropertiesRes: + """Return a client's set of properties. + + Parameters + ---------- + ins : GetPropertiesIns + The get properties instructions received from the server containing + a dictionary of configuration values. + This can be used to tell the client which properties + are needed along with some Scalar attributes. + + `GetPropertiesIns` follows the structure: + + class GetPropertiesIns: + config: Dict[str, Scaler] + + Returns + ------- + GetPropertiesRes + It can be used to communicate arbitrary property values back to the server. + + `GetPropertiesRes` follows the structure: + + class GetPropertiesRes: + status: Status + properties: Dict[str, Scaler] + + """ + + _ = ins + + properties = {} + + return GetPropertiesRes(status=Status(code=Code.OK, message="Get properties successful"), + properties=properties) + + def get_parameters(self, ins: GetParametersIns) -> GetParametersRes: + """Return the current local model parameters. + + Parameters + ---------- + ins : GetParametersIns + The get parameters instructions received from the server containing + a dictionary of configuration values. This can be used to tell the client which parameters + are needed along with some Scalar attributes. + + `GetParametersIns` follows the structure: + + class GetParametersIns: + config: Dict[str, Scaler] + + Returns + ------- + GetParametersRes + The current local model parameters. + + `GetParametersRes` follows the structure: + + class GetParametersRes: + status: Status + parameters: Parameters + + Remark + ------ + `Parameters` follows the structure: + + class Parameters: + tensors: List[bytes] + tensor_type: str + """ + + local_parameters = self.local_model.get_model_parameters() + parameters = ndarrays_to_parameters(local_parameters) + + return GetParametersRes(Status(code=Code.OK, message="Get parameter successful"), + parameters=parameters) + + + def fit(self, ins: FitIns) -> FitRes: + """Train the provided parameters using the locally held dataset. + + Parameters + ---------- + ins : FitIns + The training instructions containing (global) model parameters + received from the server and a dictionary of configuration values + used to customize the local training process. It can be used to + communicate arbitrary values from the server to the client, for + example, to set the number of (local) training epochs. + + `FitIns` follows the structure: + + class FitIns: + parameters: Parameters + config: Dict[str, Scaler] + + Returns + ------- + FitRes + The training result containing updated parameters, + number of local training examples used for training + and other details such as the metrics . + + `FitRes` follows the structure: + + class FitRes: + status: Status + parameters: Parameters + num_examples: int + metrics: Dict[str, Scalar] + + Remark + ------ + `Parameters` follows the structure: + + class Parameters: + tensors: List[bytes] + tensor_type: str + """ + + learning_rate = ins.config['learning_rate'] + epochs = ins.config['epochs'] + + + stop_watch = StopWatch() # starting a counting timer + # Unpack received parameters + received_parameters = parameters_to_ndarrays(ins.parameters) + self.local_model.set_model_parameters(received_parameters) + + # setting the training optimiser + criterion = nn.CrossEntropyLoss() + optimizer = optim.SGD(self.local_model.parameters(), lr=learning_rate) + + # Training loop + for epoch in range(epochs): + self.logger('debug', 'train {}'.format(epoch)) + train_loss, train_accuracy = self.local_model.train_epoch(self.train_loader, + criterion, + optimizer, + self.device) + self.logger('debug', 'test {}'.format(epoch)) + val_loss, val_accuracy = self.local_model.test_epoch(self.val_loader, + criterion, + self.device) + + trained_parameters = self.local_model.get_model_parameters() + parameters = ndarrays_to_parameters(trained_parameters) + + train_data_count = len(self.train_loader.dataset) + + # registering the training duration + training_duration = stop_watch.stop() + + metrics = {'training losses': train_loss, + 'training accuracies': train_accuracy, + 'validation losses': val_loss, + 'validation accuracies': val_accuracy, + 'training duration': training_duration} + + return FitRes(Status(code=Code.OK, message="Fitting successful"), + parameters=parameters, + num_examples=train_data_count, + metrics=metrics) + + + def evaluate(self, ins: EvaluateIns) -> EvaluateRes: + """Evaluate the provided parameters using the locally held dataset. + + Parameters + ---------- + ins : EvaluateIns + The evaluation instructions containing (global) model parameters + received from the server and a dictionary of configuration values + used to customize the local evaluation process. It allows the server + to influence evaluation on the client. It can be used to communicate + arbitrary values from the server to the client, for example, + to influence the number of examples used for evaluation. + + `EvaluateIns` follows the structure: + + class EvaluateIns: + parameters: Parameters + config: Dict[str, Scaler] + + + Returns + ------- + EvaluateRes + The evaluation result containing the loss on the local dataset, + the number of local data examples used for evaluation, and + other details such as metrics. + + `EvaluateRes` follows the structure: + + class EvaluateRes: + status: Status + loss: float + num_examples: int + metrics: Dict[str, Scalar] + + Remark + ------ + `Parameters` follows the structure: + + class Parameters: + tensors: List[bytes] + tensor_type: str + """ + + criterion = nn.CrossEntropyLoss() + + stop_watch = StopWatch() # starting a counting timer + + received_parameters = parameters_to_ndarrays(ins.parameters) + self.local_model.set_model_parameters(received_parameters) + + # Test the model + validation_loss, validation_accuracy = self.local_model.test_epoch(self.val_loader, criterion, self.device) + + val_data_count = len(self.val_loader.dataset) + + # registering the training duration + validation_duration = stop_watch.stop() + + metrics = {'validation losses': validation_loss, + 'validation accuracies': validation_accuracy, + 'validation duration': validation_duration} + + return EvaluateRes(status=Status(code=Code.OK, message="Evaluation successful"), + loss=validation_loss, + num_examples=val_data_count, + metrics=metrics) diff --git a/FedAvg/FedAvg/ClientManager.py b/FedAvg/FedAvg/ClientManager.py new file mode 100644 index 0000000000000000000000000000000000000000..96a1a6ae52e9dc43c87528d98a490034d223edcb --- /dev/null +++ b/FedAvg/FedAvg/ClientManager.py @@ -0,0 +1,118 @@ +import random +import threading +import time +from typing import Optional + +from flwr.server import ClientManager +from flwr.server.client_proxy import ClientProxy +from flwr.server.criterion import Criterion + +from .utils.logger import Logger + + +class FedAvgClientManager(ClientManager): + def __init__(self, log_dir='./logs'): + super(FedAvgClientManager, self).__init__() + self.clients: dict[str, ClientProxy] = {} # Dictionary to store registered clients + self._cv = threading.Condition() # Condition variable for thread synchronization + self.logger = Logger(log_dir, 'Client Manager') + + def num_available(self) -> int: + """Return the number of available clients. + + Returns + ------- + num_available : int + The number of currently available clients. + """ + with self._cv: + return len(self.clients) + + def register(self, client: ClientProxy) -> bool: + """Register Flower ClientProxy instance. + + Parameters + ---------- + client : flwr.server.client_proxy.ClientProxy + The ClientProxy of the Client to register. + + Returns + ------- + success : bool + Indicating if registration was successful. False if ClientProxy is + already registered or can not be registered for any reason. + """ + with self._cv: + if client.cid in self.clients: + self.logger('WARNING', f"Client {client.cid} is already registered.") + return False # Client already registered + self.clients[client.cid] = client + self.logger('INFO', f"Registered client {client.cid}.") + self._cv.notify_all() # Notify waiting threads + return True + + def unregister(self, client: ClientProxy) -> None: + """Unregister Flower ClientProxy instance. + + This method is idempotent. + + Parameters + ---------- + client : flwr.server.client_proxy.ClientProxy + The ClientProxy of the Client to unregister. + """ + with self._cv: + if client.cid in self.clients: + self.logger('INFO', f"Unregistering client {client.cid}.") + del self.clients[client.cid] + self._cv.notify_all() # Notify waiting threads + + def all(self) -> dict[str, ClientProxy]: + """Return all available clients.""" + with self._cv: + self.logger('INFO', f"Returning all clients: {list(self.clients.keys())}") + return self.clients.copy() + + def wait_for(self, num_clients: int, timeout: int = 86400) -> bool: + """Wait until at least `num_clients` are available.""" + start_time = time.time() + self.logger('INFO', f"Waiting for at least {num_clients} clients (timeout: {timeout}s).") + + with self._cv: + success = self._cv.wait_for(lambda: len(self.clients) >= num_clients, timeout=timeout) + + elapsed_time = time.time() - start_time + + if success: + self.logger('INFO', f"Required {num_clients} clients available after {elapsed_time:.2f}s.") + else: + self.logger('WARNING', + f"Timeout reached. Only {len(self.clients)} clients available after {elapsed_time:.2f}s.") + + return success + + def sample(self, + num_clients: int, + min_num_clients: Optional[int] = None, + criterion: Optional[Criterion] = None) -> list[ClientProxy]: + """Sample a number of Flower ClientProxy instances.""" + if min_num_clients is None: + min_num_clients = num_clients + + # Wait until enough clients are available + if not self.wait_for(min_num_clients): + self.logger('WARNING', f"Sampling failed: Not enough clients available.") + return [] + + with self._cv: + available_clients = list(self.clients.values()) + if criterion: + available_clients = [c for c in available_clients if criterion.select(c)] + + if num_clients > len(available_clients): + self.logger('WARNING', "Not enough clients meet the criterion.") + return [] + + sampled_clients = random.sample(available_clients, num_clients) + self.logger('INFO', f"Sampled {len(sampled_clients)} clients.") + return sampled_clients diff --git a/FedAvg/FedAvg/FederatedData.py b/FedAvg/FedAvg/FederatedData.py new file mode 100644 index 0000000000000000000000000000000000000000..eed0d047817f8e3f168bc9da8dfa50b8343b079a --- /dev/null +++ b/FedAvg/FedAvg/FederatedData.py @@ -0,0 +1,98 @@ +import os +import numpy as np +import torch +from torch.utils.data import DataLoader, Subset, random_split +from torchvision import datasets, transforms + + +def _load_dataset(dataset_name, root='./data'): + """Load a specific dataset: MNIST, Fashion-MNIST, CIFAR-10, or CIFAR-100.""" + dataset_name = dataset_name.upper() + + if dataset_name == 'MNIST': + return datasets.MNIST(root=root, train=True, download=True) + elif dataset_name == 'FASHION': + return datasets.FashionMNIST(root=root, train=True, download=True) + elif dataset_name == 'CIFAR10': + return datasets.CIFAR10(root=root, train=True, download=True) + elif dataset_name == 'CIFAR100': + return datasets.CIFAR100(root=root, train=True, download=True) + else: + raise ValueError(f"Dataset {dataset_name} not found. Available options: ['MNIST', 'Fashion', 'CIFAR10', 'CIFAR100']") + +def _partition_data(dataset, num_clients=10, alpha=1): + labels = np.array(dataset.targets) + num_classes = len(set(labels)) + + # Extract and shuffle indices per class + class_indices = [np.where(labels == i)[0] for i in range(num_classes)] + for c in class_indices: + np.random.shuffle(c) + + class_sizes = [len(c) - num_clients for c in class_indices] + client_indices = [[] for _ in range(num_clients)] + + # Generate Dirichlet proportions + proportions_matrix = np.random.dirichlet([alpha] * num_clients, num_classes) + + # Compute the sample count per client per class + count_matrix = (proportions_matrix * np.array(class_sizes)[:, None]).astype(int) + min_samples = np.ones((num_classes, num_clients), dtype=int) + count_matrix = np.maximum(count_matrix, min_samples) + + for c in range(num_classes): + class_count = count_matrix[c] + + split_indices = np.split(class_indices[c], np.cumsum(class_count)[:-1]) # Correct splitting + for i in range(num_clients): + client_indices[i].extend(split_indices[i]) + + # Create dataset subsets for each client + clients_datasets = [Subset(dataset, indices) for indices in client_indices] + + return clients_datasets + +def _save_datasets(client_datasets, directory="./partitioned_datasets"): + os.makedirs(directory, exist_ok=True) + for idx, dataset in enumerate(client_datasets): + file_path = os.path.join(directory, f"client_{idx}.pt") + torch.save(dataset, file_path) + +def generate_distributed_data(dataset_name, num_clients=10, alpha=1, save_dir=None): + dataset = _load_dataset(dataset_name) + clients_datasets = _partition_data(dataset, num_clients=num_clients, alpha=alpha) + if save_dir is not None: + _save_datasets(clients_datasets, directory=save_dir) + return clients_datasets + +def load_client_dataset(cid, dataset_name, directory="./clients_datasets", batch_size=64): + file_path = os.path.join(directory, f"client_{cid}.pt") + print(file_path) + if not os.path.exists(file_path): + raise FileNotFoundError(f"Dataset with index {cid} not found in {directory}") + + dataset = torch.load(file_path, weights_only=False) + train_size = int(0.9 * len(dataset)) + val_size = len(dataset) - train_size + train_dataset, val_dataset = random_split(dataset, [train_size, val_size]) + + if dataset_name.upper() in ['MNIST', 'FASHION']: + transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))]) + elif dataset_name.upper() in ['CIFAR10', 'CIFAR100']: + transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) + else: + raise ValueError("Unsupported dataset for transformation") + + # Define custom collate function to apply transformation + def collate_fn(batch): + images, labels = zip(*batch) + images = torch.stack([transform(img) for img in images]) + labels = torch.tensor(labels) + return images, labels + + # Create DataLoaders with the transformation applied in collate_fn + train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn) + val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn) + + return train_loader, val_loader + diff --git a/FedAvg/FedAvg/Models.py b/FedAvg/FedAvg/Models.py new file mode 100644 index 0000000000000000000000000000000000000000..4a47561e16fa273ef53582ae3a2de990e81c8023 --- /dev/null +++ b/FedAvg/FedAvg/Models.py @@ -0,0 +1,118 @@ +from typing import List + +import numpy as np +import torch +import torch.nn as nn +from prettytable import PrettyTable +from tqdm import tqdm + + +class DenseFashionModel(nn.Module): + def __init__(self): + super(DenseFashionModel, self).__init__() + + self.flatten = nn.Flatten() + self.fc1 = nn.Linear(28 * 28, 512) + self.fc2 = nn.Linear(512, 256) + self.fc3 = nn.Linear(256, 128) + self.fc4 = nn.Linear(128, 10) + + def forward(self, x): + x = self.flatten(x) + + x = self.fc1(x) + x = nn.ReLU()(x) + + x = self.fc2(x) + x = nn.ReLU()(x) + + x = self.fc3(x) + x = nn.ReLU()(x) + + x = self.fc4(x) + return x + + def display_layers(self): + # Create a table + table = PrettyTable() + table.field_names = ["Name", "Description", "Number of Parameters"] + + # Print all layers with their parameters and additional properties for Conv layers + for name, layer in self.named_children(): + # filtering trainable layers + number_of_parameters = sum(parameter.numel() for parameter in layer.parameters()) + if number_of_parameters != 0: + # Add layer to the table + table.add_row([name, layer, number_of_parameters]) + + # Print the table + print(table) + print('total number of parameters: {}'.format(sum(parameter.numel() for parameter in self.parameters()))) + + # Training function + + def train_epoch(self, train_loader, criterion, optimizer, device): + self.train() + + running_loss = 0.0 + running_corrects = 0 + + for inputs, labels in tqdm(train_loader, desc="Training", unit="step"): + inputs, labels = inputs.to(device), labels.to(device) + + optimizer.zero_grad() + + outputs = self(inputs) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + + running_loss += loss.item() + + _, predicted = outputs.max(1) + running_corrects += torch.sum(predicted == labels.data) + + epoch_accuracy = running_corrects.item() / len(train_loader.dataset) + epoch_loss = running_loss / len(train_loader) + + return epoch_loss, epoch_accuracy + + # Testing function + def test_epoch(self, test_loader, criterion, device): + self.eval() + + running_loss = 0.0 + running_corrects = 0 + + with torch.no_grad(): + for inputs, labels in tqdm(test_loader, desc="Testing", unit="batch"): + inputs, labels = inputs.to(device), labels.to(device) + + outputs = self(inputs) + loss = criterion(outputs, labels) + + running_loss += loss.item() + + _, predicted = outputs.max(1) + running_corrects += torch.sum(predicted == labels.data) + + epoch_accuracy = running_corrects.item() / len(test_loader.dataset) + epoch_loss = running_loss / len(test_loader) + + return epoch_loss, epoch_accuracy + + +class FedAvgModel(DenseFashionModel): + def __init__(self, use_bn=False): + super(FedAvgModel, self).__init__() + pass + + def get_model_parameters(self) -> List[np.ndarray]: + # Create and return copies of the parameters as numpy arrays. + model_parameters = [param.detach().cpu().numpy().copy() for param in self.parameters()] + return model_parameters + + def set_model_parameters(self, parameters: List[np.ndarray]): + # For each parameter, copy the input numpy array before converting it to a tensor. + for param, array in zip(self.parameters(), parameters): + param.data = torch.from_numpy(array.copy()).to(param.device) diff --git a/FedAvg/FedAvg/SimulationAnalyser.py b/FedAvg/FedAvg/SimulationAnalyser.py new file mode 100644 index 0000000000000000000000000000000000000000..a209de4b46319991ba957baa2c6b67d94c78bb5f --- /dev/null +++ b/FedAvg/FedAvg/SimulationAnalyser.py @@ -0,0 +1,110 @@ +import json +import os +import matplotlib.pyplot as plt +from .utils.logger import Logger +from prettytable import PrettyTable +from flwr.server import History # Assuming History is from Flower + + +class Analyser: + """Extended History class that supports logging, saving, loading, and plotting distributed loss and metrics.""" + + def __init__(self, log_dir='./logs', log_name=None) -> None: + self.evaluation_loss = {} + self.fit_metrics = {} + self.evaluation_metrics = {} + self.simulation_configuration = {} + self.logger = Logger(log_dir, log_name) if log_name else None + + def set_simulation_data(self, history:History, simulation_configuration:dict) -> None: + self.evaluation_loss = history.losses_distributed + self.fit_metrics = history.metrics_distributed_fit + self.evaluation_metrics = history.metrics_distributed + self.simulation_configuration = simulation_configuration + + def save_simulation_data(self, file_name:str) -> None: + """Save distributed losses and metrics to a file. Mode can be 'w' to overwrite or 'a' to append.""" + data = {'Simulation Configuration': self.simulation_configuration, + 'History': {"evaluation_loss": self.evaluation_loss, + "fit_metrics": self.fit_metrics, + "evaluation_metrics": self.evaluation_metrics, + } + } + + with open(file_name, "w") as file: + json.dump(data, file, indent=4) + if self.logger: + self.logger("INFO", f"Distributed metrics saved to {file_name}") + + def load_simulation_data(self, file_name:str) -> None: + """Load distributed losses and metrics from a file.""" + try: + with open(file_name, "r") as f: + data = json.load(f) + simulation_configuration = data["Simulation Configuration"] + history = data["History"] + self.evaluation_loss = history.get("losses_distributed", []) + self.fit_metrics = history.get("metrics_distributed_fit", {}) + self.evaluation_metrics = history.get("metrics_distributed", {}) + if self.logger: + self.logger("INFO", f"Distributed metrics loaded from {file_name}") + except (FileNotFoundError, json.JSONDecodeError): + if self.logger: + self.logger("ERROR", f"Failed to load metrics from {file_name}") + + def plot_results(self, fig_directory) -> None: + """Plot each metric and loss separately in different figures, optionally saving them to a directory.""" + if fig_directory and not os.path.exists(fig_directory): + os.makedirs(fig_directory) + + for metric, values in self.evaluation_metrics.items(): + plt.figure() + rounds, scores = zip(*values) + plt.plot(rounds, scores, linestyle="--", marker="o") + plt.xlabel("Rounds") + plt.ylabel(f"{metric}") + plt.title(f"Evaluation {metric}") + plt.legend() + plt.grid() + if fig_directory: + plt.savefig(os.path.join(fig_directory, f"Evaluation {metric}.png")) + else: + plt.show() + if self.logger: + self.logger("INFO", f"Evaluation {metric} was plotted") + + for metric, values in self.fit_metrics.items(): + plt.figure() + rounds, scores = zip(*values) + plt.plot(rounds, scores, linestyle=":", marker="s") + plt.xlabel("Rounds") + plt.ylabel(f"{metric}") + plt.title(f"Fit {metric}") + plt.legend() + plt.grid() + if fig_directory: + plt.savefig(os.path.join(fig_directory, f"Fit {metric}.png")) + else: + plt.show() + if self.logger: + self.logger("INFO", f"Fit {metric} was plotted") + + + def print_results_table(self) -> None: + """Print all metrics and losses in tabular format using PrettyTable.""" + table = PrettyTable() + table.field_names = ["Round", "Loss Distributed"] + list(self.evaluation_metrics.keys()) + list( + self.fit_metrics.keys()) + + all_rounds = set(r for r, _ in self.evaluation_loss) | set( + r for metric in self.evaluation_metrics.values() for r, _ in metric) | set( + r for metric in self.fit_metrics.values() for r, _ in metric) + for r in sorted(all_rounds): + row = [r] + row.append(next((l for round_, l in self.evaluation_loss if round_ == r), "-")) + for metric in self.evaluation_metrics.keys(): + row.append(next((v for round_, v in self.evaluation_metrics[metric] if round_ == r), "-")) + for metric in self.fit_metrics.keys(): + row.append(next((v for round_, v in self.fit_metrics[metric] if round_ == r), "-")) + table.add_row(row) + print(table) diff --git a/FedAvg/FedAvg/Strategy.py b/FedAvg/FedAvg/Strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..97da8389aa4e6f6c01a42bd025b29212b1ea38fb --- /dev/null +++ b/FedAvg/FedAvg/Strategy.py @@ -0,0 +1,269 @@ +from flwr.common import (Parameters, Scalar, FitRes, EvaluateRes, FitIns, EvaluateIns, parameters_to_ndarrays, + ndarrays_to_parameters) +from flwr.server import ClientManager +from flwr.server.client_proxy import ClientProxy +from flwr.server.strategy import Strategy +from typing import Optional, List, Dict, Tuple, Union +import numpy as np +from .utils.logger import Logger + + +class FedAvgStrategy(Strategy): + """Federated Averaging (FedAvg) Strategy for Federated Learning.""" + + def __init__(self, initial_parameters: Optional[Parameters] = None, + fraction_fit: float = 1.0, fit_config: Optional[dict[str, Scalar]] = None, + evaluate_config: Optional[dict[str, Scalar]] = None, + log_dir='./logs') -> None: + + self.initial_parameters = initial_parameters + + self.fraction_fit = fraction_fit + self.fit_config = fit_config if fit_config is not None else {} + + self.evaluate_config = evaluate_config if evaluate_config is not None else {} + + self.logger = Logger(log_dir, 'Strategy') + + def initialize_parameters(self, client_manager) -> Optional[Parameters]: + """Return the initial global model parameters.""" + + initial_parameters = self.initial_parameters + self.initial_parameters = None # Don't keep initial parameters in memory + return initial_parameters + + def configure_fit(self, server_round: int, parameters: Parameters, client_manager: ClientManager) -> list[tuple[ClientProxy, FitIns]]: + """Configure the next round of training. + + Parameters + ---------- + server_round : int + The current round of federated learning. + parameters : Parameters + The current (global) model parameters. + client_manager : ClientManager + The client manager which holds all currently connected clients. + + Returns + ------- + fit_configuration : List[Tuple[ClientProxy, FitIns]] + A list of tuples. Each tuple in the list identifies a `ClientProxy` and the + `FitIns` (i.e fit instructions) for this particular `ClientProxy`. If a particular `ClientProxy` + is not included in this list, it means that this `ClientProxy` + will not participate in the next round of federated learning. + + Remark + ------ + `FitIns` follows the structure: + + class FitIns: + parameters: Parameters + config: Dict[str, Scaler] + """ + sample_size = int(self.fraction_fit * client_manager.num_available()) + clients_proxies = client_manager.sample(num_clients=sample_size) + + self.logger('INFO', "Global parameters and clients' configuration were sent for fitting") + + # Create FitIns object for each client with the required parameters and configuration + return [(client_proxy, FitIns(parameters=parameters, config=self.fit_config)) for client_proxy in clients_proxies] + + def aggregate_fit( + self, + server_round: int, + results: list[tuple[ClientProxy, FitRes]], + failures: list[Union[tuple[ClientProxy, FitRes], BaseException]], + ) -> tuple[Optional[Parameters], dict[str, Scalar]]: + """Aggregate training results. + + Parameters + ---------- + server_round : int + The current round of federated learning. + results : List[Tuple[ClientProxy, FitRes]] + Successful updates from the previously selected and configured + clients. Each pair of `(ClientProxy, FitRes)` constitutes a + successful update from one of the previously selected clients. Not + that not all previously selected clients are necessarily included in + this list: a client might drop out and not submit a result. For each + client that did not submit an update, there should be an `Exception` + in `failures`. + failures : List[Union[Tuple[ClientProxy, FitRes], BaseException]] + Exceptions that occurred while the server was waiting for client + updates. + + Returns + ------- + parameters : Tuple[Optional[Parameters], Dict[str, Scalar]] + If parameters are returned, then the server will treat these as the + new global model parameters (i.e., it will replace the previous + parameters with the ones returned from this method). If `None` is + returned (e.g., because there were only failures and no viable + results) then the server will no update the previous model + parameters, the updates received in this round are discarded, and + the global model parameters remain the same. The aggregated metrics are also returned. + + Remark + ------ + `FitRes` (i.e. fit results) follows the structure: + + class FitRes: + status: Status + parameters: Parameters + num_examples: int + metrics: Dict[str, Scaler] + """ + if not results: + self.logger('INFO', "No updates received from the previous round") + return None, {} + + total_clients = len(results) + len(failures) + num_client_failures = sum(1 for f in failures if isinstance(f, tuple)) + num_exceptions = sum(1 for f in failures if isinstance(f, BaseException)) + failure_rate = len(failures) / total_clients if total_clients > 0 else 0.0 + + self.logger('WARNING', f"Round {server_round}: {failure_rate * 100:.2f}% of clients failed returning results") + self.logger('WARNING', f"Round {server_round}: {num_client_failures} client failures, {num_exceptions} exceptions.") + for client_proxy, failure in failures: + if isinstance(failure, BaseException): + self.logger('WARNING', f"Client {client_proxy.cid} Exception: {failure}") + + total_samples = sum(fit_res.num_examples for _, fit_res in results) + + # ✅ Convert received bytes to NumPy arrays before aggregation + weighted_updates = [] + + for _, fit_res in results: + client_weights = parameters_to_ndarrays(fit_res.parameters) + weighted_client_weights = [w * (fit_res.num_examples / total_samples) for w in client_weights] + weighted_updates.append(weighted_client_weights) + + # ✅ Aggregate Updates (Element-wise Summation) + aggregated_updates = [np.sum([weighted_update[k] for weighted_update in weighted_updates], axis=0) for k in + range(len(weighted_updates[0]))] + + aggregated_parameters = ndarrays_to_parameters(aggregated_updates) + + # Aggregate metrics + aggregated_metrics: Dict[str, Scalar] = {} + for _, fit_res in results: + for key, value in fit_res.metrics.items(): + if key not in aggregated_metrics: + aggregated_metrics[key] = 0.0 + aggregated_metrics[key] += value * fit_res.num_examples / total_samples + + self.logger('INFO', f"Round {server_round}: Results were aggregated from {len(results)} clients.") + return aggregated_parameters, aggregated_metrics + + + def configure_evaluate( + self, server_round: int, parameters: Parameters, client_manager: ClientManager + ) -> list[tuple[ClientProxy, EvaluateIns]]: + """Configure the next round of evaluation. + + Parameters + ---------- + server_round : int + The current round of federated learning. + parameters : Parameters + The current (global) model parameters. + client_manager : ClientManager + The client manager which holds all currently connected clients. + + Returns + ------- + evaluate_configuration : List[Tuple[ClientProxy, EvaluateIns]] + A list of tuples. Each tuple in the list identifies a `ClientProxy` and the + `EvaluateIns` for this particular `ClientProxy`. If a particular + `ClientProxy` is not included in this list, it means that this + `ClientProxy` will not participate in the next round of federated + evaluation. + + Remark + ------ + `EvaluateIns` (i.e. evaluation instructions) follows the structure: + + class EvaluateIns: + parameters: Parameters + config: Dict[str, Scaler] + """ + clients = client_manager.all() + self.logger('INFO', "clients' configuration were sent for fitting") + return [(client_proxy, EvaluateIns(parameters=parameters, config=self.evaluate_config)) for cid, client_proxy in clients.items()] + + def aggregate_evaluate( + self, + server_round: int, + results: list[tuple[ClientProxy, EvaluateRes]], + failures: list[Union[tuple[ClientProxy, EvaluateRes], BaseException]], + ) -> tuple[Optional[float], dict[str, Scalar]]: + """Aggregate evaluation results. + + Parameters + ---------- + server_round : int + The current round of federated learning. + results : List[Tuple[ClientProxy, FitRes]] + Successful updates from the + previously selected and configured clients. Each pair of + `(ClientProxy, FitRes` constitutes a successful update from one of the + previously selected clients. Not that not all previously selected + clients are necessarily included in this list: a client might drop out + and not submit a result. For each client that did not submit an update, + there should be an `Exception` in `failures`. + failures : List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]] + Exceptions that occurred while the server was waiting for client updates. + + Returns + ------- + aggregation_result : Tuple[Optional[float], Dict[str, Scalar]] + The aggregated evaluation result. Aggregation typically uses some variant + of a weighted average. + + Remark + ------ + `EvaluateRes` (i.e. evaluation results) follows the structure: + + class EvaluateRes: + status: Status + loss: float + num_examples: int + metrics: Dict[str, Scaler] + """ + if not results: + self.logger('WARNING', f"Round {server_round}: No successful evaluation results. Skipping aggregation.") + return None, {} + + total_clients = len(results) + len(failures) + num_client_failures = sum(1 for f in failures if isinstance(f, tuple)) + num_exceptions = sum(1 for f in failures if isinstance(f, BaseException)) + failure_rate = len(failures) / total_clients if total_clients > 0 else 0.0 + + self.logger('WARNING', f"Round {server_round}: {failure_rate * 100:.2f}% of clients failed returning results") + self.logger('WARNING', f"Round {server_round}: {num_client_failures} client failures, {num_exceptions} exceptions.") + for client_proxy, failure in failures: + if isinstance(failure, BaseException): + self.logger('WARNING', f"Client {client_proxy.cid} Exception: {failure}") + + total_samples = sum(evaluate_res.num_examples for _, evaluate_res in results) + + # Aggregate loss + weighted_loss = sum( + evaluate_res.loss * evaluate_res.num_examples / total_samples + for _, evaluate_res in results + ) + + # Aggregate metrics + aggregated_metrics: Dict[str, Scalar] = {} + for _, evaluate_res in results: + for key, value in evaluate_res.metrics.items(): + if key not in aggregated_metrics: + aggregated_metrics[key] = 0.0 + aggregated_metrics[key] += value * evaluate_res.num_examples / total_samples + + self.logger('INFO', f"Round {server_round}: Results were aggregated from {len(results)} clients.") + + return weighted_loss, aggregated_metrics + + def evaluate(self, server_round: int, parameters: Parameters) -> Optional[tuple[float, dict[str, Scalar]]]: + pass diff --git a/FedAvg/FedAvg/__init__.py b/FedAvg/FedAvg/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/FedAvg/FedAvg/__pycache__/Client.cpython-39.pyc b/FedAvg/FedAvg/__pycache__/Client.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d016968d473b0fd44297b4d946bf06ecdeb8ee12 Binary files /dev/null and b/FedAvg/FedAvg/__pycache__/Client.cpython-39.pyc differ diff --git a/FedAvg/FedAvg/__pycache__/ClientManager.cpython-39.pyc b/FedAvg/FedAvg/__pycache__/ClientManager.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..9a5a2b37c0d39bc41d67638698b10a649e1fc4cd Binary files /dev/null and b/FedAvg/FedAvg/__pycache__/ClientManager.cpython-39.pyc differ diff --git a/FedAvg/FedAvg/__pycache__/FederatedData.cpython-39.pyc b/FedAvg/FedAvg/__pycache__/FederatedData.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..dd2cf3f9bf08c30dd2ad9b66e963ad3b04981dd0 Binary files /dev/null and b/FedAvg/FedAvg/__pycache__/FederatedData.cpython-39.pyc differ diff --git a/FedAvg/FedAvg/__pycache__/Models.cpython-39.pyc b/FedAvg/FedAvg/__pycache__/Models.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..15790d322d747dc6a0a74461d5fbaecae06bc3bc Binary files /dev/null and b/FedAvg/FedAvg/__pycache__/Models.cpython-39.pyc differ diff --git a/FedAvg/FedAvg/__pycache__/SimulationAnalyser.cpython-39.pyc b/FedAvg/FedAvg/__pycache__/SimulationAnalyser.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..2ff49f50dab3e6b2a220327785e3b933c5702ab8 Binary files /dev/null and b/FedAvg/FedAvg/__pycache__/SimulationAnalyser.cpython-39.pyc differ diff --git a/FedAvg/FedAvg/__pycache__/Strategy.cpython-39.pyc b/FedAvg/FedAvg/__pycache__/Strategy.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..3cb539b16c558f2df761702be47cf690a8035094 Binary files /dev/null and b/FedAvg/FedAvg/__pycache__/Strategy.cpython-39.pyc differ diff --git a/FedAvg/FedAvg/__pycache__/__init__.cpython-39.pyc b/FedAvg/FedAvg/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..e544e3f91604b21a91beafbcdcb7b8ba1d0b93bb Binary files /dev/null and b/FedAvg/FedAvg/__pycache__/__init__.cpython-39.pyc differ diff --git a/FedAvg/FedAvg/utils/__init__.py b/FedAvg/FedAvg/utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/FedAvg/FedAvg/utils/__pycache__/__init__.cpython-39.pyc b/FedAvg/FedAvg/utils/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..ba1e660b8ba9cea826d60ef29472398db7e35928 Binary files /dev/null and b/FedAvg/FedAvg/utils/__pycache__/__init__.cpython-39.pyc differ diff --git a/FedAvg/FedAvg/utils/__pycache__/logger.cpython-39.pyc b/FedAvg/FedAvg/utils/__pycache__/logger.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..a033862e57a40df6b1cef65cfd5c1527cff52bc8 Binary files /dev/null and b/FedAvg/FedAvg/utils/__pycache__/logger.cpython-39.pyc differ diff --git a/FedAvg/FedAvg/utils/__pycache__/tools.cpython-39.pyc b/FedAvg/FedAvg/utils/__pycache__/tools.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..c249547d616ddf779e7f0bdf3a05423bff79aa80 Binary files /dev/null and b/FedAvg/FedAvg/utils/__pycache__/tools.cpython-39.pyc differ diff --git a/FedAvg/FedAvg/utils/logger.py b/FedAvg/FedAvg/utils/logger.py new file mode 100644 index 0000000000000000000000000000000000000000..cc710b602bbc0e7fd4a51136d97b37afafcc4bb3 --- /dev/null +++ b/FedAvg/FedAvg/utils/logger.py @@ -0,0 +1,42 @@ +import logging +import os +from threading import Lock + + +class Logger: + stdout_lock = Lock() + + def __init__(self, logging_dir, name): + os.makedirs(logging_dir, exist_ok=True) + + self.logger = logging.getLogger(name) + + file_handler = logging.FileHandler(os.path.join(logging_dir, name + '.log'), 'w') + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + file_handler.setFormatter(formatter) + + self.logger.addHandler(file_handler) + + def __call__(self, status: str, message: str): + status = status.upper() + with self.stdout_lock: + logging.basicConfig(level=logging.DEBUG) + self.logger.setLevel(status) + if status == 'INFO': + self.logger.info(message) + # print(Fore.GREEN + message + Fore.WHITE) + elif status == 'WARNING': + self.logger.warning(message) + # print(Fore.YELLOW + message + Fore.WHITE) + elif status == 'ERROR': + self.logger.error(message) + # print(Fore.RED + message + Fore.WHITE) + elif status == 'CRITICAL': + self.logger.critical(message) + # print(Fore.BLACK + message + Fore.WHITE) + elif status == 'DEBUG': + self.logger.debug(message) + # print(Fore.BLUE + message + Fore.WHITE) + else: + self.logger.error('ERROR: Invalid Status') + diff --git a/FedAvg/FedAvg/utils/tools.py b/FedAvg/FedAvg/utils/tools.py new file mode 100644 index 0000000000000000000000000000000000000000..caa4bc9c32e717706e45de495f32a5415dd717df --- /dev/null +++ b/FedAvg/FedAvg/utils/tools.py @@ -0,0 +1,30 @@ +import time + + +import random +import numpy as np +import torch + +class StopWatch: + def __init__(self): + self.start_instance = time.time() + + def stop(self) -> float: + current_instance = time.time() + return current_instance - self.start_instance + + +def hhmmss(seconds: float) -> str: + hours, remainder = divmod(seconds, 3600) + minutes, seconds = divmod(remainder, 60) + return "{:02d}:{:02d}:{:02d}".format(int(hours), int(minutes), int(seconds)) + + +def set_seed(seed): + random.seed(seed) + np.random.seed(seed) + torch.manual_seed(seed) + if torch.cuda.is_available(): + torch.cuda.manual_seed_all(seed) + torch.backends.cudnn.deterministic = True + torch.backends.cudnn.benchmark = False diff --git a/FedAvg/__init__.py b/FedAvg/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/FedAvg/__pycache__/__init__.cpython-39.pyc b/FedAvg/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..34e4ce800552a6faa657409d7097b8e5138a660a Binary files /dev/null and b/FedAvg/__pycache__/__init__.cpython-39.pyc differ diff --git a/FedAvg/config/__pycache__/config.cpython-39.pyc b/FedAvg/config/__pycache__/config.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d081f0c1ef08b4008b92b72fda52a9c4c1829f7e Binary files /dev/null and b/FedAvg/config/__pycache__/config.cpython-39.pyc differ diff --git a/FedAvg/config/__pycache__/hyperparameters.cpython-39.pyc b/FedAvg/config/__pycache__/hyperparameters.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..7afa876f057bbdd4ff06d5c34b25e9786995e3ae Binary files /dev/null and b/FedAvg/config/__pycache__/hyperparameters.cpython-39.pyc differ diff --git a/FedAvg/config/config.py b/FedAvg/config/config.py new file mode 100644 index 0000000000000000000000000000000000000000..73ce93349ca565f925e81a5d8f8edfdae22566c3 --- /dev/null +++ b/FedAvg/config/config.py @@ -0,0 +1,21 @@ +LOGGING_DIR = "/home/adabaja/DELIGHT/eflwr/FedAvg/logs" +RESULTS_DIR = "/home/adabaja/DELIGHT/eflwr/FedAvg/results" +HISTORY_FILE = "/home/adabaja/DELIGHT/eflwr/FedAvg/history.json" +DATASET_DIR = "/home/adabaja/DELIGHT/eflwr/FedAvg/datasets" + +# Server Configuration +SERVER_CONFIG = { + "cpus": "4.0", + "memory": "8g", + "gpu": False, + "port": 8080, + "num_rounds": 5 +} + +# Define hardware resources for each client dynamically +DEVICES_HARDWARE = { + 1: {"cpus": "2.0", "memory": "4g", "gpu": 0}, + 2: {"cpus": "1.5", "memory": "3g", "gpu": 1}, + 3: {"cpus": "1.0", "memory": "2g", "gpu": None}, # No GPU + 4: {"cpus": "2.5", "memory": "5g", "gpu": 2}, +} diff --git a/FedAvg/config/hyperparameters.py b/FedAvg/config/hyperparameters.py new file mode 100644 index 0000000000000000000000000000000000000000..67cbc36b18acf113c51e934d42f4b00528081b2c --- /dev/null +++ b/FedAvg/config/hyperparameters.py @@ -0,0 +1,14 @@ +# Global hyperparameters used across the system +SEED = 42 + +NUM_ROUNDS = 3 +NUM_CLIENTS = 5 + +EPOCHS = 1 +FRACTION_FIT = 1.0 + +DATASET_NAME = "Fashion" +ALPHA_DIRICHLET = 1.0 + +BATCH_SIZE = 64 +LEARNING_RATE = 0.02 diff --git a/FedAvg/data/FashionMNIST/raw/t10k-images-idx3-ubyte b/FedAvg/data/FashionMNIST/raw/t10k-images-idx3-ubyte new file mode 100644 index 0000000000000000000000000000000000000000..37bac79bc6f002d965f363e0b20514904504a4d9 Binary files /dev/null and b/FedAvg/data/FashionMNIST/raw/t10k-images-idx3-ubyte differ diff --git a/FedAvg/data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz b/FedAvg/data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz new file mode 100644 index 0000000000000000000000000000000000000000..667844f10bb92ae415eddb04e718f3e702547007 Binary files /dev/null and b/FedAvg/data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz differ diff --git a/FedAvg/data/FashionMNIST/raw/t10k-labels-idx1-ubyte b/FedAvg/data/FashionMNIST/raw/t10k-labels-idx1-ubyte new file mode 100644 index 0000000000000000000000000000000000000000..2195a4d0957e013db98d03e51697e2315816cce2 Binary files /dev/null and b/FedAvg/data/FashionMNIST/raw/t10k-labels-idx1-ubyte differ diff --git a/FedAvg/data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz b/FedAvg/data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz new file mode 100644 index 0000000000000000000000000000000000000000..abdddb89d6563f90ab89baf2970f67a1ffb9ad10 Binary files /dev/null and b/FedAvg/data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz differ diff --git a/FedAvg/data/FashionMNIST/raw/train-images-idx3-ubyte b/FedAvg/data/FashionMNIST/raw/train-images-idx3-ubyte new file mode 100644 index 0000000000000000000000000000000000000000..ff2f5a96367a3c1656f09c93649f07143ff8b11e Binary files /dev/null and b/FedAvg/data/FashionMNIST/raw/train-images-idx3-ubyte differ diff --git a/FedAvg/data/FashionMNIST/raw/train-images-idx3-ubyte.gz b/FedAvg/data/FashionMNIST/raw/train-images-idx3-ubyte.gz new file mode 100644 index 0000000000000000000000000000000000000000..e6ee0e37929b03937517cd45e65f4e8ca9de720d Binary files /dev/null and b/FedAvg/data/FashionMNIST/raw/train-images-idx3-ubyte.gz differ diff --git a/FedAvg/data/FashionMNIST/raw/train-labels-idx1-ubyte b/FedAvg/data/FashionMNIST/raw/train-labels-idx1-ubyte new file mode 100644 index 0000000000000000000000000000000000000000..30424ca2ea876655f5bba14f9f07132769687975 Binary files /dev/null and b/FedAvg/data/FashionMNIST/raw/train-labels-idx1-ubyte differ diff --git a/FedAvg/data/FashionMNIST/raw/train-labels-idx1-ubyte.gz b/FedAvg/data/FashionMNIST/raw/train-labels-idx1-ubyte.gz new file mode 100644 index 0000000000000000000000000000000000000000..9c4aae27b27439210af73e0803a54a74af5c8756 Binary files /dev/null and b/FedAvg/data/FashionMNIST/raw/train-labels-idx1-ubyte.gz differ diff --git a/FedAvg/datasets/client_0.pt b/FedAvg/datasets/client_0.pt new file mode 100644 index 0000000000000000000000000000000000000000..8a687c89ac5335777d39eb3405354147a85c7d1f Binary files /dev/null and b/FedAvg/datasets/client_0.pt differ diff --git a/FedAvg/datasets/client_1.pt b/FedAvg/datasets/client_1.pt new file mode 100644 index 0000000000000000000000000000000000000000..6a42909cd050118f9e15b275c1f9dda1c49b9f84 Binary files /dev/null and b/FedAvg/datasets/client_1.pt differ diff --git a/FedAvg/datasets/client_2.pt b/FedAvg/datasets/client_2.pt new file mode 100644 index 0000000000000000000000000000000000000000..5f9141f7fc5423e8859a1b537ef0fe13c008abc5 Binary files /dev/null and b/FedAvg/datasets/client_2.pt differ diff --git a/FedAvg/datasets/client_3.pt b/FedAvg/datasets/client_3.pt new file mode 100644 index 0000000000000000000000000000000000000000..06a61cc6ad5d44a3c4236861c8948fb0e9be7121 Binary files /dev/null and b/FedAvg/datasets/client_3.pt differ diff --git a/FedAvg/datasets/client_4.pt b/FedAvg/datasets/client_4.pt new file mode 100644 index 0000000000000000000000000000000000000000..be97c21f176c7ca671d1124cb690ce7d14b2afc1 Binary files /dev/null and b/FedAvg/datasets/client_4.pt differ diff --git a/FedAvg/generate_datasets.py b/FedAvg/generate_datasets.py new file mode 100644 index 0000000000000000000000000000000000000000..51734d2a497f9e9f045978d8832c547a407f3f2a --- /dev/null +++ b/FedAvg/generate_datasets.py @@ -0,0 +1,8 @@ +from FedAvg.FederatedData import generate_distributed_data +from config.config import DATASET_DIR +from config.hyperparameters import DATASET_NAME, NUM_CLIENTS, ALPHA_DIRICHLET + +generate_distributed_data(dataset_name=DATASET_NAME, + num_clients=NUM_CLIENTS, + alpha=ALPHA_DIRICHLET, + save_dir=DATASET_DIR) \ No newline at end of file diff --git a/FedAvg/results b/FedAvg/results new file mode 100644 index 0000000000000000000000000000000000000000..1550aa730ff3a9bf4c49a2b38907d99cc02d6980 --- /dev/null +++ b/FedAvg/results @@ -0,0 +1,145 @@ +{ + "Simulation Configuration": { + "SEED": 42, + "NUM_ROUNDS": 3, + "NUM_CLIENTS": 5, + "EPOCHS": 1, + "FRACTION_FIT": 1.0, + "DATASET_NAME": "Fashion", + "ALPHA_DIRICHLET": 1.0, + "BATCH_SIZE": 64, + "LEARNING_RATE": 0.02 + }, + "History": { + "evaluation_loss": [ + [ + 1, + 2.070363076745729 + ], + [ + 2, + 0.9467033390421264 + ], + [ + 3, + 0.8739871526511261 + ] + ], + "fit_metrics": { + "training duration": [ + [ + 1, + 5.047897100448608 + ], + [ + 2, + 6.3023582378923155 + ], + [ + 3, + 6.2881482076660795 + ] + ], + "validation losses": [ + [ + 1, + 0.9921783669428392 + ], + [ + 2, + 0.8072508409611623 + ], + [ + 3, + 0.7719950964315752 + ] + ], + "validation accuracies": [ + [ + 1, + 0.5892206846321922 + ], + [ + 2, + 0.7132717295759724 + ], + [ + 3, + 0.7429301170003891 + ] + ], + "training losses": [ + [ + 1, + 1.536044654450886 + ], + [ + 2, + 0.8609070021714075 + ], + [ + 3, + 0.6042069714669341 + ] + ], + "training accuracies": [ + [ + 1, + 0.4938861446270953 + ], + [ + 2, + 0.7119522945294271 + ], + [ + 3, + 0.7851587095818363 + ] + ] + }, + "evaluation_metrics": { + "validation losses": [ + [ + 1, + 2.07036306663827 + ], + [ + 2, + 0.9467033326673464 + ], + [ + 3, + 0.8739871530719484 + ] + ], + "validation duration": [ + [ + 1, + 0.5905116313062323 + ], + [ + 2, + 0.5733735400889167 + ], + [ + 3, + 0.5702985465149211 + ] + ], + "validation accuracies": [ + [ + 1, + 0.3030656447850717 + ], + [ + 2, + 0.6287904031989336 + ], + [ + 3, + 0.6334555148283906 + ] + ] + } + } +} \ No newline at end of file diff --git a/FedAvg/run_client.py b/FedAvg/run_client.py new file mode 100644 index 0000000000000000000000000000000000000000..9889911a1fc91bb92537a5486f471e5295cf39aa --- /dev/null +++ b/FedAvg/run_client.py @@ -0,0 +1,49 @@ +import sys + +import flwr as fl +import torch +import logging + +from FedAvg.Client import FedAvgClient +from FedAvg.Models import FedAvgModel +from FedAvg.FederatedData import load_client_dataset +from FedAvg.utils.tools import set_seed + +from config.hyperparameters import SEED, BATCH_SIZE +from config.hyperparameters import DATASET_NAME +from config.config import SERVER_CONFIG, DATASET_DIR, LOGGING_DIR + +from pathlib import Path + +# Fix randomness +set_seed(SEED) + +cid = int(sys.argv[1]) +server_ip = sys.argv[2] +logging.info(f"Starting client {cid}") + +# Get data and model +training_loader, validation_loader = load_client_dataset(cid=cid, + dataset_name=DATASET_NAME, + directory=str(Path(__file__).parent / DATASET_DIR), + batch_size=BATCH_SIZE) + +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + +model = FedAvgModel() + +# Create client instance +client = FedAvgClient(client_id=cid, + device=device, + model=model, + train_loader=training_loader, + val_loader=validation_loader, + log_dir=LOGGING_DIR) + +# Start the client +logging.info(f"Client {cid} connecting to {server_ip}") + +try: + fl.client.start_client(server_address=server_ip, client=client) +except Exception as e: + logging.error(f"Client {cid} failed to connect: {e}") diff --git a/FedAvg/run_server.py b/FedAvg/run_server.py new file mode 100644 index 0000000000000000000000000000000000000000..79d311db2ddf4fac6745a1fe02d9da1bcd89e961 --- /dev/null +++ b/FedAvg/run_server.py @@ -0,0 +1,56 @@ +import flwr as fl +from flwr.server import ServerConfig + +from FedAvg.ClientManager import FedAvgClientManager +from FedAvg.Strategy import FedAvgStrategy + +from FedAvg.SimulationAnalyser import Analyser + +from FedAvg.utils.logger import Logger +from FedAvg.utils.tools import set_seed + +from config.hyperparameters import SEED, NUM_ROUNDS, LEARNING_RATE, EPOCHS, FRACTION_FIT +from config.config import SERVER_CONFIG, LOGGING_DIR, RESULTS_DIR +import config.hyperparameters as hyperparams + +# Extract uppercase variables as a dictionary +simulation_configuration = {k: v for k, v in vars(hyperparams).items() if k.isupper() and not k.startswith("__")} + + +# Initialize logger +logger = Logger(LOGGING_DIR,"Server") + +# Fix randomness +set_seed(SEED) + +fit_config = {'learning_rate': LEARNING_RATE, + 'epochs': EPOCHS,} +evaluate_config = {} + +# Initialize components +client_manager = FedAvgClientManager() +strategy = FedAvgStrategy(fraction_fit=FRACTION_FIT, + fit_config=fit_config, + evaluate_config=evaluate_config) +config = ServerConfig(num_rounds=NUM_ROUNDS, + round_timeout=None) + + +logger('INFO', "Starting Federated Learning Server...") + +# Run Federated Learning Server +history = fl.server.start_server(server_address="0.0.0.0:"+str(SERVER_CONFIG["port"]), + # server= + config=config, + client_manager=client_manager, + strategy=strategy) + +# Store results in the analyser +analyser = Analyser("Result Analyser") +analyser.set_simulation_data(history=history, + simulation_configuration=simulation_configuration) +analyser.save_simulation_data(file_name=RESULTS_DIR) + +analyser.plot_results('./simulation_results') + +logger('INFO', "Federated Learning Server finished.") diff --git a/FedAvg/simulation_results/Evaluation validation accuracies.png b/FedAvg/simulation_results/Evaluation validation accuracies.png new file mode 100644 index 0000000000000000000000000000000000000000..e4c77063f70f45e4f220e040959cf68d9772cf40 Binary files /dev/null and b/FedAvg/simulation_results/Evaluation validation accuracies.png differ diff --git a/FedAvg/simulation_results/Evaluation validation duration.png b/FedAvg/simulation_results/Evaluation validation duration.png new file mode 100644 index 0000000000000000000000000000000000000000..b0c377ebef43c4b9a756b0596f074457cd249d6d Binary files /dev/null and b/FedAvg/simulation_results/Evaluation validation duration.png differ diff --git a/FedAvg/simulation_results/Evaluation validation losses.png b/FedAvg/simulation_results/Evaluation validation losses.png new file mode 100644 index 0000000000000000000000000000000000000000..a34dcc3b0edcb79deee580adbfe061ec5e7f5d53 Binary files /dev/null and b/FedAvg/simulation_results/Evaluation validation losses.png differ diff --git a/FedAvg/simulation_results/Fit training accuracies.png b/FedAvg/simulation_results/Fit training accuracies.png new file mode 100644 index 0000000000000000000000000000000000000000..4f2495a85b49fa9a2626218ea59227f1d27b68a5 Binary files /dev/null and b/FedAvg/simulation_results/Fit training accuracies.png differ diff --git a/FedAvg/simulation_results/Fit training duration.png b/FedAvg/simulation_results/Fit training duration.png new file mode 100644 index 0000000000000000000000000000000000000000..fe534cc089c417c5b63cf8c204e6d4192b22f960 Binary files /dev/null and b/FedAvg/simulation_results/Fit training duration.png differ diff --git a/FedAvg/simulation_results/Fit training losses.png b/FedAvg/simulation_results/Fit training losses.png new file mode 100644 index 0000000000000000000000000000000000000000..ed789016e41600d497a27564e3da3e2aa51a0bf3 Binary files /dev/null and b/FedAvg/simulation_results/Fit training losses.png differ diff --git a/FedAvg/simulation_results/Fit validation accuracies.png b/FedAvg/simulation_results/Fit validation accuracies.png new file mode 100644 index 0000000000000000000000000000000000000000..3d391f3b041961f4bc0af4c0993abd57f360ec6d Binary files /dev/null and b/FedAvg/simulation_results/Fit validation accuracies.png differ diff --git a/FedAvg/simulation_results/Fit validation losses.png b/FedAvg/simulation_results/Fit validation losses.png new file mode 100644 index 0000000000000000000000000000000000000000..e93d3d7a5b006442b784b508f23bfb307db73bae Binary files /dev/null and b/FedAvg/simulation_results/Fit validation losses.png differ diff --git a/Flower_v1/client.py b/Flower_v1/client.py deleted file mode 100644 index 785f0e869b3c30fc3d804c052ebc5bd860fc63c0..0000000000000000000000000000000000000000 --- a/Flower_v1/client.py +++ /dev/null @@ -1,67 +0,0 @@ -import flwr as fl -import tensorflow as tf -import sys - -# Function to load datasets based on the provided argument -def load_data(dataset_name): - if dataset_name == "cifar10": - (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data() - x_train = x_train.astype('float32') / 255.0 - x_test = x_test.astype('float32') / 255.0 - num_classes = 10 # CIFAR-10 has 10 classes - input_shape = (32, 32, 3) - elif dataset_name == "mnist": - (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() - x_train = x_train.reshape(-1, 28, 28, 1).astype('float32') / 255.0 # Add channel dimension - x_test = x_test.reshape(-1, 28, 28, 1).astype('float32') / 255.0 # Add channel dimension - num_classes = 10 # MNIST has 10 classes - input_shape = (28, 28, 1) - elif dataset_name == "cifar100": - (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar100.load_data() - x_train = x_train.astype('float32') / 255.0 - x_test = x_test.astype('float32') / 255.0 - num_classes = 100 # CIFAR-100 has 100 classes - input_shape = (32, 32, 3) - else: - raise ValueError("Dataset not supported: choose 'cifar10', 'mnist', or 'cifar100'") - - return (x_train, y_train), (x_test, y_test), num_classes, input_shape - -# Default dataset to CIFAR-10 -dataset_name = "cifar10" # Default dataset if no argument is passed -if len(sys.argv) > 1: - dataset_name = sys.argv[1] - -# Load the dataset -(x_train, y_train), (x_test, y_test), num_classes, input_shape = load_data(dataset_name) - -# Define the model (MobileNetV2 for this example) -model = tf.keras.applications.MobileNetV2(input_shape=input_shape, classes=num_classes, weights=None) -model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"]) - -# Define Flower client -class CifarClient(fl.client.NumPyClient): - def get_parameters(self, config): - return model.get_weights() - - def fit(self, parameters, config): - model.set_weights(parameters) - model.fit(x_train, y_train, epochs=1, batch_size=32, steps_per_epoch=3) - return model.get_weights(), len(x_train), {} - - def evaluate(self, parameters, config): - model.set_weights(parameters) - loss, accuracy = model.evaluate(x_test, y_test) - return loss, len(x_test), {"accuracy": float(accuracy)} - -# Start Flower client -if len(sys.argv) < 3: - fl.client.start_client(server_address="127.0.0.1:8080", - client=CifarClient(), - max_retries=5, - max_wait_time=10) #in seconds -else: - fl.client.start_client(server_address=sys.argv[2], - client=CifarClient(), - max_retries=5, - max_wait_time=10) diff --git a/Flower_v1/client_1.py b/Flower_v1/client_1.py deleted file mode 100644 index 8d0aed8cf23ca0b352e33a74b1dc3796b49f91c5..0000000000000000000000000000000000000000 --- a/Flower_v1/client_1.py +++ /dev/null @@ -1,103 +0,0 @@ -#python3 client_1.py <dataset> <partition> <client_num> <IP:PORT> -#python3 client_1.py cifar10 1 3 172.16.66.55:8080 - -import flwr as fl -import tensorflow as tf -from sklearn.model_selection import train_test_split -import numpy as np -import sys - -# Function to load and partition the dataset based on client_id -def load_partitioned_data(dataset_name, num_clients, client_id): - if dataset_name == "cifar10": - ''' - total samples train: 50000 - total samples test: 10000 - sample type: animals, vehicles, etc. - ''' - (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data() - x_train = x_train.astype("float32") / 255.0 - x_test = x_test.astype("float32") / 255.0 - num_classes = 10 - input_shape = (32, 32, 3) - elif dataset_name == "mnist": - ''' - total samples train: 60000 - total samples test: 10000 - sample type: handwritten digits - ''' - (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() - x_train = x_train.reshape(-1, 28, 28, 1).astype("float32") / 255.0 - x_test = x_test.reshape(-1, 28, 28, 1).astype("float32") / 255.0 - num_classes = 10 - input_shape = (28, 28, 1) - elif dataset_name == "cifar100": - ''' - total samples train: 50000 - total samples test: 10000 - sample type: animals, objects - ''' - (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar100.load_data() - x_train = x_train.astype("float32") / 255.0 - x_test = x_test.astype("float32") / 255.0 - num_classes = 100 - input_shape = (32, 32, 3) - else: - raise ValueError("Dataset not supported. Use 'cifar10', 'mnist', or 'cifar100'.") - - # Partition the dataset among the clients (same data distribution with same number_clients setup) - total_samples = x_train.shape[0] - samples_per_client = total_samples // num_clients - - start = client_id * samples_per_client - end = (client_id + 1) * samples_per_client if client_id != num_clients - 1 else total_samples - - x_client_train = x_train[start:end] - y_client_train = y_train[start:end] - - # x_client_train = x_train - # y_client_train = y_train - - # Split the client-specific data into training and validation sets - x_train, x_val, y_train, y_val = train_test_split(x_client_train, y_client_train, test_size=0.2, random_state=42) - - return (x_train, y_train), (x_val, y_val), (x_test, y_test), num_classes, input_shape - -# Get command-line arguments: dataset, client_id, num_clients, and server address -if len(sys.argv) != 5: - print("Usage: python3 client_1.py <dataset> <partition> <client_num> <IP:PORT>") - sys.exit(1) - -dataset_name = sys.argv[1] -client_id = int(sys.argv[2]) -num_clients = int(sys.argv[3]) -server_address = sys.argv[4] - -# Load partitioned data for the client -(x_train, y_train), (x_val, y_val), (x_test, y_test), num_classes, input_shape = load_partitioned_data(dataset_name, num_clients, client_id) - -# Define the model (MobileNetV2) -model = tf.keras.applications.MobileNetV2(input_shape=input_shape, classes=num_classes, weights=None) -model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) - -# Define Flower client -class FederatedClient(fl.client.NumPyClient): - def get_parameters(self, config): - return model.get_weights() - - def fit(self, parameters, config): - model.set_weights(parameters) - model.fit(x_train, y_train, epochs=1, batch_size=32, validation_data=(x_val, y_val)) - return model.get_weights(), len(x_train), {} - - def evaluate(self, parameters, config): - model.set_weights(parameters) - loss, accuracy = model.evaluate(x_test, y_test, verbose=0) - return loss, len(x_test), {"accuracy": float(accuracy)} - -# Start Flower client -if __name__ == "__main__": - fl.client.start_client(server_address=server_address, - client=FederatedClient(), - max_retries=3, # retry connection to server 3 times - max_wait_time=5) # wait 5 seconds before retrying diff --git a/Flower_v1/server.py b/Flower_v1/server.py deleted file mode 100644 index d736363e5df18d1d18dbc84dbc500242cac7b400..0000000000000000000000000000000000000000 --- a/Flower_v1/server.py +++ /dev/null @@ -1,78 +0,0 @@ -from typing import List, Tuple -from flwr.server import ServerApp, ServerConfig -from flwr.server.strategy import FedAvg, FedMedian, FedOpt, FedAdam -from flwr.common import Metrics, Parameters -import argparse -import tensorflow as tf -import flwr - -# Create the argument parser -parser = argparse.ArgumentParser(description="Server setting for FLWR") - -# Add the argument for num_rounds (-r or --rounds) -parser.add_argument("-r", "--rounds", type=int, default=10, help="Number of rounds") - -# Add the argument for strategy (-s or --strategy) -parser.add_argument( - "-s", - "--strategy", - type=str, - default="fedAvg", - choices=["fedAvg", "fedMedian", "fedOpt", "fedAdam"], # Add options strategies - help="Select strategy: fedAvg, fedMedian, fedOpt or fedAdam" -) - -# Parse the arguments from the command line -args = parser.parse_args() - -# Define metric aggregation function -def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics: - # Multiply accuracy of each client by number of examples used - accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics] - examples = [num_examples for num_examples, _ in metrics] - - # Aggregate and return custom metric (weighted average) - return {"accuracy": sum(accuracies) / sum(examples)} - -# Load model for server-side parameter initialization -model = tf.keras.applications.MobileNetV2( - input_shape=(32, 32, 3), weights=None, classes=10 -) -model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"]) - -# Get model weights as a list of NumPy ndarray's -weights = model.get_weights() - -# Serialize ndarrays to `Parameters` -parameters = flwr.common.ndarrays_to_parameters(weights) - -# Select the strategy based on the input argument -if args.strategy == "fedAvg": - strategy = FedAvg(evaluate_metrics_aggregation_fn=weighted_average) -elif args.strategy == "fedMedian": - strategy = FedMedian(evaluate_metrics_aggregation_fn=weighted_average) -elif args.strategy == "fedOpt": - strategy = FedOpt(evaluate_metrics_aggregation_fn=weighted_average, initial_parameters=parameters) -elif args.strategy == "fedAdam": - strategy = FedAdam(evaluate_metrics_aggregation_fn=weighted_average, initial_parameters=parameters) - -# Define config -config = ServerConfig(num_rounds=args.rounds) - - -# # Flower ServerApp -# app = ServerApp( -# config=config, -# strategy=strategy, -# ) - - -# Legacy mode -if __name__ == "__main__": - from flwr.server import start_server - - start_server( - server_address="0.0.0.0:8080", - config=config, - strategy=strategy, - ) diff --git a/Flower_v1/server_1.py b/Flower_v1/server_1.py deleted file mode 100644 index a2ca18eb6830448f5d88bf321e87de6527e89585..0000000000000000000000000000000000000000 --- a/Flower_v1/server_1.py +++ /dev/null @@ -1,204 +0,0 @@ -import csv -import time -import os -from datetime import datetime -from typing import List, Tuple, Dict, Optional -from flwr.common import Parameters, Metrics -from flwr.server.strategy import Strategy -from flwr.server.client_manager import ClientManager -from flwr.server.client_proxy import ClientProxy -from flwr.server import ServerApp, ServerConfig -from flwr.server.strategy import FedAvg, FedMedian, FedOpt, FedAdam, FedProx, QFedAvg -import argparse -import tensorflow as tf -import flwr - -# Create the folder and generate the CSV filename dynamically -def create_csv_filepath(strategy_name: str, num_rounds: int) -> str: - # Determine the parent directory of the current script - parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - - # # Create the "Log_autres" directory if it doesn't exist - # log_dir = os.path.join(parent_dir, "Log_autres") - # os.makedirs(log_dir, exist_ok=True) - - log_dir = args.log_dir - if log_dir is None: - log_dir = os.path.join(parent_dir, "Log_autres") - os.makedirs(log_dir, exist_ok=True) - - # Generate a timestamp for the file - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - - # Generate the CSV file path - csv_filename = f"training_results_{strategy_name}_{num_rounds}_{timestamp}.csv" - csv_filepath = os.path.join(log_dir, csv_filename) - - return csv_filepath - -class CustomStrategyWrapper(Strategy): - def __init__(self, strategy: Strategy, csv_filename: str): - self.strategy = strategy - self.csv_filename = csv_filename - - # Initialize CSV file and write header - with open(self.csv_filename, mode="w", newline="") as f: - writer = csv.writer(f) - writer.writerow(["round", "time", "accuracy"]) # Header - - def initialize_parameters(self, client_manager: ClientManager) -> Optional[Parameters]: - return self.strategy.initialize_parameters(client_manager) - - def configure_fit( - self, server_round: int, parameters: Parameters, client_manager: ClientManager - ): - # Log the round start time - self.round_start_time = time.time() - return self.strategy.configure_fit(server_round, parameters, client_manager) - - def aggregate_fit( - self, - server_round: int, - results: List[Tuple[ClientProxy, Metrics]], - failures: List[BaseException], - ): - # Call the actual strategy's aggregate_fit - return self.strategy.aggregate_fit(server_round, results, failures) - - def configure_evaluate( - self, server_round: int, parameters: Parameters, client_manager: ClientManager - ): - return self.strategy.configure_evaluate(server_round, parameters, client_manager) - - def aggregate_evaluate( - self, - server_round: int, - results: List[Tuple[ClientProxy, Metrics]], - failures: List[BaseException], - ): - # Call the actual strategy's aggregate_evaluate - aggregated_metrics = self.strategy.aggregate_evaluate(server_round, results, failures) - - # Calculate round duration - round_time = time.time() - self.round_start_time - - # Extract accuracy from aggregated_metrics - accuracy = aggregated_metrics[1].get("accuracy", 0) if aggregated_metrics else 0 - - # Append data to CSV - with open(self.csv_filename, mode="a", newline="") as f: - writer = csv.writer(f) - writer.writerow([server_round, round_time, accuracy]) - - return aggregated_metrics - - def evaluate( - self, server_round: int, parameters: Parameters - ) -> Optional[Tuple[float, Metrics]]: - return self.strategy.evaluate(server_round, parameters) - - -# Create the argument parser -parser = argparse.ArgumentParser(description="Server setting for FLWR") - -# Add the argument for num_rounds (-r or --rounds) -parser.add_argument("-r", "--rounds", type=int, default=10, help="Number of rounds") - -# Add the argument for strategy (-s or --strategy) -parser.add_argument( - "-s", - "--strategy", - type=str, - default="fedAvg", - choices=["fedAvg", "fedMedian", "fedOpt", "fedAdam", "fedAvg2Clients", "fedProx", "QfedAvg"], # Add options strategies - help="Select strategy: fedAvg, fedMedian, fedOpt, fedAdam, fedProx, QfedAvg or fedAvg2Clients", -) - -parser.add_argument("-l", "--log_dir", type=str, default=None, help="Directory to save logs") - -# Parse the arguments from the command line -args = parser.parse_args() - -# Define metric aggregation function -def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics: - # Multiply accuracy of each client by number of examples used - accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics] - examples = [num_examples for num_examples, _ in metrics] - - # Aggregate and return custom metric (weighted average) - return {"accuracy": sum(accuracies) / sum(examples)} - -# Load model for server-side parameter initialization -model = tf.keras.applications.MobileNetV2( - input_shape=(32, 32, 3), weights=None, classes=10 -) -model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"]) - -# Get model weights as a list of NumPy ndarray's -weights = model.get_weights() - -# Serialize ndarrays to `Parameters` -parameters = flwr.common.ndarrays_to_parameters(weights) - -# Select the base strategy -if args.strategy == "fedAvg": - # baseline strategy - base_strategy = FedAvg( - evaluate_metrics_aggregation_fn=weighted_average) -elif args.strategy == "fedAvg2Clients": - # Custom client num each round - base_strategy = FedAvg( - fraction_fit=0.67, # 67% of clients agg/fit each round - min_fit_clients=2, # At least 2 clients fit each round - min_available_clients=2, # At least 2 clients available each round - evaluate_metrics_aggregation_fn=weighted_average, - ) -elif args.strategy == "fedMedian": - # robustness strategy - base_strategy = FedMedian( - evaluate_metrics_aggregation_fn=weighted_average) -elif args.strategy == "fedOpt": - # server optimization strategy - base_strategy = FedOpt( - evaluate_metrics_aggregation_fn=weighted_average, - initial_parameters=parameters - ) -elif args.strategy == "fedAdam": - # server optimization strategy use Adam - base_strategy = FedAdam( - evaluate_metrics_aggregation_fn=weighted_average, - initial_parameters=parameters - ) -elif args.strategy == "fedProx": - # regularization strategy - base_strategy = FedProx( - evaluate_metrics_aggregation_fn=weighted_average, - initial_parameters=parameters, - proximal_mu = 0.1 - ) -elif args.strategy == "QFedAvg": - # improve fairness, reduce var_loss strategy - base_strategy = QFedAvg( - evaluate_metrics_aggregation_fn=weighted_average, - initial_parameters=parameters - ) - - -# Generate the CSV file path based on strategy and number of rounds -csv_filepath = create_csv_filepath(strategy_name=args.strategy, num_rounds=args.rounds) - -# Wrap the selected strategy with the logging wrapper -strategy = CustomStrategyWrapper(strategy=base_strategy, csv_filename=csv_filepath) - -# Define config -config = ServerConfig(num_rounds=args.rounds) - -# Legacy mode -if __name__ == "__main__": - from flwr.server import start_server - - start_server( - server_address="0.0.0.0:8080", - config=config, - strategy=strategy, - ) diff --git a/README.md b/README.md deleted file mode 100644 index 2c46cecfc818e3490a03da3f8c9870a8a25a2dbf..0000000000000000000000000000000000000000 --- a/README.md +++ /dev/null @@ -1,267 +0,0 @@ -# Measure Energy of Flower FL in G5K - -This project provides tools to measure the energy consumption of Flower-based federated learning (FL) experiments on the Grid'5000 (G5K) testbed. It includes scripts to manage distributed nodes, run FL experiments, and monitor energy usage. - -## Table of Contents - -- [Getting Started](#getting-started) -- [Installation](#installation) -- [Usage](#usage) - - [FL framework](#fl-framework) - - [Configure instances for CPU](#configure-instance-for-cpu) - - [Configure instances for GPU](#configure-instance-for-gpu-developing) - - [Run exp](#run-exp) -- [Quickstart](#quickstart) - - [Step 1. Reserve the Hosts in G5K](#step-1-reserve-the-hosts-in-g5k) - - [Step 2. Configure](#step-2-configure) - - [Step 3. Collect IP](#step-3-collect-ip) - - [Step 4. Run the Campaign or Instance](#step-4-run-the-campaign-or-single-instance) - - [Step 5. Output](#step-5-output) - - [Step 6. Clean Up](#step-6-clean-up) -- [License](#license) - -## Getting Started - -The repository includes an example of Flower (using TensorFlow) in the `Flower_v1` directory and the source of measuring framework in `Run`. This example demonstrates how to use this framework to measure energy consumption. - -## Installation - -Clone the repository and navigate to the `eflwr` directory: - -```bash -git clone https://gitlab.irit.fr/sepia-pub/delight/eflwr.git -cd eflwr -``` - -This framework requires: -- **Python 3.9.2** or higher. -- Additional dependencies listed in `requirements.txt`. Install them with: - ```bash - pip install -r requirements.txt - ``` -*Note:* `requirements.txt` includes `tensorflow`, `tensorflow-datasets` `scikit-learn` and `numpy` using for the provided Flower example. - -Navigate to `Run` directory: - -```bash -cd Run -``` - -## Usage - -### FL framework -FL scripts (includes server and client scripts) can be updated, example in dir `Flower_v1`. - -### Configure instance for CPU -Configure instances of experiment in a json format, structure is shown below. - - - **instances** includes **"1"**, **"2"** ,... are identifies of each instance. - - **instance**: name of instance. - - **output_dir**: location stores the output files (experiment log and energy monitoring output). - - **dvfs_cpu**: choose only one in 3 settings. - - `dummy`: for testing in min and max CPU freq (`false` or `true`). - - `baseline`: for testing in max CPU freq (`false` or `true`). - - `frequencies`: Limits to the provided list of frequencies (`null` or `int list []`). - - **Remark:** check the available frequencies before using oftion `frequencies`. - - - Set the permissions and disable Turbo Boost first: - ```bash - bash "$(python3 -c "import expetator, os; print(os.path.join(os.path.dirname(expetator.__file__), 'leverages', 'dvfs_pct.sh'))")" init - ``` - - Run this command to get available 4 frequencies (min, max, 2 in middle): - ```bash - python3 get_freq.py - ``` - - Update extraced frequencies value to configure files. - - Structure of json config: - ```json - { - "instances": { - "1": { - "instance": "", - "output_dir": "", - "dvfs_cpu": { - "dummy": true, - "baseline": false, - "frequencies": null - }, - "server": { - "command": "python3", - "args": [ - ], - "ip": "", - "modules": ["logger"], - "port": 8080 - }, - "clients": [ - { - "name": "client1", - "command": "python3", - "args": [ - ], - "ip": "" - }, - {...}, - {...} - ] - }, - "2": { - "instance": "", - ... - } - } - } - ``` - -### Configure instance for GPU (*developing) - -- The configuration is as same CPU, except **dvfs** role. In GPU config, the role is **dvfs_gpu**. -Choose only one in 3 settings: - - `dummy`: for testing in min and max GPU freq (`false` or `true`). - - `baseline`: for testing in max GPU freq (`false` or `true`). - - last setting includes 3 parameters: test with the all freqs in the range. To disable this setting, set the `zoomfrom` and `zoomto` are same values. - - `steps`: steps to jump in range/window of frequencies (int). - - `zoomfrom`: freq start - - `zoomto`: freq stop - *example:* - - list of freq available [1, 1.1, 1.2, 1.9, 2.5, 2.7] GHz - - with `zoomfrom` = 1.1, `zoomto` = 2.7 and `steps` = 2 - - list of tested freq returns [1.1, 1.9, 2,7] - - **Remark:** check the available frequencies before using option `frequencies`. - Run below cmd: - ```bash - nvidia-smi -i 0 --query-supported-clocks=gr --format=csv,noheader,nounits | tr '\n' ' ' - ``` - - ```json - "dvfs_gpu": { - "dummy": true, - "baseline": false, - "steps": 2, - "zoomfrom": 0, - "zoomto": 0 - }, - ``` - -### Run exp -2 options of experiment: run single instance or all instances (a campaign). - - <u>Run single instance</u>: - ```bash - python3 measure_instance.py -c [config_file] -i [instance] -x [experiment_name] -r [repetitions] - ``` - - - **[config_file]**: The instances configuration file. - - **[instance]** : Identify number of single instance. - - **[experiment_name]**: The name you use to identify your experiment. - - **[repetitions]**: Number of repetitions for the experiment. - - <u>Run campaign</u>: - ```bash - python3 measure_campaign.py -x [experiment_name] -c [config_file] -r [repetitions] - ``` - For campaign running, all instances which were defined in **[config_file]** will be used. - -## Quickstart - -### Step 1. Reserve the Hosts in G5K - -Reserve the required number of hosts (*See the [document of G5K](https://www.grid5000.fr/w/Getting_Started#Reserving_resources_with_OAR:_the_basics) for more details*) -<u>For example</u>: - -Reserve 4 hosts (CPU) (1 server + 3 clients) for 2 hours: -```bash -oarsub -I -l host=4,walltime=2 -``` -Reserve 4 hosts (GPU) (1 server + 3 clients) for 2 hours: -```bash -oarsub -I -t exotic -p "gpu_count>0" -l {"cluster='drac'"}/host=4 # grenoble -``` - -**Remark**: for now only 2 clusters, `chifflot` in Lille and `drac` in Grenoble are available for testing in more than 3 GPU nodes, maximum is 8 (`chifflot`) or 12 (`drac`) nodes. - -Make sure your are in`eflwr/Run/`: -```bash -cd Run -``` - -### Step 2. Configure -Two JSON configuration files (e.g. `config_instances_CPU.json` for CPU and `config_instances_GPU.json` for GPU) to specify experiment details includes one or more instances. - -```bash -cat config_instances_CPU.json -``` - -<u>For example</u>: `config_instances_CPU.json` provides two examples of instance configuration. -- instance "`1`": fedAvg, cifar10, dvfs with min and max CPU freq, 1 round. -- instance "`2`": fedAvg2Clients, cifar10, dvfs with min and max CPU freq, 1 round. - -### Step 3. Collect IP - -Run the following command to collect/generate a node list: -```bash -uniq $OAR_NODEFILE > nodelist -``` - -Automatically populate missing IP addresses in the JSON file: -```bash -python3 collect_ip.py -n nodelist -c config_instances_CPU.json -``` - -### Step 4. Run the Campaign or Single Instance - -Run single instance with instance `1`, and 2 repetitions: -```bash -python3 measure_instance.py -x SingleTest -c config_instances_CPU.json -i 1 -r 2 -``` - -Run a campaign with all instances (`1` and `2`), and 2 repetitions: -```bash -python3 measure_campaign.py -x CampaignTest -c config_instances_CPU.json -r 2 -``` - -**Note**: Running single instance takes about 6 mins (1 round (80s) * 2 repetitions * 2 freqs = 320s). Running a campaign (2 instances) takes about 12 mins. - -### Step 5. Output - -The logs and energy monitoring data will be saved in the directory specified in the JSON configuration. - -Output dir structure for demo single instance: Log/Flower_SingleTest/Flower_instance_Flower_instance_fedAvg_cifar10 - -```plaintext -Log/Flower_SingleTest -├── Flower_instance_Flower_instance_fedAvg_cifar10 -│ ├── Expetator -| | ├── config_instance_1.json -│ ├── Expetator_<host_info>_<timestamp>_mojitos: mojitos outputs -│ ├── Expetator_<host_info>_<timestamp>_power: wattmetter outputs -│ ├── Expetator_<host_info>_<timestamp>: measurement log -│ ├── Flwr_<timestamp>: Flower log -│ │ ├── Client_<ip> -│ │ ├── Server_<ip> -│ │ ├── training_results_<instance_name>_<time>.csv -│ ├── Flwr_<timestamp> -│ │ ├── Client_<ip> -│ │ ├── Server_<ip> -... -``` -Output dir structure for demo campaign, includes 2 folders for 2 instances: - -```plaintext -Log/Flower_CampaignTest -├── Flower_instance_Flower_instance_fedAvg_cifar10 -├── Flower_instance_Flower_instance_fedAvg2Clients_cifar10 -... -### Step 6. Clean Up - -After the experiment, exit the host and kill job if needed: - ```bash - exit - oardel <job_id> - ``` - -## License - -This project is licensed under [GPLv3]. \ No newline at end of file diff --git a/Run/config_instances.json b/Run/config_instances.json index 8617efb40033c33a1a4204eea49959decce1b721..76dfc67b24614f6d630b7f342f49bba89dc1424d 100644 --- a/Run/config_instances.json +++ b/Run/config_instances.json @@ -1,114 +1,69 @@ { "instances": { "1": { - "instance": "fedAvg_cifar10", - "output_dir": "./Log", - "dvfs_cpu": { - "dummy": false, - "baseline": false, - "frequencies": [1200000,1500000,1800000,2100000] - }, - "server": { - "command": "python3", - "args": [ - "./Flower_v1/server_1.py", - "-r 15", - "-s fedAvg" - ], - "ip": "172.16.66.76", - "modules": ["logger"], - "port": 8080 - }, "clients": [ - { - "name": "client1", - "command": "python3", - "args": [ - "./Flower_v1/client_1.py", - "cifar10", - "0", - "3" - ], - "ip": "172.16.66.77" - }, - { - "name": "client2", - "command": "python3", - "args": [ - "./Flower_v1/client_1.py", - "cifar10", - "1", - "3" - ], - "ip": "172.16.66.78" - }, - { - "name": "client3", - "command": "python3", - "args": [ - "./Flower_v1/client_1.py", - "cifar10", - "2", - "3" - ], - "ip": "172.16.66.79" - } - ] - }, - "2": { - "instance": "fedAvg2Clients_cifar10", - "output_dir": "./Log", + { + "args": [ + "./FedAvg/run_client.py", + "0" + ], + "command": "python3", + "ip": "172.16.52.14", + "name": "client0" + }, + { + "args": [ + "./FedAvg/run_client.py", + "1" + ], + "command": "python3", + "ip": "172.16.52.15", + "name": "client1" + }, + { + "args": [ + "./FedAvg/run_client.py", + "2" + ], + "command": "python3", + "ip": "172.16.48.11", + "name": "client2" + }, + { + "args": [ + "./FedAvg/run_client.py", + "3" + ], + "command": "python3", + "ip": "172.16.48.15", + "name": "client3" + }, + { + "args": [ + "./FedAvg/run_client.py", + "4" + ], + "command": "python3", + "ip": "172.16.48.7", + "name": "client4" + } + ], "dvfs_cpu": { - "dummy": false, "baseline": false, - "frequencies": [1200000,1500000,1800000,2100000] + "dummy": false, + "frequencies": null }, + "instance": "FedAvg", + "output_dir": "./meter_logs", "server": { - "command": "python3", "args": [ - "./Flower_v1/server_1.py", - "-r 15", - "-s fedAvg2Clients" + "./FedAvg/run_server.py" ], - "ip": "172.16.66.76", - "modules": ["logger"], - "port": 8080 - }, - "clients": [ - { - "name": "client1", "command": "python3", - "args": [ - "./Flower_v1/client_1.py", - "cifar10", - "0", - "3" - ], - "ip": "172.16.66.77" - }, - { - "name": "client2", - "command": "python3", - "args": [ - "./Flower_v1/client_1.py", - "cifar10", - "1", - "3" - ], - "ip": "172.16.66.78" - }, - { - "name": "client3", - "command": "python3", - "args": [ - "./Flower_v1/client_1.py", - "cifar10", - "2", - "3" - ], - "ip": "172.16.66.79" + "ip": "172.16.51.3", + "modules": [], + "port": 8080 } - ] } } } \ No newline at end of file diff --git a/Run/config_instances_CPU.json b/Run/config_instances_CPU.json deleted file mode 100644 index 86c1b1a6cc8e395c60221665bb60a9bbdcbf7550..0000000000000000000000000000000000000000 --- a/Run/config_instances_CPU.json +++ /dev/null @@ -1,114 +0,0 @@ -{ - "instances": { - "1": { - "instance": "fedAvg_cifar10", - "output_dir": "./Log", - "dvfs_cpu": { - "dummy": true, - "baseline": false, - "frequencies": null - }, - "server": { - "command": "python3", - "args": [ - "./Flower_v1/server_1.py", - "-r 1", - "-s fedAvg" - ], - "ip": "172.16.66.76", - "modules": ["logger"], - "port": 8080 - }, - "clients": [ - { - "name": "client1", - "command": "python3", - "args": [ - "./Flower_v1/client_1.py", - "cifar10", - "0", - "3" - ], - "ip": "172.16.66.77" - }, - { - "name": "client2", - "command": "python3", - "args": [ - "./Flower_v1/client_1.py", - "cifar10", - "1", - "3" - ], - "ip": "172.16.66.78" - }, - { - "name": "client3", - "command": "python3", - "args": [ - "./Flower_v1/client_1.py", - "cifar10", - "2", - "3" - ], - "ip": "172.16.66.79" - } - ] - }, - "2": { - "instance": "fedAvg2Clients_cifar10", - "output_dir": "./Log", - "dvfs_cpu": { - "dummy": true, - "baseline": false, - "frequencies": null - }, - "server": { - "command": "python3", - "args": [ - "./Flower_v1/server_1.py", - "-r 1", - "-s fedAvg2Clients" - ], - "ip": "172.16.66.76", - "modules": ["logger"], - "port": 8080 - }, - "clients": [ - { - "name": "client1", - "command": "python3", - "args": [ - "./Flower_v1/client_1.py", - "cifar10", - "0", - "3" - ], - "ip": "172.16.66.77" - }, - { - "name": "client2", - "command": "python3", - "args": [ - "./Flower_v1/client_1.py", - "cifar10", - "1", - "3" - ], - "ip": "172.16.66.78" - }, - { - "name": "client3", - "command": "python3", - "args": [ - "./Flower_v1/client_1.py", - "cifar10", - "2", - "3" - ], - "ip": "172.16.66.79" - } - ] - } - } -} \ No newline at end of file diff --git a/Run/config_instances_GPU.json b/Run/config_instances_GPU.json deleted file mode 100644 index ecde3587356087c559fab914ea35199164067859..0000000000000000000000000000000000000000 --- a/Run/config_instances_GPU.json +++ /dev/null @@ -1,116 +0,0 @@ -{ - "instances": { - "1": { - "instance": "fedAvg_cifar10", - "output_dir": "/home/mdo/Framework/eflwr/Log", - "dvfs_gpu": { - "dummy": true, - "baseline": false, - "steps": 2, - "zoomfrom": 0, - "zoomto": 0 - }, - "server": { - "command": "python3", - "args": [ - "/home/mdo/Framework/eflwr/Flower_v1/server_1.py", - "-r 1", - "-s fedAvg" - ], - "ip": "172.16.66.76", - "port": 8080 - }, - "clients": [ - { - "name": "client1", - "command": "python3", - "args": [ - "/home/mdo/Framework/eflwr/Flower_v1/client_1.py", - "cifar10", - "1", - "3" - ], - "ip": "172.16.66.77" - }, - { - "name": "client2", - "command": "python3", - "args": [ - "/home/mdo/Framework/eflwr/Flower_v1/client_1.py", - "cifar10", - "2", - "3" - ], - "ip": "172.16.66.78" - }, - { - "name": "client3", - "command": "python3", - "args": [ - "/home/mdo/Framework/eflwr/Flower_v1/client_1.py", - "cifar10", - "3", - "3" - ], - "ip": "172.16.66.79" - } - ] - }, - "2": { - "instance": "fedAvg2Clients_cifar10", - "output_dir": "/home/mdo/Framework/eflwr/Log", - "dvfs_gpu": { - "dummy": true, - "baseline": false, - "steps": 2, - "zoomfrom": 0, - "zoomto": 0 - }, - "server": { - "command": "python3", - "args": [ - "/home/mdo/Framework/eflwr/Flower_v1/server_1.py", - "-r 1", - "-s fedAvg2Clients" - ], - "ip": "172.16.66.76", - "port": 8080 - }, - "clients": [ - { - "name": "client1", - "command": "python3", - "args": [ - "/home/mdo/Framework/eflwr/Flower_v1/client_1.py", - "cifar10", - "1", - "3" - ], - "ip": "172.16.66.77" - }, - { - "name": "client2", - "command": "python3", - "args": [ - "/home/mdo/Framework/eflwr/Flower_v1/client_1.py", - "cifar10", - "2", - "3" - ], - "ip": "172.16.66.78" - }, - { - "name": "client3", - "command": "python3", - "args": [ - "/home/mdo/Framework/eflwr/Flower_v1/client_1.py", - "cifar10", - "3", - "3" - ], - "ip": "172.16.66.79" - } - ] - } - } -} diff --git a/Run/nodelist b/Run/nodelist index 4b73aa277fb912e45e63caba9e334ed60300b096..21d6ebf3ad826b0c2f8caf01884dc7fd7fe772b3 100644 --- a/Run/nodelist +++ b/Run/nodelist @@ -1,4 +1,6 @@ -gros-76.nancy.grid5000.fr -gros-77.nancy.grid5000.fr -gros-78.nancy.grid5000.fr -gros-79.nancy.grid5000.fr +hercule-3.lyon.grid5000.fr +nova-14.lyon.grid5000.fr +nova-15.lyon.grid5000.fr +taurus-11.lyon.grid5000.fr +taurus-15.lyon.grid5000.fr +taurus-7.lyon.grid5000.fr diff --git a/environment.yml b/environment.yml new file mode 100644 index 0000000000000000000000000000000000000000..fe84b870edb7b47f1a85736dd3238068107034bd --- /dev/null +++ b/environment.yml @@ -0,0 +1,194 @@ +name: FL +channels: + - conda-forge + - defaults +dependencies: + - _libgcc_mutex=0.1=main + - _openmp_mutex=5.1=1_gnu + - abseil-cpp=20211102.0=hd4dd3e8_0 + - altair=5.0.1=py312h06a4308_0 + - arrow-cpp=14.0.2=h374c478_1 + - attrs=23.1.0=py312h06a4308_0 + - aws-c-auth=0.6.19=h5eee18b_0 + - aws-c-cal=0.5.20=hdbd6064_0 + - aws-c-common=0.8.5=h5eee18b_0 + - aws-c-compression=0.2.16=h5eee18b_0 + - aws-c-event-stream=0.2.15=h6a678d5_0 + - aws-c-http=0.6.25=h5eee18b_0 + - aws-c-io=0.13.10=h5eee18b_0 + - aws-c-mqtt=0.7.13=h5eee18b_0 + - aws-c-s3=0.1.51=hdbd6064_0 + - aws-c-sdkutils=0.1.6=h5eee18b_0 + - aws-checksums=0.1.13=h5eee18b_0 + - aws-crt-cpp=0.18.16=h6a678d5_0 + - aws-sdk-cpp=1.10.55=h721c034_0 + - blas=1.0=mkl + - blinker=1.6.2=py312h06a4308_0 + - boost-cpp=1.82.0=hdb19cb5_2 + - bottleneck=1.3.7=py312ha883a20_0 + - brotli=1.0.9=h5eee18b_8 + - brotli-bin=1.0.9=h5eee18b_8 + - brotli-python=1.0.9=py312h6a678d5_8 + - bzip2=1.0.8=h5eee18b_5 + - c-ares=1.19.1=h5eee18b_0 + - ca-certificates=2024.7.2=h06a4308_0 + - cachetools=5.3.3=py312h06a4308_0 + - certifi=2024.7.4=py312h06a4308_0 + - charset-normalizer=2.0.4=pyhd3eb1b0_0 + - click=8.1.7=py312h06a4308_0 + - contourpy=1.2.0=py312hdb19cb5_0 + - expat=2.5.0=h6a678d5_0 + - freetype=2.12.1=h4a9f257_0 + - gflags=2.2.2=h6a678d5_1 + - gitdb=4.0.7=pyhd3eb1b0_0 + - gitpython=3.1.37=py312h06a4308_0 + - glog=0.5.0=h6a678d5_1 + - grpc-cpp=1.48.2=he1ff14a_1 + - icu=73.1=h6a678d5_0 + - idna=3.7=py312h06a4308_0 + - intel-openmp=2023.1.0=hdb19cb5_46306 + - jpeg=9e=h5eee18b_1 + - jsonschema=4.19.2=py312h06a4308_0 + - jsonschema-specifications=2023.7.1=py312h06a4308_0 + - krb5=1.20.1=h143b758_1 + - lcms2=2.12=h3be6417_0 + - ld_impl_linux-64=2.38=h1181459_1 + - lerc=3.0=h295c915_0 + - libboost=1.82.0=h109eef0_2 + - libbrotlicommon=1.0.9=h5eee18b_8 + - libbrotlidec=1.0.9=h5eee18b_8 + - libbrotlienc=1.0.9=h5eee18b_8 + - libcurl=8.7.1=h251f7ec_0 + - libdeflate=1.17=h5eee18b_1 + - libedit=3.1.20230828=h5eee18b_0 + - libev=4.33=h7f8727e_1 + - libevent=2.1.12=hdbd6064_1 + - libffi=3.4.4=h6a678d5_0 + - libgcc-ng=11.2.0=h1234567_1 + - libgfortran-ng=11.2.0=h00389a5_1 + - libgfortran5=11.2.0=h1234567_1 + - libgomp=11.2.0=h1234567_1 + - libnghttp2=1.57.0=h2d74bed_0 + - libpng=1.6.39=h5eee18b_0 + - libprotobuf=3.20.3=he621ea3_0 + - libssh2=1.11.0=h251f7ec_0 + - libstdcxx-ng=11.2.0=h1234567_1 + - libthrift=0.15.0=h1795dd8_2 + - libtiff=4.5.1=h6a678d5_0 + - libuuid=1.41.5=h5eee18b_0 + - libwebp-base=1.3.2=h5eee18b_0 + - lz4-c=1.9.4=h6a678d5_1 + - markdown-it-py=2.2.0=py312h06a4308_1 + - matplotlib-base=3.9.2=py312h66fe004_0 + - mdurl=0.1.0=py312h06a4308_0 + - mkl=2023.1.0=h213fc3f_46344 + - mkl-service=2.4.0=py312h5eee18b_1 + - mkl_fft=1.3.8=py312h5eee18b_0 + - mkl_random=1.2.4=py312hdb19cb5_0 + - mpld3=0.5.9=py312h06a4308_0 + - ncurses=6.4=h6a678d5_0 + - numexpr=2.8.7=py312hf827012_0 + - numpy=1.26.4=py312hc5e2394_0 + - numpy-base=1.26.4=py312h0da6c21_0 + - openjpeg=2.4.0=h3ad879b_0 + - openssl=3.0.14=h5eee18b_0 + - orc=1.7.4=hb3bc3d3_1 + - packaging=23.2=py312h06a4308_0 + - pandas=2.2.1=py312h526ad5a_0 + - pip=23.3.1=py312h06a4308_0 + - pyarrow=14.0.2=py312hb107042_0 + - pydeck=0.8.0=py312h06a4308_2 + - pygments=2.15.1=py312h06a4308_1 + - pysocks=1.7.1=py312h06a4308_0 + - python=3.12.2=h996f2a0_0 + - python-dateutil=2.9.0post0=py312h06a4308_2 + - python-tzdata=2023.3=pyhd3eb1b0_0 + - pytz=2024.1=py312h06a4308_0 + - pyyaml=6.0.1=py312h5eee18b_0 + - re2=2022.04.01=h295c915_0 + - readline=8.2=h5eee18b_0 + - referencing=0.30.2=py312h06a4308_0 + - requests=2.32.2=py312h06a4308_0 + - rich=13.3.5=py312h06a4308_1 + - rpds-py=0.10.6=py312hb02cf49_0 + - s2n=1.3.27=hdbd6064_0 + - scipy=1.12.0=py312hc5e2394_0 + - setuptools=68.2.2=py312h06a4308_0 + - six=1.16.0=pyhd3eb1b0_1 + - smmap=4.0.0=pyhd3eb1b0_0 + - snappy=1.1.10=h6a678d5_1 + - sqlite=3.41.2=h5eee18b_0 + - streamlit=1.32.0=py312h06a4308_0 + - tbb=2021.8.0=hdb19cb5_0 + - tenacity=8.2.2=py312h06a4308_1 + - tk=8.6.12=h1ccaba5_0 + - toml=0.10.2=pyhd3eb1b0_0 + - toolz=0.12.0=py312h06a4308_0 + - tornado=6.3.3=py312h5eee18b_0 + - typing_extensions=4.11.0=py312h06a4308_0 + - tzdata=2024a=h04d1e81_0 + - unicodedata2=15.1.0=py312h5eee18b_0 + - urllib3=2.2.1=py312h06a4308_0 + - utf8proc=2.6.1=h5eee18b_1 + - watchdog=4.0.1=py312h06a4308_0 + - wheel=0.41.2=py312h06a4308_0 + - xz=5.4.6=h5eee18b_0 + - yaml=0.2.5=h7b6447c_0 + - zlib=1.2.13=h5eee18b_0 + - zstd=1.5.5=hc292b87_2 + - pip: + - cffi==1.17.1 + - clarabel==0.9.0 + - colorama==0.4.6 + - cryptography==42.0.8 + - cvxpy==1.6.0 + - cycler==0.12.1 + - filelock==3.13.1 + - flwr==1.12.0 + - fonttools==4.49.0 + - fsspec==2024.2.0 + - grpcio==1.70.0 + - grpcio-tools==1.70.0 + - iterators==0.0.2 + - jinja2==3.1.3 + - kiwisolver==1.4.5 + - markupsafe==2.1.5 + - matplotlib==3.8.3 + - mpmath==1.3.0 + - networkx==3.2.1 + - nvidia-cublas-cu12==12.1.3.1 + - nvidia-cuda-cupti-cu12==12.1.105 + - nvidia-cuda-nvrtc-cu12==12.1.105 + - nvidia-cuda-runtime-cu12==12.1.105 + - nvidia-cudnn-cu12==8.9.2.26 + - nvidia-cufft-cu12==11.0.2.54 + - nvidia-curand-cu12==10.3.2.106 + - nvidia-cusolver-cu12==11.4.5.107 + - nvidia-cusparse-cu12==12.1.0.106 + - nvidia-nccl-cu12==2.19.3 + - nvidia-nvjitlink-cu12==12.3.101 + - nvidia-nvtx-cu12==12.1.105 + - osqp==0.6.7.post3 + - pathspec==0.12.1 + - pillow==10.2.0 + - prettytable==3.10.0 + - protobuf==5.29.3 + - pycparser==2.22 + - pycryptodome==3.21.0 + - pyparsing==3.1.1 + - qdldl==0.1.7.post4 + - scs==3.2.7 + - seaborn==0.13.2 + - shellingham==1.5.4 + - sympy==1.12 + - tomli==2.0.2 + - tomli-w==1.1.0 + - torch==2.2.1 + - torchaudio==2.2.1 + - torchvision==0.17.1 + - tqdm==4.66.2 + - typer==0.12.5 + - typing-extensions==4.10.0 + - wcwidth==0.2.13 + - expetator==0.3.25 +prefix: /home/adabaja/.../anaconda3/envs/FL diff --git a/generate_config_for_mia.py b/generate_config_for_mia.py new file mode 100644 index 0000000000000000000000000000000000000000..94e1d43570747b1318b8a13c312b5d57368bdae9 --- /dev/null +++ b/generate_config_for_mia.py @@ -0,0 +1,50 @@ +import json +from FedAvg.config.config import * +from FedAvg.config.hyperparameters import * + +def generate_federated_config(): + num_clients = NUM_CLIENTS + server_port = SERVER_CONFIG["port"] + + config_structure = { + "instances": { + "1": { + "instance": "FedAvg", + "output_dir": "./meter_logs", + "dvfs_cpu": { + "dummy": False, + "baseline": False, + "frequencies": None + }, + "server": { + "command": "python3", + "args": ["./FedAvg/run_server.py"], + "modules": [], + "ip": "", + "port": server_port + }, + "clients": [] + } + } + } + + for i in range(num_clients): + client_entry = { + "name": f"client{i}", + "command": "python3", + "args": ["./FedAvg/run_client.py", str(i)], + "ip": "" # IP can be assigned dynamically if needed + } + config_structure["instances"]["1"]["clients"].append(client_entry) + + return config_structure + +# Generate the configuration +federated_config = generate_federated_config() + +# Save to a JSON file +with open("Run/config_instances.json", "w") as json_file: + json.dump(federated_config, json_file, indent=4) + +print("Configuration file generated") + diff --git a/remaining_steps.sh b/remaining_steps.sh new file mode 100644 index 0000000000000000000000000000000000000000..4f28e033955d38be9f77c338e7b966b630c4e51a --- /dev/null +++ b/remaining_steps.sh @@ -0,0 +1,10 @@ +# Step 5: Prepare run environment +cd Run || { echo "Directory 'Run' not found"; exit 1; } +uniq $OAR_NODEFILE > nodelist || { echo "Failed to create nodelist"; exit 1; } + +# Step 6: Run simulation scripts +echo "Running simulation..." +python3 collect_ip.py -n nodelist -c config_instances.json || { echo "Failed to collect IPs"; exit 1; } +python3 measure_instance.py -x debug_test -c config_instances.json -i 1 -r 1 || { echo "Measurement script failed"; exit 1; } + +echo "Simulation completed successfully!" diff --git a/requirements.txt b/requirements.txt index b8e774cdb26bd1b4774c0b23893f30253d6149c9..ec95cc1fe1c53bc92aebc72bbc1b2a28cdbc4ae7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,55 @@ -flwr==1.13.0 -flwr-datasets==0.4.0 +cffi==1.17.1 +clarabel==0.9.0 +colorama==0.4.6 +cryptography==42.0.8 +cvxpy==1.6.0 +cycler==0.12.1 +filelock==3.13.1 +flwr==1.12.0 +fonttools==4.49.0 +fsspec==2024.2.0 +grpcio==1.70.0 +grpcio-tools<1.70.0 +iterators==0.0.2 +jinja2==3.1.3 +kiwisolver==1.4.5 +markupsafe==2.1.5 +matplotlib==3.9.3 +mpmath==1.3.0 +networkx==3.2.1 +nvidia-cublas-cu12==12.1.3.1 +nvidia-cuda-cupti-cu12==12.1.105 +nvidia-cuda-nvrtc-cu12==12.1.105 +nvidia-cuda-runtime-cu12==12.1.105 +nvidia-cudnn-cu12==8.9.2.26 +nvidia-cufft-cu12==11.0.2.54 +nvidia-curand-cu12==10.3.2.106 +nvidia-cusolver-cu12==11.4.5.107 +nvidia-cusparse-cu12==12.1.0.106 +nvidia-nccl-cu12==2.19.3 +nvidia-nvjitlink-cu12==12.3.101 +nvidia-nvtx-cu12==12.1.105 +osqp==0.6.7.post3 +pathspec==0.12.1 +pillow==10.2.0 +prettytable==3.10.0 +protobuf==4.25.3 +pycparser==2.22 +pycryptodome==3.21.0 +pyparsing==3.1.1 +qdldl==0.1.7.post4 +scs==3.2.7 +seaborn==0.13.2 +shellingham==1.5.4 +sympy==1.12 +tomli==2.0.2 +tomli-w==1.1.0 +torch==2.2.1 +torchaudio==2.2.1 +torchvision==0.17.1 +tqdm==4.66.2 +typer==0.12.5 +typing_extensions==4.11.0 +wcwidth==0.2.13 expetator==0.3.25 -tensorflow>=2.16.1,<2.17.0 -tensorflow-datasets==4.4.0 -tensorboard>=2.16.2,<2.17.0 -scikit-learn>=1.6.1 -numpy>=1.26.0,<1.27.0 \ No newline at end of file +numpy==1.26.4 diff --git a/run_test.sh b/run_test.sh new file mode 100644 index 0000000000000000000000000000000000000000..c33073e8768cfacd58cbc0931fba75992a193dea --- /dev/null +++ b/run_test.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +# Step 1: Install dependencies +echo "Installing dependencies..." +pip install -r requirements.txt || { echo "Failed to install requirements"; exit 1; } + +# Step 2: Generate datasets +echo "Generating datasets..." +cd FedAvg || { echo "Directory 'FedAvg' not found"; exit 1; } +python3 generate_datasets.py || { echo "Dataset generation failed"; exit 1; } +cd .. + +# Step 3: Generate config for MIA +echo "Generating MIA configuration..." +python3 generate_config_for_mia.py || { echo "MIA config generation failed"; exit 1; } + +# Step 4: Submit job to OAR +echo "Submitting job to OAR..." +oarsub -I -l host=6,walltime=2 || { echo "OAR job submission failed"; exit 1; }