Skip to content
Snippets Groups Projects
Commit fabbd911 authored by Jannis Klinkenberg's avatar Jannis Klinkenberg
Browse files

updated scripts

parent 71d9a5de
No related branches found
No related tags found
No related merge requests found
...@@ -4,12 +4,15 @@ ...@@ -4,12 +4,15 @@
############################################################ ############################################################
export TF_CONFIG=$(python -W ignore create_tfconfig.py) export TF_CONFIG=$(python -W ignore create_tfconfig.py)
# limit visible devices to ensure correct number of replicas in TensorFlow MultiWorkerMirroredStrategy
export CUDA_VISIBLE_DEVICES=${SLURM_LOCALID}
############################################################ ############################################################
### Execution ### Execution
############################################################ ############################################################
# start model training # start model training
python -W ignore train_model.py --distributed python -W ignore train_model.py
# execute with XLA JIT # execute with XLA JIT
# TF_XLA_FLAGS="--tf_xla_auto_jit=2 --tf_xla_cpu_global_jit" python -W ignore train_model.py --distributed # TF_XLA_FLAGS="--tf_xla_auto_jit=2 --tf_xla_cpu_global_jit" python -W ignore train_model.py
\ No newline at end of file \ No newline at end of file
...@@ -16,6 +16,9 @@ export APPTAINERENV_NCCL_SOCKET_NTHREADS=${NCCL_SOCKET_NTHREADS} ...@@ -16,6 +16,9 @@ export APPTAINERENV_NCCL_SOCKET_NTHREADS=${NCCL_SOCKET_NTHREADS}
export APPTAINERENV_NCCL_DEBUG=${NCCL_DEBUG} export APPTAINERENV_NCCL_DEBUG=${NCCL_DEBUG}
# make additional SLURM variables available inside container # make additional SLURM variables available inside container
export APPTAINERENV_SLURM_PROCID=${SLURM_PROCID}
export APPTAINERENV_SLURM_LOCALID=${SLURM_LOCALID}
export APPTAINERENV_SLURM_NTASKS=${SLURM_NTASKS}
export APPTAINERENV_SLURM_CPUS_PER_TASK=${SLURM_CPUS_PER_TASK} export APPTAINERENV_SLURM_CPUS_PER_TASK=${SLURM_CPUS_PER_TASK}
export APPTAINERENV_SLURM_NTASKS_PER_NODE=${SLURM_NTASKS_PER_NODE} export APPTAINERENV_SLURM_NTASKS_PER_NODE=${SLURM_NTASKS_PER_NODE}
export APPTAINERENV_SLURM_NNODES=${SLURM_NNODES} export APPTAINERENV_SLURM_NNODES=${SLURM_NNODES}
......
...@@ -45,4 +45,4 @@ mkdir -p ${NEWTMP} ...@@ -45,4 +45,4 @@ mkdir -p ${NEWTMP}
srun zsh -c '\ srun zsh -c '\
source set_vars.sh && \ source set_vars.sh && \
apptainer exec -e --nv -B ${NEWTMP}:/tmp ${TENSORFLOW_IMAGE} \ apptainer exec -e --nv -B ${NEWTMP}:/tmp ${TENSORFLOW_IMAGE} \
bash -c "python -W ignore train_model_horovod.py --distributed"' bash -c "python -W ignore train_model_horovod.py"'
...@@ -36,6 +36,6 @@ export NCCL_SOCKET_NTHREADS=8 # multi-threading for NCCL communication ...@@ -36,6 +36,6 @@ export NCCL_SOCKET_NTHREADS=8 # multi-threading for NCCL communication
# each process sets required environment variables and # each process sets required environment variables and
# runs the python script # runs the python script
srun zsh -c '\ srun zsh -c "\
source set_vars.sh && \ source set_vars.sh && \
zsh ./execution_wrapper.sh' zsh ./execution_wrapper.sh"
\ No newline at end of file \ No newline at end of file
#!/usr/bin/zsh
############################################################
### Slurm flags
############################################################
#SBATCH --time=00:15:00
#SBATCH --partition=c23g
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=2
#SBATCH --cpus-per-task=24
#SBATCH --gres=gpu:2
############################################################
### Load modules or software
############################################################
# TODO: load/activate your desired modules and virtual environment
############################################################
### Parameters and Settings
############################################################
# print some information about current system
echo "Job nodes: ${SLURM_JOB_NODELIST}"
echo "Current machine: $(hostname)"
nvidia-smi
export NCCL_DEBUG=INFO
export TF_CPP_MIN_LOG_LEVEL=1 # disable info messages
export TF_GPU_THREAD_MODE='gpu_private'
export NCCL_SOCKET_NTHREADS=8 # multi-threading for NCCL communication
############################################################
### Execution (Model Training)
############################################################
# each process sets required environment variables and
# runs the python script
srun zsh -c "\
source set_vars.sh && \
python -W ignore train_model_horovod.py"
...@@ -10,24 +10,10 @@ from tensorflow.keras import backend as K ...@@ -10,24 +10,10 @@ from tensorflow.keras import backend as K
from tensorflow.keras.datasets import cifar10 from tensorflow.keras.datasets import cifar10
import tensorflow.keras.applications as applications import tensorflow.keras.applications as applications
class TrainLoggerModel(tf.keras.Model):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def train_step(self, data):
# # if hvd.rank() == 0:
# x, y = data
# tf.print('new batch:')
# #tf.print(x,summarize=-1)
# tf.print(y,summarize=-1)
# Return a dict mapping metric names to current value
return {m.name: m.result() for m in self.metrics}
def parse_command_line(): def parse_command_line():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--device", required=False, type=str, choices=["cpu", "cuda"], default="cuda") parser.add_argument("--device", required=False, type=str, choices=["cpu", "cuda"], default="cuda")
parser.add_argument("--num_epochs", required=False, type=int, default=3) parser.add_argument("--num_epochs", required=False, type=int, default=5)
parser.add_argument("--batch_size", required=False, type=int, default=128) parser.add_argument("--batch_size", required=False, type=int, default=128)
parser.add_argument("--verbosity", required=False, help="Keras verbosity level for training/evaluation", type=int, default=2) parser.add_argument("--verbosity", required=False, help="Keras verbosity level for training/evaluation", type=int, default=2)
parser.add_argument("--num_intraop_threads", required=False, help="Number of intra-op threads", type=int, default=None) parser.add_argument("--num_intraop_threads", required=False, help="Number of intra-op threads", type=int, default=None)
...@@ -46,7 +32,7 @@ def parse_command_line(): ...@@ -46,7 +32,7 @@ def parse_command_line():
# specific to cifar 10 dataset # specific to cifar 10 dataset
args.num_classes = 10 args.num_classes = 10
# if args.world_rank == 0: if args.world_rank == 0:
print("Settings:") print("Settings:")
settings_map = vars(args) settings_map = vars(args)
for name in sorted(settings_map.keys()): for name in sorted(settings_map.keys()):
...@@ -74,26 +60,30 @@ def load_dataset(args): ...@@ -74,26 +60,30 @@ def load_dataset(args):
x_test -= x_train_mean x_test -= x_train_mean
# dimensions # dimensions
# if args.world_rank == 0: if args.world_rank == 0:
print(f"original train_shape: {x_train.shape}") print(f"original train_shape: {x_train.shape}")
print(f"original test_shape: {x_test.shape}") print(f"original test_shape: {x_test.shape}")
n_train, n_test = x_train.shape[0], x_test.shape[0] n_train, n_test = x_train.shape[0], x_test.shape[0]
resize_size = 224 # use bigger images with ResNet resize_size = 224 # use bigger images with ResNet
# disable any automatic data sharding in TensorFlow as we handle that manually here
# options = tf.data.Options()
# options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
# Generating input pipelines # Generating input pipelines
ds_train = (tf.data.Dataset.from_tensor_slices((x_train, y_train)) ds_train = (tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label)) .map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label))
.shuffle(n_train) .shuffle(n_train) # .shard(num_shards=args.world_size, index=args.world_rank)
.cache().batch(args.global_batch_size).prefetch(tf.data.AUTOTUNE) .cache().batch(args.global_batch_size).prefetch(tf.data.AUTOTUNE) #.with_options(options)
) )
ds_test = (tf.data.Dataset.from_tensor_slices((x_test, y_test)) ds_test = (tf.data.Dataset.from_tensor_slices((x_test, y_test))
.map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label)) .map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label))
.shuffle(n_test).cache().batch(args.global_batch_size).prefetch(tf.data.AUTOTUNE) .shuffle(n_test).cache().batch(args.global_batch_size).prefetch(tf.data.AUTOTUNE) #.with_options(options)
) )
# get updated shapes # get updated shapes
train_shape, test_shape = ds_train.element_spec[0].shape, ds_test.element_spec[0].shape train_shape, test_shape = ds_train.element_spec[0].shape, ds_test.element_spec[0].shape
# if args.world_rank == 0: if args.world_rank == 0:
print(f"final train_shape: {train_shape}") print(f"final train_shape: {train_shape}")
print(f"final test_shape: {test_shape}") print(f"final test_shape: {test_shape}")
...@@ -105,19 +95,19 @@ def setup(args): ...@@ -105,19 +95,19 @@ def setup(args):
if args.num_interop_threads: if args.num_interop_threads:
tf.config.threading.set_inter_op_parallelism_threads(args.num_interop_threads) tf.config.threading.set_inter_op_parallelism_threads(args.num_interop_threads)
l_gpu_devices = [] if args.device == "cpu" else tf.config.list_physical_devices("GPU") gpu_devices = [] if args.device == "cpu" else tf.config.list_physical_devices("GPU")
# if args.world_rank == 0: if args.world_rank == 0:
print(f"Tensorflow get_intra_op_parallelism_threads: {tf.config.threading.get_intra_op_parallelism_threads()}") print(f"Tensorflow get_intra_op_parallelism_threads: {tf.config.threading.get_intra_op_parallelism_threads()}")
print(f"Tensorflow get_inter_op_parallelism_threads: {tf.config.threading.get_inter_op_parallelism_threads()}") print(f"Tensorflow get_inter_op_parallelism_threads: {tf.config.threading.get_inter_op_parallelism_threads()}")
print("List of GPU devices found:") print("List of GPU devices found:")
for dev in l_gpu_devices: for dev in gpu_devices:
print(str(dev.device_type) + ": " + dev.name) print(str(dev.device_type) + ": " + dev.name)
print("") print("")
sys.stdout.flush() sys.stdout.flush()
tf.config.set_visible_devices(l_gpu_devices[args.local_rank], "GPU") tf.config.set_visible_devices(gpu_devices[args.local_rank], "GPU")
tf.keras.backend.clear_session() tf.keras.backend.clear_session()
tf.config.optimizer.set_jit(True) tf.config.optimizer.set_jit(True)
...@@ -145,20 +135,14 @@ def main(): ...@@ -145,20 +135,14 @@ def main():
# loading desired dataset # loading desired dataset
ds_train, ds_test, train_shape = load_dataset(args) ds_train, ds_test, train_shape = load_dataset(args)
# options = tf.data.Options()
# options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
# ds_train = ds_train.with_options(options)
# callbacks to register # callbacks to register
callbacks = [] callbacks = []
with strategy.scope(): with strategy.scope():
# ds_train = strategy.experimental_distribute_dataset(ds_train) model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=args.num_classes)
# model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=args.num_classes)
model = TrainLoggerModel()
# model.summary() # display the model architecture # model.summary() # display the model architecture
# create optimizer and scale learning rate with number of workers
cur_optimizer = Adam(learning_rate=0.001 * args.world_size) cur_optimizer = Adam(learning_rate=0.001 * args.world_size)
model.compile(loss="categorical_crossentropy", optimizer=cur_optimizer, metrics=["accuracy"]) model.compile(loss="categorical_crossentropy", optimizer=cur_optimizer, metrics=["accuracy"])
...@@ -171,26 +155,14 @@ def main(): ...@@ -171,26 +155,14 @@ def main():
) )
callbacks.append(tensorboard_callback) callbacks.append(tensorboard_callback)
class PrintLabelsCallback(tf.keras.callbacks.Callback):
def on_train_batch_begin(self, batch, logs=None):
# Use strategy.run to access labels data on each worker
def print_labels(features, labels):
# Print the actual labels processed by each worker
tf.print(f"Worker labels for batch {batch}:", labels, summarize=-1)
# Iterate through dataset and extract labels only
strategy.run(lambda x: print_labels(*x), args=(next(iter(ds_train)),))
# train the model # train the model
model.fit(ds_train, epochs=args.num_epochs, verbose=args.verbosity, callbacks=[PrintLabelsCallback()]) model.fit(ds_train, epochs=args.num_epochs, verbose=args.verbosity, callbacks=callbacks)
# evaluate model # evaluate model
# scores = model.evaluate(ds_test, verbose=args.verbosity) scores = model.evaluate(ds_test, verbose=args.verbosity)
# if args.world_rank == 0: if args.world_rank == 0:
# print(f"Test Evaluation: Accuracy: {scores[1]}") print(f"Test Evaluation: Accuracy: {scores[1]}")
# sys.stdout.flush() sys.stdout.flush()
if __name__ == "__main__": if __name__ == "__main__":
main() main()
...@@ -67,16 +67,20 @@ def load_dataset(args): ...@@ -67,16 +67,20 @@ def load_dataset(args):
n_train, n_test = x_train.shape[0], x_test.shape[0] n_train, n_test = x_train.shape[0], x_test.shape[0]
resize_size = 224 # use bigger images with ResNet resize_size = 224 # use bigger images with ResNet
# disable any automatic data sharding in TensorFlow as we handle that manually here
# options = tf.data.Options()
# options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
# Generating input pipelines # Generating input pipelines
ds_train = (tf.data.Dataset.from_tensor_slices((x_train, y_train)) ds_train = (tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label)) .map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label))
.shuffle(n_train).shard(num_shards=hvd.size(), index=hvd.rank()) # Horovod: need to manually shard dataset .shuffle(n_train).shard(num_shards=hvd.size(), index=hvd.rank()) # Horovod: need to manually shard dataset
.cache().batch(args.batch_size).prefetch(tf.data.AUTOTUNE) .cache().batch(args.batch_size).prefetch(tf.data.AUTOTUNE) #.with_options(options)
) )
# Horovod: dont use sharding for test here. Otherwise reduction of results is necessary # Horovod: dont use sharding for test here. Otherwise reduction of results is necessary
ds_test = (tf.data.Dataset.from_tensor_slices((x_test, y_test)) ds_test = (tf.data.Dataset.from_tensor_slices((x_test, y_test))
.map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label)) .map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label))
.shuffle(n_test).cache().batch(args.batch_size).prefetch(tf.data.AUTOTUNE) .shuffle(n_test).cache().batch(args.batch_size).prefetch(tf.data.AUTOTUNE) #.with_options(options)
) )
# get updated shapes # get updated shapes
...@@ -93,19 +97,19 @@ def setup(args): ...@@ -93,19 +97,19 @@ def setup(args):
if args.num_interop_threads: if args.num_interop_threads:
tf.config.threading.set_inter_op_parallelism_threads(args.num_interop_threads) tf.config.threading.set_inter_op_parallelism_threads(args.num_interop_threads)
l_gpu_devices = [] if args.device == "cpu" else tf.config.list_physical_devices("GPU") gpu_devices = [] if args.device == "cpu" else tf.config.list_physical_devices("GPU")
if hvd.rank() == 0: if hvd.rank() == 0:
print(f"Tensorflow get_intra_op_parallelism_threads: {tf.config.threading.get_intra_op_parallelism_threads()}") print(f"Tensorflow get_intra_op_parallelism_threads: {tf.config.threading.get_intra_op_parallelism_threads()}")
print(f"Tensorflow get_inter_op_parallelism_threads: {tf.config.threading.get_inter_op_parallelism_threads()}") print(f"Tensorflow get_inter_op_parallelism_threads: {tf.config.threading.get_inter_op_parallelism_threads()}")
print("List of GPU devices found:") print("List of GPU devices found:")
for dev in l_gpu_devices: for dev in gpu_devices:
print(str(dev.device_type) + ": " + dev.name) print(str(dev.device_type) + ": " + dev.name)
print("") print("")
sys.stdout.flush() sys.stdout.flush()
tf.config.set_visible_devices(l_gpu_devices[hvd.local_rank()], "GPU") tf.config.set_visible_devices(gpu_devices[hvd.local_rank()], "GPU")
tf.keras.backend.clear_session() tf.keras.backend.clear_session()
tf.config.optimizer.set_jit(True) tf.config.optimizer.set_jit(True)
...@@ -145,7 +149,7 @@ def main(): ...@@ -145,7 +149,7 @@ def main():
model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=args.num_classes) model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=args.num_classes)
# model.summary() # display the model architecture # model.summary() # display the model architecture
# Horovod: add Horovod Distributed Optimizer and scale learning rate with number of workers # Horovod: create Horovod DistributedOptimizer and scale learning rate with number of workers
cur_optimizer = Adam(learning_rate=0.001 * hvd.size()) cur_optimizer = Adam(learning_rate=0.001 * hvd.size())
opt = hvd.DistributedOptimizer(cur_optimizer, compression=compression) opt = hvd.DistributedOptimizer(cur_optimizer, compression=compression)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment