Skip to content
Snippets Groups Projects
Commit 3214589c authored by HuongDM1896's avatar HuongDM1896
Browse files

add get_freq

parent 674251e4
No related branches found
No related tags found
No related merge requests found
from typing import List, Tuple
from flwr.server import ServerApp, ServerConfig
from flwr.server.strategy import FedAvg, FedMedian
from flwr.common import Metrics
from flwr.server.strategy import FedAvg, FedMedian, FedOpt, FedAdam
from flwr.common import Metrics, Parameters
import argparse
import tensorflow as tf
import flwr
# Create the argument parser
parser = argparse.ArgumentParser(description="Server setting for FLWR")
......@@ -17,8 +18,8 @@ parser.add_argument(
"--strategy",
type=str,
default="fedAvg",
choices=["fedAvg", "fedMedian"], # Add options strategies
help="Select strategy: fedAvg or fedMedian"
choices=["fedAvg", "fedMedian", "fedOpt", "fedAdam"], # Add options strategies
help="Select strategy: fedAvg, fedMedian, fedOpt or fedAdam"
)
# Parse the arguments from the command line
......@@ -33,12 +34,27 @@ def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
# Aggregate and return custom metric (weighted average)
return {"accuracy": sum(accuracies) / sum(examples)}
# Load model for server-side parameter initialization
model = tf.keras.applications.MobileNetV2(
input_shape=(32, 32, 3), weights=None, classes=10
)
model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"])
# Get model weights as a list of NumPy ndarray's
weights = model.get_weights()
# Serialize ndarrays to `Parameters`
parameters = flwr.common.ndarrays_to_parameters(weights)
# Select the strategy based on the input argument
if args.strategy == "fedAvg":
strategy = FedAvg(evaluate_metrics_aggregation_fn=weighted_average)
elif args.strategy == "fedMedian":
strategy = FedMedian(evaluate_metrics_aggregation_fn=weighted_average)
elif args.strategy == "fedOpt":
strategy = FedOpt(evaluate_metrics_aggregation_fn=weighted_average, initial_parameters=parameters)
elif args.strategy == "fedAdam":
strategy = FedAdam(evaluate_metrics_aggregation_fn=weighted_average, initial_parameters=parameters)
# Define config
config = ServerConfig(num_rounds=args.rounds)
......@@ -59,4 +75,4 @@ if __name__ == "__main__":
server_address="0.0.0.0:8080",
config=config,
strategy=strategy,
)
\ No newline at end of file
)
import csv
import time
import os
from datetime import datetime
from typing import List, Tuple, Dict, Optional
from flwr.common import Parameters, Metrics
from flwr.server.strategy import Strategy
from flwr.server.client_manager import ClientManager
from flwr.server.client_proxy import ClientProxy
from flwr.server import ServerApp, ServerConfig
from flwr.server.strategy import FedAvg, FedMedian, FedOpt, FedAdam
import argparse
import tensorflow as tf
import flwr
# Create the folder and generate the CSV filename dynamically
def create_csv_filepath(strategy_name: str, num_rounds: int) -> str:
# Determine the parent directory of the current script
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Create the "Log_autres" directory if it doesn't exist
log_dir = os.path.join(parent_dir, "Log_autres")
os.makedirs(log_dir, exist_ok=True)
# Generate a timestamp for the file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Generate the CSV file path
csv_filename = f"training_results_{strategy_name}_{num_rounds}_{timestamp}.csv"
csv_filepath = os.path.join(log_dir, csv_filename)
return csv_filepath
class CustomStrategyWrapper(Strategy):
def __init__(self, strategy: Strategy, csv_filename: str):
self.strategy = strategy
self.csv_filename = csv_filename
# Initialize CSV file and write header
with open(self.csv_filename, mode="w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["round", "time", "accuracy"]) # Header
def initialize_parameters(self, client_manager: ClientManager) -> Optional[Parameters]:
return self.strategy.initialize_parameters(client_manager)
def configure_fit(
self, server_round: int, parameters: Parameters, client_manager: ClientManager
):
# Log the round start time
self.round_start_time = time.time()
return self.strategy.configure_fit(server_round, parameters, client_manager)
def aggregate_fit(
self,
server_round: int,
results: List[Tuple[ClientProxy, Metrics]],
failures: List[BaseException],
):
# Call the actual strategy's aggregate_fit
return self.strategy.aggregate_fit(server_round, results, failures)
def configure_evaluate(
self, server_round: int, parameters: Parameters, client_manager: ClientManager
):
return self.strategy.configure_evaluate(server_round, parameters, client_manager)
def aggregate_evaluate(
self,
server_round: int,
results: List[Tuple[ClientProxy, Metrics]],
failures: List[BaseException],
):
# Call the actual strategy's aggregate_evaluate
aggregated_metrics = self.strategy.aggregate_evaluate(server_round, results, failures)
# Calculate round duration
round_time = time.time() - self.round_start_time
# Extract accuracy from aggregated_metrics
accuracy = aggregated_metrics[1].get("accuracy", 0) if aggregated_metrics else 0
# Append data to CSV
with open(self.csv_filename, mode="a", newline="") as f:
writer = csv.writer(f)
writer.writerow([server_round, round_time, accuracy])
return aggregated_metrics
def evaluate(
self, server_round: int, parameters: Parameters
) -> Optional[Tuple[float, Metrics]]:
return self.strategy.evaluate(server_round, parameters)
# Create the argument parser
parser = argparse.ArgumentParser(description="Server setting for FLWR")
# Add the argument for num_rounds (-r or --rounds)
parser.add_argument("-r", "--rounds", type=int, default=10, help="Number of rounds")
# Add the argument for strategy (-s or --strategy)
parser.add_argument(
"-s",
"--strategy",
type=str,
default="fedAvg",
choices=["fedAvg", "fedMedian", "fedOpt", "fedAdam", "fedAvg2Clients"], # Add options strategies
help="Select strategy: fedAvg, fedMedian, fedOpt or fedAdam or fedAvg2Clients",
)
# Parse the arguments from the command line
args = parser.parse_args()
# Define metric aggregation function
def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
# Multiply accuracy of each client by number of examples used
accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
examples = [num_examples for num_examples, _ in metrics]
# Aggregate and return custom metric (weighted average)
return {"accuracy": sum(accuracies) / sum(examples)}
# Load model for server-side parameter initialization
model = tf.keras.applications.MobileNetV2(
input_shape=(32, 32, 3), weights=None, classes=10
)
model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"])
# Get model weights as a list of NumPy ndarray's
weights = model.get_weights()
# Serialize ndarrays to `Parameters`
parameters = flwr.common.ndarrays_to_parameters(weights)
# Select the base strategy
if args.strategy == "fedAvg":
base_strategy = FedAvg(evaluate_metrics_aggregation_fn=weighted_average)
elif args.strategy == "fedAvg2Clients":
base_strategy = FedAvg(
fraction_fit=0.67,
min_fit_clients=2,
min_available_clients=2,
evaluate_metrics_aggregation_fn=weighted_average,
)
elif args.strategy == "fedMedian":
base_strategy = FedMedian(evaluate_metrics_aggregation_fn=weighted_average)
elif args.strategy == "fedOpt":
base_strategy = FedOpt(
evaluate_metrics_aggregation_fn=weighted_average, initial_parameters=parameters
)
elif args.strategy == "fedAdam":
base_strategy = FedAdam(
evaluate_metrics_aggregation_fn=weighted_average, initial_parameters=parameters
)
# Generate the CSV file path based on strategy and number of rounds
csv_filepath = create_csv_filepath(strategy_name=args.strategy, num_rounds=args.rounds)
# Wrap the selected strategy with the logging wrapper
strategy = CustomStrategyWrapper(strategy=base_strategy, csv_filename=csv_filepath)
# Define config
config = ServerConfig(num_rounds=args.rounds)
# Legacy mode
if __name__ == "__main__":
from flwr.server import start_server
start_server(
server_address="0.0.0.0:8080",
config=config,
strategy=strategy,
)
{
"instance": "fedAvg_cifar10",
"output_dir": "/home/mdo/Test/eflwr/Log",
"output_dir": "/home/mdo/Framework/eflwr/Log",
"dvfs": {
"dummy": false,
"baseline": false,
"frequencies": [
1000000,
1400000,
1800000,
2200000
]
},
"server": {
"command": "python3",
"args": [
"/home/mdo/Test/eflwr/Flower_v1/server.py",
"-r 1",
"/home/mdo/Framework/eflwr/Flower_v1/server_1.py",
"-r 25",
"-s fedAvg"
],
"additional_env_var": [
""
],
"ip": "172.16.66.26",
"ip": "172.16.66.76",
"port": 8080
},
"clients": [
......@@ -19,37 +29,43 @@
"name": "client1",
"command": "python3",
"args": [
"/home/mdo/Test/eflwr/Flower_v1/client.py",
"cifar10"
"/home/mdo/Framework/eflwr/Flower_v1/client_1.py",
"cifar10",
"1",
"3"
],
"additional_env_var": [
""
],
"ip": "172.16.66.38"
"ip": "172.16.66.77"
},
{
"name": "client2",
"command": "python3",
"args": [
"/home/mdo/Test/eflwr/Flower_v1/client.py",
"cifar10"
"/home/mdo/Framework/eflwr/Flower_v1/client_1.py",
"cifar10",
"2",
"3"
],
"additional_env_var": [
""
],
"ip": "172.16.66.4"
"ip": "172.16.66.78"
},
{
"name": "client3",
"command": "python3",
"args": [
"/home/mdo/Test/eflwr/Flower_v1/client.py",
"cifar10"
"/home/mdo/Framework/eflwr/Flower_v1/client_1.py",
"cifar10",
"3",
"3"
],
"additional_env_var": [
""
],
"ip": "172.16.66.65"
"ip": "172.16.66.79"
}
]
}
}
\ No newline at end of file
{
"instance": "fedMedian_cifar10",
"output_dir": "/home/mdo/Test/eflwr/Log",
"instance": "fedAvg2Clients_cifar10",
"output_dir": "/home/mdo/Framework/eflwr/Log",
"dvfs": {
"dummy": false,
"baseline": false,
"frequencies": [
1000000,
1400000,
1800000,
2200000
]
},
"server": {
"command": "python3",
"args": [
"/home/mdo/Test/eflwr/Flower_v1/server.py",
"-r 1",
"-s fedMedian"
"/home/mdo/Framework/eflwr/Flower_v1/server_1.py",
"-r 25",
"-s fedAvg2Clients"
],
"additional_env_var": [
""
],
"ip": "172.16.66.26",
"ip": "172.16.66.76",
"port": 8080
},
"clients": [
......@@ -19,37 +29,43 @@
"name": "client1",
"command": "python3",
"args": [
"/home/mdo/Test/eflwr/Flower_v1/client.py",
"cifar10"
"/home/mdo/Framework/eflwr/Flower_v1/client_1.py",
"cifar10",
"1",
"3"
],
"additional_env_var": [
""
],
"ip": "172.16.66.38"
"ip": "172.16.66.77"
},
{
"name": "client2",
"command": "python3",
"args": [
"/home/mdo/Test/eflwr/Flower_v1/client.py",
"cifar10"
"/home/mdo/Framework/eflwr/Flower_v1/client_1.py",
"cifar10",
"2",
"3"
],
"additional_env_var": [
""
],
"ip": "172.16.66.4"
"ip": "172.16.66.78"
},
{
"name": "client3",
"command": "python3",
"args": [
"/home/mdo/Test/eflwr/Flower_v1/client.py",
"cifar10"
"/home/mdo/Framework/eflwr/Flower_v1/client_1.py",
"cifar10",
"3",
"3"
],
"additional_env_var": [
""
],
"ip": "172.16.66.65"
"ip": "172.16.66.79"
}
]
}
}
\ No newline at end of file
from expetator.leverages import dvfs
#import subprocess
#subprocess.run(["bash","/home/hdomai/.local/lib/python3.10/site-packages/expetator/leverages/dvfs_pct.sh", "init"], check=True)
frequencies, pcts = dvfs.get_dvfs_values()
print("Cpu freq available (min, max, delta):", frequencies)
f=list(frequencies)
print("List", f)
print("4 freqquencies for testing", f[0], f[len(f)//3], f[len(f)*2//3], f[-1])
......@@ -6,7 +6,9 @@ import argparse
import json
import time
import expetator.experiment as experiment
from expetator.monitors import Mojitos, kwollect
#from expetator.monitors import Mojitos, kwollect
from expetator.monitors import Mojitos
from expetator.leverages import Dvfs
# Determine script directory
current_dir = Path(__file__).resolve().parent
......@@ -48,6 +50,12 @@ except json.JSONDecodeError:
log_dir = config["output_dir"]
instance_name = config.get("instance", "default_instance")
# Extract DVFS configuration from the config file
dvfs_config = config.get("dvfs", {})
dvfs_dummy = dvfs_config.get("dummy", False)
dvfs_baseline = dvfs_config.get("baseline", False)
dvfs_frequencies = dvfs_config.get("frequencies", None)
# Set the Flower log directory with the suffix and ensure it exists
flower_log_dir = os.path.join(log_dir, f"Flower_{args.suffix}", f"Flower_instance_{instance_name}", "Expetator")
os.makedirs(flower_log_dir, exist_ok=True)
......@@ -55,6 +63,12 @@ os.makedirs(flower_log_dir, exist_ok=True)
# Path to the script that will be executed
script_dir = os.path.join(current_dir, 'run_flwr.py')
# SCP the config file to the destination
scp_command = f"scp {config_path} {flower_log_dir}"
print(f"Executing SCP command: {scp_command}")
os.system(scp_command)
class DemoBench:
def __init__(self, params=[args.suffix]):
self.names = {"flower"}
......@@ -69,15 +83,18 @@ class DemoBench:
# Run the Flower script with the provided suffix argument
executor.local(f"python3 {script_dir} -c {args.config} -x {args.suffix}")
return time.time() - before, "flower"
if __name__ == "__main__":
# Ensure DVFS settings are retrieved from the config file
dvfs = Dvfs(dummy=dvfs_dummy, baseline=dvfs_baseline, frequencies=dvfs_frequencies)
experiment.run_experiment(
flower_log_dir,
[DemoBench()],
leverages=[],
leverages=[dvfs],
monitors=[
Mojitos(sensor_set={'user', 'rxp', 'dram0'}),
kwollect.Power(metric=kwollect.get_g5k_target_metric())
Mojitos(sensor_set={'user', 'rxp', 'dram0'})
# kwollect.Power(metric=kwollect.get_g5k_target_metric())
],
times=args.repeat
)
#python3 measure.py -c config_instance1.json -x test -r repeat
from pathlib import Path
import os
import argparse
import json
import time
import expetator.experiment as experiment
from expetator.monitors import Mojitos, kwollect
from expetator.leverages import Dvfs
# Determine script directory
current_dir = Path(__file__).resolve().parent
parent_dir = current_dir.parent
# Set up argument parser
parser = argparse.ArgumentParser(description="Run a benchmark experiment using a specified config file.")
parser.add_argument(
"-x", "--suffix", type=str, required=True,
help="Suffix for the log directory (e.g., experiment name or timestamp)"
)
parser.add_argument(
"-c", "--config", type=str, required=True,
help="Path to the config file (e.g., config_instance1.json or a glob pattern like config_instance*.json)"
)
parser.add_argument(
"-r", "--repeat", type=int, default=1, required=True,
help="Number of repeatation (e.g., 2, the exp will be repeated in 2 times)"
)
# Parse arguments
args = parser.parse_args()
# Dynamically set the path to the config.json file
config_path = os.path.join(current_dir, args.config)
# Read the output directory from config.json
try:
with open(config_path, "r") as file:
config = json.load(file)
except FileNotFoundError:
print(f"Error: Config file {config_path} not found!")
exit(1)
except json.JSONDecodeError:
print(f"Error: Config file {config_path} contains invalid JSON!")
exit(1)
# Base log directory and instance name from config.json
log_dir = config["output_dir"]
instance_name = config.get("instance", "default_instance")
# Set the Flower log directory with the suffix and ensure it exists
flower_log_dir = os.path.join(log_dir, f"Flower_{args.suffix}", f"Flower_instance_{instance_name}", "Expetator")
os.makedirs(flower_log_dir, exist_ok=True)
# Path to the script that will be executed
script_dir = os.path.join(current_dir, 'run_flwr.py')
# SCP the config file to the destination
scp_command = f"scp {config_path} {flower_log_dir}"
print(f"Executing SCP command: {scp_command}")
os.system(scp_command)
class DemoBench:
def __init__(self, params=[args.suffix]):
self.names = {"flower"}
self.params = params
def build(self, executor):
params = {"flower": self.params}
return params
def run(self, bench, param, executor):
before = time.time()
# Run the Flower script with the provided suffix argument
executor.local(f"python3 {script_dir} -c {args.config} -x {args.suffix}")
return time.time() - before, "flower"
if __name__ == "__main__":
experiment.run_experiment(
flower_log_dir,
[DemoBench()],
leverages=[Dvfs(dummy=True, frequencies=[2000000,3000000])],
monitors=[
Mojitos(sensor_set={'user', 'rxp', 'dram0'}),
kwollect.Power(metric=kwollect.get_g5k_target_metric())
],
times=args.repeat
)
......@@ -63,81 +63,6 @@ flower_dir = os.path.join(output_dir, f"Flower_{experiment_suffix}")
log_exp_dir = os.path.join(flower_dir, f"Flower_instance_{instance_name}", f"Flwr_{current_time}")
os.makedirs(log_exp_dir, exist_ok=True)
# Define the log-checking function
def check_flower_logs(client_logs, server_log, output_file):
"""
Check Flower client and server logs for success indicators.
Args:
client_logs (list of str): Paths to the client log files.
server_log (str): Path to the server log file.
output_file (str): Path to the output log file to write the result.
"""
client_success_keywords = ["Sent reply", "Received", "Disconnect and shut down"]
server_success_keywords = ["[SUMMARY]", "Run finished", "History (metrics, distributed, evaluate)"]
# Helper function to check if all keywords are in the log content
def check_keywords_in_log(log_path, keywords):
if not os.path.exists(log_path):
return False, f"Log file not found: {log_path}"
try:
with open(log_path, "r") as file:
lines = file.readlines()[-10:] # Read the last 10 lines for efficiency
log_content = "".join(lines)
return all(keyword in log_content for keyword in keywords), log_content
except Exception as e:
return False, f"Error reading log file {log_path}: {e}"
# Check client logs
client_results = []
for log_path in client_logs:
success, log_content = check_keywords_in_log(log_path, client_success_keywords)
client_results.append(success)
if not success:
print(f"Client log check failed: {log_path}\nLog Content:\n{log_content}")
# Check server log
server_success, server_log_content = check_keywords_in_log(server_log, server_success_keywords)
if not server_success:
print(f"Server log check failed: {server_log}\nLog Content:\n{server_log_content}")
# Determine final success status
if all(client_results) and server_success:
result = "Success all"
else:
result = "Failure"
# Write the result to the output file
with open(output_file, "w") as out_file:
out_file.write(f"Result: {result}\n")
if server_success:
out_file.write(f"Server log content:\n{server_log_content}\n")
else:
out_file.write("Server log check failed.\n")
for idx, (success, log_path) in enumerate(zip(client_results, client_logs)):
if success:
out_file.write(f"Client {idx + 1} ({log_path}) log check passed.\n")
else:
out_file.write(f"Client {idx + 1} ({log_path}) log check failed.\n")
print(f"Check completed. Result: {result}. Details written to {output_file}")
return result
# Function to run kill.py on failure
def run_kill_script():
"""
Run the kill.py script if the experiment fails.
"""
kill_script_path = os.path.join(current_dir, "kill.py")
if not os.path.exists(kill_script_path):
print("kill.py script not found.")
return
try:
print("Executing kill.py due to failure...")
subprocess.run(["python3", kill_script_path], check=True)
except subprocess.CalledProcessError as e:
print(f"Error executing kill.py: {e}")
# Run the server and clients
try:
# Start server process and store it
......@@ -172,15 +97,7 @@ try:
for client_process in client_processes:
client_process.wait()
print("========== Checking Logs ==========")
output_file = os.path.join(log_exp_dir, "flower_log_summary.txt")
result = check_flower_logs(client_log_paths, server_log_path, output_file)
if result == "Failure":
run_kill_script()
print("========== Experiment Finished ==========")
except Exception as e:
print(f"An error occurred: {e}")
sys.exit(1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment