From b410268711b949fecebcb8b45181d0b173b03dc2 Mon Sep 17 00:00:00 2001 From: Jannis Klinkenberg <j.klinkenberg@itc.rwth-aachen.de> Date: Fri, 8 Nov 2024 16:31:28 +0100 Subject: [PATCH] added simple cifar examples --- tensorflow/cifar10/set_vars.sh | 23 ++++ tensorflow/cifar10/submit_job_container.sh | 46 +++++++ tensorflow/cifar10/submit_job_venv.sh | 49 +++++++ tensorflow/cifar10/train_model.py | 122 ++++++++++++++++++ .../cifar10_distributed/execution_wrapper.sh | 8 -- tensorflow/cifar10_distributed/set_vars.sh | 11 +- .../submit_job_container.sh | 3 + .../cifar10_distributed/submit_job_venv.sh | 51 ++++++++ tensorflow/cifar10_distributed/train_model.py | 45 +++---- 9 files changed, 325 insertions(+), 33 deletions(-) create mode 100644 tensorflow/cifar10/set_vars.sh create mode 100644 tensorflow/cifar10/submit_job_container.sh create mode 100644 tensorflow/cifar10/submit_job_venv.sh create mode 100644 tensorflow/cifar10/train_model.py create mode 100644 tensorflow/cifar10_distributed/submit_job_venv.sh diff --git a/tensorflow/cifar10/set_vars.sh b/tensorflow/cifar10/set_vars.sh new file mode 100644 index 0000000..6333adb --- /dev/null +++ b/tensorflow/cifar10/set_vars.sh @@ -0,0 +1,23 @@ +#!/usr/bin/zsh + +export RANK=${SLURM_PROCID} +export LOCAL_RANK=${SLURM_LOCALID} +export WORLD_SIZE=${SLURM_NTASKS} + +# make variables also available inside container +export APPTAINERENV_RANK=${RANK} +export APPTAINERENV_LOCAL_RANK=${LOCAL_RANK} +export APPTAINERENV_WORLD_SIZE=${WORLD_SIZE} +export APPTAINERENV_TMP="/tmp" + +export APPTAINERENV_TF_CPP_MIN_LOG_LEVEL=${TF_CPP_MIN_LOG_LEVEL} +export APPTAINERENV_TF_GPU_THREAD_MODE=${TF_GPU_THREAD_MODE} +export APPTAINERENV_NCCL_SOCKET_NTHREADS=${NCCL_SOCKET_NTHREADS} +export APPTAINERENV_NCCL_DEBUG=${NCCL_DEBUG} + +# make additional SLURM variables available inside container +export APPTAINERENV_SLURM_CPUS_PER_TASK=${SLURM_CPUS_PER_TASK} +export APPTAINERENV_SLURM_NTASKS_PER_NODE=${SLURM_NTASKS_PER_NODE} +export APPTAINERENV_SLURM_NNODES=${SLURM_NNODES} +export APPTAINERENV_SLURM_JOB_NODELIST=${SLURM_JOB_NODELIST} +export APPTAINERENV_R_WLM_ABAQUSHOSTLIST="${R_WLM_ABAQUSHOSTLIST}" diff --git a/tensorflow/cifar10/submit_job_container.sh b/tensorflow/cifar10/submit_job_container.sh new file mode 100644 index 0000000..fa9b7b5 --- /dev/null +++ b/tensorflow/cifar10/submit_job_container.sh @@ -0,0 +1,46 @@ +#!/usr/bin/zsh +############################################################ +### Slurm flags +############################################################ + +#SBATCH --time=00:15:00 +#SBATCH --partition=c23g +#SBATCH --nodes=1 +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=24 +#SBATCH --gres=gpu:1 + +############################################################ +### Load modules or software +############################################################ + +# load module for PyTorch container +module load TensorFlow/nvcr-24.01-tf2-py3 +module list + +############################################################ +### Parameters and Settings +############################################################ + +# print some information about current system +echo "Job nodes: ${SLURM_JOB_NODELIST}" +echo "Current machine: $(hostname)" +nvidia-smi + +export NCCL_DEBUG=INFO +export TF_CPP_MIN_LOG_LEVEL=1 # disable info messages +export TF_GPU_THREAD_MODE='gpu_private' +export NCCL_SOCKET_NTHREADS=8 # multi-threading for NCCL communication + +############################################################ +### Execution (Model Training) +############################################################ + +# tensorflow in container often needs a tmp directory +NEWTMP=$(pwd)/tmp +mkdir -p ${NEWTMP} + +# run the python script inside the container +source set_vars.sh +apptainer exec -e --nv -B ${NEWTMP}:/tmp ${TENSORFLOW_IMAGE} \ + bash -c "python -W ignore train_model.py"' diff --git a/tensorflow/cifar10/submit_job_venv.sh b/tensorflow/cifar10/submit_job_venv.sh new file mode 100644 index 0000000..cc27dd6 --- /dev/null +++ b/tensorflow/cifar10/submit_job_venv.sh @@ -0,0 +1,49 @@ +#!/usr/bin/zsh +############################################################ +### Slurm flags +############################################################ + +#SBATCH --time=00:15:00 +#SBATCH --partition=c23g +#SBATCH --nodes=1 +#SBATCH --ntasks-per-node=2 +#SBATCH --cpus-per-task=24 +#SBATCH --gres=gpu:2 +#SBATCH --account=supp0001 + +############################################################ +### Load modules or software +############################################################ + +# TODO: activate your desired virtual environment +module purge +module load GCC/11.3.0 +module load OpenMPI/4.1.4 +module load CMake/3.21.1 +module load Python/3.9.6 +module load NCCL/2.20.5-CUDA-12.4.0 +module load cuDNN/8.9.7.29-CUDA-12.3.0 + +source /work/jk869269/venvs/tensorflow-2.17_CUDA-12.3/bin/activate + +############################################################ +### Parameters and Settings +############################################################ + +# print some information about current system +echo "Job nodes: ${SLURM_JOB_NODELIST}" +echo "Current machine: $(hostname)" +nvidia-smi + +export NCCL_DEBUG=INFO +export TF_CPP_MIN_LOG_LEVEL=1 # disable info messages +export TF_GPU_THREAD_MODE='gpu_private' +export NCCL_SOCKET_NTHREADS=8 # multi-threading for NCCL communication + +############################################################ +### Execution (Model Training) +############################################################ + +# run the python script +source set_vars.sh +python -W ignore train_model.py \ No newline at end of file diff --git a/tensorflow/cifar10/train_model.py b/tensorflow/cifar10/train_model.py new file mode 100644 index 0000000..1251e61 --- /dev/null +++ b/tensorflow/cifar10/train_model.py @@ -0,0 +1,122 @@ +from __future__ import print_function +import numpy as np +import os, sys +import argparse +import datetime +import tensorflow as tf +from tensorflow.keras.optimizers import Adam +from tensorflow.keras import backend as K +from tensorflow.keras.datasets import cifar10 +import tensorflow.keras.applications as applications + +def parse_command_line(): + parser = argparse.ArgumentParser() + parser.add_argument("--device", required=False, type=str, choices=["cpu", "cuda"], default="cuda") + parser.add_argument("--num_epochs", required=False, type=int, default=5) + parser.add_argument("--batch_size", required=False, type=int, default=128) + parser.add_argument("--verbosity", required=False, help="Keras verbosity level for training/evaluation", type=int, default=2) + parser.add_argument("--num_intraop_threads", required=False, help="Number of intra-op threads", type=int, default=None) + parser.add_argument("--num_interop_threads", required=False, help="Number of inter-op threads", type=int, default=None) + parser.add_argument("--tensorboard", required=False, help="Whether to use tensorboard callback", action="store_true", default=False) + parser.add_argument("--profile_batches", required=False, help='Batches to profile with for tensorboard. Format "batch_start,batch_end"', type=str, default="2,5") + args = parser.parse_args() + + # specific to cifar 10 dataset + args.num_classes = 10 + + print("Settings:") + settings_map = vars(args) + for name in sorted(settings_map.keys()): + print("--" + str(name) + ": " + str(settings_map[name])) + print("") + sys.stdout.flush() + + return args + +def load_dataset(args): + K.set_image_data_format("channels_last") + + # load the cifar10 data + (x_train, y_train), (x_test, y_test) = cifar10.load_data() + + # convert class vectors to binary class matrices. + y_train = tf.keras.utils.to_categorical(y_train, args.num_classes) + y_test = tf.keras.utils.to_categorical(y_test, args.num_classes) + + # normalize base data + x_train = x_train.astype("float32") / 255 + x_test = x_test.astype("float32") / 255 + x_train_mean = np.mean(x_train, axis=0) + x_train -= x_train_mean + x_test -= x_train_mean + + print("x_train shape:", x_train.shape) + print("y_train shape:", y_train.shape) + print(x_train.shape[0], "train samples") + print(x_test.shape[0], "test samples") + sys.stdout.flush() + + return (x_train, y_train), (x_test, y_test) + +def setup(args): + if args.num_intraop_threads: + tf.config.threading.set_intra_op_parallelism_threads(args.num_intraop_threads) + if args.num_interop_threads: + tf.config.threading.set_inter_op_parallelism_threads(args.num_interop_threads) + + print(f"Tensorflow get_intra_op_parallelism_threads: {tf.config.threading.get_intra_op_parallelism_threads()}") + print(f"Tensorflow get_inter_op_parallelism_threads: {tf.config.threading.get_inter_op_parallelism_threads()}") + + l_gpu_devices = [] if args.device == "cpu" else tf.config.list_physical_devices("GPU") + print("List of GPU devices found:") + for dev in l_gpu_devices: + print(str(dev.device_type) + ": " + dev.name) + print("") + sys.stdout.flush() + + tf.keras.backend.clear_session() + tf.config.optimizer.set_jit(True) + +def main(): + # parse command line arguments + args = parse_command_line() + + # run setup (e.g., create distributed environment if desired) + setup(args) + + # data set loading + (x_train, y_train), (x_test, y_test) = load_dataset(args) + n_train, n_test = x_train.shape[0], x_test.shape[0] + input_shape = x_train.shape[1:] + + # Generating input pipelines + ds_train = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(n_train).cache().batch(args.batch_size).prefetch(tf.data.AUTOTUNE) + ds_test = ds_test = tf.data.Dataset.from_tensor_slices((x_test, y_test)).shuffle(n_test).cache().batch(args.batch_size).prefetch(tf.data.AUTOTUNE) + + # callbacks to register + callbacks = [] + + model = applications.ResNet50(weights=None, input_shape=input_shape, classes=args.num_classes) + # model.summary() # display the model architecture + cur_optimizer = Adam(0.001) + model.compile(loss="categorical_crossentropy", optimizer=cur_optimizer, metrics=["accuracy"]) + + # callbacks to register + if args.tensorboard: + tensorboard_callback = tf.keras.callbacks.TensorBoard( + log_dir=os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S")), + histogram_freq=1, + profile_batch=args.profile_batches, + ) + callbacks.append(tensorboard_callback) + + # train the model + model.fit(ds_train, epochs=args.num_epochs, verbose=args.verbosity, callbacks=callbacks) + + # evaluate model + scores = model.evaluate(ds_test, verbose=args.verbosity) + print(f"Test Evaluation: Accuracy: {scores[1]}") + sys.stdout.flush() + +if __name__ == "__main__": + main() diff --git a/tensorflow/cifar10_distributed/execution_wrapper.sh b/tensorflow/cifar10_distributed/execution_wrapper.sh index ae1df9b..997737c 100644 --- a/tensorflow/cifar10_distributed/execution_wrapper.sh +++ b/tensorflow/cifar10_distributed/execution_wrapper.sh @@ -1,12 +1,4 @@ #!/usr/bin/zsh -############################################################ -### Parameters & Directories -############################################################ - -export TF_CPP_MIN_LOG_LEVEL=1 # disable info messages -export TF_GPU_THREAD_MODE='gpu_private' -export NCCL_SOCKET_NTHREADS=8 # multi-threading for NCCL communication - ############################################################ ### Set TF_CONFIG ############################################################ diff --git a/tensorflow/cifar10_distributed/set_vars.sh b/tensorflow/cifar10_distributed/set_vars.sh index 19405a4..6333adb 100644 --- a/tensorflow/cifar10_distributed/set_vars.sh +++ b/tensorflow/cifar10_distributed/set_vars.sh @@ -1,16 +1,21 @@ -#!/usr/local_rwth/bin/zsh +#!/usr/bin/zsh export RANK=${SLURM_PROCID} export LOCAL_RANK=${SLURM_LOCALID} export WORLD_SIZE=${SLURM_NTASKS} -# make variables also available inside singularity container +# make variables also available inside container export APPTAINERENV_RANK=${RANK} export APPTAINERENV_LOCAL_RANK=${LOCAL_RANK} export APPTAINERENV_WORLD_SIZE=${WORLD_SIZE} export APPTAINERENV_TMP="/tmp" -# make additional SLURM variables available to container +export APPTAINERENV_TF_CPP_MIN_LOG_LEVEL=${TF_CPP_MIN_LOG_LEVEL} +export APPTAINERENV_TF_GPU_THREAD_MODE=${TF_GPU_THREAD_MODE} +export APPTAINERENV_NCCL_SOCKET_NTHREADS=${NCCL_SOCKET_NTHREADS} +export APPTAINERENV_NCCL_DEBUG=${NCCL_DEBUG} + +# make additional SLURM variables available inside container export APPTAINERENV_SLURM_CPUS_PER_TASK=${SLURM_CPUS_PER_TASK} export APPTAINERENV_SLURM_NTASKS_PER_NODE=${SLURM_NTASKS_PER_NODE} export APPTAINERENV_SLURM_NNODES=${SLURM_NNODES} diff --git a/tensorflow/cifar10_distributed/submit_job_container.sh b/tensorflow/cifar10_distributed/submit_job_container.sh index 3ffe0d1..8fc8291 100644 --- a/tensorflow/cifar10_distributed/submit_job_container.sh +++ b/tensorflow/cifar10_distributed/submit_job_container.sh @@ -28,6 +28,9 @@ echo "Current machine: $(hostname)" nvidia-smi export NCCL_DEBUG=INFO +export TF_CPP_MIN_LOG_LEVEL=1 # disable info messages +export TF_GPU_THREAD_MODE='gpu_private' +export NCCL_SOCKET_NTHREADS=8 # multi-threading for NCCL communication ############################################################ ### Execution (Model Training) diff --git a/tensorflow/cifar10_distributed/submit_job_venv.sh b/tensorflow/cifar10_distributed/submit_job_venv.sh new file mode 100644 index 0000000..be3ff76 --- /dev/null +++ b/tensorflow/cifar10_distributed/submit_job_venv.sh @@ -0,0 +1,51 @@ +#!/usr/bin/zsh +############################################################ +### Slurm flags +############################################################ + +#SBATCH --time=00:15:00 +#SBATCH --partition=c23g +#SBATCH --nodes=1 +#SBATCH --ntasks-per-node=2 +#SBATCH --cpus-per-task=24 +#SBATCH --gres=gpu:2 +#SBATCH --account=supp0001 + +############################################################ +### Load modules or software +############################################################ + +# TODO: activate your desired virtual environment +module purge +module load GCC/11.3.0 +module load OpenMPI/4.1.4 +module load CMake/3.21.1 +module load Python/3.9.6 +module load NCCL/2.20.5-CUDA-12.4.0 +module load cuDNN/8.9.7.29-CUDA-12.3.0 + +source /work/jk869269/venvs/tensorflow-2.17_CUDA-12.3/bin/activate + +############################################################ +### Parameters and Settings +############################################################ + +# print some information about current system +echo "Job nodes: ${SLURM_JOB_NODELIST}" +echo "Current machine: $(hostname)" +nvidia-smi + +export NCCL_DEBUG=INFO +export TF_CPP_MIN_LOG_LEVEL=1 # disable info messages +export TF_GPU_THREAD_MODE='gpu_private' +export NCCL_SOCKET_NTHREADS=8 # multi-threading for NCCL communication + +############################################################ +### Execution (Model Training) +############################################################ + +# each process sets required environment variables and +# runs the python script +srun zsh -c '\ + source set_vars.sh && \ + zsh ./execution_wrapper.sh' \ No newline at end of file diff --git a/tensorflow/cifar10_distributed/train_model.py b/tensorflow/cifar10_distributed/train_model.py index 41c29e8..ab0c3c0 100644 --- a/tensorflow/cifar10_distributed/train_model.py +++ b/tensorflow/cifar10_distributed/train_model.py @@ -14,7 +14,6 @@ def parse_command_line(): parser.add_argument("--device", required=False, type=str, choices=["cpu", "cuda"], default="cuda") parser.add_argument("--num_epochs", required=False, type=int, default=5) parser.add_argument("--batch_size", required=False, type=int, default=128) - parser.add_argument("--num_workers", required=False, type=int, default=1) parser.add_argument("--distributed", required=False, action="store_true", default=False) parser.add_argument("--verbosity", required=False, help="Keras verbosity level for training/evaluation", type=int, default=2) parser.add_argument("--num_intraop_threads", required=False, help="Number of intra-op threads", type=int, default=None) @@ -78,19 +77,18 @@ def load_dataset(args): return (x_train, y_train), (x_test, y_test) -def setup(args) -> None: +def setup(args): if args.num_intraop_threads: tf.config.threading.set_intra_op_parallelism_threads(args.num_intraop_threads) if args.num_interop_threads: tf.config.threading.set_inter_op_parallelism_threads(args.num_interop_threads) + l_gpu_devices = [] if args.device == "cpu" else tf.config.list_physical_devices("GPU") + if args.world_rank == 0: print(f"Tensorflow get_intra_op_parallelism_threads: {tf.config.threading.get_intra_op_parallelism_threads()}") print(f"Tensorflow get_inter_op_parallelism_threads: {tf.config.threading.get_inter_op_parallelism_threads()}") - sys.stdout.flush() - l_gpu_devices = [] if args.device == "cpu" else tf.config.list_physical_devices("GPU") - if args.world_rank == 0: print("List of GPU devices found:") for dev in l_gpu_devices: print(str(dev.device_type) + ": " + dev.name) @@ -101,13 +99,6 @@ def setup(args) -> None: tf.keras.backend.clear_session() tf.config.optimizer.set_jit(True) -def main(): - # parse command line arguments - args = parse_command_line() - - # run setup (e.g., create distributed environment if desired) - setup(args) - # define data parallel strategy for distrbuted training strategy = tf.distribute.MultiWorkerMirroredStrategy( communication_options=tf.distribute.experimental.CommunicationOptions( @@ -115,6 +106,15 @@ def main(): ) ) + return strategy + +def main(): + # parse command line arguments + args = parse_command_line() + + # run setup (e.g., create distributed environment if desired) + strategy = setup(args) + # data set loading (x_train, y_train), (x_test, y_test) = load_dataset(args) n_train, n_test = x_train.shape[0], x_test.shape[0] @@ -133,22 +133,23 @@ def main(): cur_optimizer = Adam(0.001) model.compile(loss="categorical_crossentropy", optimizer=cur_optimizer, metrics=["accuracy"]) - # callbacks to register - if args.tensorboard: - tensorboard_callback = tf.keras.callbacks.TensorBoard( - log_dir=os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S")), - histogram_freq=1, - profile_batch=args.profile_batches, - ) - callbacks.append(tensorboard_callback) + # callbacks to register + if args.tensorboard: + tensorboard_callback = tf.keras.callbacks.TensorBoard( + log_dir=os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S")), + histogram_freq=1, + profile_batch=args.profile_batches, + ) + callbacks.append(tensorboard_callback) # train the model model.fit(ds_train, epochs=args.num_epochs, verbose=args.verbosity, callbacks=callbacks) # evaluate model scores = model.evaluate(ds_test, verbose=args.verbosity) - print(f"Test Evaluation: Accuracy: {scores[1]}") - sys.stdout.flush() + if args.world_rank == 0: + print(f"Test Evaluation: Accuracy: {scores[1]}") + sys.stdout.flush() if __name__ == "__main__": main() -- GitLab