diff --git a/tensorflow/cifar10/train_model.py b/tensorflow/cifar10/train_model.py index b505879cec21f184be4c7c0629c1f0e4dab21362..05648ebd6bc1d56aef515370b35a0d52d0e0447c 100644 --- a/tensorflow/cifar10/train_model.py +++ b/tensorflow/cifar10/train_model.py @@ -6,7 +6,6 @@ import argparse import datetime import tensorflow as tf from tensorflow.keras.optimizers import Adam -from tensorflow.keras import backend as K from tensorflow.keras.datasets import cifar10 import tensorflow.keras.applications as applications @@ -15,82 +14,57 @@ def parse_command_line(): parser.add_argument("--device", required=False, type=str, choices=["cpu", "cuda"], default="cuda") parser.add_argument("--num_epochs", required=False, type=int, default=5) parser.add_argument("--batch_size", required=False, type=int, default=128) - parser.add_argument("--verbosity", required=False, help="Keras verbosity level for training/evaluation", type=int, default=2) - parser.add_argument("--num_intraop_threads", required=False, help="Number of intra-op threads", type=int, default=None) - parser.add_argument("--num_interop_threads", required=False, help="Number of inter-op threads", type=int, default=None) parser.add_argument("--tensorboard", required=False, help="Whether to use tensorboard callback", action="store_true", default=False) parser.add_argument("--profile_batches", required=False, help='Batches to profile with for tensorboard. Format "batch_start,batch_end"', type=str, default="2,5") args = parser.parse_args() - # specific to cifar 10 dataset - args.num_classes = 10 - print("Settings:") settings_map = vars(args) for name in sorted(settings_map.keys()): print("--" + str(name) + ": " + str(settings_map[name])) print("") - sys.stdout.flush() return args -def load_dataset(args): - K.set_image_data_format("channels_last") +def preprocess_data(images, labels): + images = tf.image.resize(images, (224, 224)) # Resize for ResNet-50 + images = images / 255.0 # Normalize to [0, 1] + return images, labels - # load the cifar10 data +def load_dataset(args): + # load the cifar10 data and generate input pipelines (x_train, y_train), (x_test, y_test) = cifar10.load_data() # convert class vectors to binary class matrices. - y_train = tf.keras.utils.to_categorical(y_train, args.num_classes) - y_test = tf.keras.utils.to_categorical(y_test, args.num_classes) - - # normalize base data - x_train = x_train.astype("float32") / 255 - x_test = x_test.astype("float32") / 255 - x_train_mean = np.mean(x_train, axis=0) - x_train -= x_train_mean - x_test -= x_train_mean - - # dimensions - print(f"original train_shape: {x_train.shape}") - print(f"original test_shape: {x_test.shape}") - n_train, n_test = x_train.shape[0], x_test.shape[0] - resize_size = 224 # use bigger images with ResNet - - # Generating input pipelines - ds_train = (tf.data.Dataset.from_tensor_slices((x_train, y_train)) - .map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label)) - .shuffle(n_train).cache().batch(args.batch_size).prefetch(tf.data.AUTOTUNE) + y_train = tf.keras.utils.to_categorical(y_train, 10) + y_test = tf.keras.utils.to_categorical(y_test, 10) + + ds_train = ( + tf.data.Dataset.from_tensor_slices((x_train, y_train)) + .map(preprocess_data) + .shuffle(x_train.shape[0]) + .cache() + .batch(args.batch_size) + .prefetch(tf.data.experimental.AUTOTUNE) ) - ds_test = (tf.data.Dataset.from_tensor_slices((x_test, y_test)) - .map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label)) - .shuffle(n_test).cache().batch(args.batch_size).prefetch(tf.data.AUTOTUNE) + + ds_test = ( + tf.data.Dataset.from_tensor_slices((x_test, y_test)) + .map(preprocess_data) + .cache() + .batch(args.batch_size) + .prefetch(tf.data.experimental.AUTOTUNE) ) # get updated shapes train_shape, test_shape = ds_train.element_spec[0].shape, ds_test.element_spec[0].shape - print(f"final train_shape: {train_shape}") - print(f"final test_shape: {test_shape}") + print(f"train_shape:", x_train.shape, " -> ", train_shape) + print(f"test_shape:", x_test.shape, " -> ", test_shape) return ds_train, ds_test, train_shape def setup(args): - if args.num_intraop_threads: - tf.config.threading.set_intra_op_parallelism_threads(args.num_intraop_threads) - if args.num_interop_threads: - tf.config.threading.set_inter_op_parallelism_threads(args.num_interop_threads) - - gpu_devices = [] if args.device == "cpu" else tf.config.list_physical_devices("GPU") - - print(f"Tensorflow get_intra_op_parallelism_threads: {tf.config.threading.get_intra_op_parallelism_threads()}") - print(f"Tensorflow get_inter_op_parallelism_threads: {tf.config.threading.get_inter_op_parallelism_threads()}") - - - print("List of GPU devices found:") - for dev in gpu_devices: - print(str(dev.device_type) + ": " + dev.name) - print("") - sys.stdout.flush() + print(f"Number of GPU devices found on worker {args.world_rank}", len(tf.config.list_physical_devices("GPU"))) tf.keras.backend.clear_session() tf.config.optimizer.set_jit(True) @@ -113,9 +87,8 @@ def main(): # callbacks to register callbacks = [] - model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=args.num_classes) - # model.summary() # display the model architecture - + # create and compile the model + model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=10) cur_optimizer = Adam(0.001) model.compile(loss="categorical_crossentropy", optimizer=cur_optimizer, metrics=["accuracy"]) @@ -134,7 +107,6 @@ def main(): # evaluate model scores = model.evaluate(ds_test, verbose=args.verbosity) print(f"Test Evaluation: Accuracy: {scores[1]}") - sys.stdout.flush() if __name__ == "__main__": main() diff --git a/tensorflow/cifar10_distributed/train_model.py b/tensorflow/cifar10_distributed/train_model.py index 5a742e608ca2fbee421bb3c31c3356fd4cafb60c..c80076de977d8f4bfda3ef75bcb3c858d0513ce3 100644 --- a/tensorflow/cifar10_distributed/train_model.py +++ b/tensorflow/cifar10_distributed/train_model.py @@ -108,11 +108,10 @@ def main(): # callbacks to register callbacks = [] + # create and compile the model with strategy.scope(): model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=10) - - # create optimizer and scale learning rate with number of workers - cur_optimizer = Adam(learning_rate=0.001 * args.world_size) + cur_optimizer = Adam(learning_rate=0.001 * args.world_size) # scale learning rate with number of workers model.compile(loss="categorical_crossentropy", optimizer=cur_optimizer, metrics=["accuracy"]) # callbacks to register diff --git a/tensorflow/cifar10_distributed/train_model_horovod.py b/tensorflow/cifar10_distributed/train_model_horovod.py index e18aff6c01802439040c7555e72af5af94c8f2e7..6cae19eadd6e622a6e4fedb915190f340d4f053b 100644 --- a/tensorflow/cifar10_distributed/train_model_horovod.py +++ b/tensorflow/cifar10_distributed/train_model_horovod.py @@ -110,13 +110,11 @@ def main(): hvd.callbacks.BroadcastGlobalVariablesCallback(0), ] + # create and compile the model model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=10) - # model.summary() # display the model architecture - # Horovod: create Horovod DistributedOptimizer and scale learning rate with number of workers cur_optimizer = Adam(learning_rate=0.001 * hvd.size()) opt = hvd.DistributedOptimizer(cur_optimizer, compression=compression) - model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"]) # callbacks to register