diff --git a/.gitignore b/.gitignore index dfa99497718102c4b7260fd7efb76a3270868086..1e89a50e0ccba642512e901520c29ca1415a0142 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,19 @@ -*.pyc -*.py~ -results/* -.vscode/* -src/__pycache__/* -info.log -*.ipynb -launch.json -backup_results/* +*.py~ +results/* +.vscode/* +src/__pycache__/* +info.log +launch.json +backup_results/* +*.sh +tests/__pycache__/* +datasets/* +pub/* +data/* +*.tgz +*.pyc +info.log +*.ipynb +launch.json +backup_results/* *.sh \ No newline at end of file diff --git a/IRIT-evaluation-licence_UT3.txt b/IRIT-evaluation-licence_UT3.txt new file mode 100644 index 0000000000000000000000000000000000000000..58d6baf4b8b0feea1fe1d1439328d7163ddad7f0 --- /dev/null +++ b/IRIT-evaluation-licence_UT3.txt @@ -0,0 +1,43 @@ +------------------------------------------------------------ +SOFTWARE EVALUATION LICENSE +IRIT computer science research laboratory, Toulouse, France. +------------------------------------------------------------ + +Definitions +SOFTWARE: The "Comparative-Evaluation-of-Clustered-Federated-Learning-Methods" software, in source-code form, written by leahcimali and omar-rifai at the IRIT computer science research laboratory. + +LICENSOR: L’UNIVERSITE TOULOUSE III - PAUL SABATIER, a public scientific, cultural and professional establishment, having SIRET No. 193 113 842 00010, APE code 8542 Z, having its registered office at 118, route de Narbonne, 31062 Toulouse Cedex 9, France, acting in its own name and on its own behalf, and on behalf of l'Institut de Recherche en Informatique de Toulouse (IRIT), UMR N°5055. + +1. INTENT/PURPOSE +This agreement determines the conditions in which the LICENSOR, who has the rights to the SOFTWARE, grants to the LICENSEE a license for research and evaluation purposes only, excluding any commercial use. + +2. LICENSEE +Any person or organization who receives the SOFTWARE with a copy of this license. + +3. RIGHTS GRANTED +The rights to use and copy the SOFTWARE, subject to the restrictions described in this agreement. +The rights to modify and compile the SOFTWARE when it's provided in source code form, subject to the restrictions described in this agreement. +For the SOFTWARE provided in binary form only, LICENSEE undertakes to not decompile, disassemble, decrypt, extract the components or perform reverse engineering except to the extent expressly provided by law. + +5. SCOPE OF THE LICENSE +- NON-COMMERCIAL license for research and evaluation purposes ONLY. +- NO right to commercialize the SOFTWARE, or any derivative work, without separate agreement with the LICENSOR. + +6. MODIFICATION OF THE SOFTWARE +License permits LICENSEE to modify the SOFTWARE provided in source code form for research and evaluation purposes only. + +7. REDISTRIBUTION +- License permits LICENSEE to redistribute verbatim copies of the SOFTWARE, accompanied with a copy of this license. +- License DOES NOT permit LICENSEE to redistribute modified versions of the SOFTWARE provided in source code form. +- License DOES NOT permit LICENSEE to commercialize the SOFTWARE or any derivative work of the SOFTWARE. + +8. FEE/ROYALTY +- LICENSEE pays no royalty for this license. +- LICENSEE and any third parties must enter a new agreement for any use beyond scope of license. Please contact the IRIT technology transfer office (email numerique@toulouse-tech-transfer.com) for further information. + +9. NO WARRANTY +The SOFTWARE is provided "as is" without warranty of any kind, either expressed or implied, including, but not limited to, the implied warranties of merchantability and fitness for a particular purpose. The entire risk as to the quality and performance of the program is with the LICENSEE. + +10. NO LIABILITY +In no event unless required by applicable law or agreed to in writing will any copyright owner be liable to LICENSEE for damages, including any general, special, incidental or consequential damages arising out of the use or inability to use the program (including but not limited to loss of data or data being rendered inaccurate or losses sustained by LICENSEE or third parties or a failure of the program to operate with other programs), even if such holder has been advised of the possibility of such damages. + diff --git a/README.md b/README.md index 1fe36a15326ef42ec68d281ce06aa1bac60527d1..f447cbfc92b917cd80d615ae13313aa0482c37ef 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,17 @@ -#### Code for the paper: *Comparative Evaluation of Clustered Federated Learning Methods* - -##### Submited to 'The 2nd IEEE International Conference on Federated Learning Technologies and Applications (FLTA24), VALENCIA, SPAIN' - -1. To reproduce the results in the paper run `driver.py` with the parameters in `exp_configs.csv` - -2. Each experiment will output a `.csv` file with the resuting metrics - -3. Histogram plots and a summary table of various experiments can be obtained running `src/utils_results.py` - -To use driver.py use the following parameters : - -`python driver.py --exp_type --dataset --heterogeneity_type --num_clients --num_samples_by_label --num_clusters --centralized_epochs --federated_rounds --seed ` - -To run all experiments in exp_config.csv user `run_exp.py`. - -Once all experiments are done, to get results run `src/utils_results.src`. - - - +#### Code for the paper: *Comparative Evaluation of Clustered Federated Learning Methods* + +##### Submited to 'The 2nd IEEE International Conference on Federated Learning Technologies and Applications (FLTA24), VALENCIA, SPAIN' + +1. To reproduce the results in the paper run `driver.py` with the parameters in `exp_configs.csv` + +2. Each experiment will output a `.csv` file with the resuting metrics + +3. Histogram plots and a summary table of various experiments can be obtained running `src/utils_results.py` + +To use driver.py use the following parameters : + +`python driver.py --exp_type --dataset --heterogeneity_type --num_clients --num_samples_by_label --num_clusters --centralized_epochs --federated_rounds --seed ` + +To run all experiments in exp_config.csv user `run_exp.py`. + +Once all experiments are done, to get results run `src/utils_results.src`. \ No newline at end of file diff --git a/draft.ipynb b/draft.ipynb new file mode 100644 index 0000000000000000000000000000000000000000..834995584ae218907dd70f817888b2554f65b294 --- /dev/null +++ b/draft.ipynb @@ -0,0 +1,291 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 6, + "id": "c1649e65-6fb0-4af7-8ecd-d94f44511d9d", + "metadata": {}, + "outputs": [], + "source": [ + "import tarfile\n", + "import os \n", + "\n", + "\n", + "import numpy as np\n", + "\n", + "import torch\n", + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "from torch.utils.data import Dataset, DataLoader, TensorDataset\n", + "from torch.utils.data import random_split\n", + "import torchvision\n", + "from torchvision.datasets.utils import download_url\n", + "from torchvision.datasets import ImageFolder\n", + "from torchvision.transforms import ToTensor\n", + "import torchvision.transforms as transforms\n", + "\n", + "\n", + "from src.utils_results import plot_img\n", + "from src.fedclass import Client\n", + "from src.utils_training import train_central, test_model\n", + "from src.utils_data import get_clients_data, data_preparation\n", + "from src.models import GenericConvModel\n", + "from sklearn.model_selection import train_test_split\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "id": "59c7fe59-a1ce-4925-bdca-8ecb777902e8", + "metadata": {}, + "source": [ + "## Three Methods to load the dataset" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "60951c57-4f25-4e62-9255-a57d120c6370", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Using downloaded and verified file: ./cifar10.tgz\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/tmp/ipykernel_6044/2990241823.py:6: DeprecationWarning: Python 3.14 will, by default, filter extracted tar archives and reject files or modify their metadata. Use the filter argument to control this behavior.\n", + " tar.extractall(path='./data')\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Files already downloaded and verified\n", + "Files already downloaded and verified\n" + ] + } + ], + "source": [ + "### 1- Using raw image folder\n", + "\n", + "dataset_url = \"https://s3.amazonaws.com/fast-ai-imageclas/cifar10.tgz\"\n", + "download_url(dataset_url, '.')\n", + "with tarfile.open('./cifar10.tgz', 'r:gz') as tar:\n", + " tar.extractall(path='./data')\n", + "data_dir = './data/cifar10'\n", + "\n", + "classes = os.listdir(data_dir + \"/train\")\n", + "dataset1 = ImageFolder(data_dir+'/train', transform=ToTensor())\n", + "\n", + "\n", + "\n", + "### 2- Using project functions\n", + "\n", + "dict_clients = get_clients_data(num_clients = 1, num_samples_by_label = 600, dataset = 'cifar10', nn_model = 'convolutional')\n", + "x_data, y_data = dict_clients[0]['x'], dict_clients[0]['y']\n", + "x_data = np.transpose(x_data, (0, 3, 1, 2))\n", + "\n", + "dataset2 = TensorDataset(torch.tensor(x_data, dtype=torch.float32), torch.tensor(y_data, dtype=torch.long))\n", + "\n", + "\n", + "\n", + "### 3 - Using CIFAR10 dataset from Pytorch\n", + "\n", + "cifar10 = torchvision.datasets.CIFAR10(\"datasets\", download=True, transform=ToTensor())\n", + "(x_data, y_data) = cifar10.data, cifar10.targets\n", + "x_data = np.transpose(x_data, (0, 3, 1, 2))\n", + "dataset3 = TensorDataset(torch.tensor(x_data, dtype=torch.float32), torch.tensor(y_data, dtype=torch.long))\n" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "2ff13653-be89-4b0b-97f0-ffe7ee9c23ab", + "metadata": {}, + "outputs": [], + "source": [ + "model = GenericConvModel(32,3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "148d883c-a667-49a2-87c1-5962f1c859eb", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "id": "c4f0dc1d-4cdc-47cb-b200-2c58984ac171", + "metadata": {}, + "source": [ + "## Conversion to dataloaders" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "bcefeb34-f9f4-4086-8af9-73469c3fd375", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "random_seed = 42\n", + "torch.manual_seed(random_seed);\n", + "val_size = 5000\n", + "train_size = len(dataset1) - val_size\n", + "\n", + "train_ds, val_ds = random_split(dataset1, [train_size, val_size])\n", + "\n", + "batch_size=128\n", + "train_dl = DataLoader(train_ds, batch_size, shuffle=True, num_workers=4, pin_memory=True)\n", + "val_dl = DataLoader(val_ds, batch_size*2, num_workers=4, pin_memory=True)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "91bec335-be85-4ef8-a27f-298ed08b80fc", + "metadata": {}, + "outputs": [], + "source": [ + "num_epochs = 10\n", + "opt_func = torch.optim.Adam\n", + "lr = 0.001\n", + "\n", + "@torch.no_grad()\n", + "def evaluate(model, val_loader):\n", + " model.eval()\n", + " outputs = [model.validation_step(batch) for batch in val_loader]\n", + " return model.validation_epoch_end(outputs)\n", + "\n", + "def fit(epochs, lr, model, train_loader, val_loader, opt_func=opt_func):\n", + " history = []\n", + " optimizer = opt_func(model.parameters(), lr)\n", + " for epoch in range(epochs):\n", + " # Training Phase \n", + " model.train()\n", + " train_losses = []\n", + " for batch in train_loader:\n", + " loss = model.training_step(batch)\n", + " train_losses.append(loss)\n", + " loss.backward()\n", + " optimizer.step()\n", + " optimizer.zero_grad()\n", + " # Validation phase\n", + " result = evaluate(model, val_loader)\n", + " result['train_loss'] = torch.stack(train_losses).mean().item()\n", + " model.epoch_end(epoch, result)\n", + " history.append(result)\n", + " return history\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "56aa9198-0a07-4ec8-802b-792352667795", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Epoch [0], train_loss: 1.7809, val_loss: 1.4422, val_acc: 0.4745\n", + "Epoch [1], train_loss: 1.2344, val_loss: 1.0952, val_acc: 0.6092\n", + "Epoch [2], train_loss: 0.9971, val_loss: 0.9526, val_acc: 0.6552\n", + "Epoch [3], train_loss: 0.8338, val_loss: 0.8339, val_acc: 0.7085\n", + "Epoch [4], train_loss: 0.7093, val_loss: 0.7892, val_acc: 0.7239\n", + "Epoch [5], train_loss: 0.6082, val_loss: 0.7572, val_acc: 0.7490\n" + ] + } + ], + "source": [ + "history = fit(num_epochs, lr, model, train_dl, val_dl, opt_func)" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "a106673e-a9a9-4525-bc94-98d3b64f2a7d", + "metadata": {}, + "outputs": [], + "source": [ + "result = evaluate(model, test_loader)" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "24941b20-3aed-4336-9f79-87e4fcf0bba7", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'val_loss': 2.3049447536468506, 'val_acc': 0.10572139918804169}" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "218e7550-2b48-4a8e-8547-e3afe81d34fe", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "15f616e4-e565-4396-9450-03c891530640", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3fbea674-afff-495a-ac28-c34b44561d47", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/driver.py b/driver.py index 13438d73cf4d1adef19ccc53c694f2085022c258..5229830d5d5b92d61ba3bced2eb1eb51b48081f8 100644 --- a/driver.py +++ b/driver.py @@ -1,10 +1,15 @@ +import os + +# Set the environment variable for deterministic behavior with CuBLAS (Give reproductibility with CUDA) +os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8" import click @click.command() @click.option('--exp_type', help="The experiment type to run") -@click.option('--heterogeneity_type', help="The data heterogeneity to test (or dataset)") @click.option('--dataset') +@click.option('--nn_model', help= "The training model to use ('linear (default) or 'convolutional')") +@click.option('--heterogeneity_type', help="The data heterogeneity to test (or dataset)") @click.option('--num_clients', type=int) @click.option('--num_samples_by_label', type=int) @click.option('--num_clusters', type=int) @@ -13,24 +18,20 @@ import click @click.option('--seed', type=int) - -def main_driver(exp_type, dataset, heterogeneity_type, num_clients, num_samples_by_label, num_clusters, centralized_epochs, federated_rounds, seed): +def main_driver(exp_type, dataset, nn_model, heterogeneity_type, num_clients, num_samples_by_label, num_clusters, centralized_epochs, federated_rounds, seed): from pathlib import Path import pandas as pd - from src.utils_logging import cprint, setup_logging from src.utils_data import setup_experiment, get_uid - setup_logging() - - row_exp = pd.Series({"exp_type": exp_type, "dataset": dataset, "heterogeneity_type": heterogeneity_type, "num_clients": num_clients, + row_exp = pd.Series({"exp_type": exp_type, "dataset": dataset, "nn_model" : nn_model, "heterogeneity_type": heterogeneity_type, "num_clients": num_clients, "num_samples_by_label": num_samples_by_label, "num_clusters": num_clusters, "centralized_epochs": centralized_epochs, "federated_rounds": federated_rounds, "seed": seed}) output_name = row_exp.to_string(header=False, index=False, name=False).replace(' ', "").replace('\n','_') - + hash_outputname = get_uid(output_name) pathlist = Path("results").rglob('*.json') @@ -39,7 +40,7 @@ def main_driver(exp_type, dataset, heterogeneity_type, num_clients, num_samples_ if get_uid(str(file_name.stem)) == hash_outputname: - cprint(f"Experiment {str(file_name.stem)} already executed in with results in \n {output_name}.json", lvl="warning") + print(f"Experiment {str(file_name.stem)} already executed in with results in \n {output_name}.json") return try: @@ -48,8 +49,7 @@ def main_driver(exp_type, dataset, heterogeneity_type, num_clients, num_samples_ except Exception as e: - cprint(f"Could not run experiment with parameters {output_name}. Exception {e}") - + print(f"Could not run experiment with parameters {output_name}. Exception {e}") return launch_experiment(model_server, list_clients, row_exp, output_name) @@ -58,39 +58,42 @@ def main_driver(exp_type, dataset, heterogeneity_type, num_clients, num_samples_ -def launch_experiment(model_server, list_clients, row_exp, output_name): +def launch_experiment(model_server, list_clients, row_exp, output_name, save_results = True): from src.utils_training import run_cfl_client_side, run_cfl_server_side from src.utils_training import run_benchmark - from src.utils_logging import cprint str_row_exp = ':'.join(row_exp.to_string().replace('\n', '/').split()) - if row_exp['exp_type'] == "benchmark": + if row_exp['exp_type'] == "global-federated" or row_exp['exp_type'] == "pers-centralized": - cprint(f"Launching benchmark experiment with parameters:\n{str_row_exp}", lvl="info") + print(f"Launching benchmark experiment with parameters:\n{str_row_exp}") - run_benchmark(list_clients, row_exp, output_name, main_model=model_server) + df_results = run_benchmark(model_server, list_clients, row_exp) elif row_exp['exp_type'] == "client": - cprint(f"Launching client-side experiment with parameters:\n {str_row_exp}", lvl="info") + print(f"Launching client-side experiment with parameters:\n {str_row_exp}") - run_cfl_client_side(model_server, list_clients, row_exp, output_name) + df_results = run_cfl_client_side(model_server, list_clients, row_exp) elif row_exp['exp_type'] == "server": - cprint(f"Launching server-side experiment with parameters:\n {str_row_exp}", lvl="info") + print(f"Launching server-side experiment with parameters:\n {str_row_exp}") - run_cfl_server_side(model_server, list_clients, row_exp, output_name) + df_results = run_cfl_server_side(model_server, list_clients, row_exp) else: + str_exp_type = row_exp['exp_type'] + raise Exception(f"Unrecognized experiement type {str_exp_type}. Please check config file and try again.") - return + if save_results : + df_results.to_csv("results/" + output_name + ".csv") + return if __name__ == "__main__": diff --git a/exp_configs.csv b/exp_configs.csv index 2c22c5f06b85a5f54e37c5196f0e9b4220aba8f2..677e5b1b7ec7685ac5ebc3e1db65c61b943b7d3e 100644 --- a/exp_configs.csv +++ b/exp_configs.csv @@ -1,45 +1,10 @@ -exp_type,dataset,heterogeneity_type,num_clients,num_samples_by_label,num_clusters,centralized_epochs,federated_rounds,seed -server,kmnist,concept-shift-on-labels,48,100,6,10,20,42 -server,fashion-mnist,concept-shift-on-labels,48,100,6,10,20,42 -server,mnist,concept-shift-on-features,48,100,4,10,20,42 -server,kmnist,concept-shift-on-features,48,100,4,10,20,42 -server,fashion-mnist,concept-shift-on-features,48,100,4,10,20,42 -server,mnist,labels-distribution-skew,48,100,4,10,20,42 -server,kmnist,labels-distribution-skew,48,100,4,10,20,42 -server,fashion-mnist,labels-distribution-skew,48,100,4,10,20,42 -server,mnist,features-distribution-skew,48,100,3,10,20,42 -server,kmnist,features-distribution-skew,48,100,3,10,20,42 -server,fashion-mnist,features-distribution-skew,48,100,3,10,20,42 -server,mnist,quantity-skew,48,100,4,10,20,42 -server,kmnist,quantity-skew,48,100,4,10,20,42 -server,fashion-mnist,quantity-skew,48,100,4,10,20,42 -client,mnist,concept-shift-on-labels,48,100,6,10,20,42 -client,kmnist,concept-shift-on-labels,48,100,6,10,20,42 -client,fashion-mnist,concept-shift-on-labels,48,100,6,10,20,42 -client,mnist,concept-shift-on-features,48,100,4,10,20,42 -client,kmnist,concept-shift-on-features,48,100,4,10,20,42 -client,fashion-mnist,concept-shift-on-features,48,100,4,10,20,42 -client,mnist,labels-distribution-skew,48,100,4,10,20,42 -client,kmnist,labels-distribution-skew,48,100,4,10,20,42 -client,fashion-mnist,labels-distribution-skew,48,100,4,10,20,42 -client,mnist,features-distribution-skew,48,100,3,10,20,42 -client,kmnist,features-distribution-skew,48,100,3,10,20,42 -client,fashion-mnist,features-distribution-skew,48,100,3,10,20,42 -client,mnist,quantity-skew,48,100,4,10,20,42 -client,kmnist,quantity-skew,48,100,4,10,20,42 -client,fashion-mnist,quantity-skew,48,100,4,10,20,42 -benchmark,mnist,concept-shift-on-labels,48,100,6,10,50,42 -benchmark,kmnist,concept-shift-on-labels,48,100,6,10,50,42 -benchmark,fashion-mnist,concept-shift-on-labels,48,100,6,10,50,42 -benchmark,mnist,concept-shift-on-features,48,100,4,10,50,42 -benchmark,kmnist,concept-shift-on-features,48,100,4,10,50,42 -benchmark,fashion-mnist,concept-shift-on-features,48,100,4,10,50,42 -benchmark,mnist,labels-distribution-skew,48,100,4,10,50,42 -benchmark,kmnist,labels-distribution-skew,48,100,4,10,50,42 -benchmark,fashion-mnist,labels-distribution-skew,48,100,4,10,50,42 -benchmark,mnist,features-distribution-skew,48,100,3,10,50,42 -benchmark,kmnist,features-distribution-skew,48,100,3,10,50,42 -benchmark,fashion-mnist,features-distribution-skew,48,100,3,10,50,42 -benchmark,mnist,quantity-skew,48,100,4,10,50,42 -benchmark,kmnist,quantity-skew,48,100,4,10,50,42 -benchmark,fashion-mnist,quantity-skew,48,100,4,10,50,42 +exp_type,dataset,nn_model,heterogeneity_type,num_clients,num_samples_by_label,num_clusters,centralized_epochs,federated_rounds,seed +pers-centralized,cifar10,convolutional,concept-shift-on-features,48,100,4,200,0,42 +global-federated,cifar10,convolutional,concept-shift-on-features,48,100,4,20,50,42 +global-federated,cifar10,convolutional,concept-shift-on-features,48,100,4,20,100,42 +global-federated,cifar10,convolutional,concept-shift-on-features,48,100,4,20,150,42 +global-federated,cifar10,convolutional,concept-shift-on-features,48,100,4,20,200,42 +global-federated,cifar10,convolutional,concept-shift-on-features,48,100,4,50,50,42 +global-federated,cifar10,convolutional,concept-shift-on-features,48,100,4,50,100,42 +global-federated,cifar10,convolutional,concept-shift-on-features,48,100,4,50,150,42 +global-federated,cifar10,convolutional,concept-shift-on-features,48,100,4,50,200,42 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3d5b25f724115d5e59a0ec95d067b1e28836bc8b..49dd21325bab0c39a947e406f5c214391a968472 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,13 @@ -# Automatically generated by https://github.com/damnever/pigar. - -imbalanced-learn==0.12.3 -inputimeout==1.0.4 -kiwisolver==1.4.5 -matplotlib==3.9.0 -numpy==1.26.4 -opencv-python==4.10.0.84 -pandas==2.2.2 -scikit-learn==1.5.0 -scipy==1.14.0 -tensorflow==2.16.2 - +# Automatically generated by https://github.com/damnever/pigar. + +imbalanced-learn==0.12.3 +inputimeout==1.0.4 +kiwisolver==1.4.5 +matplotlib==3.9.0 +numpy==1.26.4 +opencv-python==4.10.0.84 +pandas==2.2.2 +scikit-learn==1.5.0 +scipy==1.14.0 +tensorflow==2.16.2 + diff --git a/run_exp.py b/run_exp.py index 2b13321492863f7b1b6685cadd96872ded872a58..b0399a699d489005483611f94c50929190c39708 100644 --- a/run_exp.py +++ b/run_exp.py @@ -13,21 +13,22 @@ with open(csv_file, newline='') as csvfile: row = next(reader) # Reading the second row # Assigning CSV values to variables - exp_type, dataset, heterogeneity_type, num_clients, num_samples_by_label, num_clusters, centralized_epochs, federated_rounds, seed = row + exp_type, dataset, nn_model, heterogeneity_type, num_clients, num_samples_by_label, num_clusters, centralized_epochs, federated_rounds, seed = row # Building the command command = [ "python", "driver.py", "--exp_type", exp_type, "--dataset", dataset, + + "--nn_model", nn_model, "--heterogeneity_type", heterogeneity_type, "--num_clients", num_clients, "--num_samples_by_label", num_samples_by_label, "--num_clusters", num_clusters, "--centralized_epochs", centralized_epochs, "--federated_rounds", federated_rounds, - "--seed", seed - ] + "--seed", seed] # Run the command subprocess.run(command) diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/__pycache__/fedclass.cpython-39.pyc b/src/__pycache__/fedclass.cpython-39.pyc index 0a21c9d04738758c290f94973540e669adf76f94..6bbae010a18e55672eb9c49850f1d8c70f330ba2 100644 Binary files a/src/__pycache__/fedclass.cpython-39.pyc and b/src/__pycache__/fedclass.cpython-39.pyc differ diff --git a/src/__pycache__/metrics.cpython-39.pyc b/src/__pycache__/metrics.cpython-39.pyc index a54efc2a09c3f4eddbadbb7baaf621a8edd6d1c2..3c35939954a9d33a9ca0bebe24f7befd6045c15b 100644 Binary files a/src/__pycache__/metrics.cpython-39.pyc and b/src/__pycache__/metrics.cpython-39.pyc differ diff --git a/src/__pycache__/models.cpython-39.pyc b/src/__pycache__/models.cpython-39.pyc index 92da0c4cbd9585ba7f4832be30bd36801115b4d5..f9539c0488571d5c6f42300994f3bb834e511dc0 100644 Binary files a/src/__pycache__/models.cpython-39.pyc and b/src/__pycache__/models.cpython-39.pyc differ diff --git a/src/__pycache__/utils_data.cpython-39.pyc b/src/__pycache__/utils_data.cpython-39.pyc index a54c03cb2292eddd5ae61ee7d946f97a4a77a590..2b6a7fc3f9a64de0230be9eebf48087da86e04b9 100644 Binary files a/src/__pycache__/utils_data.cpython-39.pyc and b/src/__pycache__/utils_data.cpython-39.pyc differ diff --git a/src/fedclass.py b/src/fedclass.py index be70f2a0cdad23c6008b0bd0f04839795b8c2bc7..4a1a3476ccbb86309ea64fdb796dfe0b3ff07c55 100644 --- a/src/fedclass.py +++ b/src/fedclass.py @@ -1,10 +1,32 @@ class Client: - # Define the client class - def __init__(self, client_id, data): - - import numpy as np + """ Client Object used in the Fedearated Learning protocol + + Attributes: + client_id: unique client identifier + data: client data in the form {'x': [], 'y' :[]) where x, and y are + respectively the features and labels of the dataset + """ + + def __init__(self, client_id: int, data: dict): + """Initialize the Client object + + Arguments: + id : int + unique client identifier + data : dict + local data dict of the form {'x': [], 'y'[]} + model : nn.Module + The local nn model of the Client + cluster_id : int + ID of the cluster the client belong to or None if not applicable + heterogeneity_class: int + The ID of heterogeneity class the client's data belong to or None if not applicable + accuracy : float + The current client's model's accuracy based on a test set + """ + self.id = client_id self.data = data self.model = None @@ -12,18 +34,55 @@ class Client: self.heterogeneity_class = None self.accuracy = 0 + def __eq__(self, value: object) -> bool: + return (self.id == value.id and + self.model == value.model and + all((self.data['x'] == value.data['x']).flatten()) and + all((self.data['y'] == value.data['y']).flatten()) and + self.cluster_id == value.cluster_id and + self.heterogeneity_class == value.heterogeneity_class) + + def to_dict(self): + + """Return a dictionary with the attributes of the Client """ + return { 'id': self.id, 'cluster_id': self.cluster_id, 'heterogeneity_class': self.heterogeneity_class, 'accuracy': self.accuracy } + + class Server: - # Define the server class - def __init__(self,model,num_clusters=None): - self.model = model # initialize central server model - self.num_clusters = num_clusters # number of clusters defined - self.clusters_models = {} # Dictionary of clusters models - \ No newline at end of file + + """ Server Object used in the Fedearated Learning protocol + + Attributes: + model: nn.Module + The nn learing model the server is associated with + num_clusters: int + Number of clusters the server defines for a CFL protocol + """ + + def __init__(self,model,num_clusters: int=None): + """Initialize a Server object with an empty dictionary of cluster_models + + Arguments: + model: nn.Module + The nn learing model the server is associated with + num_clusters: int + Number of clusters the server defines for a CFL protocol + + """ + + self.model = model + self.num_clusters = num_clusters + self.clusters_models = {} + + def __eq__(self, value: object) -> bool: + return (str(self.model.state_dict()) == str(value.model.state_dict()) and + self.num_clusters == value.num_clusters and + self.clusters_models == value.clusters_models) \ No newline at end of file diff --git a/src/metrics.py b/src/metrics.py index fd8f00295f8c11ce7551bd7ddb35b0a2ae80fc81..651b3240611c4fcec1cd1125213484d916a3ead3 100644 --- a/src/metrics.py +++ b/src/metrics.py @@ -1,9 +1,18 @@ -def calc_global_metrics(labels_true, labels_pred): - - """ - Calculate global metrics based on model weights +def calc_global_metrics(labels_true: list, labels_pred: list) -> dict: + + """ Calculate global metrics based on model weights + + Arguments: + labels_true : list + list of ground truth labels + labels_pred : list + list of predicted labels to compare with ground truth + Returns: + a dictionary containing the following metrics: + 'ARI', 'AMI', 'hom': homogeneity_score, 'cmpt': completness score, 'vm': v-measure """ + from sklearn.metrics import adjusted_rand_score, homogeneity_completeness_v_measure, adjusted_mutual_info_score homogeneity_score, completness_score, v_measure = homogeneity_completeness_v_measure(labels_true, labels_pred) @@ -14,30 +23,4 @@ def calc_global_metrics(labels_true, labels_pred): dict_metrics = {"ARI": ARI_score, "AMI": AMI_score, "hom": homogeneity_score, "cmplt": completness_score, "vm": v_measure} - return dict_metrics - - - -def report_CFL(list_clients, output_name): - """ - Save results as a csv - """ - import pandas as pd - - df_results = pd.DataFrame.from_records([c.to_dict() for c in list_clients]) - - df_results.to_csv("results/" + output_name + ".csv") - - return - - -def plot_mnist(image,label): - # Function to plot the mnist image - import matplotlib.pyplot as plt - plt.imshow(image, cmap='gray') - plt.title(f'MNIST Digit: {label}') # Add the label as the title - plt.axis('off') # Turn off axis - plt.show() - - - + return dict_metrics \ No newline at end of file diff --git a/src/models.py b/src/models.py index b74f5588aae7d664658f326ab9563cf980c5d2e6..cd01cbce2240fbb05229cfe25e862c9f0b4a313f 100644 --- a/src/models.py +++ b/src/models.py @@ -1,46 +1,94 @@ +import torch import torch.nn as nn import torch.nn.functional as F +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") -class SimpleLinear(nn.Module): - # Simple fully connected neural network with ReLU activations with a single hidden layer of size 200 - def __init__(self, h1=200): - super().__init__() - self.fc1 = nn.Linear(28*28, h1) - self.fc2 = nn.Linear(h1, 10) - - def forward(self, x): - x = x.view(-1, 28 * 28) - x = F.relu(self.fc1(x)) - x = self.fc2(x) - return F.log_softmax(x, dim=1) +def accuracy(outputs, labels): + _, preds = torch.max(outputs, dim=1) + return torch.tensor(torch.sum(preds == labels).item() / len(preds)) -class SimpleConv(nn.Module): - def __init__(self): - super(SimpleConv, self).__init__() - # convolutional layer - self.conv1 = nn.Conv2d(3, 16, 3, padding=1) - self.conv2 = nn.Conv2d(16, 32, 3, padding = 1) - self.conv3 = nn.Conv2d(32, 16, 3, padding = 1) - # max pooling layer - self.pool = nn.MaxPool2d(2, 2) - - # Fully connected layer - self.fc1 = nn.Linear(16 * 4 * 4, 10) +class ImageClassificationBase(nn.Module): + def training_step(self, batch, device): + images, labels = batch + images, labels = images.to(device), labels.to(device).long() + out = self(images) + loss = F.cross_entropy(out, labels) + return loss + + def validation_step(self, batch, device): + images, labels = batch + images, labels = images.to(device), labels.to(device).long() + out = self(images) + loss = F.cross_entropy(out, labels) + acc = accuracy(out, labels) + return {'val_loss': loss.detach(), 'val_acc': acc} - # Dropout - self.dropout = nn.Dropout(p=0.2) - - def flatten(self, x): - return x.reshape(x.size()[0], -1) + def validation_epoch_end(self, outputs): + batch_losses = [x['val_loss'] for x in outputs] + epoch_loss = torch.stack(batch_losses).mean() + batch_accs = [x['val_acc'] for x in outputs] + epoch_acc = torch.stack(batch_accs).mean() + return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()} - def forward(self, x): - # add sequence of convolutional and max pooling layers - x = self.dropout(self.pool(F.relu(self.conv1(x)))) - x = self.dropout(self.pool(F.relu(self.conv2(x)))) - x = self.dropout(self.pool(F.relu(self.conv3(x)))) - x = self.flatten(x) - x = self.fc1(x) + def epoch_end(self, epoch, result): + print("Epoch [{}], train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}".format( + epoch, result['train_loss'], result['val_loss'], result['val_acc'])) - return F.log_softmax(x, dim=1) \ No newline at end of file +class GenericLinearModel(ImageClassificationBase): + def __init__(self, in_size, n_channels): + super().__init__() + self.in_size = in_size + self.network = nn.Sequential( + nn.Linear(in_size * in_size, 200), + nn.Linear(200, 10) + ) + + def forward(self, xb): + xb = xb.view(-1, self.in_size * self.in_size) + return self.network(xb) + +class GenericConvModel(ImageClassificationBase): + def __init__(self, in_size, n_channels): + super().__init__() + self.img_final_size = int(in_size / (2**3)) + self.network = nn.Sequential( + nn.Conv2d(n_channels, 32, kernel_size=3, padding=1), + nn.BatchNorm2d(32), + nn.ReLU(), + nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1), + nn.BatchNorm2d(64), + nn.ReLU(), + nn.MaxPool2d(2, 2), # output: 64 x 16 x 16 + nn.Dropout(0.25), + + nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1), + nn.BatchNorm2d(128), + nn.ReLU(), + nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1), + nn.BatchNorm2d(128), + nn.ReLU(), + nn.MaxPool2d(2, 2), # output: 128 x 8 x 8 + nn.Dropout(0.25), + + + nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1), + nn.BatchNorm2d(256), + nn.ReLU(), + nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1), + nn.BatchNorm2d(256), + nn.ReLU(), + nn.MaxPool2d(2, 2), # output: 256 x 4 x 4 + nn.Dropout(0.25), + + nn.Flatten(), + nn.Linear(256 * self.img_final_size * self.img_final_size, 1024), + nn.ReLU(), + nn.Linear(1024, 512), + nn.ReLU(), + nn.Linear(512, 10) + ) + + def forward(self, xb): + return self.network(xb) \ No newline at end of file diff --git a/src/utils_data.py b/src/utils_data.py index 528ebca4f438319bdd7c309d6813a513155e777b..1a0058d33746eb509b0bd323878436af448acb3b 100644 --- a/src/utils_data.py +++ b/src/utils_data.py @@ -1,146 +1,284 @@ -import torch -import numpy as np - -from collections import Counter -import pandas as pd -import numpy as np - from src.fedclass import Client, Server +from torch.utils.data import DataLoader +from numpy import ndarray +from typing import Tuple + +def shuffle_list(list_samples : int, seed : int) -> list: + + """Function to shuffle the samples list -device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + Arguments: + list_samples : A list of samples to shuffle + seed : Randomization seed for reproducible results + + Returns: + The shuffled list of samples + """ -def shuffle(array,seed=42): - # Function to shuffle the samples - # Generate a list of shuffled indices + import numpy as np np.random.seed(seed) - shuffled_indices = np.arange(array.shape[0]) + shuffled_indices = np.arange(list_samples.shape[0]) + np.random.shuffle(shuffled_indices) - # Use the shuffled indices to reorder the array along the first axis - shuffled_arr = array[shuffled_indices].copy() - return shuffled_arr + shuffled_list = list_samples[shuffled_indices].copy() + + return shuffled_list -def create_label_dict(dataset, seed = 42) : - """ - Create a dictionary of dataset samples by labels + +def create_label_dict(dataset : str, nn_model : str) -> dict: + + """Create a dictionary of dataset samples + + Arguments: + dataset : The name of the dataset to use (e.g 'fashion-mnist', 'mnist', or 'cifar10') + nn_model : the training model type ('linear' or 'convolutional') + + Returns: + label_dict : A dictionary of data of the form {'x': [], 'y': []} + + Raises: + Error : if the dataset name is unrecognized """ + import sys - from tensorflow.keras.datasets import mnist, fashion_mnist - from extra_keras_datasets import kmnist + import numpy as np + import torchvision + + import torchvision.transforms as transforms if dataset == "fashion-mnist": - (x_train, y_train), _ = fashion_mnist.load_data() + fashion_mnist = torchvision.datasets.MNIST("datasets", download=True) + (x_data, y_data) = fashion_mnist.data, fashion_mnist.targets + + if nn_model in ["convolutional"]: + x_data = x_data.unsqueeze(1) + elif dataset == 'mnist': - (x_train, y_train), _ = mnist.load_data() + mnist = torchvision.datasets.MNIST("datasets", download=True) + (x_data, y_data) = mnist.data, mnist.targets + + elif dataset == "cifar10": + cifar10 = torchvision.datasets.CIFAR10("datasets", download=True) + (x_data, y_data) = cifar10.data, cifar10.targets + + elif dataset == 'kmnist': - (x_train, y_train), _ = kmnist.load_data() + kmnist = torchvision.datasets.KMNIST("datasets", download=True) + (x_data, y_data) = kmnist.load_data() else: sys.exit("Unrecognized dataset. Please make sure you are using one of the following ['mnist', fashion-mnist', 'kmnist']") - label_dict = {} + for label in range(10): - label_indices = np.where(y_train == label)[0] - - label_samples_x = x_train[label_indices] - - label_dict[label] = shuffle(label_samples_x, seed) + label_indices = np.where(np.array(y_data) == label)[0] + label_samples_x = x_data[label_indices] + label_dict[label] = label_samples_x return label_dict +def get_clients_data(num_clients : int, num_samples_by_label : int, dataset : str, nn_model : str) -> dict: + + """Distribute a dataset evenly accross num_clients clients. Works with datasets with 10 labels + + Arguments: + num_clients : Number of clients of interest + num_samples_by_label : Number of samples of each labels by client + dataset: The name of the dataset to use (e.g 'fashion-mnist', 'mnist', or 'cifar10') + nn_model : the training model type ('linear' or 'convolutional') -def get_clients_data(num_clients, num_samples_by_label, dataset, seed = 42): - """ - Distribute Dataset evenly accross num_clients clients - ---------- - num_clients : int - number of client of interest - - num_samples_by_label : int - number of samples of each labels by clients - Returns - ------- - client_dataset : Dictionnary - Dictionnary where each key correspond to a client index. The samples will be contained in the 'x' key and the target in 'y' key + Returns: + client_dataset : Dictionnary where each key correspond to a client index. The samples will be contained in the 'x' key and the target in 'y' key """ - label_dict = create_label_dict(dataset, seed) - # Initialize dictionary to store client data + import numpy as np + + label_dict = create_label_dict(dataset, nn_model) + clients_dictionary = {} client_dataset = {} + for client in range(num_clients): + clients_dictionary[client] = {} + for label in range(10): + clients_dictionary[client][label]= label_dict[label][client*num_samples_by_label:(client+1)*num_samples_by_label] + for client in range(num_clients): + client_dataset[client] = {} + client_dataset[client]['x'] = np.concatenate([clients_dictionary[client][label] for label in range(10)], axis=0) + client_dataset[client]['y'] = np.concatenate([[label]*len(clients_dictionary[client][label]) for label in range(10)], axis=0) + return client_dataset -def rotate_images(client, rotation): - # Rotate images, used of concept shift on features - images = client.data['x'] + + +def rotate_images(client: Client, rotation: int) -> None: + + """ Rotate a Client's images, used for ``concept shift on features'' + + Arguments: + client : A Client object whose dataset images we want to rotate + rotation : the rotation angle to apply 0 < angle < 360 + """ - if rotation >0 : + import numpy as np + + images = client.data['x'] + + if rotation > 0 : + rotated_images = [] + for img in images: - rotated_img = np.rot90(img, k=rotation//90) # Rotate image by specified angle + + orig_shape = img.shape + rotated_img = np.rot90(img, k=rotation//90) # Rotate image by specified angle + rotated_img = rotated_img.reshape(*orig_shape) rotated_images.append(rotated_img) + client.data['x'] = np.array(rotated_images) -def data_preparation(client, row_exp): - """ - Train test split of a client's data and create onf dataloaders for local model training - """ + return - from sklearn.model_selection import train_test_split - from torch.utils.data import DataLoader, TensorDataset +import torch +from sklearn.model_selection import train_test_split +from torch.utils.data import DataLoader, TensorDataset, Dataset +import torchvision.transforms as transforms - x_train, x_test, y_train, y_test = train_test_split(client.data['x'], client.data['y'], test_size=0.3, random_state=row_exp['seed'],stratify=client.data['y']) - x_train, x_test = x_train/255.0 , x_test/255.0 +class AddGaussianNoise(object): + + def __init__(self, mean=0., std=1.): + self.std = std + self.mean = mean + + def __call__(self, tensor): + import torch + return tensor + torch.randn(tensor.size()) * self.std + self.mean + + def __repr__(self): + return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std) - x_train_tensor = torch.tensor(x_train, dtype=torch.float32) - x_train_tensor.to(device) +class AddRandomJitter(object): + def __init__(self, brightness =0.5, contrast = 1, saturation = 0.1, hue = 0.5): + self.brightness = brightness, + self.contrast = contrast, + self.saturation = saturation, + self.hue = hue + + def __call__(self, tensor): + import torchvision.transforms as transforms + transform = transforms.ColorJitter(brightness = self.brightness, contrast= self.contrast, + saturation = self.saturation, hue = self.hue) + return transform(tensor) - y_train_tensor = torch.tensor(y_train, dtype=torch.long) - y_train_tensor.to(device) +class CustomDataset(Dataset): + + def __init__(self, data, labels, transform=None): + # Ensure data is in (N, H, W, C) format + self.data = data # Assume data is in (N, H, W, C) format + self.labels = labels + self.transform = transform - x_test_tensor = torch.tensor(x_test, dtype=torch.float32) - x_test_tensor.to(device) - y_test_tensor = torch.tensor(y_test, dtype=torch.long) - y_test_tensor.to(device) + def __len__(self): + return len(self.data) + def __getitem__(self, idx): + image = self.data[idx] # Shape (H, W, C) + label = self.labels[idx] - train_dataset = TensorDataset(x_train_tensor, y_train_tensor) - train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) + # Convert image to tensor and permute to (C, H, W) + image = torch.tensor(image, dtype=torch.float) # Convert to tensor + image = image.permute(2, 0, 1) # Change to (C, H, W) + + # Apply transformation if provided + if self.transform: + image = self.transform(image) + + return image, label + +def data_preparation(client: Client, row_exp: dict) -> None: + """Saves Dataloaders of train and test data in the Client attributes + + Arguments: + client : The client object to modify + row_exp : The current experiment's global parameters + """ + + def to_device_tensor(data, device, data_dtype): + data = torch.tensor(data, dtype=data_dtype) + data = data.to(device) + return data - test_dataset = TensorDataset(x_test_tensor, y_test_tensor) - test_loader = DataLoader(test_dataset, batch_size=32) + import torch + from sklearn.model_selection import train_test_split + from torch.utils.data import DataLoader, TensorDataset, Dataset + import torchvision.transforms as transforms + import numpy as np # Import NumPy for transpose operation + + # Define data augmentation transforms + + train_transform = transforms.Compose([ + transforms.RandomHorizontalFlip(), + transforms.RandomRotation(20), # Normalize if needed + transforms.RandomCrop(32, padding=4), + transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) + ]) + + # Transform for validation and test data (no augmentation, just normalization) + test_val_transform = transforms.Compose([ + transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), # Normalize if needed + ]) - setattr(client, 'data_loader', {'train' : train_loader,'test': test_loader}) - setattr(client,'train_test', {'x_train': x_train,'x_test': x_test, 'y_train': y_train, 'y_test': y_test}) + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + # Split into train, validation, and test sets + x_data, x_test, y_data, y_test = train_test_split(client.data['x'], client.data['y'], test_size=0.3, random_state=row_exp['seed'], stratify=client.data['y']) + x_train, x_val, y_train, y_val = train_test_split(x_data, y_data, test_size=0.25, random_state=42) -def get_dataset_heterogeneities(heterogeneity_type): + # Create datasets with transformations + train_dataset = CustomDataset(x_train, y_train, transform=train_transform) + val_dataset = CustomDataset(x_val, y_val, transform=test_val_transform) + test_dataset = CustomDataset(x_test, y_test, transform=test_val_transform) - dict_params = {} + # Create DataLoaders + train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True) + validation_loader = DataLoader(val_dataset, batch_size=128, shuffle=True) + test_loader = DataLoader(test_dataset, batch_size=128, shuffle=True) - #if heterogeneity_type == "labels-distribution-skew": - # dict_params['skews'] = [[1,2],[3,4],[5,6],[7,8]] - # dict_params['ratios'] = [[0.2,0.2],[0.2,0.2],[0.2,0.2],[0.2,0.2]] + # Store DataLoaders in the client object + setattr(client, 'data_loader', {'train': train_loader, 'val': validation_loader, 'test': test_loader}) + setattr(client, 'train_test', {'x_train': x_train, 'x_val': x_val, 'x_test': x_test, 'y_train': y_train, 'y_val': y_val, 'y_test': y_test}) + + return + + + +def get_dataset_heterogeneities(heterogeneity_type: str) -> dict: + + """ + Retrieves the "skew" and "ratio" attributes of a given heterogeneity type + + Arguments: + heterogeneity_type : The label of the heterogeneity scenario (labels-distribution-skew, concept-shift-on-labels, quantity-skew) + Returns: + dict_params: A dictionary of the form {<het>: []} where <het> is the applicable heterogeneity type + """ + dict_params = {} - #elif heterogeneity_type == "labels-distribution-skew-balancing": - # dict_params['skews'] = [[0,1,2,3,4],[5,6,7,8,9],[0,2,4,6,8],[1,3,5,7,9]] - # dict_params['ratios'] = [[0.1,0.1,0.1,0.1,0.1],[0.1,0.1,0.1,0.1,0.1],[0.1,0.1,0.1,0.1,0.1],[0.1,0.1,0.1,0.1,0.1],[0.1,0.1,0.1,0.1,0.1]] - if heterogeneity_type == 'labels-distribution-skew': dict_params['skews'] = [[0,3,4,5,6,7,8,9], [0,1,2,5,6,7,8,9], [0,1,2,3,4,7,8,9], [0,1,2,3,4,5,6,9]] dict_params['ratios'] = [[0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1], [0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1], [0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1], @@ -153,31 +291,73 @@ def get_dataset_heterogeneities(heterogeneity_type): dict_params['skews'] = [0.1,0.2,0.6,1] return dict_params + + +def setup_experiment(row_exp: dict) -> Tuple[Server, list]: + + """ Setup function to create and personalize client's data + + Arguments: + row_exp : The current experiment's global parameters -def setup_experiment(row_exp): + Returns: + model_server, list_clients: a nn model used the server in the FL protocol, a list of Client Objects used as nodes in the FL protocol - from src.models import SimpleLinear + """ + + from src.models import GenericConvModel + from src.utils_fed import init_server_cluster + import torch list_clients = [] - model_server = Server(SimpleLinear()) + + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + + torch.manual_seed(row_exp['seed']) + + imgs_params = {'mnist': (28,1) , 'fashion-mnist': (28,1), 'kmnist': (28,1), 'cifar10': (32,3)} + + if row_exp['nn_model'] == "linear": + + model_server = Server(GenericConvModel(in_size=imgs_params[row_exp['dataset']][0], n_channels=imgs_params[row_exp['dataset']][1])) + + elif row_exp['nn_model'] == "convolutional": + + model_server = Server(GenericConvModel(in_size=imgs_params[row_exp['dataset']][0], n_channels=imgs_params[row_exp['dataset']][1])) + + + model_server.model.to(device) dict_clients = get_clients_data(row_exp['num_clients'], row_exp['num_samples_by_label'], row_exp['dataset'], - row_exp['seed']) + row_exp['nn_model']) for i in range(row_exp['num_clients']): + list_clients.append(Client(i, dict_clients[i])) list_clients = add_clients_heterogeneity(list_clients, row_exp) - + + if row_exp['exp_type'] == "client": + + init_server_cluster(model_server, list_clients, row_exp, imgs_params[row_exp['dataset']]) + return model_server, list_clients -def add_clients_heterogeneity(list_clients, row_exp): +def add_clients_heterogeneity(list_clients: list, row_exp: dict) -> list: + """ Utility function to apply the relevant heterogeneity classes to each client + Arguments: + list_clients : List of Client Objects with specific heterogeneity_class + row_exp : The current experiment's global parameters + Returns: + The updated list of clients + """ + dict_params = get_dataset_heterogeneities(row_exp['heterogeneity_type']) if row_exp['heterogeneity_type'] == "concept-shift-on-features": # rotations @@ -200,9 +380,20 @@ def add_clients_heterogeneity(list_clients, row_exp): -def apply_label_swap(list_clients, row_exp, list_swaps): - +def apply_label_swap(list_clients : list, row_exp : dict, list_swaps : list) -> list: + + """ Utility function to apply label swaps on Client images + + Arguments: + list_clients : List of Client Objects with specific heterogeneity_class + row_exp : The current experiment's global parameters + list_swap : List containing the labels to swap by heterogeneity class + Returns : + Updated list of clients + + """ n_swaps_types = len(list_swaps) + n_clients_by_swaps_type = row_exp['num_clients'] // n_swaps_types for i in range(n_swaps_types): @@ -215,6 +406,7 @@ def apply_label_swap(list_clients, row_exp, list_swaps): for client in list_clients_swapped: client = swap_labels(list_swaps[i],client, str(i)) + data_preparation(client, row_exp) list_clients[start_index:end_index] = list_clients_swapped @@ -226,9 +418,18 @@ def apply_label_swap(list_clients, row_exp, list_swaps): -def apply_rotation(list_clients, row_exp): +def apply_rotation(list_clients : list, row_exp : dict) -> list: + + """ Utility function to apply rotation 0,90,180 and 270 to 1/4 of Clients - # Apply rotation 0,90,180 and 270 to 1/4 of clients each + Arguments: + list_clients : List of Client Objects with specific heterogeneity_class + row_exp : The current experiment's global parameters + + Returns: + Updated list of clients + """ + n_rotation_types = 4 n_clients_by_rotation_type = row_exp['num_clients'] // n_rotation_types #TODO check edge cases where n_clients < n_rotation_types @@ -252,11 +453,22 @@ def apply_rotation(list_clients, row_exp): list_clients[start_index:end_index] = list_clients_rotated list_clients = list_clients[:end_index] + return list_clients -def apply_labels_skew(list_clients, row_exp, list_skews, list_ratios): +def apply_labels_skew(list_clients : list, row_exp : dict, list_skews : list, list_ratios : list) -> list: + + """ Utility function to apply label skew to Clients' data + + Arguments: + list_clients : List of Client Objects with specific heterogeneity_class + row_exp : The current experiment's global parameters + Returns: + Updated list of clients + """ + n_skews = len(list_skews) n_clients_by_skew = row_exp['num_clients'] // n_skews @@ -283,10 +495,20 @@ def apply_labels_skew(list_clients, row_exp, list_skews, list_ratios): -def apply_quantity_skew(list_clients, row_exp, list_skews): +def apply_quantity_skew(list_clients : list, row_exp : dict, list_skews : list) -> list: - # Setup server and clients for quantity skew experiment - # Skew list create for each element an equal subset of clients with the corresponding percentage of the client data + """ Utility function to apply quantity skew to Clients' data + For each element in list_skews, apply the skew to an equal subset of Clients + + + Arguments: + list_clients : List of Client Objects with specific heterogeneity_class + row_exp : The current experiment's global parameters + list_skew : List of float 0 < i < 1 with quantity skews to subsample data + + Returns: + Updated list of clients + """ n_max_samples = 100 # TODO: parameterize by dataset @@ -296,9 +518,8 @@ def apply_quantity_skew(list_clients, row_exp, list_skews): dict_clients = [get_clients_data(n_clients_by_skew, int(n_max_samples * skew), row_exp['dataset'], - seed=row_exp['seed']) + row_exp['nn_model']) for skew in list_skews] - list_clients = [] for c in range(n_clients_by_skew): @@ -313,15 +534,24 @@ def apply_quantity_skew(list_clients, row_exp, list_skews): data_preparation(client, row_exp) - return list_clients -def apply_features_skew(list_clients, row_exp) : - # Setup server and clients for features distribution skew experiments +def apply_features_skew(list_clients : list, row_exp : dict) -> list : + + """ Utility function to apply features skew to Clients' data + + Arguments: + list_clients : List of Client Objects with specific heterogeneity_class + row_exp : The current experiment's global parameters + + Returns: + Updated list of clients + """ + + n_skew_types = 3 #TODO parameterize - n_skew_types = 3 n_clients_by_skew = row_exp['num_clients'] // n_skew_types for i in range(n_skew_types): @@ -353,106 +583,184 @@ def apply_features_skew(list_clients, row_exp) : -def swap_labels(labels, client, heterogeneity_class): +def swap_labels(labels : list, client : Client, heterogeneity_class : int) -> Client: + + """ Utility Function for label swapping used for concept shift on labels. Sets the attribute "heterogeneity class" + + Arguments: + labels : Labels to swap + client : The Client object whose data we want to apply the swap on + Returns: + Client with labels swapped + """ - # Function for label swapping use for concept shift on labels - # labels : tuple of labels to swap newlabellist = client.data['y'] + otherlabelindex = newlabellist==labels[1] + newlabellist[newlabellist==labels[0]]=labels[1] + newlabellist[otherlabelindex] = labels[0] + client.data['y']= newlabellist + setattr(client,'heterogeneity_class', heterogeneity_class) + return client -def centralize_data(clientlist): - # Centralize data of the federated learning setup for central model comparison + +def centralize_data(list_clients: list) -> Tuple[DataLoader, DataLoader]: + """Centralize data of the federated learning setup for central model comparison + + Arguments: + list_clients : The list of Client Objects + + Returns: + Train and test torch DataLoaders with data of all Clients + """ + + from torchvision import transforms + import torch from torch.utils.data import DataLoader,TensorDataset - x_train = np.concatenate([clientlist[id].train_test['x_train'] for id in range(len(clientlist))],axis = 0) - x_test = np.concatenate([clientlist[id].train_test['x_test'] for id in range(len(clientlist))],axis = 0) - y_train = np.concatenate([clientlist[id].train_test['y_train'] for id in range(len(clientlist))],axis = 0) - y_test = np.concatenate([clientlist[id].train_test['y_test'] for id in range(len(clientlist))],axis = 0) - x_train_tensor = torch.tensor(x_train, dtype=torch.float32) - y_train_tensor = torch.tensor(y_train, dtype=torch.long) - x_test_tensor = torch.tensor(x_test, dtype=torch.float32) - y_test_tensor = torch.tensor(y_test, dtype=torch.long) - train_dataset = TensorDataset(x_train_tensor, y_train_tensor) - train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True) - test_dataset = TensorDataset(x_test_tensor, y_test_tensor) - test_loader = DataLoader(test_dataset, batch_size=64) - return train_loader, test_loader - - - -def ratio_func(y, multiplier, minority_class): - # downsample a label by multiplier - target_stats = Counter(y) - return {minority_class: int(multiplier * target_stats[minority_class])} - -def unbalancing(client,labels_list ,ratio_list): - # downsample the dataset of a client with each elements of the labels_list will be downsample with the corresponding ration of ratio_list + import numpy as np + + train_transform = transforms.Compose([ + transforms.RandomHorizontalFlip(), + transforms.RandomRotation(20), # Normalize if needed + transforms.RandomCrop(32, padding=4), + transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) + ]) + + # Transform for validation and test data (no augmentation, just normalization) + test_val_transform = transforms.Compose([ + transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), # Normalize if needed + ]) + + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + + # Concatenate training data from all clients + x_train = np.concatenate([list_clients[id].train_test['x_train'] for id in range(len(list_clients))], axis=0) + y_train = np.concatenate([list_clients[id].train_test['y_train'] for id in range(len(list_clients))], axis=0) + + # Concatenate validation data from all clients + x_val = np.concatenate([list_clients[id].train_test['x_val'] for id in range(len(list_clients))], axis=0) + y_val = np.concatenate([list_clients[id].train_test['y_val'] for id in range(len(list_clients))], axis=0) + + # Concatenate test data from all clients + x_test = np.concatenate([list_clients[id].train_test['x_test'] for id in range(len(list_clients))], axis=0) + y_test = np.concatenate([list_clients[id].train_test['y_test'] for id in range(len(list_clients))], axis=0) + + # Create Custom Datasets + train_dataset = CustomDataset(x_train, y_train, transform=train_transform) + val_dataset = CustomDataset(x_val, y_val, transform=test_val_transform) + test_dataset = CustomDataset(x_test, y_test, transform=test_val_transform) + + # Create DataLoaders + train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True) + val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False) # Validation typically not shuffled + test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False) # Test data typically not shuffled + + return train_loader, val_loader, test_loader + + + + +def unbalancing(client : Client ,labels_list : list ,ratio_list: list) -> Client : + + """ Downsample the dataset of a client with each elements of the labels_list will be downsampled by the corresponding ration of ratio_list + + Arguments: + client : Client whose dataset we want to downsample + labels_list : Labels to downsample in the Client's dataset + ratio_list : Ratios to use for downsampling the labels + """ + + import pandas as pd from imblearn.datasets import make_imbalance + from math import prod + + def ratio_func(y, multiplier, minority_class): + + from collections import Counter + + target_stats = Counter(y) + return {minority_class: int(multiplier * target_stats[minority_class])} + + x_train = client.data['x'] y_train = client.data['y'] - (nsamples, i_dim,j_dim) = x_train.shape - X_resampled = x_train.reshape(-1, i_dim * j_dim) # flatten the images + + orig_shape = x_train.shape + + # flatten the images + X_resampled = x_train.reshape(-1, prod(orig_shape[1:])) y_resampled = y_train for i in range(len(labels_list)): + X = pd.DataFrame(X_resampled) + X_resampled, y_resampled = make_imbalance(X, y_resampled, sampling_strategy=ratio_func, **{"multiplier": ratio_list[i], "minority_class": labels_list[i]}) - ### unflatten the images - client.data['x'] = X_resampled.to_numpy().reshape(-1, i_dim, j_dim) + client.data['x'] = X_resampled.to_numpy().reshape(-1, *orig_shape[1:]) client.data['y'] = y_resampled return client -def dilate_images(x_train, kernel_size=(3, 3)): - import cv2 - """ - Perform dilation operation on a batch of images using a given kernel. + +def dilate_images(x_train : ndarray, kernel_size : tuple = (3, 3)) -> ndarray: + + """ Perform dilation operation on a batch of images using a given kernel. Make image 'bolder' for features distribution skew setup - Parameters: - x_train (ndarray): Input batch of images (3D array with shape (n, height, width)). - kernel_size (tuple): Size of the structuring element/kernel for dilation. + + + Arguments: + x_train : Input batch of images (3D array with shape (n, height, width)). + kernel_size : Size of the structuring element/kernel for dilation. Returns: - ndarray: Dilation results for all images in the batch. + ndarray Dilation results for all images in the batch. """ + import cv2 - n = x_train.shape[0] # Number of images in the batch + import numpy as np + + n = x_train.shape[0] + dilated_images = np.zeros_like(x_train, dtype=np.uint8) # Create the kernel for dilation kernel = np.ones(kernel_size, np.uint8) - # Iterate over each image in the batch for i in range(n): - # Perform dilation on the current image + dilated_image = cv2.dilate(x_train[i], kernel, iterations=1) - # Store the dilated image in the results array + dilated_images[i] = dilated_image return dilated_images -def erode_images(x_train, kernel_size=(3, 3)): + +def erode_images(x_train : ndarray, kernel_size : tuple =(3, 3)) -> ndarray: """ Perform erosion operation on a batch of images using a given kernel. Make image 'finner' for features distribution skew setup - Parameters: - x_train (ndarray): Input batch of images (3D array with shape (n, height, width)). - kernel_size (tuple): Size of the structuring element/kernel for erosion. + Arguments: + x_train : Input batch of images (3D array with shape (n, height, width)). + kernel_size : Size of the structuring element/kernel for erosion. Returns: - ndarray: Erosion results for all images in the batch. + ndarray of Erosion results for all images in the batch. """ + import cv2 - n = x_train.shape[0] # Number of images in the batch + import numpy as np + + n = x_train.shape[0] eroded_images = np.zeros_like(x_train, dtype=np.uint8) # Create the kernel for erosion @@ -468,19 +776,7 @@ def erode_images(x_train, kernel_size=(3, 3)): return eroded_images - -def save_results(model_server, row_exp): - - import torch - - if row_exp['exp_type'] == "client" or "server": - for cluster_id in range(row_exp['num_clusters']): - torch.save(model_server.clusters_models[cluster_id].state_dict(), f"./results/{row_exp['output']}_{row_exp['exp_type']}_model_cluster_{cluster_id}.pth") - - return - - -def get_uid(str_obj): +def get_uid(str_obj: str) -> str: """ Generates an (almost) unique Identifier given a string object. Note: Collision probability is low enough to be functional for the use case desired which is to uniquely identify experiment parameters using an int diff --git a/src/utils_fed.py b/src/utils_fed.py index 7d681f793e528d1385e742621268deb6e4e718dc..5c08df9a4da540774d3162bcbb0b40b90f4a4521 100644 --- a/src/utils_fed.py +++ b/src/utils_fed.py @@ -1,24 +1,39 @@ +from src.fedclass import Server +import torch +import torch.nn as nn +import pandas as pd +from torch.utils.data import DataLoader -def send_server_model_to_client(client_list, my_server): - """ - Function to copy server model to clients in standard FL +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + +def send_server_model_to_client(list_clients : list, my_server : Server) -> None: + + """ Function to copy the Server model to client attributes in a FL protocol + + Arguments: + list_clients : List of Client objects on which to set the parameter `model' + my_server : Server object with the model to copy """ import copy - for client in client_list: + for client in list_clients: setattr(client, 'model', copy.deepcopy(my_server.model)) return -def send_cluster_models_to_clients(client_list , my_server): - """ - Function to distribute cluster models to clients based on attribute client.cluster_id +def send_cluster_models_to_clients(list_clients : list , my_server : Server) -> None: + """ Function to copy Server modelm to clients based on attribute client.cluster_id + + Arguments: + list_clients : List of Clients to update + my_server : Server from which to fetch models """ + import copy - for client in client_list: + for client in list_clients: if client.cluster_id is None: setattr(client, 'model', copy.deepcopy(my_server.model)) else: @@ -26,204 +41,258 @@ def send_cluster_models_to_clients(client_list , my_server): return -def model_avg(client_list): +def model_avg(list_clients : list) -> nn.Module: + + """ Utility function for the fed_avg function which creates a new model + with weights set to the weighted average of + + Arguments: + list_clients : List of Client whose models we want to use to perform the weighted average + + Returns: + New model with weights equal to the weighted average of those in the input Clients list + """ + import copy import torch - # Create a new model with the weight average of clients' weights - new_model = copy.deepcopy(client_list[0].model) - - # Initialize a variable to store the total size of all local training datasets - total_data_size = sum(len(client.data_loader['train'].dataset) for client in client_list) - - # Iterate over the parameters of the new model + new_model = copy.deepcopy(list_clients[0].model) + + total_data_size = sum(len(client.data_loader['train'].dataset) for client in list_clients) + for name, param in new_model.named_parameters(): - # Initialize the weighted averaged parameter with zeros + weighted_avg_param = torch.zeros_like(param) - # Accumulate the parameters across all clients, ponderated by local data size - for client in client_list: - # Calculate the weight based on the local data size + for client in list_clients: + data_size = len(client.data_loader['train'].dataset) - weight = data_size / total_data_size - - # Add the weighted parameters of the current client + + weight = data_size / total_data_size weighted_avg_param += client.model.state_dict()[name] * weight - - # Assign the weighted averaged parameter to the new model - param.data = weighted_avg_param + + param.data = weighted_avg_param #TODO: make more explicit return new_model -def fedavg(my_server,client_list): +def fedavg(my_server : Server, list_clients : list) -> None: """ - Perform a weighted average of model parameters across clients, - where the weight is determined by the size of each client's - local training dataset. Return a new model with the averaged parameters. + Implementation of the (Clustered) federated aggregation algorithm with one model per cluster. + The code modifies the cluster models `my_server.cluster_models[i]' + + + Arguments: + my_server : Server model which contains the cluster models - Args: - client_list (list): List of clients, each containing a PyTorch model and a data loader. + list_clients: List of clients, each containing a PyTorch model and a data loader. - Returns: - torch.nn.Module: A new PyTorch model with the weighted averaged parameters. """ if my_server.num_clusters == None: - # Initialize a new model - my_server.model = model_avg(client_list) + + my_server.model = model_avg(list_clients) else : for cluster_id in range(my_server.num_clusters): - - # Filter clients belonging to the current cluster - - cluster_client_list = [client for client in client_list if client.cluster_id == cluster_id] + + cluster_clients_list = [client for client in list_clients if client.cluster_id == cluster_id] - if len(cluster_client_list)>0 : + if len(cluster_clients_list)>0 : - my_server.clusters_models[cluster_id] = model_avg(cluster_client_list) - + my_server.clusters_models[cluster_id] = model_avg(cluster_clients_list) + return + + +def model_weight_matrix(list_clients : list) -> pd.DataFrame: + + """ Create a weight matrix DataFrame using the weights of local federated models for use in the server-side CFL + + Arguments: + + list_clients: List of Clients with respective models + + Returns: + DataFrame with weights of each model as rows + """ -# FOR SERVER-SIDE CFL -def model_weight_matrix(list_clients): import numpy as np import pandas as pd - """ - Create a weight matrix DataFrame using the weights of local federated models - Parameters - ---------- - list_clients: List of Clients with respective models - all the federated system models - - Returns - ------- - pd.DataFrame - DataFrame with weights of each model as rows - """ + model_dict = {client.id : client.model for client in list_clients} - # Collect the shapes of the model parameters - shapes = [param.data.numpy().shape for param in next(iter(model_dict.values())).parameters()] - # Create an empty NumPy matrix to store the weights + shapes = [param.data.cpu().numpy().shape for param in next(iter(model_dict.values())).parameters()] weight_matrix_np = np.empty((len(model_dict), sum(np.prod(shape) for shape in shapes))) - # Iterate through the keys of model_dict for idx, (_, model) in enumerate(model_dict.items()): - # Extract model weights and flatten - model_weights = np.concatenate([param.data.numpy().flatten() for param in model.parameters()]) + + model_weights = np.concatenate([param.data.cpu().numpy().flatten() for param in model.parameters()]) weight_matrix_np[idx, :] = model_weights - # Convert the NumPy matrix to a DataFrame weight_matrix = pd.DataFrame(weight_matrix_np, columns=[f'w_{i+1}' for i in range(weight_matrix_np.shape[1])]) return weight_matrix -def k_means_cluster_id(weight_matrix, k): - from sklearn.cluster import KMeans +def k_means_cluster_id(weight_matrix : pd.DataFrame, k : int, seed : int) -> pd.Series: - """ - Define cluster identites using k-means - ---------- - weight_matrix: DataFrame - Weight matrix of all federated models - k: Interger - K-means parameter + """ Define cluster identites using k-means + + Args : + weight_matrix: Weight matrix of all federated models + k: K-means parameter + seed : Random seed to allow reproducibility - Returns - ------- - pd.DataFrame + Returns: Pandas Serie with cluster identity of each model """ + from sklearn.cluster import KMeans - # Initialize the KMeans model - kmeans = KMeans(n_clusters=k, random_state=42) - - # Fit the model to the standardized data + kmeans = KMeans(n_clusters=k, random_state=seed) kmeans.fit(weight_matrix) - # Add a new column to the DataFrame indicating the cluster assignment weight_matrix['cluster'] = kmeans.labels_ clusters_identities = weight_matrix['cluster'] + return clusters_identities -def k_means_clustering(client_list,num_clusters): - import pickle - weight_matrix = model_weight_matrix(client_list) - clusters_identities = k_means_cluster_id(weight_matrix, num_clusters) - for client in client_list : - setattr(client, 'cluster_id',clusters_identities[client.id]) +def k_means_clustering(list_clients : list, num_clusters : int, seed : int) -> None: + + """ Performs a k-mean clustering and sets the cluser_id attribute to clients based on the result + + Arguments: + list_clients : List of Clients on which to perform clustering + num_clusters : Parameter to set the number of clusters needed + seed : Random seed to allow reproducibility + """ + weight_matrix = model_weight_matrix(list_clients) -# FOR CLIENT-SIDE CFL + clusters_identities = k_means_cluster_id(weight_matrix, num_clusters, seed) -def init_server_cluster(my_server, client_list, row_exp, p_expert_opinion=None): + for client in list_clients : + + setattr(client, 'cluster_id',clusters_identities[client.id]) - """ - Assign clients to initial clusters using a given distribution or completely at random. + return + + + +def init_server_cluster(my_server : Server, list_clients : list, row_exp : dict, imgs_params: dict, p_expert_opinion : float = 0) -> None: + + """ Function to initialize cluster membership for client-side CFL (sets param cluster id) + using a given distribution or completely at random. + + Arguments: + my_server : Server model containing one model per cluster + + list_clients : List of Clients whose model we want to initialize + + row_exp : Dictionary containing the different global experiement parameters + + p_expert_opintion : Parameter to avoid completly random assignment if neeed (default to 0) """ - from src.models import SimpleLinear + from src.models import GenericLinearModel, GenericConvModel import numpy as np import copy - import torch - torch.manual_seed(row_exp['seed']) + np.random.seed(row_exp['seed']) - list_heterogeneities = list(set([c.heterogeneity_class for c in client_list])) + list_heterogeneities = list(dict.fromkeys([client.heterogeneity_class for client in list_clients])) - if not p_expert_opinion: + if not p_expert_opinion or p_expert_opinion == 0: p_expert_opinion = 1 / row_exp['num_clusters'] p_rest = (1 - p_expert_opinion) / (row_exp['num_clusters'] - 1) my_server.num_clusters = row_exp['num_clusters'] + my_server.clusters_models = {cluster_id: GenericConvModel(in_size=imgs_params[0], n_channels=imgs_params[1]) for cluster_id in range(row_exp['num_clusters'])} - my_server.clusters_models = {cluster_id: SimpleLinear(h1=200) for cluster_id in range(row_exp['num_clusters'])} - - - for client in client_list: + for client in list_clients: probs = [p_rest if x != list_heterogeneities.index(client.heterogeneity_class) % row_exp['num_clusters'] else p_expert_opinion for x in range(row_exp['num_clusters'])] client.cluster_id = np.random.choice(range(row_exp['num_clusters']), p = probs) - + client.model = copy.deepcopy(my_server.clusters_models[client.cluster_id]) + return -def set_client_cluster(my_server, client_list, row_exp): - """ - Use the loss to calculate the cluster membership for client-side CFL - """ +def loss_calculation(model : nn.modules, train_loader : DataLoader) -> float: + + """ Utility function to calculate average_loss across all samples <train_loader> + + Arguments: + + model : the input server model + + train_loader : DataLoader with the dataset to use for loss calculation + """ + import torch + import torch.nn as nn + + criterion = nn.CrossEntropyLoss() + + model.to(device) + model.eval() + + total_loss = 0.0 + total_samples = 0 + + with torch.no_grad(): + + for inputs, targets in train_loader: + inputs, targets = inputs.to(device), targets.to(device).long() + outputs = model(inputs) + + loss = criterion(outputs, targets) + + total_loss += loss.item() * inputs.size(0) + total_samples += inputs.size(0) + + average_loss = total_loss / total_samples + + return average_loss + + + + +def set_client_cluster(my_server : Server, list_clients : list, row_exp : dict) -> None: + """ Function to calculate cluster membership for client-side CFL (sets param cluster id) - from src.utils_training import loss_calculation + Arguments: + my_server : Server model containing one model per cluster + + list_clients : List of Clients whose model we want to initialize + + row_exp : Dictionary containing the different global experiement parameters + """ + import numpy as np import copy - for client in client_list: + for client in list_clients: cluster_losses = [] for cluster_id in range(row_exp['num_clusters']): - cluster_loss = loss_calculation(my_server.clusters_models[cluster_id], client.data_loader['train'], row_exp) + cluster_loss = loss_calculation(my_server.clusters_models[cluster_id], client.data_loader['train']) cluster_losses.append(cluster_loss) index_of_min_loss = np.argmin(cluster_losses) - #print(f"client {client.id} with heterogeneity {client.heterogeneity_class} cluster losses:", cluster_losses) - client.model = copy.deepcopy(my_server.clusters_models[index_of_min_loss]) client.cluster_id = index_of_min_loss diff --git a/src/utils_logging.py b/src/utils_logging.py index 37ef6e0bdf69a1b33f2eb66455002334d3bbe0cb..ba4cd9390b791277b251f0aceae972be96e590b0 100644 --- a/src/utils_logging.py +++ b/src/utils_logging.py @@ -16,7 +16,7 @@ def cprint(msg: str, lvl: str = "info") -> None: """ Print message to the console at the desired logging level. - Args: + Arguments: msg (str): Message to print. lvl (str): Logging level between "debug", "info", "warning", "error" and "critical". The default value is "info". diff --git a/src/utils_results.py b/src/utils_results.py index 4a7bbe50a71e7edb8be2e315d42eefc5389ba80b..5b08515f37a68903bd2af3dc7de37a21abab2748 100644 --- a/src/utils_results.py +++ b/src/utils_results.py @@ -1,12 +1,15 @@ from pandas import DataFrame from pathlib import Path - +from torch import tensor -def save_histograms(): - """ - Read result files and save all histogram plots +def save_histograms() -> None: + + """Read csv files found in 'results/' and generates and saves histogram plots of clients assignemnts + + Raises : + Warning when the csv file is not of the expected format (code generated results csv) """ import pandas as pd @@ -28,13 +31,15 @@ def save_histograms(): print(f"Error: Unable to open result file {file_path}.",e) continue - return -def get_clusters(df_results): +def get_clusters(df_results : DataFrame) -> list: + """ Function to returns a list of clusters ranging from 0 to max_cluster (uses: append_empty_clusters()) + """ + list_clusters = list(df_results['cluster_id'].unique()) list_clusters = append_empty_clusters(list_clusters) @@ -42,9 +47,15 @@ def get_clusters(df_results): return list_clusters -def append_empty_clusters(list_clusters): +def append_empty_clusters(list_clusters : list) -> list: """ - Handle the situation where some clusters are empty by appending the clusters ID + Utility function for ``get_clusters'' to handle the situation where some clusters are empty by appending the clusters ID + + Arguments: + list_clusters: List of clusters with clients + + Returns: + List of clusters with or without clients """ list_clusters_int = [int(x) for x in list_clusters] @@ -61,8 +72,10 @@ def append_empty_clusters(list_clusters): -def get_z_nclients(df_results, x_het, y_clust, labels_heterogeneity): +def get_z_nclients(df_results : dict, x_het : list, y_clust : list, labels_heterogeneity : list) -> list: + """ Returns the number of clients associated with a given heterogeneity class for each cluster""" + z_nclients = [0]* len(x_het) for i in range(len(z_nclients)): @@ -74,12 +87,31 @@ def get_z_nclients(df_results, x_het, y_clust, labels_heterogeneity): -def plot_histogram_clusters(df_results: DataFrame, title): +def plot_img(img : tensor) -> None: + + """Utility function to plot an image of any shape""" + + from torchvision import transforms + import matplotlib.pyplot as plt + + plt.imshow(transforms.ToPILImage()(img)) + + + +def plot_histogram_clusters(df_results: DataFrame, title : str) -> None: + + """ Function to create 3D Histograms of clients to cluster assignments showing client's heterogeneity class + + Arguments: + + df_results : DataFrame containing all parameters from the resulting csv files + + title : The plot title. The image is saved in results/plots/histogram_' + title + '.png' + """ import matplotlib.pyplot as plt import numpy as np - labels_heterogeneity = list(df_results['heterogeneity_class'].unique()) bar_width = bar_depth = 0.5 @@ -122,10 +154,15 @@ def plot_histogram_clusters(df_results: DataFrame, title): plt.title(title, fontdict=None, loc='center', pad=None) plt.savefig('results/plots/histogram_' + title + '.png') + plt.close() + return + + +def normalize_results(results_accuracy : float, results_std : float) -> int: -def normalize_results(results_accuracy, results_std): + """Utility function to convert float accuracy and std to percentage """ if results_accuracy < 1: @@ -136,8 +173,10 @@ def normalize_results(results_accuracy, results_std): return results_accuracy, results_std -def summarize_results(): +def summarize_results() -> None: + """ Creates results summary of all the results files under "results/summarized_results.csv""" + from pathlib import Path import pandas as pd from numpy import mean, std @@ -164,9 +203,9 @@ def summarize_results(): list_params = path.stem.split('_') - dict_exp_results = {"exp_type" : list_params[0], "dataset": list_params[1], "dataset_type": list_params[2], "number_of_clients": list_params[3], - "samples by_client": list_params[4], "num_clusters": list_params[5], "centralized_epochs": list_params[6], - "federated_rounds": list_params[7],"accuracy": accuracy} + dict_exp_results = {"exp_type" : list_params[0], "dataset": list_params[1], "nn_model" : list_params[2], "dataset_type": list_params[3], "number_of_clients": list_params[4], + "samples by_client": list_params[5], "num_clusters": list_params[6], "centralized_epochs": list_params[7], + "federated_rounds": list_params[8],"accuracy": accuracy} try: diff --git a/src/utils_training.py b/src/utils_training.py index c47b32df8493c5cd7e080fdc9782fba700d8c96b..d1cc6d03d6e94316f29a4538d7e2bbc6cfda779f 100644 --- a/src/utils_training.py +++ b/src/utils_training.py @@ -1,184 +1,164 @@ import torch import torch.nn as nn -import torch.optim as optim -from src.models import SimpleLinear +from torch.utils.data import DataLoader -lr = 0.01 +import pandas as pd +from src.models import ImageClassificationBase +from src.fedclass import Server -def lr_schedule(epoch,lr): - decay_factor = 0.1 - if epoch % 10 == 0 and epoch != 0: - return lr * decay_factor - else: - return lr -def run_cfl_server_side(model_server, list_clients, row_exp, output_name): +def run_cfl_server_side(model_server : Server, list_clients : list, row_exp : dict) -> pd.DataFrame: + + """ Driver function for server-side cluster FL algorithm. The algorithm personalize training by clusters obtained + from model weights (k-means). + + Arguments: + + main_model : Type of Server model needed + list_clients : A list of Client Objects used as nodes in the FL protocol + row_exp : The current experiment's global parameters + + Returns: + + df_results : dataframe with the experiment results + """ from src.utils_fed import k_means_clustering - from src.metrics import report_CFL - from src.utils_logging import cprint import copy import torch torch.manual_seed(row_exp['seed']) model_server = train_federated(model_server, list_clients, row_exp, use_cluster_models = False) - - model_server.clusters_models= {cluster_id: copy.deepcopy(model_server.model) for cluster_id in range(row_exp['num_clusters'])} - + model_server.clusters_models= {cluster_id: copy.deepcopy(model_server.model) for cluster_id in range(row_exp['num_clusters'])} setattr(model_server, 'num_clusters', row_exp['num_clusters']) - cprint(f"Preparing to cluster with {len(list_clients)} clients") + k_means_clustering(list_clients, row_exp['num_clusters'], row_exp['seed']) - k_means_clustering(list_clients, row_exp['num_clusters']) - - cprint("Finished clustering") - model_server = train_federated(model_server, list_clients, row_exp, use_cluster_models = True) - cprint('Finished server-side CFL') + for client in list_clients : + + acc = test_model(model_server.clusters_models[client.cluster_id], client.data_loader['test']) + setattr(client, 'accuracy', acc) - list_clients = add_clients_accuracies(model_server, list_clients, row_exp) + df_results = pd.DataFrame.from_records([c.to_dict() for c in list_clients]) - results = report_CFL(list_clients, output_name) + return df_results - return results +def run_cfl_client_side(model_server : Server, list_clients : list, row_exp : dict) -> pd.DataFrame: + + """ Driver function for client-side cluster FL algorithm. The algorithm personalize training by clusters obtained + from model weights (k-means). -def run_cfl_client_side(model_server, list_clients, row_exp, output_name, init_cluster=True): - from src.utils_fed import init_server_cluster, set_client_cluster, fedavg - from src.metrics import report_CFL - from src.utils_logging import cprint + Arguments: + + main_model : Type of Server model needed + list_clients : A list of Client Objects used as nodes in the FL protocol + row_exp : The current experiment's global parameters + """ + + from src.utils_fed import set_client_cluster, fedavg import torch torch.manual_seed(row_exp['seed']) - - if init_cluster == True : - - init_server_cluster(model_server, list_clients, row_exp, p_expert_opinion=0.0) for _ in range(row_exp['federated_rounds']): for client in list_clients: - client.model, _ = train_central(client.model, client.data_loader['train'], row_exp) + client.model, _ = train_central(client.model, client.data_loader['train'], client.data_loader['val'], row_exp) fedavg(model_server, list_clients) - - set_client_cluster(model_server, list_clients, row_exp) - cprint("Finished client-side CFL") + set_client_cluster(model_server, list_clients, row_exp) - list_clients = add_clients_accuracies(model_server, list_clients, row_exp) + for client in list_clients : - results = report_CFL(list_clients, output_name) - return results - + acc = test_model(model_server.clusters_models[client.cluster_id], client.data_loader['test']) + setattr(client, 'accuracy', acc) -def run_benchmark(list_clients, row_exp, output_name, main_model=SimpleLinear()): - - import pandas as pd + df_results = pd.DataFrame.from_records([c.to_dict() for c in list_clients]) - list_exps = ['global-federated', 'pers-centralized'] - list_heterogeneities = list(set(client.heterogeneity_class for client in list_clients)) + return df_results - - for training_type in list_exps: - - curr_model = main_model if 'federated' in training_type else SimpleLinear() - if 'pers' in training_type: +def run_benchmark(main_model : nn.Module, list_clients : list, row_exp : dict) -> pd.DataFrame: - for heterogeneity_class in list_heterogeneities: - - list_clients_filtered = [client for client in list_clients if client.heterogeneity_class == heterogeneity_class] - - model_server, test_loader = train_benchmark(list_clients_filtered, row_exp, curr_model, training_type) + """ Benchmark function to calculate baseline FL results and ``optimal'' personalization results if clusters are known in advance - test_benchmark(model_server, list_clients_filtered, test_loader, row_exp) - - elif 'global' in training_type: - - model_server, test_loader = train_benchmark(list_clients, row_exp, curr_model, training_type) + Arguments: - test_benchmark(model_server, list_clients, test_loader, row_exp) - - df_results = pd.DataFrame.from_records([c.to_dict() for c in list_clients]) - - df_results.to_csv(path_or_buf= "results/" + output_name.replace("benchmark", "benchmark-" + training_type) + ".csv") + main_model : Type of Server model needed + list_clients : A list of Client Objects used as nodes in the FL protocol + row_exp : The current experiment's global parameters + """ - return + import pandas as pd + import torch + import copy -def train_benchmark(list_clients, row_exp, main_model, training_type="centralized"): - - from src.utils_training import train_model - from src.utils_data import centralize_data - import copy + from src.utils_data import centralize_data - train_loader, test_loader = centralize_data(list_clients) + list_heterogeneities = list(dict.fromkeys([client.heterogeneity_class for client in list_clients])) - if "federated" in training_type: - model_server = copy.deepcopy(main_model) - model_trained = train_model(model_server, None, list_clients, row_exp) - - else: - model_trained = train_model(main_model, train_loader, list_clients, row_exp) - - return model_trained, test_loader + torch.manual_seed(row_exp['seed']) + torch.use_deterministic_algorithms(True) + curr_model = main_model if row_exp['exp_type'] == 'global-federated' else main_model.model + if row_exp['exp_type'] == 'pers-centralized': + for heterogeneity_class in list_heterogeneities: + list_clients_filtered = [client for client in list_clients if client.heterogeneity_class == heterogeneity_class] + train_loader, val_loader, test_loader = centralize_data(list_clients_filtered) + model_trained, _ = train_central(curr_model, train_loader, val_loader, row_exp) -def test_benchmark(model_trained, list_clients, test_loader, row_exp): - - from src.utils_training import test_model + global_acc = test_model(model_trained, test_loader) + + for client in list_clients_filtered : - global_acc = test_model(model_trained, test_loader, row_exp) - - for client in list_clients : - - #client_acc = test_model(model_trained, client.data_loader['test'])*100 - - setattr(client, 'accuracy', global_acc) + setattr(client, 'accuracy', global_acc) - return global_acc - - + elif row_exp['exp_type'] == 'global-federated': + + model_server = copy.deepcopy(curr_model) + model_trained = train_federated(model_server, list_clients, row_exp, use_cluster_models = False) -def train_model(model_server, train_loader, list_clients, row_exp): - - if not train_loader: - trained_obj = train_federated(model_server, list_clients, row_exp, use_cluster_models = False) - trained_model = trained_obj.model - - else: - trained_model, _ = train_central(model_server, train_loader, row_exp) + _, _,test_loader = centralize_data(list_clients) + global_acc = test_model(model_trained.model, test_loader) + + for client in list_clients : - return trained_model + setattr(client, 'accuracy', global_acc) + df_results = pd.DataFrame.from_records([c.to_dict() for c in list_clients]) + + return df_results def train_federated(main_model, list_clients, row_exp, use_cluster_models = False): - """ - Controler function to launch federated learning + + """Controler function to launch federated learning - Parameters - ---------- - main_model: - Define the central node model : - """ + Arguments: - from src.utils_fed import send_server_model_to_client, send_cluster_models_to_clients, fedavg + main_model: Server model used in our experiment + list_clients: A list of Client Objects used as nodes in the FL protocol + row_exp: The current experiment's global parameters + use_cluster_models: Boolean to determine whether to use personalization by clustering + """ + from src.utils_fed import send_server_model_to_client, send_cluster_models_to_clients, fedavg for i in range(0, row_exp['federated_rounds']): accs = [] - if use_cluster_models == False: send_server_model_to_client(list_clients, main_model) @@ -188,9 +168,8 @@ def train_federated(main_model, list_clients, row_exp, use_cluster_models = Fals send_cluster_models_to_clients(list_clients, main_model) for client in list_clients: - - client.model, curr_acc = train_central(client.model, client.data_loader['train'], row_exp) - + print(f"Training client {client.id} with dataset of size {client.data['x'].shape}") + client.model, curr_acc = train_central(client.model, client.data_loader['train'], client.data_loader['val'], row_exp) accs.append(curr_acc) fedavg(main_model, list_clients) @@ -198,124 +177,117 @@ def train_federated(main_model, list_clients, row_exp, use_cluster_models = Fals return main_model -def train_central(main_model, train_loader, row_exp, lr_scheduler=None): - - criterion = nn.CrossEntropyLoss() - optimizer=optim.SGD - optimizer = optimizer(main_model.parameters(), lr=0.01) - torch.manual_seed(row_exp['seed']) - - main_model.train() - for epoch in range(row_exp['centralized_epochs']): - - running_loss = total = correct = 0 - - # Apply learning rate decay if lr_scheduler is provided - if lr_scheduler is not None: - for param_group in optimizer.param_groups: - param_group['lr'] = lr_scheduler(epoch, param_group['lr']) - - # Iterate over the training dataset - for inputs, labels in train_loader: - - optimizer.zero_grad() # Zero the gradients - outputs = main_model(inputs) # Forward pass - _, predicted = torch.max(outputs, 1) - - loss = criterion(outputs, labels) # Calculate the loss - loss.backward() # Backward pass - optimizer.step() # Update weights - - running_loss += loss.item() * inputs.size(0) - - total += labels.size(0) - correct += (predicted == labels).sum().item() - - # Calculate accuracy on the test set - accuracy = correct / total - - main_model.eval() # Set the model to evaluation mode - - return main_model, accuracy - -def loss_calculation(model, train_loader, row_exp): - import torch - import torch.nn as nn - - # Assuming you have a PyTorch model named 'model' and its training data loader named 'train_loader' - - # Define your loss function - criterion = nn.CrossEntropyLoss() # Example, adjust based on your task - - # Set the model to evaluation mode +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") +@torch.no_grad() +def evaluate(model : nn.Module, val_loader : DataLoader) -> dict: + + """ Returns a dict with loss and accuracy information""" + model.to(device) model.eval() + outputs =[] + for batch in val_loader: + # Move entire batch to the correct device + batch = [item.to(device) for item in batch] + + # Call the validation step and append to outputs + output = model.validation_step(batch,device) + outputs.append(output) + return model.validation_epoch_end(outputs) - # Initialize variables to accumulate loss and total number of samples - total_loss = 0.0 - total_samples = 0 - - torch.manual_seed(row_exp['seed']) - # Iterate through the training data loader - with torch.no_grad(): - for inputs, targets in train_loader: +def train_central(model: ImageClassificationBase, train_loader: DataLoader, val_loader: DataLoader, row_exp: dict): + """ Main training function for centralized learning + + Arguments: + model : Server model used in our experiment + train_loader : DataLoader with the training dataset + val_loader : Dataloader with the validation dataset + row_exp : The current experiment's global parameters + + Returns: + (model, history) : base model with trained weights / results at each training step + """ - # Forward pass - outputs = model(inputs) + # Check if CUDA is available and set the device + + # Move the model to the appropriate device + model.to(device) - # Compute the loss - loss = criterion(outputs, targets) + opt_func = torch.optim.Adam # if row_exp['nn_model'] == "linear" else torch.optim.Adam + lr = 0.001 + history = [] + optimizer = opt_func(model.parameters(), lr) - # Accumulate the loss and the total number of samples - total_loss += loss.item() * inputs.size(0) - total_samples += inputs.size(0) + for epoch in range(row_exp['centralized_epochs']): + + model.train() + train_losses = [] + + for batch in train_loader: + # Move batch to the same device as the model + inputs, labels = [item.to(device) for item in batch] + + # Pass the unpacked inputs and labels to the model's training step + loss = model.training_step((inputs, labels),device) + train_losses.append(loss) + loss.backward() - # Calculate the average loss - average_loss = total_loss / total_samples + optimizer.step() + optimizer.zero_grad() + + result = evaluate(model, val_loader) # Ensure evaluate handles CUDA as needed + result['train_loss'] = torch.stack(train_losses).mean().item() + + model.epoch_end(epoch, result) + + history.append(result) + + return model, history - return average_loss + -def test_model(model, test_loader, row_exp): +def test_model(model: nn.Module, test_loader: DataLoader) -> float: + """ Calculates model accuracy (percentage) on the <test_loader> Dataset + + Arguments: + model : the input server model + test_loader : DataLoader with the dataset to use for testing + """ + criterion = nn.CrossEntropyLoss() - # Set the model to evaluation mode - model.eval() + # Set device to CUDA if available + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + # Move the model to the device + model.to(device) + + model.eval() # Set the model to evaluation mode - # Initialize variables to track accuracy correct = 0 total = 0 test_loss = 0.0 - torch.manual_seed(row_exp['seed']) - - with torch.no_grad(): - - for inputs, labels in test_loader: + with torch.no_grad(): # No need to track gradients in evaluation + for batch in test_loader: + inputs, labels = [item.to(device) for item in batch] outputs = model(inputs) + # Compute the loss loss = criterion(outputs, labels) - test_loss += loss.item() * inputs.size(0) + # Get predictions _, predicted = torch.max(outputs, 1) + # Calculate total and correct predictions total += labels.size(0) correct += (predicted == labels).sum().item() + # Average test loss over all examples test_loss = test_loss / len(test_loader.dataset) + # Calculate accuracy percentage accuracy = (correct / total) * 100 - formated_accuracy = "{:.2f}".format(accuracy) - - return formated_accuracy - - -def add_clients_accuracies(model_server, list_clients, row_exp): - - - for client in list_clients : - acc = test_model(model_server.clusters_models[client.cluster_id], client.data_loader['test'], row_exp) - setattr(client, 'accuracy', acc) - - return list_clients + return accuracy diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/refs/client_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv b/tests/refs/client_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv new file mode 100644 index 0000000000000000000000000000000000000000..709b3aefdd09492937e94b9c9fb7f686a2c5ecb5 --- /dev/null +++ b/tests/refs/client_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv @@ -0,0 +1,7 @@ +id,cluster_id,heterogeneity_class,accuracy +0,0,none,82.0 +1,0,erosion,56.666666666666664 +2,0,dilatation,82.33333333333334 +3,0,none,83.66666666666667 +4,0,erosion,57.99999999999999 +5,0,dilatation,83.0 diff --git a/tests/refs/global-federated_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv b/tests/refs/global-federated_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv new file mode 100644 index 0000000000000000000000000000000000000000..07d5a6495b5998e98726ee296857516205735cbf --- /dev/null +++ b/tests/refs/global-federated_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv @@ -0,0 +1,7 @@ +id,cluster_id,heterogeneity_class,accuracy +0,,none,74.27777777777777 +1,,erosion,74.27777777777777 +2,,dilatation,74.27777777777777 +3,,none,74.27777777777777 +4,,erosion,74.27777777777777 +5,,dilatation,74.27777777777777 diff --git a/tests/refs/pers-centralized_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv b/tests/refs/pers-centralized_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv new file mode 100644 index 0000000000000000000000000000000000000000..5bcc37ae272a05b2ed913b8036f40c7f9e4a4bee --- /dev/null +++ b/tests/refs/pers-centralized_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv @@ -0,0 +1,7 @@ +id,cluster_id,heterogeneity_class,accuracy +0,,none,65.33333333333333 +1,,erosion,39.5 +2,,dilatation,79.0 +3,,none,65.33333333333333 +4,,erosion,39.5 +5,,dilatation,79.0 diff --git a/tests/refs/server_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv b/tests/refs/server_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv new file mode 100644 index 0000000000000000000000000000000000000000..f3337b1b436a1e00874995dd2ad53d4e4a685e98 --- /dev/null +++ b/tests/refs/server_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv @@ -0,0 +1,7 @@ +id,cluster_id,heterogeneity_class,accuracy +0,2,none,85.66666666666667 +1,1,erosion,66.66666666666666 +2,0,dilatation,86.66666666666667 +3,2,none,88.66666666666667 +4,1,erosion,68.66666666666667 +5,0,dilatation,89.0 diff --git a/tests/test_utils_training.py b/tests/test_utils_training.py new file mode 100644 index 0000000000000000000000000000000000000000..d8e91006fc7b6697d35ca8d20a06c18ea553e6cb --- /dev/null +++ b/tests/test_utils_training.py @@ -0,0 +1,123 @@ +import os +import pytest + +from pathlib import Path + +if os.getenv('_PYTEST_RAISE', "0") != "0": + + @pytest.hookimpl(tryfirst=True) + def pytest_exception_interact(call): + raise call.excinfo.value + + @pytest.hookimpl(tryfirst=True) + def pytest_internalerror(excinfo): + raise excinfo.value + + +def utils_extract_params(file_path: Path): + """ Creates a dictionary row_exp with the parameters for the experiment given a well formated results file path + """ + + with open (file_path, "r") as fp: + + keys = ['exp_type', 'dataset', 'nn_model', 'heterogeneity_type' , 'num_clients', + 'num_samples_by_label' , 'num_clusters', 'centralized_epochs', + 'federated_rounds', 'seed'] + + + parameters = file_path.stem.split('_') + + row_exp = dict( + zip(keys, + parameters[:4] + [int(x) for x in parameters[4:]]) + ) + + return row_exp + + +def test_run_cfl_benchmark_oracle(): + + from pathlib import Path + import numpy as np + import pandas as pd + + from src.utils_data import setup_experiment + from src.utils_training import run_benchmark + + file_path = Path("tests/refs/pers-centralized_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv") + + row_exp = utils_extract_params(file_path) + + model_server, list_clients = setup_experiment(row_exp) + + df_results = run_benchmark(model_server, list_clients, row_exp) + + assert all(np.isclose(df_results['accuracy'], pd.read_csv(file_path)['accuracy'], rtol=0.01)) + + +def test_run_cfl_benchmark_fl(): + + from pathlib import Path + import numpy as np + import pandas as pd + + from src.utils_data import setup_experiment + from src.utils_training import run_benchmark + + file_path = Path("tests/refs/global-federated_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv") + + row_exp = utils_extract_params(file_path) + + model_server, list_clients = setup_experiment(row_exp) + + df_results = run_benchmark(model_server, list_clients, row_exp) + + assert all(np.isclose(df_results['accuracy'], pd.read_csv(file_path)['accuracy'], rtol=0.01)) + + +def test_run_cfl_client_side(): + + from pathlib import Path + import numpy as np + import pandas as pd + + from src.utils_data import setup_experiment + from src.utils_training import run_cfl_client_side + + file_path = Path("tests/refs/client_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv") + + row_exp = utils_extract_params(file_path) + + model_server, list_clients = setup_experiment(row_exp) + + df_results = run_cfl_client_side(model_server, list_clients, row_exp) + + assert all(np.isclose(df_results['accuracy'], pd.read_csv(file_path)['accuracy'], rtol=0.01)) + + +def test_run_cfl_server_side(): + + from pathlib import Path + import numpy as np + import pandas as pd + + from src.utils_data import setup_experiment + from src.utils_training import run_cfl_server_side + + file_path = Path("tests/refs/server_fashion-mnist_linear_features-distribution-skew_8_100_3_5_5_42.csv") + + row_exp = utils_extract_params(file_path) + + model_server, list_clients = setup_experiment(row_exp) + + df_results = run_cfl_server_side(model_server, list_clients, row_exp) + + assert all(np.isclose(df_results['accuracy'], pd.read_csv(file_path)['accuracy'], rtol=0.01)) + + +if __name__ == "__main__": + test_run_cfl_client_side() + test_run_cfl_server_side() + test_run_cfl_benchmark_fl() + test_run_cfl_benchmark_oracle() + \ No newline at end of file