Skip to content
Snippets Groups Projects
run_flwr.py 4.77 KiB
# python3 run_flwr_1.py -c config_instances.json -i 1 -x 1

import os
import sys
import json
import subprocess
from pathlib import Path
from datetime import datetime
import argparse

# structure def main to call in other python file
def main(config_path, instance, suffix):
    print(f"Running Flower instance with instance: {instance} and suffix: {suffix}")

    config_path = os.path.abspath(config_path)
    config_dir = os.path.dirname(config_path)
    
    # read json
    try:
        with open(config_path, "r") as file:
            config_data = json.load(file)
    except FileNotFoundError:
        print(f"Error: Config file {config_path} not found!")
        sys.exit(1)
    except json.JSONDecodeError:
        print(f"Error: Config file {config_path} contains invalid JSON!")
        sys.exit(1)

    # get instance info
    instance_config = config_data.get("instances", {}).get(str(instance)) 
    if not instance_config:
        print(f"Error: Instance {instance} not found in config.json!")
        sys.exit(1)

    # get server/client info
    output_dir = os.path.abspath(os.path.join(config_dir, instance_config["output_dir"])) if instance_config["output_dir"].startswith("./") else instance_config["output_dir"]
    instance_name = instance_config.get("instance", f"default_instance_{instance}")
    server_ip = instance_config["server"]["ip"]
    server_port = instance_config["server"]["port"]

    # Convert server args to absolute paths if needed
    server_args = [
        os.path.abspath(os.path.join(config_dir, arg)) if arg.startswith("./") else arg
        for arg in instance_config["server"]["args"]
    ]
    server_command = [instance_config["server"]["command"], *server_args]
    
    # Convert client args to absolute paths if needed
    clients = instance_config["clients"]
    client_commands = []
    for client in clients:
        client_ip = client["ip"]
        client_args = [
            os.path.abspath(os.path.join(config_dir, arg)) if arg.startswith("./") else arg
            for arg in client["args"]
        ]
        client_command = [client["command"], *client_args, f'"{server_ip}:{server_port}"']
        client_commands.append({"ip": client_ip, "command": client_command})

    # print(server_command, client_commands)
    # print(output_dir)
    # return
    # Create log for experiment Flower
    current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
    flower_dir = os.path.join(output_dir, f"Flower_{suffix}")
    log_exp_dir = os.path.join(flower_dir, f"Flower_instance_{instance_name}", f"Flwr_{current_time}")
    os.makedirs(log_exp_dir, exist_ok=True)

    # Run server and clients
    try:
        print(f"========== Run Server on {server_ip} ==========")
        server_log_path = os.path.join(log_exp_dir, f"Server_{server_ip}")
        with open(server_log_path, "w") as log_file:
            server_command_with_log = server_command.copy()  
            if "logger" in instance_config["server"]["modules"]:  
                server_command_with_log.extend(["--log_dir", log_exp_dir])  # Nếu có, thêm log_dir

            server_process = subprocess.Popen(
                ["oarsh", server_ip, *server_command_with_log],  # Chạy lệnh server (có hoặc không có log_dir)
                stdout=log_file, stderr=subprocess.STDOUT
            )

            # server_process = subprocess.Popen(
            #     ["oarsh", server_ip, *server_command],
            #     stdout=log_file, stderr=subprocess.STDOUT
            # )

        client_processes = []
        for client in client_commands:
            client_ip = client["ip"]
            client_command = client["command"]
            if client_ip:
                print(f"========== Run Client on {client_ip} ==========")
                client_log_path = os.path.join(log_exp_dir, f"Client_{client_ip}")
                with open(client_log_path, "w") as log_file:
                    client_process = subprocess.Popen(
                        ["oarsh", client_ip, *client_command],
                        stdout=log_file, stderr=subprocess.STDOUT
                    )
                    client_processes.append(client_process)

        print("========== Waiting for processes to complete ==========")
        server_process.wait()
        for client_process in client_processes:
            client_process.wait()
            
    except Exception as e:
        print(f"An error occurred: {e}")
        sys.exit(1)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Run Flower server and clients with specified config file.")
    parser.add_argument("-c", "--config", type=str, required=True)
    parser.add_argument("-i", "--instance", type=str, required=True)
    parser.add_argument("-x", "--suffix", type=str, required=True)
    args = parser.parse_args()

    main(args.config, args.instance, args.suffix)