Skip to content
Snippets Groups Projects
Commit 3dc4cb5a authored by Jannis Klinkenberg's avatar Jannis Klinkenberg
Browse files

removed complexity from examples

parent 6f041b4f
No related branches found
No related tags found
No related merge requests found
......@@ -4,9 +4,6 @@
############################################################
export TF_CONFIG=$(python -W ignore create_tfconfig.py)
# limit visible devices to ensure correct number of replicas in TensorFlow MultiWorkerMirroredStrategy
export CUDA_VISIBLE_DEVICES=${SLURM_LOCALID}
############################################################
### Execution
############################################################
......
......@@ -4,11 +4,15 @@ export RANK=${SLURM_PROCID}
export LOCAL_RANK=${SLURM_LOCALID}
export WORLD_SIZE=${SLURM_NTASKS}
# limit visible devices to ensure correct device selection and number of replicas in TensorFlow MultiWorkerMirroredStrategy
export CUDA_VISIBLE_DEVICES=${SLURM_LOCALID}
# make variables also available inside container
export APPTAINERENV_RANK=${RANK}
export APPTAINERENV_LOCAL_RANK=${LOCAL_RANK}
export APPTAINERENV_WORLD_SIZE=${WORLD_SIZE}
export APPTAINERENV_TMP="/tmp"
export APPTAINERENV_CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES}
export APPTAINERENV_TF_CPP_MIN_LOG_LEVEL=${TF_CPP_MIN_LOG_LEVEL}
export APPTAINERENV_TF_GPU_THREAD_MODE=${TF_GPU_THREAD_MODE}
......
......@@ -6,7 +6,6 @@ import argparse
import datetime
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as K
from tensorflow.keras.datasets import cifar10
import tensorflow.keras.applications as applications
......@@ -15,9 +14,6 @@ def parse_command_line():
parser.add_argument("--device", required=False, type=str, choices=["cpu", "cuda"], default="cuda")
parser.add_argument("--num_epochs", required=False, type=int, default=5)
parser.add_argument("--batch_size", required=False, type=int, default=128)
parser.add_argument("--verbosity", required=False, help="Keras verbosity level for training/evaluation", type=int, default=2)
parser.add_argument("--num_intraop_threads", required=False, help="Number of intra-op threads", type=int, default=None)
parser.add_argument("--num_interop_threads", required=False, help="Number of inter-op threads", type=int, default=None)
parser.add_argument("--tensorboard", required=False, help="Whether to use tensorboard callback", action="store_true", default=False)
parser.add_argument("--profile_batches", required=False, help='Batches to profile with for tensorboard. Format "batch_start,batch_end"', type=str, default="2,5")
args = parser.parse_args()
......@@ -27,10 +23,7 @@ def parse_command_line():
args.world_rank = int(os.environ["RANK"])
args.local_rank = int(os.environ["LOCAL_RANK"])
args.global_batch_size = args.batch_size * args.world_size
args.verbosity = 0 if args.world_rank != 0 else args.verbosity # only use verbose for master process
# specific to cifar 10 dataset
args.num_classes = 10
args.verbosity = 2 if args.world_rank == 0 else 0 # only use verbose for master process
if args.world_rank == 0:
print("Settings:")
......@@ -38,72 +31,50 @@ def parse_command_line():
for name in sorted(settings_map.keys()):
print("--" + str(name) + ": " + str(settings_map[name]))
print("")
sys.stdout.flush()
return args
def load_dataset(args):
K.set_image_data_format("channels_last")
def preprocess_data(images, labels):
images = tf.image.resize(images, (224, 224)) # Resize for ResNet-50
images = images / 255.0 # Normalize to [0, 1]
return images, labels
# load the cifar10 data
def load_dataset(args):
# load the cifar10 data and generate input pipelines
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
# convert class vectors to binary class matrices.
y_train = tf.keras.utils.to_categorical(y_train, args.num_classes)
y_test = tf.keras.utils.to_categorical(y_test, args.num_classes)
# normalize base data
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255
x_train_mean = np.mean(x_train, axis=0)
x_train -= x_train_mean
x_test -= x_train_mean
# dimensions
if args.world_rank == 0:
print(f"original train_shape: {x_train.shape}")
print(f"original test_shape: {x_test.shape}")
n_train, n_test = x_train.shape[0], x_test.shape[0]
resize_size = 224 # use bigger images with ResNet
# Generating input pipelines
ds_train = (tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label))
.shuffle(n_train)
.cache().batch(args.global_batch_size).prefetch(tf.data.AUTOTUNE)
y_train = tf.keras.utils.to_categorical(y_train, 10)
y_test = tf.keras.utils.to_categorical(y_test, 10)
ds_train = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(preprocess_data)
.shuffle(x_train.shape[0])
.cache()
.batch(args.global_batch_size)
.prefetch(tf.data.experimental.AUTOTUNE)
)
ds_test = (tf.data.Dataset.from_tensor_slices((x_test, y_test))
.map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label))
.shuffle(n_test).cache().batch(args.global_batch_size).prefetch(tf.data.AUTOTUNE)
ds_test = (
tf.data.Dataset.from_tensor_slices((x_test, y_test))
.map(preprocess_data)
.cache()
.batch(args.global_batch_size)
.prefetch(tf.data.experimental.AUTOTUNE)
)
# get updated shapes
train_shape, test_shape = ds_train.element_spec[0].shape, ds_test.element_spec[0].shape
if args.world_rank == 0:
print(f"final train_shape: {train_shape}")
print(f"final test_shape: {test_shape}")
print(f"train_shape:", x_train.shape, " -> ", train_shape)
print(f"test_shape:", x_test.shape, " -> ", test_shape)
return ds_train, ds_test, train_shape
def setup(args):
if args.num_intraop_threads:
tf.config.threading.set_intra_op_parallelism_threads(args.num_intraop_threads)
if args.num_interop_threads:
tf.config.threading.set_inter_op_parallelism_threads(args.num_interop_threads)
gpu_devices = [] if args.device == "cpu" else tf.config.list_physical_devices("GPU")
if args.world_rank == 0:
print(f"Tensorflow get_intra_op_parallelism_threads: {tf.config.threading.get_intra_op_parallelism_threads()}")
print(f"Tensorflow get_inter_op_parallelism_threads: {tf.config.threading.get_inter_op_parallelism_threads()}")
print("List of GPU devices found:")
for dev in gpu_devices:
print(str(dev.device_type) + ": " + dev.name)
print("")
sys.stdout.flush()
print(f"Number of GPU devices found on worker {args.world_rank}", len(tf.config.list_physical_devices("GPU")))
tf.config.set_visible_devices(gpu_devices[0], "GPU")
tf.keras.backend.clear_session()
tf.config.optimizer.set_jit(True)
......@@ -138,8 +109,7 @@ def main():
callbacks = []
with strategy.scope():
model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=args.num_classes)
# model.summary() # display the model architecture
model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=10)
# create optimizer and scale learning rate with number of workers
cur_optimizer = Adam(learning_rate=0.001 * args.world_size)
......@@ -161,7 +131,6 @@ def main():
scores = model.evaluate(ds_test, verbose=args.verbosity)
if args.world_rank == 0:
print(f"Test Evaluation: Accuracy: {scores[1]}")
sys.stdout.flush()
if __name__ == "__main__":
main()
......@@ -6,7 +6,6 @@ import argparse
import datetime
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as K
from tensorflow.keras.datasets import cifar10
import tensorflow.keras.applications as applications
import horovod.keras as hvd
......@@ -16,9 +15,6 @@ def parse_command_line():
parser.add_argument("--device", required=False, type=str, choices=["cpu", "cuda"], default="cuda")
parser.add_argument("--num_epochs", required=False, type=int, default=5)
parser.add_argument("--batch_size", required=False, type=int, default=128)
parser.add_argument("--verbosity", required=False, help="Keras verbosity level for training/evaluation", type=int, default=2)
parser.add_argument("--num_intraop_threads", required=False, help="Number of intra-op threads", type=int, default=None)
parser.add_argument("--num_interop_threads", required=False, help="Number of inter-op threads", type=int, default=None)
parser.add_argument("--tensorboard", required=False, help="Whether to use tensorboard callback", action="store_true", default=False)
parser.add_argument("--profile_batches", required=False, help='Batches to profile with for tensorboard. Format "batch_start,batch_end"', type=str, default="2,5")
args = parser.parse_args()
......@@ -28,10 +24,7 @@ def parse_command_line():
args.world_rank = hvd.rank()
args.local_rank = hvd.local_rank()
args.global_batch_size = args.batch_size * hvd.size()
args.verbosity = 0 if hvd.rank() != 0 else args.verbosity # only use verbose for master process
# specific to cifar 10 dataset
args.num_classes = 10
args.verbosity = 2 if hvd.rank() == 0 else 0 # only use verbose for master process
if hvd.rank() == 0:
print("Settings:")
......@@ -39,73 +32,52 @@ def parse_command_line():
for name in sorted(settings_map.keys()):
print("--" + str(name) + ": " + str(settings_map[name]))
print("")
sys.stdout.flush()
return args
def load_dataset(args):
K.set_image_data_format("channels_last")
def preprocess_data(images, labels):
images = tf.image.resize(images, (224, 224)) # Resize for ResNet-50
images = images / 255.0 # Normalize to [0, 1]
return images, labels
# load the cifar10 data
def load_dataset(args):
# load the cifar10 data and generate input pipelines
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
# convert class vectors to binary class matrices.
y_train = tf.keras.utils.to_categorical(y_train, args.num_classes)
y_test = tf.keras.utils.to_categorical(y_test, args.num_classes)
# normalize base data
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255
x_train_mean = np.mean(x_train, axis=0)
x_train -= x_train_mean
x_test -= x_train_mean
# dimensions
if hvd.rank() == 0:
print(f"original train_shape: {x_train.shape}")
print(f"original test_shape: {x_test.shape}")
n_train, n_test = x_train.shape[0], x_test.shape[0]
resize_size = 224 # use bigger images with ResNet
# Generating input pipelines
ds_train = (tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label))
.shuffle(n_train).shard(num_shards=hvd.size(), index=hvd.rank()) # Horovod: need to manually shard dataset
.cache().batch(args.batch_size).prefetch(tf.data.AUTOTUNE)
y_train = tf.keras.utils.to_categorical(y_train, 10)
y_test = tf.keras.utils.to_categorical(y_test, 10)
ds_train = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(preprocess_data)
.shuffle(x_train.shape[0])
.shard(num_shards=hvd.size(), index=hvd.rank()) # Horovod: need to manually shard dataset
.cache()
.batch(args.global_batch_size)
.prefetch(tf.data.experimental.AUTOTUNE)
)
# Horovod: dont use sharding for test here. Otherwise reduction of results is necessary
ds_test = (tf.data.Dataset.from_tensor_slices((x_test, y_test))
.map(lambda image, label: (tf.image.resize(image, [resize_size, resize_size]), label))
.shuffle(n_test).cache().batch(args.batch_size).prefetch(tf.data.AUTOTUNE)
ds_test = (
tf.data.Dataset.from_tensor_slices((x_test, y_test))
.map(preprocess_data)
# Horovod: dont use sharding for test here. Otherwise reduction of prediction results would be necessary
.cache()
.batch(args.global_batch_size)
.prefetch(tf.data.experimental.AUTOTUNE)
)
# get updated shapes
train_shape, test_shape = ds_train.element_spec[0].shape, ds_test.element_spec[0].shape
if hvd.rank() == 0:
print(f"final train_shape: {train_shape}")
print(f"final test_shape: {test_shape}")
print(f"train_shape:", x_train.shape, " -> ", train_shape)
print(f"test_shape:", x_test.shape, " -> ", test_shape)
return ds_train, ds_test, train_shape
def setup(args):
if args.num_intraop_threads:
tf.config.threading.set_intra_op_parallelism_threads(args.num_intraop_threads)
if args.num_interop_threads:
tf.config.threading.set_inter_op_parallelism_threads(args.num_interop_threads)
print(f"Number of GPU devices found on worker {args.world_rank}", len(tf.config.list_physical_devices("GPU")))
gpu_devices = [] if args.device == "cpu" else tf.config.list_physical_devices("GPU")
if hvd.rank() == 0:
print(f"Tensorflow get_intra_op_parallelism_threads: {tf.config.threading.get_intra_op_parallelism_threads()}")
print(f"Tensorflow get_inter_op_parallelism_threads: {tf.config.threading.get_inter_op_parallelism_threads()}")
print("List of GPU devices found:")
for dev in gpu_devices:
print(str(dev.device_type) + ": " + dev.name)
print("")
sys.stdout.flush()
tf.config.set_visible_devices(gpu_devices[hvd.local_rank()], "GPU")
tf.keras.backend.clear_session()
tf.config.optimizer.set_jit(True)
......@@ -138,11 +110,7 @@ def main():
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# If desired: save checkpoints only on worker 0 to prevent other workers from corrupting them.
# if hvd.rank() == 0:
# callbacks.append(tf.keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=args.num_classes)
model = applications.ResNet50(weights=None, input_shape=train_shape[1:], classes=10)
# model.summary() # display the model architecture
# Horovod: create Horovod DistributedOptimizer and scale learning rate with number of workers
......@@ -167,7 +135,6 @@ def main():
scores = model.evaluate(ds_test, verbose=args.verbosity)
if hvd.rank() == 0:
print(f"Test Evaluation: Accuracy: {scores[1]}")
sys.stdout.flush()
# Horovod: synchronize at the end (replacement for barrier)
cur_rank = hvd.rank()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment