From 3214589cb61d57a7cc372ba44605cd31f9edd7ed Mon Sep 17 00:00:00 2001 From: HuongDM1896 <domaihuong1451997@gmail.com> Date: Mon, 3 Feb 2025 08:13:26 +0100 Subject: [PATCH] add get_freq --- Flower_v1/server.py | 30 +++++-- Flower_v1/server_1.py | 175 ++++++++++++++++++++++++++++++++++++++ Run/config_instance1.json | 44 +++++++--- Run/config_instance2.json | 48 +++++++---- Run/get_freq.py | 10 +++ Run/measure.py | 27 ++++-- Run/measure_1.py | 91 ++++++++++++++++++++ Run/run_flwr.py | 85 +----------------- 8 files changed, 384 insertions(+), 126 deletions(-) create mode 100644 Flower_v1/server_1.py create mode 100644 Run/get_freq.py create mode 100644 Run/measure_1.py diff --git a/Flower_v1/server.py b/Flower_v1/server.py index 0c40e10..d736363 100644 --- a/Flower_v1/server.py +++ b/Flower_v1/server.py @@ -1,9 +1,10 @@ from typing import List, Tuple - from flwr.server import ServerApp, ServerConfig -from flwr.server.strategy import FedAvg, FedMedian -from flwr.common import Metrics +from flwr.server.strategy import FedAvg, FedMedian, FedOpt, FedAdam +from flwr.common import Metrics, Parameters import argparse +import tensorflow as tf +import flwr # Create the argument parser parser = argparse.ArgumentParser(description="Server setting for FLWR") @@ -17,8 +18,8 @@ parser.add_argument( "--strategy", type=str, default="fedAvg", - choices=["fedAvg", "fedMedian"], # Add options strategies - help="Select strategy: fedAvg or fedMedian" + choices=["fedAvg", "fedMedian", "fedOpt", "fedAdam"], # Add options strategies + help="Select strategy: fedAvg, fedMedian, fedOpt or fedAdam" ) # Parse the arguments from the command line @@ -33,12 +34,27 @@ def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics: # Aggregate and return custom metric (weighted average) return {"accuracy": sum(accuracies) / sum(examples)} +# Load model for server-side parameter initialization +model = tf.keras.applications.MobileNetV2( + input_shape=(32, 32, 3), weights=None, classes=10 +) +model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"]) + +# Get model weights as a list of NumPy ndarray's +weights = model.get_weights() + +# Serialize ndarrays to `Parameters` +parameters = flwr.common.ndarrays_to_parameters(weights) + # Select the strategy based on the input argument if args.strategy == "fedAvg": strategy = FedAvg(evaluate_metrics_aggregation_fn=weighted_average) elif args.strategy == "fedMedian": strategy = FedMedian(evaluate_metrics_aggregation_fn=weighted_average) - +elif args.strategy == "fedOpt": + strategy = FedOpt(evaluate_metrics_aggregation_fn=weighted_average, initial_parameters=parameters) +elif args.strategy == "fedAdam": + strategy = FedAdam(evaluate_metrics_aggregation_fn=weighted_average, initial_parameters=parameters) # Define config config = ServerConfig(num_rounds=args.rounds) @@ -59,4 +75,4 @@ if __name__ == "__main__": server_address="0.0.0.0:8080", config=config, strategy=strategy, - ) \ No newline at end of file + ) diff --git a/Flower_v1/server_1.py b/Flower_v1/server_1.py new file mode 100644 index 0000000..8bf37a0 --- /dev/null +++ b/Flower_v1/server_1.py @@ -0,0 +1,175 @@ +import csv +import time +import os +from datetime import datetime +from typing import List, Tuple, Dict, Optional +from flwr.common import Parameters, Metrics +from flwr.server.strategy import Strategy +from flwr.server.client_manager import ClientManager +from flwr.server.client_proxy import ClientProxy +from flwr.server import ServerApp, ServerConfig +from flwr.server.strategy import FedAvg, FedMedian, FedOpt, FedAdam +import argparse +import tensorflow as tf +import flwr + +# Create the folder and generate the CSV filename dynamically +def create_csv_filepath(strategy_name: str, num_rounds: int) -> str: + # Determine the parent directory of the current script + parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + + # Create the "Log_autres" directory if it doesn't exist + log_dir = os.path.join(parent_dir, "Log_autres") + os.makedirs(log_dir, exist_ok=True) + + # Generate a timestamp for the file + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + # Generate the CSV file path + csv_filename = f"training_results_{strategy_name}_{num_rounds}_{timestamp}.csv" + csv_filepath = os.path.join(log_dir, csv_filename) + + return csv_filepath + +class CustomStrategyWrapper(Strategy): + def __init__(self, strategy: Strategy, csv_filename: str): + self.strategy = strategy + self.csv_filename = csv_filename + + # Initialize CSV file and write header + with open(self.csv_filename, mode="w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["round", "time", "accuracy"]) # Header + + def initialize_parameters(self, client_manager: ClientManager) -> Optional[Parameters]: + return self.strategy.initialize_parameters(client_manager) + + def configure_fit( + self, server_round: int, parameters: Parameters, client_manager: ClientManager + ): + # Log the round start time + self.round_start_time = time.time() + return self.strategy.configure_fit(server_round, parameters, client_manager) + + def aggregate_fit( + self, + server_round: int, + results: List[Tuple[ClientProxy, Metrics]], + failures: List[BaseException], + ): + # Call the actual strategy's aggregate_fit + return self.strategy.aggregate_fit(server_round, results, failures) + + def configure_evaluate( + self, server_round: int, parameters: Parameters, client_manager: ClientManager + ): + return self.strategy.configure_evaluate(server_round, parameters, client_manager) + + def aggregate_evaluate( + self, + server_round: int, + results: List[Tuple[ClientProxy, Metrics]], + failures: List[BaseException], + ): + # Call the actual strategy's aggregate_evaluate + aggregated_metrics = self.strategy.aggregate_evaluate(server_round, results, failures) + + # Calculate round duration + round_time = time.time() - self.round_start_time + + # Extract accuracy from aggregated_metrics + accuracy = aggregated_metrics[1].get("accuracy", 0) if aggregated_metrics else 0 + + # Append data to CSV + with open(self.csv_filename, mode="a", newline="") as f: + writer = csv.writer(f) + writer.writerow([server_round, round_time, accuracy]) + + return aggregated_metrics + + def evaluate( + self, server_round: int, parameters: Parameters + ) -> Optional[Tuple[float, Metrics]]: + return self.strategy.evaluate(server_round, parameters) + + +# Create the argument parser +parser = argparse.ArgumentParser(description="Server setting for FLWR") + +# Add the argument for num_rounds (-r or --rounds) +parser.add_argument("-r", "--rounds", type=int, default=10, help="Number of rounds") + +# Add the argument for strategy (-s or --strategy) +parser.add_argument( + "-s", + "--strategy", + type=str, + default="fedAvg", + choices=["fedAvg", "fedMedian", "fedOpt", "fedAdam", "fedAvg2Clients"], # Add options strategies + help="Select strategy: fedAvg, fedMedian, fedOpt or fedAdam or fedAvg2Clients", +) + +# Parse the arguments from the command line +args = parser.parse_args() + +# Define metric aggregation function +def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics: + # Multiply accuracy of each client by number of examples used + accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics] + examples = [num_examples for num_examples, _ in metrics] + + # Aggregate and return custom metric (weighted average) + return {"accuracy": sum(accuracies) / sum(examples)} + +# Load model for server-side parameter initialization +model = tf.keras.applications.MobileNetV2( + input_shape=(32, 32, 3), weights=None, classes=10 +) +model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"]) + +# Get model weights as a list of NumPy ndarray's +weights = model.get_weights() + +# Serialize ndarrays to `Parameters` +parameters = flwr.common.ndarrays_to_parameters(weights) + +# Select the base strategy +if args.strategy == "fedAvg": + base_strategy = FedAvg(evaluate_metrics_aggregation_fn=weighted_average) +elif args.strategy == "fedAvg2Clients": + base_strategy = FedAvg( + fraction_fit=0.67, + min_fit_clients=2, + min_available_clients=2, + evaluate_metrics_aggregation_fn=weighted_average, + ) +elif args.strategy == "fedMedian": + base_strategy = FedMedian(evaluate_metrics_aggregation_fn=weighted_average) +elif args.strategy == "fedOpt": + base_strategy = FedOpt( + evaluate_metrics_aggregation_fn=weighted_average, initial_parameters=parameters + ) +elif args.strategy == "fedAdam": + base_strategy = FedAdam( + evaluate_metrics_aggregation_fn=weighted_average, initial_parameters=parameters + ) + + +# Generate the CSV file path based on strategy and number of rounds +csv_filepath = create_csv_filepath(strategy_name=args.strategy, num_rounds=args.rounds) + +# Wrap the selected strategy with the logging wrapper +strategy = CustomStrategyWrapper(strategy=base_strategy, csv_filename=csv_filepath) + +# Define config +config = ServerConfig(num_rounds=args.rounds) + +# Legacy mode +if __name__ == "__main__": + from flwr.server import start_server + + start_server( + server_address="0.0.0.0:8080", + config=config, + strategy=strategy, + ) diff --git a/Run/config_instance1.json b/Run/config_instance1.json index 2b4f97c..d69ccd7 100644 --- a/Run/config_instance1.json +++ b/Run/config_instance1.json @@ -1,17 +1,27 @@ { "instance": "fedAvg_cifar10", - "output_dir": "/home/mdo/Test/eflwr/Log", + "output_dir": "/home/mdo/Framework/eflwr/Log", + "dvfs": { + "dummy": false, + "baseline": false, + "frequencies": [ + 1000000, + 1400000, + 1800000, + 2200000 + ] + }, "server": { "command": "python3", "args": [ - "/home/mdo/Test/eflwr/Flower_v1/server.py", - "-r 1", + "/home/mdo/Framework/eflwr/Flower_v1/server_1.py", + "-r 25", "-s fedAvg" ], "additional_env_var": [ "" ], - "ip": "172.16.66.26", + "ip": "172.16.66.76", "port": 8080 }, "clients": [ @@ -19,37 +29,43 @@ "name": "client1", "command": "python3", "args": [ - "/home/mdo/Test/eflwr/Flower_v1/client.py", - "cifar10" + "/home/mdo/Framework/eflwr/Flower_v1/client_1.py", + "cifar10", + "1", + "3" ], "additional_env_var": [ "" ], - "ip": "172.16.66.38" + "ip": "172.16.66.77" }, { "name": "client2", "command": "python3", "args": [ - "/home/mdo/Test/eflwr/Flower_v1/client.py", - "cifar10" + "/home/mdo/Framework/eflwr/Flower_v1/client_1.py", + "cifar10", + "2", + "3" ], "additional_env_var": [ "" ], - "ip": "172.16.66.4" + "ip": "172.16.66.78" }, { "name": "client3", "command": "python3", "args": [ - "/home/mdo/Test/eflwr/Flower_v1/client.py", - "cifar10" + "/home/mdo/Framework/eflwr/Flower_v1/client_1.py", + "cifar10", + "3", + "3" ], "additional_env_var": [ "" ], - "ip": "172.16.66.65" + "ip": "172.16.66.79" } ] -} +} \ No newline at end of file diff --git a/Run/config_instance2.json b/Run/config_instance2.json index 72d7c8b..fdc92f6 100644 --- a/Run/config_instance2.json +++ b/Run/config_instance2.json @@ -1,17 +1,27 @@ { - "instance": "fedMedian_cifar10", - "output_dir": "/home/mdo/Test/eflwr/Log", + "instance": "fedAvg2Clients_cifar10", + "output_dir": "/home/mdo/Framework/eflwr/Log", + "dvfs": { + "dummy": false, + "baseline": false, + "frequencies": [ + 1000000, + 1400000, + 1800000, + 2200000 + ] + }, "server": { "command": "python3", "args": [ - "/home/mdo/Test/eflwr/Flower_v1/server.py", - "-r 1", - "-s fedMedian" + "/home/mdo/Framework/eflwr/Flower_v1/server_1.py", + "-r 25", + "-s fedAvg2Clients" ], "additional_env_var": [ "" ], - "ip": "172.16.66.26", + "ip": "172.16.66.76", "port": 8080 }, "clients": [ @@ -19,37 +29,43 @@ "name": "client1", "command": "python3", "args": [ - "/home/mdo/Test/eflwr/Flower_v1/client.py", - "cifar10" + "/home/mdo/Framework/eflwr/Flower_v1/client_1.py", + "cifar10", + "1", + "3" ], "additional_env_var": [ "" ], - "ip": "172.16.66.38" + "ip": "172.16.66.77" }, { "name": "client2", "command": "python3", "args": [ - "/home/mdo/Test/eflwr/Flower_v1/client.py", - "cifar10" + "/home/mdo/Framework/eflwr/Flower_v1/client_1.py", + "cifar10", + "2", + "3" ], "additional_env_var": [ "" ], - "ip": "172.16.66.4" + "ip": "172.16.66.78" }, { "name": "client3", "command": "python3", "args": [ - "/home/mdo/Test/eflwr/Flower_v1/client.py", - "cifar10" + "/home/mdo/Framework/eflwr/Flower_v1/client_1.py", + "cifar10", + "3", + "3" ], "additional_env_var": [ "" ], - "ip": "172.16.66.65" + "ip": "172.16.66.79" } ] -} +} \ No newline at end of file diff --git a/Run/get_freq.py b/Run/get_freq.py new file mode 100644 index 0000000..f5bccd2 --- /dev/null +++ b/Run/get_freq.py @@ -0,0 +1,10 @@ +from expetator.leverages import dvfs +#import subprocess + +#subprocess.run(["bash","/home/hdomai/.local/lib/python3.10/site-packages/expetator/leverages/dvfs_pct.sh", "init"], check=True) +frequencies, pcts = dvfs.get_dvfs_values() + +print("Cpu freq available (min, max, delta):", frequencies) +f=list(frequencies) +print("List", f) +print("4 freqquencies for testing", f[0], f[len(f)//3], f[len(f)*2//3], f[-1]) diff --git a/Run/measure.py b/Run/measure.py index 6fb40be..9ed5480 100644 --- a/Run/measure.py +++ b/Run/measure.py @@ -6,7 +6,9 @@ import argparse import json import time import expetator.experiment as experiment -from expetator.monitors import Mojitos, kwollect +#from expetator.monitors import Mojitos, kwollect +from expetator.monitors import Mojitos +from expetator.leverages import Dvfs # Determine script directory current_dir = Path(__file__).resolve().parent @@ -48,6 +50,12 @@ except json.JSONDecodeError: log_dir = config["output_dir"] instance_name = config.get("instance", "default_instance") +# Extract DVFS configuration from the config file +dvfs_config = config.get("dvfs", {}) +dvfs_dummy = dvfs_config.get("dummy", False) +dvfs_baseline = dvfs_config.get("baseline", False) +dvfs_frequencies = dvfs_config.get("frequencies", None) + # Set the Flower log directory with the suffix and ensure it exists flower_log_dir = os.path.join(log_dir, f"Flower_{args.suffix}", f"Flower_instance_{instance_name}", "Expetator") os.makedirs(flower_log_dir, exist_ok=True) @@ -55,6 +63,12 @@ os.makedirs(flower_log_dir, exist_ok=True) # Path to the script that will be executed script_dir = os.path.join(current_dir, 'run_flwr.py') +# SCP the config file to the destination + +scp_command = f"scp {config_path} {flower_log_dir}" +print(f"Executing SCP command: {scp_command}") +os.system(scp_command) + class DemoBench: def __init__(self, params=[args.suffix]): self.names = {"flower"} @@ -69,15 +83,18 @@ class DemoBench: # Run the Flower script with the provided suffix argument executor.local(f"python3 {script_dir} -c {args.config} -x {args.suffix}") return time.time() - before, "flower" - + if __name__ == "__main__": +# Ensure DVFS settings are retrieved from the config file + dvfs = Dvfs(dummy=dvfs_dummy, baseline=dvfs_baseline, frequencies=dvfs_frequencies) experiment.run_experiment( flower_log_dir, [DemoBench()], - leverages=[], + leverages=[dvfs], monitors=[ - Mojitos(sensor_set={'user', 'rxp', 'dram0'}), - kwollect.Power(metric=kwollect.get_g5k_target_metric()) + Mojitos(sensor_set={'user', 'rxp', 'dram0'}) + # kwollect.Power(metric=kwollect.get_g5k_target_metric()) ], times=args.repeat ) + diff --git a/Run/measure_1.py b/Run/measure_1.py new file mode 100644 index 0000000..1863c81 --- /dev/null +++ b/Run/measure_1.py @@ -0,0 +1,91 @@ +#python3 measure.py -c config_instance1.json -x test -r repeat + +from pathlib import Path +import os +import argparse +import json +import time +import expetator.experiment as experiment +from expetator.monitors import Mojitos, kwollect +from expetator.leverages import Dvfs + +# Determine script directory +current_dir = Path(__file__).resolve().parent +parent_dir = current_dir.parent + +# Set up argument parser +parser = argparse.ArgumentParser(description="Run a benchmark experiment using a specified config file.") +parser.add_argument( + "-x", "--suffix", type=str, required=True, + help="Suffix for the log directory (e.g., experiment name or timestamp)" +) +parser.add_argument( + "-c", "--config", type=str, required=True, + help="Path to the config file (e.g., config_instance1.json or a glob pattern like config_instance*.json)" +) +parser.add_argument( + "-r", "--repeat", type=int, default=1, required=True, + help="Number of repeatation (e.g., 2, the exp will be repeated in 2 times)" +) + +# Parse arguments +args = parser.parse_args() + +# Dynamically set the path to the config.json file +config_path = os.path.join(current_dir, args.config) + +# Read the output directory from config.json +try: + with open(config_path, "r") as file: + config = json.load(file) +except FileNotFoundError: + print(f"Error: Config file {config_path} not found!") + exit(1) +except json.JSONDecodeError: + print(f"Error: Config file {config_path} contains invalid JSON!") + exit(1) + +# Base log directory and instance name from config.json +log_dir = config["output_dir"] +instance_name = config.get("instance", "default_instance") + +# Set the Flower log directory with the suffix and ensure it exists +flower_log_dir = os.path.join(log_dir, f"Flower_{args.suffix}", f"Flower_instance_{instance_name}", "Expetator") +os.makedirs(flower_log_dir, exist_ok=True) + +# Path to the script that will be executed +script_dir = os.path.join(current_dir, 'run_flwr.py') + +# SCP the config file to the destination + +scp_command = f"scp {config_path} {flower_log_dir}" +print(f"Executing SCP command: {scp_command}") +os.system(scp_command) + +class DemoBench: + def __init__(self, params=[args.suffix]): + self.names = {"flower"} + self.params = params + + def build(self, executor): + params = {"flower": self.params} + return params + + def run(self, bench, param, executor): + before = time.time() + # Run the Flower script with the provided suffix argument + executor.local(f"python3 {script_dir} -c {args.config} -x {args.suffix}") + return time.time() - before, "flower" + +if __name__ == "__main__": + experiment.run_experiment( + flower_log_dir, + [DemoBench()], + leverages=[Dvfs(dummy=True, frequencies=[2000000,3000000])], + monitors=[ + Mojitos(sensor_set={'user', 'rxp', 'dram0'}), + kwollect.Power(metric=kwollect.get_g5k_target_metric()) + ], + times=args.repeat + ) + diff --git a/Run/run_flwr.py b/Run/run_flwr.py index 5b48054..680ba43 100644 --- a/Run/run_flwr.py +++ b/Run/run_flwr.py @@ -63,81 +63,6 @@ flower_dir = os.path.join(output_dir, f"Flower_{experiment_suffix}") log_exp_dir = os.path.join(flower_dir, f"Flower_instance_{instance_name}", f"Flwr_{current_time}") os.makedirs(log_exp_dir, exist_ok=True) -# Define the log-checking function -def check_flower_logs(client_logs, server_log, output_file): - """ - Check Flower client and server logs for success indicators. - - Args: - client_logs (list of str): Paths to the client log files. - server_log (str): Path to the server log file. - output_file (str): Path to the output log file to write the result. - """ - client_success_keywords = ["Sent reply", "Received", "Disconnect and shut down"] - server_success_keywords = ["[SUMMARY]", "Run finished", "History (metrics, distributed, evaluate)"] - - # Helper function to check if all keywords are in the log content - def check_keywords_in_log(log_path, keywords): - if not os.path.exists(log_path): - return False, f"Log file not found: {log_path}" - try: - with open(log_path, "r") as file: - lines = file.readlines()[-10:] # Read the last 10 lines for efficiency - log_content = "".join(lines) - return all(keyword in log_content for keyword in keywords), log_content - except Exception as e: - return False, f"Error reading log file {log_path}: {e}" - - # Check client logs - client_results = [] - for log_path in client_logs: - success, log_content = check_keywords_in_log(log_path, client_success_keywords) - client_results.append(success) - if not success: - print(f"Client log check failed: {log_path}\nLog Content:\n{log_content}") - - # Check server log - server_success, server_log_content = check_keywords_in_log(server_log, server_success_keywords) - if not server_success: - print(f"Server log check failed: {server_log}\nLog Content:\n{server_log_content}") - - # Determine final success status - if all(client_results) and server_success: - result = "Success all" - else: - result = "Failure" - - # Write the result to the output file - with open(output_file, "w") as out_file: - out_file.write(f"Result: {result}\n") - if server_success: - out_file.write(f"Server log content:\n{server_log_content}\n") - else: - out_file.write("Server log check failed.\n") - for idx, (success, log_path) in enumerate(zip(client_results, client_logs)): - if success: - out_file.write(f"Client {idx + 1} ({log_path}) log check passed.\n") - else: - out_file.write(f"Client {idx + 1} ({log_path}) log check failed.\n") - - print(f"Check completed. Result: {result}. Details written to {output_file}") - return result - -# Function to run kill.py on failure -def run_kill_script(): - """ - Run the kill.py script if the experiment fails. - """ - kill_script_path = os.path.join(current_dir, "kill.py") - if not os.path.exists(kill_script_path): - print("kill.py script not found.") - return - try: - print("Executing kill.py due to failure...") - subprocess.run(["python3", kill_script_path], check=True) - except subprocess.CalledProcessError as e: - print(f"Error executing kill.py: {e}") - # Run the server and clients try: # Start server process and store it @@ -172,15 +97,7 @@ try: for client_process in client_processes: client_process.wait() - print("========== Checking Logs ==========") - output_file = os.path.join(log_exp_dir, "flower_log_summary.txt") - result = check_flower_logs(client_log_paths, server_log_path, output_file) - - if result == "Failure": - run_kill_script() - - print("========== Experiment Finished ==========") - except Exception as e: print(f"An error occurred: {e}") sys.exit(1) + -- GitLab