Skip to content
Snippets Groups Projects
Commit aa95d86d authored by Nastassya Horlava's avatar Nastassya Horlava
Browse files

Docs tensorflow

parent 6272cef2
No related branches found
No related tags found
1 merge request!4Docs tensorflow
Showing
with 772 additions and 442 deletions
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.11.2
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
# Run the formatter.
- id: ruff-format
# adapted from https://www.tensorflow.org/guide/keras/distributed_training
import click
import tensorflow as tf
from pathlib import Path
import logging
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard
import numpy as np
import os
from tensorflow import keras
from time import perf_counter
import pandas as pd
import shutil
from dataclasses import dataclass
import json
from typing import List
import pickle
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def set_seed(seed=5):
logger.info(f"Setting random seed to {seed}")
tf.random.set_seed(seed=seed)
np.random.seed(seed)
def sys_info(**kwargs):
logging.info(f"TensorFlow version:{tf.__version__}")
logging.info(f"Num GPUs Available: {len(tf.config.list_physical_devices('GPU'))}")
logging.info(f"Running parameters: ")
for key, value in kwargs.items():
logging.info(f"{key}:{value}")
@dataclass
class TrainingOptions():
task: str = "train"
resume: bool = False
batch_size: int = 500
num_epochs: int = 100
val_fraction: float = 0.2
model_name: str = "default"
project_path: str = Path(__file__).parent.resolve().parent
train_size = 60000 * (1 - val_fraction)
def __init__(self, **kwargs):
super().__init__()
self.__dict__.update(**{key: item for key, item in kwargs.items()})
self.experiment_path = self.project_path / 'models' / self.model_name
self.model_weights_path = self.experiment_path / 'weights'
self.data_path = self.project_path / 'data' / "mnist.npz"
self.logs_path = self.experiment_path / "logs"
if (self.task=="train" and self.resume) or (self.task =="test"):
self.load_opts()
self.experiment_path.mkdir(parents=True, exist_ok=True)
self.model_weights_path.mkdir(parents=True, exist_ok=True)
self.data_path.resolve().parent.mkdir(parents=True, exist_ok=True)
self.logs_path.mkdir(parents=True, exist_ok=True)
self.num_gpu = len(tf.config.list_physical_devices('GPU'))
self.distributed = self.num_gpu>1
self.global_batch_size = self.batch_size * self.num_gpu
self.steps_per_epoch = self.train_size // self.global_batch_size
if self.task == "train" and not self.resume:
shutil.rmtree(self.experiment_path, ignore_errors=False, onerror=None)
self.experiment_path.mkdir(parents=True, exist_ok=True)
self.save_opts()
def save_opts(self, ):
serializable_dict = self.get_serializeble_params()
with open(str(self.experiment_path / 'opts.json'), 'w') as f:
json.dump(serializable_dict, f)
def get_serializeble_params(self):
serializable_dict = self.__dict__.copy()
for key, item in serializable_dict.items():
if isinstance(item, Path):
serializable_dict[key] = str(item)
return serializable_dict
def load_opts(self):
with open(self.experiment_path / 'opts.json', 'r') as f:
opts_dict = json.load(f)
for key, item in opts_dict.items():
if key not in self.get_skipped_eval_params:
setattr(self, key, item)
self.convert_path_params()
def convert_path_params(self):
for key, item in self.__dict__.items():
if isinstance(item, str) and "path" in key:
setattr(self, key, Path(item))
@property
def get_skipped_eval_params(self) -> List[str]:
return ["batch_size", "distributed", "num_gpu", "task"]
class MNIST_classifier():
def __init__(self, opts: TrainingOptions):
self.opts = opts
def get_compiled_model(self):
model = keras.Sequential([
keras.layers.Flatten(input_shape=(28, 28)),
keras.layers.Dense(128, activation='relu'),
keras.layers.Dense(10)
])
model.compile(optimizer='adam',
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
# setting from_logits=True bc didn’t use a SoftMax layer in the final layer
metrics=[keras.metrics.SparseCategoricalAccuracy()])
return model
def make_or_restore_model(self, resume=False):
# Set up the model
if resume:
latest_checkpoint = self.find_checkpoint(which_checkpoint="last")
if latest_checkpoint is not None:
logger.info(f"Restoring from {latest_checkpoint}")
return keras.models.load_model(str(latest_checkpoint))
else:
logger.info("Creating a new model")
return self.get_compiled_model()
def prepare_dataset(self):
logging.info(f"Dataset will be saved under: {self.opts.data_path}")
(train_images, train_labels), (test_images, test_labels) = keras.datasets.mnist.load_data(
path=self.opts.data_path)
train_images = train_images.astype("float32") / 255.0
test_images = test_images.astype("float32") / 255.0
train_labels = train_labels.astype("float32")
test_labels = test_labels.astype("float32")
# Reserve num_val_samples samples for validation
num_val_samples = int(train_images.shape[0] * self.opts.val_fraction)
val_images = train_images[-num_val_samples:]
val_labels = train_labels[-num_val_samples:]
train_images = train_images[:-num_val_samples]
train_labels = train_labels[:-num_val_samples]
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).batch(
self.opts.global_batch_size, drop_remainder=True,
)
val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels)).batch(self.opts.global_batch_size, drop_remainder=True,)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(self.opts.global_batch_size, drop_remainder=True,)
return train_dataset, val_dataset, test_dataset
def init_callbacks(self):
filepath_best_checkpoint = str(self.opts.model_weights_path / "best_model.h5")
checkpoint_best = ModelCheckpoint(filepath_best_checkpoint, monitor='val_loss',
verbose=0, save_best_only=True, mode='min',
save_weights_only=False)
filepath_last_checkpoint = str(self.opts.model_weights_path / "last_model.h5")
checkpoint_last = ModelCheckpoint(filepath_last_checkpoint, verbose=0,
save_best_only=False, save_freq='epoch',
save_weights_only=False)
tensorboard_callback = TensorBoard(log_dir=str(self.opts.logs_path), histogram_freq=1,
)
callbacks_list = [checkpoint_best, checkpoint_last, tensorboard_callback]
return callbacks_list
def train(self, resume=False):
self.init_time = perf_counter()
self.train_dataset, self.val_dataset, self.test_dataset = self.prepare_dataset()
self.callbacks_list = self.init_callbacks()
# Create a MirroredStrategy.
if self.opts.distributed:
self.strategy = tf.distribute.MirroredStrategy()
assert self.strategy.num_replicas_in_sync == self.opts.num_gpu, "Distributed training requires number of GPUs available to be the same as number of replicas in sync"
with self.strategy.scope():
self.model = self.make_or_restore_model(resume=resume)
else:
self.model = self.make_or_restore_model(resume=resume)
self.model.fit(x=self.train_dataset,
validation_data=self.val_dataset,
epochs=self.opts.num_epochs,
verbose=2,
steps_per_epoch=self.opts.steps_per_epoch,
callbacks=self.callbacks_list,
)
def evaluate(self, test_dataset, which_checkpoint="best"):
checkpoint = self.find_checkpoint(which_checkpoint=which_checkpoint)
assert checkpoint is not None
model = keras.models.load_model(checkpoint)
test_loss, test_acc = model.evaluate(test_dataset, verbose=0)
return test_loss, test_acc
def find_checkpoint(self, which_checkpoint="best"):
if which_checkpoint == "best":
checkpoints = list(self.opts.model_weights_path.glob('best_model.h5'))
if which_checkpoint == "last":
checkpoints = list(self.opts.model_weights_path.glob('last_model.h5'))
if checkpoints:
latest_checkpoint = max(checkpoints, key=os.path.getctime)
return latest_checkpoint
else:
return None
@click.command(no_args_is_help=True)
@click.option('--task', required=True, type=click.Choice(['train', 'test']), help='Task to perform')
@click.option('--model_name', type=str, required=True, default="default", help='Name of your experiment')
@click.option('--batch_size', type=int, default=1000, help='Batch size', )
@click.option('--num_epochs', type=int, default=100, help='Number of epochs')
@click.option('--resume', is_flag=True, default=False, help='Whether to resume training')
@click.option('--val_fraction', type=click.FloatRange(0, 1), default=0.2, help='Fraction of validation subset')
def train_or_evaluate(**kwargs):
sys_info(**kwargs)
set_seed()
if kwargs["task"] == 'train':
training_opts = TrainingOptions(**kwargs)
mnist_classifier = MNIST_classifier(opts=training_opts)
mnist_classifier.train(resume=training_opts.resume)
elif kwargs["task"] == 'test':
eval_opts = TrainingOptions(**kwargs)
eval_opts.load_opts()
mnist_classifier = MNIST_classifier(opts=eval_opts)
mnist_classifier.train_dataset, mnist_classifier.val_dataset, mnist_classifier.test_dataset = mnist_classifier.prepare_dataset()
train_loss, train_acc = mnist_classifier.evaluate(mnist_classifier.train_dataset)
logger.info(f'Train accuracy: {train_acc}, train loss: {train_loss}')
val_loss, val_acc = mnist_classifier.evaluate(mnist_classifier.val_dataset)
logger.info(f'Val accuracy: {val_acc}, val loss: {train_loss}')
test_loss, test_acc = mnist_classifier.evaluate(mnist_classifier.test_dataset)
logger.info(f'Test accuracy: {test_acc}, test loss: {train_loss}')
if __name__ == "__main__":
train_or_evaluate()
## Run locally
1. Build your container
```shell
apptainer build --fakeroot nvidia_tensorflow.sif nvidia_tensorflow.def
```
2. Run
- Run test line to see that correct tf version is installed:
```shell
apptainer exec nvidia_tensorflow.sif python -c "import tensorflow as tf; print("TensorFlow version:", tf.__version__)"
```
- Or run training of MNIST:
```shell
apptainer exec --fakeroot nvidia_tensorflow.sif python MNIST_example/src/main.py --task train --model_name model_distr --num_epochs 5 --batch_size 500 --do_profile
```
## Run on RAVEN
1. Load the apptainer modules:
```shell
module purge #this will unload all previous modules
module load apptainer/1.2.2
```
2. Build your container:
```shell
apptainer build --fakeroot nvidia_tensorflow.sif nvidia_tensorflow.def
```
3. Run
3.1. You could now test that correct version of container is installed:
```shell
apptainer exec --nv nvidia_tensorflow.sif python -c "import tensorflow as tf; print(f'TensorFlow version:{tf.__version__}')"
```
Alternatively, you could connect to container shell and test that everything withing a container is visible/runs correcly:
```shell
apptainer shell nvidia_tensorflow.sif #This will switch you to the container instance
python -c "import tensorflow as tf; print(f'TensorFlow version:{tf.__version__}')"
```
3.2. You could run training of MNIST via command:
```shell
apptainer exec --nv nvidia_tensorflow.sif python MNIST_example/src/main.py --task train --model_name test --num_epochs 5 --batch_size 100
```
See example of submitting a job to the RAVEN cluster:
- **For undistributed training** : in "scripts/run_undistributed.slurm"
- **For distributed training** using 1 node and 4 GPUs: in "scripts/run_distributed_4_gpu.slurm"
## Monitor training
One can connect to tensorboard to monitor training.
In order to do so:
1. On you local machine run:
```shell
ssh -N -f -L localhost:6006:localhost:6006 user.name@raven.mpcdf.mpg.de
```
2. On the cluster, load tensorboard module:
```shell
module load anaconda/3/2021.11 tensorflow/cpu/2.11.0 tensorboard/2.11.0
```
3. Connect to your log directory:
```shell
tensorboard --logdir MNIST_example/models/test/logs --port 6006
```
4. Now in a browser, open http://localhost:6006
\ No newline at end of file
#!/bin/bash -l
#
#SBATCH -o logs/%j_gpu4.out.distr
#SBATCH -e logs/%j_gpu4.err.distr
#SBATCH -D ./
#SBATCH -J mnist
#
#SBATCH --nodes=1
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=72
#SBATCH --mem=0
#
#SBATCH --constraint="gpu"
#SBATCH --gres=gpu:a100:4
#
#SBATCH --mail-type=end
#SBATCH --mail-user=g.nasta.work@gmail.com
#
# Wall clock limit (max. is 24 hours):
#SBATCH --time=00:59:00
cd ../
EXPERIMENT_NAME=MNIST_example
NUM_EPOCH=20
BATCH_SIZE=120
MODEL_NAME="${EXPERIMENT_NAME}_distributed_4gpu"
echo "Use containers"
source /etc/profile.d/modules.sh
module purge
module load apptainer/1.2.2
nvidia-smi --query-gpu=timestamp,utilization.gpu,utilization.memory,pci.bus_id --format=csv -l 2 > "nvidia_smi_monitoring_$MODEL_NAME.csv" &
NVIDIASMI_PID=$!
srun apptainer exec \
--nv -B .:"$HOME" \
nvidia_tensorflow.sif python $EXPERIMENT_NAME/src/main.py --task train --model_name $MODEL_NAME \
--num_epochs $NUM_EPOCH --batch_size $BATCH_SIZE
srun apptainer exec \
--nv -B .:"$HOME" \
nvidia_tensorflow.sif python $EXPERIMENT_NAME/src/main.py --task test --model_name $MODEL_NAME \
--batch_size $BATCH_SIZE
kill $NVIDIASMI_PID
#!/bin/bash -l
#
#SBATCH -o logs/%j_gpu1_undistr.out.undistr
#SBATCH -e logs/%j_gpu1_undistr.err.undistr
#SBATCH -D ./
#SBATCH -J mnist
#
#SBATCH --nodes=1
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=18
#SBATCH --mem=64GB
#
#SBATCH --constraint="gpu"
#SBATCH --gres=gpu:a100:1
#
#SBATCH --mail-type=end
#SBATCH --mail-user=g.nasta.work@gmail.com
#
# Wall clock limit (max. is 24 hours):
#SBATCH --time=00:59:00
cd ../
EXPERIMENT_NAME=MNIST_example
NUM_EPOCH=20
BATCH_SIZE=120
MODEL_NAME="${EXPERIMENT_NAME}_undistributed"
source /etc/profile.d/modules.sh
module purge
module load apptainer/1.2.2
nvidia-smi --query-gpu=timestamp,utilization.gpu,utilization.memory,pci.bus_id --format=csv -l 2 > "nvidia_smi_monitoring_$MODEL_NAME.csv" &
NVIDIASMI_PID=$!
srun apptainer exec \
--nv -B .:"$HOME" \
nvidia_tensorflow.sif python $EXPERIMENT_NAME/src/main.py --task train \
--model_name $MODEL_NAME --num_epochs $NUM_EPOCH --batch_size $BATCH_SIZE
srun apptainer exec \
--nv -B .:"$HOME" \
nvidia_tensorflow.sif python $EXPERIMENT_NAME/src/main.py --task test \
--model_name $MODEL_NAME --batch_size $BATCH_SIZE
kill $NVIDIASMI_PID
[tool.ruff]
line-length = 88
[tool.ruff.lint.pycodestyle]
max-doc-length = 88
max-line-length = 88
[tool.ruff.lint]
extend-select = ["I", "W505"]
# TensorFlow example
This directory provides a lightweight example of training a ResNet50 model on synthetic data using TensorFlow, covering both single-gpu (undistributed) and multi-node / multi-node (distributed) setups.
## :file_folder: Folder structure
- **`src/`**:
- Contains a script that trains ResNet50 on synthetic data while printing images-per-second for basic performance monitoring.
- **System-specific subfolders** (**`viper/`**, **`raven/`** )
- Include machine-specific container recipes and SLURM submission scripts.
To build and run on a system, cd into the relevant folder and follow the instructions in README.md.
# Tensorflow on Raven
Use Nvidia containers: https://catalog.ngc.nvidia.com/orgs/nvidia/containers/tensorflow
## BUILD
1. Load the latest apptainer module, e.g.
```bash
module load apptainer/1.4.1
```
2. Build your container:
```
apptainer build nvidia-tensorflow.sif nvidia-tensorflow.def
```
## RUN
* To run the training script on synthetic data using ResNet50 in an **undistributed** fashion, execute:
```bash
sbatch run_undistributed.slurm
```
* To run it in a **distributed** fashion on a **single node with multiple GPUs**, execute:
```bash
sbatch run_distributed_1_node_multi_gpu.slurm
```
* To run it in a **distributed** fashion across **multiple nodes**, execute:
```bash
sbatch run_distributed_multi_node_multi_gpu.slurm
```
### Results
Container: nvcr.io/nvidia/tensorflow:24.10-tf2-py3
#### (local) batch_size=256
> `CONF` = 1.96 * std
|NNODES|NGPUS|IPS|CONF|
|-|-|-|-|
|1|1|855.1|4.3|
|1|2|1664.7|10.9|
|1|4|3278.1|30.5|
|2|8|5764.7|57.4|
BootStrap: docker BootStrap: docker
From: nvcr.io/nvidia/tensorflow:23.12-tf2-py3 From: nvcr.io/nvidia/tensorflow:24.10-tf2-py3
%post
%environment
%files
%runscript
%labels %labels
author Nastassya Horlava author AI group
version 1.0 version 1.0
%help %help
The converted Docker image for NVIDIAs Tensorflow (version 2.14). The converted Docker image for NVIDIAs Tensorflow (version 2.16).
%test
#!/bin/bash -l
#SBATCH -o logs/%j_multigpu.log
#SBATCH -e logs/%j_multigpu.log
#SBATCH -J tf_synth
#SBATCH --nodes=1 # request a full node
#SBATCH --ntasks-per-node=1 # only start 1 task via srun because Python multiprocessing starts more tasks internally
#SBATCH --ntasks-per-socket=1
#SBATCH --cpus-per-task=72 # assign all the cores to that first task to make room for Python's multiprocessing tasks
#SBATCH --constraint="gpu"
#SBATCH --gres=gpu:a100:4
#SBATCH --mem=500000
#SBATCH --time=02:15:00
module purge
module load apptainer/1.4.1
###### Variables ######
CONTAINER="nvidia-tensorflow.sif"
export TF_FORCE_GPU_ALLOW_GROWTH=true
srun apptainer exec --nv -B ../src/:/workspace/ ${CONTAINER} bash -c """
export RANK=\${SLURM_PROCID}
python /workspace/train_synthetic.py train
"""
\ No newline at end of file
#!/bin/bash -l
#SBATCH -o logs/%j_multinode.log
#SBATCH -e logs/%j_multinode.log
#SBATCH -J tf_synth
#SBATCH --nodes=2 # request multiple nodes
#SBATCH --ntasks-per-node=1 # only start 1 task via srun because Python multiprocessing starts more tasks internally
#SBATCH --ntasks-per-socket=1
#SBATCH --cpus-per-task=72 # assign all the cores to that first task to make room for Python's multiprocessing tasks
#SBATCH --constraint="gpu"
#SBATCH --gres=gpu:a100:4
#SBATCH --mem=500000
#SBATCH --time=02:15:00
module purge
module load apptainer/1.4.1
###### Variables ######
CONTAINER="nvidia-tensorflow.sif"
export TF_FORCE_GPU_ALLOW_GROWTH=true
PRE_RUN="source ../src/set_tf_config_multiple_nodes.sh && echo \$TF_CONFIG && export RANK=\${SLURM_PROCID}"
srun bash -c """
${PRE_RUN} &&
export RANK=\${SLURM_PROCID} &&
apptainer exec --nv -B ../src/:/workspace/ ${CONTAINER} python /workspace/train_synthetic.py train
"""
#!/bin/bash -l
#SBATCH -o logs/%j_undistributed.log
#SBATCH -e logs/%j_undistributed.log
#SBATCH -J tf_synth
#SBATCH --nodes=1 # request a full node
#SBATCH --ntasks-per-node=1 # only start 1 task via srun because Python multiprocessing starts more tasks internally
#SBATCH --ntasks-per-socket=1
#SBATCH --cpus-per-task=18 # assign all the cores to that first task to make room for Python's multiprocessing tasks
#SBATCH --constraint="gpu"
#SBATCH --gres=gpu:a100:1
#SBATCH --mem=125000
#SBATCH --time=02:15:00
module purge
module load apptainer/1.4.1
###### Variables ######
CONTAINER="nvidia-tensorflow.sif"
export TF_FORCE_GPU_ALLOW_GROWTH=true
srun apptainer exec --nv -B ../src/:/workspace/ ${CONTAINER} bash -c """
export RANK=\${SLURM_PROCID}
python /workspace/train_synthetic.py train
"""
\ No newline at end of file
#!/bin/bash -l
export TF_PORT="12321"
export WORKERS_LIST=$(scontrol show hostnames | sed "s/$/:${TF_PORT}\"/g" | \
sed 's/^/"/g' | paste -s -d ',' )
export TF_CONFIG="{\"cluster\": {\"worker\": [${WORKERS_LIST}]}, \"task\": {\"type\": \"worker\", \"index\": $SLURM_NODEID} }"
echo $TF_CONFIG
# adapted from https://www.tensorflow.org/guide/keras/distributed_training
import logging
import os
from contextlib import nullcontext
from dataclasses import dataclass, field
from pathlib import Path
from time import perf_counter
from typing import Any, Dict, Optional, Union
import click
import pandas as pd
import tensorflow as tf
logger = logging.getLogger(__name__)
class TimingCallback(tf.keras.callbacks.Callback):
def __init__(
self,
batch_size: int,
log_freq: Union[None, str, int] = "batch",
num_warmup_batches: int = 10,
rank: int = 0,
):
super().__init__()
self.batch_size = batch_size
self._chief_worker_only = True
self.rank = rank
self.num_warmup_batches = num_warmup_batches
self.tracked_time = []
if log_freq == "batch":
self.log_every_epoch = False
self.log_every_n_steps = 1
elif isinstance(log_freq, int):
self.log_every_epoch = False
self.log_every_n_steps = log_freq
else:
raise ValueError(
"Expected `log_freq` argument to be an integer or 'batch'. "
f"Received: log_freq={log_freq} (of type {type(log_freq)})"
)
logger.info(str(self.__dict__))
def on_train_batch_begin(self, batch, logs=None):
if self.log_every_n_steps is None or logs is None:
return
current_iteration = int(self.model.optimizer.iterations.numpy())
if (current_iteration + 1) % self.log_every_n_steps == 0 and self.rank == 0:
self.batch_begin_time_train = perf_counter()
def on_train_batch_end(self, batch, logs=None):
if self.log_every_n_steps is None or logs is None:
return
current_iteration = int(self.model.optimizer.iterations.numpy())
if current_iteration % self.log_every_n_steps == 0 and self.rank == 0:
batch_end_time = perf_counter()
time_elapsed_for_batch = batch_end_time - self.batch_begin_time_train
logger.info(
f"Current IPS: epoch_idx = {self.current_epoch}; batch_idx = {batch}; batch_time_elapsed = {time_elapsed_for_batch}; batch_images_per_sec = {self.batch_size / time_elapsed_for_batch}"
)
self.tracked_time.append(
{
"type": "batch",
"epoch_idx": self.current_epoch,
"batch_idx": batch,
"begin_time": self.batch_begin_time_train,
"end_time": batch_end_time,
"batch_time_elapsed-train": time_elapsed_for_batch,
"batch_images_per_sec-train": self.batch_size
/ time_elapsed_for_batch,
"rank": self.rank,
"step": current_iteration,
}
)
def on_epoch_begin(self, epoch, logs=None):
self.current_epoch = epoch
def on_train_end(self, logs=None):
if self.rank == 0:
# print(self.tracked_time, flush = True)
tracked_time_pd = pd.DataFrame.from_records(self.tracked_time)
# print(tracked_time_pd.head(), flush = True)
tracked_time_pd = tracked_time_pd[
(tracked_time_pd["rank"] == 0)
& (tracked_time_pd["step"] >= self.num_warmup_batches)
]
logger.info(
f"Average IPS = {tracked_time_pd['batch_images_per_sec-train'].mean()}(+-{tracked_time_pd['batch_images_per_sec-train'].std()})"
)
def set_seed(seed: int = 5):
import random
import numpy as np
if not isinstance(seed, int):
raise ValueError(
"Expected `seed` argument to be an integer. "
f"Received: seed={seed} (of type {type(seed)})"
)
random.seed(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
class NullStrategy:
@staticmethod
def scope():
return nullcontext()
@property
def num_replicas_in_sync(self):
return 1
class NullCommunication:
@property
def name(self):
return "null"
@dataclass
class TrainingStrategy:
"""
Determine and initialize the appropriate TensorFlow distribution strategy
based on hardware configuration and environment.
"""
device_type: str = field(init=False)
num_nodes: int = field(init=False)
num_gpus_per_node: int = field(init=False)
strategy_type: str = field(init=False)
communication_type: Union[
str, tf.distribute.experimental.CommunicationImplementation
] = field(init=False)
cross_device_communication_type: Union[str, tf.distribute.CrossDeviceOps] = field(
init=False
)
communication_options: Optional[tf.distribute.experimental.CommunicationOptions] = (
field(default=None, init=False)
)
strategy: Union[
tf.distribute.MultiWorkerMirroredStrategy,
tf.distribute.MirroredStrategy,
NullStrategy,
] = field(init=False)
def __post_init__(self):
self._gather_cluster_info()
self.device_type = self._detect_device_type()
self._select_strategy()
self._log_strategy_params()
def _gather_cluster_info(self) -> None:
"""Read environment variables related to cluster configuration."""
self.num_nodes = int(os.environ.get("SLURM_NNODES", -1))
self.num_gpus_per_node = int(os.environ.get("SLURM_GPUS_ON_NODE", -1))
def _detect_device_type(self) -> str:
"""
Determine the GPU type from available hardware.
Returns:
str: "NVIDIA", "AMD", or raises NotImplementedError if unknown.
"""
gpus = tf.config.list_physical_devices("GPU")
device_names = [
tf.config.experimental.get_device_details(gpu).get("device_name", "")
for gpu in gpus
]
if not device_names:
return "NONE"
if all("NVIDIA" in name for name in device_names):
return "NVIDIA"
elif all("AMD" in name for name in device_names):
return "AMD"
raise NotImplementedError(f"Unsupported GPU types detected: {device_names}")
def _select_strategy(self) -> None:
"""Select and initialize the appropriate distribution strategy."""
if self.num_nodes == 1 and self.num_gpus_per_node == 1:
self._use_null_strategy()
elif self.num_nodes == 1:
self._use_single_node_multi_gpu_strategy()
elif self.num_nodes > 1:
self._use_multi_node_strategy()
else:
raise NotImplementedError(
f"Unsupported combination of num_nodes={self.num_nodes} and num_gpus_per_node={self.num_gpus_per_node}"
)
def _use_null_strategy(self) -> None:
self.strategy_type = "null"
self.communication_type = NullCommunication()
self.cross_device_communication_type = NullCommunication()
self.strategy = NullStrategy()
def _use_single_node_multi_gpu_strategy(self) -> None:
self.strategy_type = "MirroredStrategy"
self.communication_type = NullCommunication()
self.cross_device_communication_type = (
self._get_cross_device_ops_implementation(self.device_type)
)
self.strategy = tf.distribute.MirroredStrategy(
cross_device_ops=self.cross_device_communication_type
)
def _use_multi_node_strategy(self) -> None:
self.cross_device_communication_type = NullCommunication()
self.communication_type = self._get_communication_implementation(
self.device_type
)
self.communication_options = tf.distribute.experimental.CommunicationOptions(
implementation=self.communication_type
)
self.strategy_type = "MultiWorkerMirroredStrategy"
self.strategy = tf.distribute.MultiWorkerMirroredStrategy(
communication_options=self.communication_options
)
def _get_cross_device_ops_implementation(self, device_type: str):
"""Map device type to appropriate communication implementation."""
if device_type == "NVIDIA":
return tf.distribute.NcclAllReduce()
elif device_type == "AMD":
return tf.distribute.ReductionToOneDevice()
else:
raise NotImplementedError(
f"Unsupported device type for communication: {device_type}"
)
def _get_communication_implementation(self, device_type: str):
"""Map device type to appropriate communication implementation."""
if device_type == "NVIDIA":
return tf.distribute.experimental.CommunicationImplementation.NCCL
elif device_type == "AMD":
return tf.distribute.experimental.CommunicationImplementation.RING
else:
raise NotImplementedError(
f"Unsupported device type for communication: {device_type}"
)
def _log_strategy_params(self) -> None:
"""Log key strategy configuration"""
logger.info(f"num_replicas_in_sync = {self.strategy.num_replicas_in_sync}")
logger.info(f"strategy_type = {self.strategy_type}")
logger.info(f"communication_type = {self.communication_type.name}")
logger.info(
f"cross_device_communication_type = {type(self.cross_device_communication_type)}"
)
@dataclass
class TrainingOptions:
batch_size_per_device: int = 256
num_epochs: int = 100
lr: float = 1e-3
model_type: str = "ResNet50"
training_strategy: TrainingStrategy = field(init=False)
momentum: float = 0.9
weight_decay: float = 5e-4
optimizer_name: str = "SGD"
loss: str = "SparseCategoricalCrossentropy"
seed: int = 228
timing_log_freq: Union[int, str] = 10
timing_warmup_batches: int = 10
dataset_size: int = 20000
global_batch_size: int = field(init=False)
@classmethod
def from_yaml(
cls, cfg_path: Union[str, Path], cli_kwargs: Dict[Any, Any]
) -> "TrainingOptions":
import yaml
with open(cfg_path, "r") as file:
data = yaml.safe_load(file)
for key, item in cli_kwargs.items():
logger.info(f"{key}: {item}")
if item is not None:
data[key] = item
return cls(**data)
def __post_init__(self):
self.training_strategy = TrainingStrategy()
self.distributed = self.training_strategy.strategy_type != "null"
self.num_replicas_in_sync = (
self.training_strategy.strategy.num_replicas_in_sync
if hasattr(self.training_strategy.strategy, "num_replicas_in_sync")
else 1
)
logger.info("Local batch size was provided, so scaling up gbs")
self.global_batch_size = (
int(self.batch_size_per_device * self.num_replicas_in_sync)
if self.distributed
else self.batch_size_per_device
)
self.steps_per_epoch = self.dataset_size // self.global_batch_size
class SYNTH_classifier:
def __init__(self, opts: TrainingOptions):
self.opts = opts
def get_compiled_model(self):
model = tf.keras.applications.ResNet50(weights=None)
model.compile(
optimizer=tf.keras.optimizers.SGD(
learning_rate=self.opts.lr,
momentum=self.opts.momentum,
weight_decay=self.opts.weight_decay,
),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
)
return model
def prepare_dataset(self):
@tf.function
def gen_fn(_):
image = tf.random.uniform([224, 224, 3])
label = tf.random.uniform([], minval=0, maxval=999, dtype=tf.int64)
return image, label
dataset = tf.data.Dataset.range(10**9) # # Infinite-like
dataset = dataset.map(gen_fn, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.batch(self.opts.global_batch_size, drop_remainder=True)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
return dataset
def train(self):
self.train_dataset = self.prepare_dataset()
logger.info(f"train_dataset: {type(self.train_dataset)}")
# Define distributed strategy
if self.opts.distributed:
self.train_dataset = (
self.opts.training_strategy.strategy.experimental_distribute_dataset(
self.train_dataset
)
)
# Create a MirroredStrategy or MultiWorkerMirroredStrategy in case of
# distributed training, or just NullStrategy instead.
with self.opts.training_strategy.strategy.scope():
self.model = self.get_compiled_model()
callbacks = self.configure_callbacks()
logger.info(f"element_spec: {self.train_dataset.element_spec}")
self.model.fit(
x=self.train_dataset,
epochs=self.opts.num_epochs,
steps_per_epoch=self.opts.steps_per_epoch,
callbacks=callbacks,
)
def configure_callbacks(self):
callbacks = [
TimingCallback(
batch_size=self.opts.global_batch_size,
log_freq=self.opts.timing_log_freq,
rank=int(os.environ.get("RANK", 0)),
num_warmup_batches=self.opts.timing_warmup_batches,
),
]
return callbacks
def evaluate(self, test_dataset):
test_loss, test_acc = self.model.evaluate(test_dataset, verbose=0)
return test_loss, test_acc
@click.group()
def cli():
pass
@cli.command(no_args_is_help=False)
@click.option("--batch_size_per_device", type=int, default=256)
@click.option("--run_cfg", type=click.Path(exists=True), default=None)
def train(
batch_size_per_device,
run_cfg,
):
if run_cfg is not None:
training_options = TrainingOptions.from_yaml(
cfg_path=run_cfg,
cli_kwargs=dict(
batch_size_per_device=batch_size_per_device,
),
)
else:
training_options = TrainingOptions(
batch_size_per_device=batch_size_per_device,
)
set_seed(training_options.seed)
mnist_classifier = SYNTH_classifier(opts=training_options)
mnist_classifier.train()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.INFO) # Configure the root logger
urllib3_logger = logging.getLogger("urllib3")
urllib3_logger.setLevel(logging.WARNING)
simple_parsing_logger = logging.getLogger("simple_parsing")
simple_parsing_logger.setLevel(logging.INFO)
cli()
# Tensorflow on Viper
Use ROCM containers: https://hub.docker.com/r/rocm/tensorflow
## BUILD
1. Load the latest apptainer module, e.g.
```bash
module load apptainer/1.4.1
```
2. Build your container:
```
apptainer build amd-tensorflow.sif amd-tensorflow.def
```
## RUN
* To run the training script on synthetic data using ResNet50 in an **undistributed** fashion, execute:
```bash
sbatch run_undistributed.slurm
```
* To run it in a **distributed** fashion on a **single node with multiple GPUs**, execute:
```bash
sbatch run_distributed_1_node_multi_gpu.slurm
```
* To run it in a **distributed** fashion across **multiple nodes**, execute:
```bash
sbatch run_distributed_multi_node_multi_gpu.slurm
```
### Results
Container: rocm/tensorflow:rocm6.3.3-py3.12-tf2.16-dev
#### (local) batch_size=256
> `CONF` = 1.96 * std
|NNODES|NGPUS|IPS|CONF|
|-|-|-|-|
|1|1|1360.7|34.5|
|1|2|1204.8|32.7|
|2|4|2263.4|100.7|
BootStrap: docker
From: rocm/tensorflow:rocm6.3.3-py3.12-tf2.16-dev
%post
python -m pip install --upgrade pip
pip install ipython ipykernel
pip install pandas
pip install click
%environment
export ROCBLAS_TENSILE_LIBPATH=/opt/rocm-6.3.3/lib/rocblas/library
export MIOPEN_SYSTEM_DB_PATH=/opt/rocm-6.3.3/share/miopen/db
%labels
author AI group
version 1.0
%help
The converted Docker image for AMD Tensorflow (version 2.16).
#!/bin/bash -l
#SBATCH -o logs/%j_multigpu.log
#SBATCH -e logs/%j_multigpu.log
#SBATCH -J tf_synth
#SBATCH --nodes=1 # request a full node
#SBATCH --ntasks-per-node=1 # only start 1 task via srun because Python multiprocessing starts more tasks internally
#SBATCH --ntasks-per-socket=1
#SBATCH --cpus-per-task=48 # assign all the cores to that first task to make room for Python's multiprocessing tasks
#SBATCH --constraint="apu"
#SBATCH --gres=gpu:2
#SBATCH --mem=0
#SBATCH --time=02:15:00
module purge
module load apptainer/1.4.1
###### Variables ######
CONTAINER="amd-tensorflow.sif"
export TF_FORCE_GPU_ALLOW_GROWTH=true
srun apptainer exec -B ../src/:/workspace/ ${CONTAINER} bash -c """
export RANK=\${SLURM_PROCID}
python /workspace/train_synthetic.py train
"""
#!/bin/bash -l
#SBATCH -o logs/%j_multinode.log
#SBATCH -e logs/%j_multinode.log
#SBATCH -J tf_synth
#SBATCH --nodes=2 # request multiple nodes
#SBATCH --ntasks-per-node=1 # only start 1 task via srun because Python multiprocessing starts more tasks internally
#SBATCH --ntasks-per-socket=1
#SBATCH --cpus-per-task=48 # assign all the cores to that first task to make room for Python's multiprocessing tasks
#SBATCH --constraint="apu"
#SBATCH --gres=gpu:2
#SBATCH --mem=0
#SBATCH --time=02:15:00
module purge
module load apptainer/1.4.1
###### Variables ######
CONTAINER="amd-tensorflow.sif"
export TF_FORCE_GPU_ALLOW_GROWTH=true
PRE_RUN="source ../src/set_tf_config_multiple_nodes.sh && echo \$TF_CONFIG && export RANK=\${SLURM_PROCID}"
srun bash -c """
${PRE_RUN} &&
export RANK=\${SLURM_PROCID} &&
apptainer exec --rocm -B ../src/:/workspace/ ${CONTAINER} python /workspace/train_synthetic.py train
"""
#!/bin/bash -l
#SBATCH -o logs/%j_undistributed.log
#SBATCH -e logs/%j_undistributed.log
#SBATCH -J tf_synth
#SBATCH --nodes=1 # request a full node
#SBATCH --ntasks-per-node=1 # only start 1 task via srun because Python multiprocessing starts more tasks internally
#SBATCH --ntasks-per-socket=1
#SBATCH --cpus-per-task=24 # assign all the cores to that first task to make room for Python's multiprocessing tasks
#SBATCH --constraint="apu"
#SBATCH --gres=gpu:1
#SBATCH --mem=120000
#SBATCH --time=02:15:00
module purge
module load apptainer/1.4.1
###### Variables ######
CONTAINER="amd-tensorflow.sif"
export TF_FORCE_GPU_ALLOW_GROWTH=true
srun apptainer exec -B ../src/:/workspace/ ${CONTAINER} bash -c """
export RANK=\${SLURM_PROCID}
python /workspace/train_synthetic.py train
"""
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment