From 3dc4cb5a773419ef45600f100bd996bd7f3ae6ab Mon Sep 17 00:00:00 2001
From: Jannis Klinkenberg <j.klinkenberg@itc.rwth-aachen.de>
Date: Sat, 16 Nov 2024 18:13:50 +0100
Subject: [PATCH] removed complexity from examples

---
 .../cifar10_distributed/execution_wrapper.sh  |  3 -
 tensorflow/cifar10_distributed/set_vars.sh    |  4 +
 tensorflow/cifar10_distributed/train_model.py | 87 ++++++-----------
 .../train_model_horovod.py                    | 93 ++++++-------------
 4 files changed, 62 insertions(+), 125 deletions(-)

diff --git a/tensorflow/cifar10_distributed/execution_wrapper.sh b/tensorflow/cifar10_distributed/execution_wrapper.sh
index c7899b3..d12b6a3 100644
--- a/tensorflow/cifar10_distributed/execution_wrapper.sh
+++ b/tensorflow/cifar10_distributed/execution_wrapper.sh
@@ -4,9 +4,6 @@
 ############################################################
 export TF_CONFIG=$(python -W ignore create_tfconfig.py)
 
-# limit visible devices to ensure correct number of replicas in TensorFlow MultiWorkerMirroredStrategy
-export CUDA_VISIBLE_DEVICES=${SLURM_LOCALID}
-
 ############################################################
 ### Execution
 ############################################################
diff --git a/tensorflow/cifar10_distributed/set_vars.sh b/tensorflow/cifar10_distributed/set_vars.sh
index 74ffc02..1decdb2 100644
--- a/tensorflow/cifar10_distributed/set_vars.sh
+++ b/tensorflow/cifar10_distributed/set_vars.sh
@@ -4,11 +4,15 @@ export RANK=${SLURM_PROCID}
 export LOCAL_RANK=${SLURM_LOCALID}
 export WORLD_SIZE=${SLURM_NTASKS}
 
+# limit visible devices to ensure correct device selection and number of replicas in TensorFlow MultiWorkerMirroredStrategy
+export CUDA_VISIBLE_DEVICES=${SLURM_LOCALID}
+
 # make variables also available inside container
 export APPTAINERENV_RANK=${RANK}
 export APPTAINERENV_LOCAL_RANK=${LOCAL_RANK}
 export APPTAINERENV_WORLD_SIZE=${WORLD_SIZE}
 export APPTAINERENV_TMP="/tmp"
+export APPTAINERENV_CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES}
 
 export APPTAINERENV_TF_CPP_MIN_LOG_LEVEL=${TF_CPP_MIN_LOG_LEVEL}
 export APPTAINERENV_TF_GPU_THREAD_MODE=${TF_GPU_THREAD_MODE}
diff --git a/tensorflow/cifar10_distributed/train_model.py b/tensorflow/cifar10_distributed/train_model.py
index 8012cc3..5a742e6 100644
--- a/tensorflow/cifar10_distributed/train_model.py
+++ b/tensorflow/cifar10_distributed/train_model.py
@@ -6,7 +6,6 @@ import argparse
 import datetime
 import tensorflow as tf
 from tensorflow.keras.optimizers import Adam
-from tensorflow.keras import backend as K
 from tensorflow.keras.datasets import cifar10
 import tensorflow.keras.applications as applications
 
@@ -15,9 +14,6 @@ def parse_command_line():
     parser.add_argument("--device", required=False, type=str, choices=["cpu", "cuda"], default="cuda")
     parser.add_argument("--num_epochs", required=False, type=int, default=5)
     parser.add_argument("--batch_size", required=False, type=int, default=128)
-    parser.add_argument("--verbosity", required=False, help="Keras verbosity level for training/evaluation", type=int, default=2)
-    parser.add_argument("--num_intraop_threads", required=False, help="Number of intra-op threads", type=int, default=None)
-    parser.add_argument("--num_interop_threads", required=False, help="Number of inter-op threads", type=int, default=None)
     parser.add_argument("--tensorboard", required=False, help="Whether to use tensorboard callback", action="store_true", default=False)
     parser.add_argument("--profile_batches", required=False, help='Batches to profile with for tensorboard. Format "batch_start,batch_end"', type=str, default="2,5")
     args = parser.parse_args()
@@ -27,10 +23,7 @@ def parse_command_line():
     args.world_rank = int(os.environ["RANK"])
     args.local_rank = int(os.environ["LOCAL_RANK"])
     args.global_batch_size = args.batch_size * args.world_size
-    args.verbosity = 0 if args.world_rank != 0 else args.verbosity # only use verbose for master process
-
-    # specific to cifar 10 dataset
-    args.num_classes = 10
+    args.verbosity = 2 if args.world_rank == 0 else 0 # only use verbose for master process
 
     if args.world_rank == 0:
         print("Settings:")
@@ -38,72 +31,50 @@ def parse_command_line():
         for name in sorted(settings_map.keys()):
             print("--" + str(name) + ": " + str(settings_map[name]))
         print("")
-        sys.stdout.flush()
 
     return args
 
-def load_dataset(args):
-    K.set_image_data_format("channels_last")
+def preprocess_data(images, labels):
+    images = tf.image.resize(images, (224, 224))  # Resize for ResNet-50
+    images = images / 255.0  # Normalize to [0, 1]
+    return images, labels
 
-    # load the cifar10 data
+def load_dataset(args):
+    # load the cifar10 data and generate input pipelines
     (x_train, y_train), (x_test, y_test) = cifar10.load_data()
 
     # convert class vectors to binary class matrices.
-    y_train = tf.keras.utils.to_categorical(y_train, args.num_classes)
-    y_test = tf.keras.utils.to_categorical(y_test, args.num_classes)
-
-    # normalize base data
-    x_train = x_train.astype("float32") / 255
-    x_test = x_test.astype("float32") / 255
-    x_train_mean = np.mean(x_train, axis=0)
-    x_train -= x_train_mean
-    x_test -= x_train_mean
-
-    # dimensions
-    if args.world_rank == 0:
-        print(f"original train_shape: {x_train.shape}")
-        print(f"original test_shape: {x_test.shape}")
-    n_train, n_test = x_train.shape[0], x_test.shape[0]
-    resize_size = 224 # use bigger images with ResNet
-
-    # Generating input pipelines
-    ds_train = (tf.data.Dataset.from_tensor_slices((x_train, y_train))
-                .map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label))
-                .shuffle(n_train)
-                .cache().batch(args.global_batch_size).prefetch(tf.data.AUTOTUNE)
+    y_train = tf.keras.utils.to_categorical(y_train, 10)
+    y_test = tf.keras.utils.to_categorical(y_test, 10)
+
+    ds_train = (
+        tf.data.Dataset.from_tensor_slices((x_train, y_train))
+        .map(preprocess_data)
+        .shuffle(x_train.shape[0])
+        .cache()
+        .batch(args.global_batch_size)
+        .prefetch(tf.data.experimental.AUTOTUNE)
     )
-    ds_test = (tf.data.Dataset.from_tensor_slices((x_test, y_test))
-               .map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label))
-               .shuffle(n_test).cache().batch(args.global_batch_size).prefetch(tf.data.AUTOTUNE)
+
+    ds_test = (
+        tf.data.Dataset.from_tensor_slices((x_test, y_test))
+        .map(preprocess_data)
+        .cache()
+        .batch(args.global_batch_size)
+        .prefetch(tf.data.experimental.AUTOTUNE)
     )
 
     # get updated shapes
     train_shape, test_shape = ds_train.element_spec[0].shape, ds_test.element_spec[0].shape
     if args.world_rank == 0:
-        print(f"final train_shape: {train_shape}")
-        print(f"final test_shape: {test_shape}")
+        print(f"train_shape:", x_train.shape, " -> ", train_shape)
+        print(f"test_shape:", x_test.shape, " -> ", test_shape)
 
     return ds_train, ds_test, train_shape
 
 def setup(args):
-    if args.num_intraop_threads:
-        tf.config.threading.set_intra_op_parallelism_threads(args.num_intraop_threads)
-    if args.num_interop_threads:
-        tf.config.threading.set_inter_op_parallelism_threads(args.num_interop_threads)
-
-    gpu_devices = [] if args.device == "cpu" else tf.config.list_physical_devices("GPU")
-
-    if args.world_rank == 0:
-        print(f"Tensorflow get_intra_op_parallelism_threads: {tf.config.threading.get_intra_op_parallelism_threads()}")
-        print(f"Tensorflow get_inter_op_parallelism_threads: {tf.config.threading.get_inter_op_parallelism_threads()}")
-
-        print("List of GPU devices found:")
-        for dev in gpu_devices:
-            print(str(dev.device_type) + ": " + dev.name)
-        print("")
-        sys.stdout.flush()
+    print(f"Number of GPU devices found on worker {args.world_rank}", len(tf.config.list_physical_devices("GPU")))
 
-    tf.config.set_visible_devices(gpu_devices[0], "GPU")
     tf.keras.backend.clear_session()
     tf.config.optimizer.set_jit(True)
 
@@ -138,8 +109,7 @@ def main():
     callbacks = []
 
     with strategy.scope():
-        model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=args.num_classes)
-        # model.summary() # display the model architecture
+        model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=10)
 
         # create optimizer and scale learning rate with number of workers
         cur_optimizer = Adam(learning_rate=0.001 * args.world_size)
@@ -161,7 +131,6 @@ def main():
     scores = model.evaluate(ds_test, verbose=args.verbosity)
     if args.world_rank == 0:
         print(f"Test Evaluation: Accuracy: {scores[1]}")
-        sys.stdout.flush()
 
 if __name__ == "__main__":
     main()
diff --git a/tensorflow/cifar10_distributed/train_model_horovod.py b/tensorflow/cifar10_distributed/train_model_horovod.py
index 24dea50..e18aff6 100644
--- a/tensorflow/cifar10_distributed/train_model_horovod.py
+++ b/tensorflow/cifar10_distributed/train_model_horovod.py
@@ -6,7 +6,6 @@ import argparse
 import datetime
 import tensorflow as tf
 from tensorflow.keras.optimizers import Adam
-from tensorflow.keras import backend as K
 from tensorflow.keras.datasets import cifar10
 import tensorflow.keras.applications as applications
 import horovod.keras as hvd
@@ -16,9 +15,6 @@ def parse_command_line():
     parser.add_argument("--device", required=False, type=str, choices=["cpu", "cuda"], default="cuda")
     parser.add_argument("--num_epochs", required=False, type=int, default=5)
     parser.add_argument("--batch_size", required=False, type=int, default=128)
-    parser.add_argument("--verbosity", required=False, help="Keras verbosity level for training/evaluation", type=int, default=2)
-    parser.add_argument("--num_intraop_threads", required=False, help="Number of intra-op threads", type=int, default=None)
-    parser.add_argument("--num_interop_threads", required=False, help="Number of inter-op threads", type=int, default=None)
     parser.add_argument("--tensorboard", required=False, help="Whether to use tensorboard callback", action="store_true", default=False)
     parser.add_argument("--profile_batches", required=False, help='Batches to profile with for tensorboard. Format "batch_start,batch_end"', type=str, default="2,5")
     args = parser.parse_args()
@@ -28,10 +24,7 @@ def parse_command_line():
     args.world_rank = hvd.rank()
     args.local_rank = hvd.local_rank()
     args.global_batch_size = args.batch_size * hvd.size()
-    args.verbosity = 0 if hvd.rank() != 0 else args.verbosity # only use verbose for master process
-
-    # specific to cifar 10 dataset
-    args.num_classes = 10
+    args.verbosity = 2 if hvd.rank() == 0 else 0 # only use verbose for master process
 
     if hvd.rank() == 0:
         print("Settings:")
@@ -39,73 +32,52 @@ def parse_command_line():
         for name in sorted(settings_map.keys()):
             print("--" + str(name) + ": " + str(settings_map[name]))
         print("")
-        sys.stdout.flush()
 
     return args
 
-def load_dataset(args):
-    K.set_image_data_format("channels_last")
+def preprocess_data(images, labels):
+    images = tf.image.resize(images, (224, 224))  # Resize for ResNet-50
+    images = images / 255.0  # Normalize to [0, 1]
+    return images, labels
 
-    # load the cifar10 data
+def load_dataset(args):
+    # load the cifar10 data and generate input pipelines
     (x_train, y_train), (x_test, y_test) = cifar10.load_data()
 
     # convert class vectors to binary class matrices.
-    y_train = tf.keras.utils.to_categorical(y_train, args.num_classes)
-    y_test = tf.keras.utils.to_categorical(y_test, args.num_classes)
-
-    # normalize base data
-    x_train = x_train.astype("float32") / 255
-    x_test = x_test.astype("float32") / 255
-    x_train_mean = np.mean(x_train, axis=0)
-    x_train -= x_train_mean
-    x_test -= x_train_mean
-
-    # dimensions
-    if hvd.rank() == 0:
-        print(f"original train_shape: {x_train.shape}")
-        print(f"original test_shape: {x_test.shape}")
-    n_train, n_test = x_train.shape[0], x_test.shape[0]
-    resize_size = 224 # use bigger images with ResNet
-
-    # Generating input pipelines
-    ds_train = (tf.data.Dataset.from_tensor_slices((x_train, y_train))
-                .map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label))
-                .shuffle(n_train).shard(num_shards=hvd.size(), index=hvd.rank()) # Horovod: need to manually shard dataset
-                .cache().batch(args.batch_size).prefetch(tf.data.AUTOTUNE)
+    y_train = tf.keras.utils.to_categorical(y_train, 10)
+    y_test = tf.keras.utils.to_categorical(y_test, 10)
+
+    ds_train = (
+        tf.data.Dataset.from_tensor_slices((x_train, y_train))
+        .map(preprocess_data)
+        .shuffle(x_train.shape[0])
+        .shard(num_shards=hvd.size(), index=hvd.rank()) # Horovod: need to manually shard dataset
+        .cache()
+        .batch(args.global_batch_size)
+        .prefetch(tf.data.experimental.AUTOTUNE)
     )
-    # Horovod: dont use sharding for test here. Otherwise reduction of results is necessary
-    ds_test = (tf.data.Dataset.from_tensor_slices((x_test, y_test))
-               .map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label))
-               .shuffle(n_test).cache().batch(args.batch_size).prefetch(tf.data.AUTOTUNE)
+
+    ds_test = (
+        tf.data.Dataset.from_tensor_slices((x_test, y_test))
+        .map(preprocess_data)
+        # Horovod: dont use sharding for test here. Otherwise reduction of prediction results would be necessary
+        .cache()
+        .batch(args.global_batch_size)
+        .prefetch(tf.data.experimental.AUTOTUNE)
     )
 
     # get updated shapes
     train_shape, test_shape = ds_train.element_spec[0].shape, ds_test.element_spec[0].shape
     if hvd.rank() == 0:
-        print(f"final train_shape: {train_shape}")
-        print(f"final test_shape: {test_shape}")
+        print(f"train_shape:", x_train.shape, " -> ", train_shape)
+        print(f"test_shape:", x_test.shape, " -> ", test_shape)
 
     return ds_train, ds_test, train_shape
 
 def setup(args):
-    if args.num_intraop_threads:
-        tf.config.threading.set_intra_op_parallelism_threads(args.num_intraop_threads)
-    if args.num_interop_threads:
-        tf.config.threading.set_inter_op_parallelism_threads(args.num_interop_threads)
+    print(f"Number of GPU devices found on worker {args.world_rank}", len(tf.config.list_physical_devices("GPU")))
 
-    gpu_devices = [] if args.device == "cpu" else tf.config.list_physical_devices("GPU")
-
-    if hvd.rank() == 0:
-        print(f"Tensorflow get_intra_op_parallelism_threads: {tf.config.threading.get_intra_op_parallelism_threads()}")
-        print(f"Tensorflow get_inter_op_parallelism_threads: {tf.config.threading.get_inter_op_parallelism_threads()}")
-
-        print("List of GPU devices found:")
-        for dev in gpu_devices:
-            print(str(dev.device_type) + ": " + dev.name)
-        print("")
-        sys.stdout.flush()
-
-    tf.config.set_visible_devices(gpu_devices[hvd.local_rank()], "GPU")
     tf.keras.backend.clear_session()
     tf.config.optimizer.set_jit(True)
 
@@ -138,11 +110,7 @@ def main():
         hvd.callbacks.BroadcastGlobalVariablesCallback(0),
     ]
 
-    # If desired: save checkpoints only on worker 0 to prevent other workers from corrupting them.
-    # if hvd.rank() == 0:
-    #     callbacks.append(tf.keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
-
-    model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=args.num_classes)
+    model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=10)
     # model.summary() # display the model architecture
     
     # Horovod: create Horovod DistributedOptimizer and scale learning rate with number of workers
@@ -167,7 +135,6 @@ def main():
     scores = model.evaluate(ds_test, verbose=args.verbosity)
     if hvd.rank() == 0:
         print(f"Test Evaluation: Accuracy: {scores[1]}")
-        sys.stdout.flush()
     
     # Horovod: synchronize at the end (replacement for barrier)
     cur_rank = hvd.rank()
-- 
GitLab