Skip to content
Snippets Groups Projects
Commit 6ecde379 authored by Nastassya Horlava's avatar Nastassya Horlava
Browse files

removed old example

parent 189b26b6
No related branches found
No related tags found
1 merge request!4Docs tensorflow
# adapted from https://www.tensorflow.org/guide/keras/distributed_training
import click
import tensorflow as tf
from pathlib import Path
import logging
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard
import numpy as np
import os
from tensorflow import keras
from time import perf_counter
import pandas as pd
import shutil
from dataclasses import dataclass
import json
from typing import List
import pickle
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def set_seed(seed=5):
logger.info(f"Setting random seed to {seed}")
tf.random.set_seed(seed=seed)
np.random.seed(seed)
def sys_info(**kwargs):
logging.info(f"TensorFlow version:{tf.__version__}")
logging.info(f"Num GPUs Available: {len(tf.config.list_physical_devices('GPU'))}")
logging.info(f"Running parameters: ")
for key, value in kwargs.items():
logging.info(f"{key}:{value}")
@dataclass
class TrainingOptions():
task: str = "train"
resume: bool = False
batch_size: int = 500
num_epochs: int = 100
val_fraction: float = 0.2
model_name: str = "default"
project_path: str = Path(__file__).parent.resolve().parent
train_size = 60000 * (1 - val_fraction)
def __init__(self, **kwargs):
super().__init__()
self.__dict__.update(**{key: item for key, item in kwargs.items()})
self.experiment_path = self.project_path / 'models' / self.model_name
self.model_weights_path = self.experiment_path / 'weights'
self.data_path = self.project_path / 'data' / "mnist.npz"
self.logs_path = self.experiment_path / "logs"
if (self.task=="train" and self.resume) or (self.task =="test"):
self.load_opts()
self.experiment_path.mkdir(parents=True, exist_ok=True)
self.model_weights_path.mkdir(parents=True, exist_ok=True)
self.data_path.resolve().parent.mkdir(parents=True, exist_ok=True)
self.logs_path.mkdir(parents=True, exist_ok=True)
self.num_gpu = len(tf.config.list_physical_devices('GPU'))
self.distributed = self.num_gpu>1
self.global_batch_size = self.batch_size * self.num_gpu
self.steps_per_epoch = self.train_size // self.global_batch_size
if self.task == "train" and not self.resume:
shutil.rmtree(self.experiment_path, ignore_errors=False, onerror=None)
self.experiment_path.mkdir(parents=True, exist_ok=True)
self.save_opts()
def save_opts(self, ):
serializable_dict = self.get_serializeble_params()
with open(str(self.experiment_path / 'opts.json'), 'w') as f:
json.dump(serializable_dict, f)
def get_serializeble_params(self):
serializable_dict = self.__dict__.copy()
for key, item in serializable_dict.items():
if isinstance(item, Path):
serializable_dict[key] = str(item)
return serializable_dict
def load_opts(self):
with open(self.experiment_path / 'opts.json', 'r') as f:
opts_dict = json.load(f)
for key, item in opts_dict.items():
if key not in self.get_skipped_eval_params:
setattr(self, key, item)
self.convert_path_params()
def convert_path_params(self):
for key, item in self.__dict__.items():
if isinstance(item, str) and "path" in key:
setattr(self, key, Path(item))
@property
def get_skipped_eval_params(self) -> List[str]:
return ["batch_size", "distributed", "num_gpu", "task"]
class MNIST_classifier():
def __init__(self, opts: TrainingOptions):
self.opts = opts
def get_compiled_model(self):
model = keras.Sequential([
keras.layers.Flatten(input_shape=(28, 28)),
keras.layers.Dense(128, activation='relu'),
keras.layers.Dense(10)
])
model.compile(optimizer='adam',
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
# setting from_logits=True bc didn’t use a SoftMax layer in the final layer
metrics=[keras.metrics.SparseCategoricalAccuracy()])
return model
def make_or_restore_model(self, resume=False):
# Set up the model
if resume:
latest_checkpoint = self.find_checkpoint(which_checkpoint="last")
if latest_checkpoint is not None:
logger.info(f"Restoring from {latest_checkpoint}")
return keras.models.load_model(str(latest_checkpoint))
else:
logger.info("Creating a new model")
return self.get_compiled_model()
def prepare_dataset(self):
logging.info(f"Dataset will be saved under: {self.opts.data_path}")
(train_images, train_labels), (test_images, test_labels) = keras.datasets.mnist.load_data(
path=self.opts.data_path)
train_images = train_images.astype("float32") / 255.0
test_images = test_images.astype("float32") / 255.0
train_labels = train_labels.astype("float32")
test_labels = test_labels.astype("float32")
# Reserve num_val_samples samples for validation
num_val_samples = int(train_images.shape[0] * self.opts.val_fraction)
val_images = train_images[-num_val_samples:]
val_labels = train_labels[-num_val_samples:]
train_images = train_images[:-num_val_samples]
train_labels = train_labels[:-num_val_samples]
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).batch(
self.opts.global_batch_size, drop_remainder=True,
)
val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels)).batch(self.opts.global_batch_size, drop_remainder=True,)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(self.opts.global_batch_size, drop_remainder=True,)
return train_dataset, val_dataset, test_dataset
def init_callbacks(self):
filepath_best_checkpoint = str(self.opts.model_weights_path / "best_model.h5")
checkpoint_best = ModelCheckpoint(filepath_best_checkpoint, monitor='val_loss',
verbose=0, save_best_only=True, mode='min',
save_weights_only=False)
filepath_last_checkpoint = str(self.opts.model_weights_path / "last_model.h5")
checkpoint_last = ModelCheckpoint(filepath_last_checkpoint, verbose=0,
save_best_only=False, save_freq='epoch',
save_weights_only=False)
tensorboard_callback = TensorBoard(log_dir=str(self.opts.logs_path), histogram_freq=1,
)
callbacks_list = [checkpoint_best, checkpoint_last, tensorboard_callback]
return callbacks_list
def train(self, resume=False):
self.init_time = perf_counter()
self.train_dataset, self.val_dataset, self.test_dataset = self.prepare_dataset()
self.callbacks_list = self.init_callbacks()
# Create a MirroredStrategy.
if self.opts.distributed:
self.strategy = tf.distribute.MirroredStrategy()
assert self.strategy.num_replicas_in_sync == self.opts.num_gpu, "Distributed training requires number of GPUs available to be the same as number of replicas in sync"
with self.strategy.scope():
self.model = self.make_or_restore_model(resume=resume)
else:
self.model = self.make_or_restore_model(resume=resume)
self.model.fit(x=self.train_dataset,
validation_data=self.val_dataset,
epochs=self.opts.num_epochs,
verbose=2,
steps_per_epoch=self.opts.steps_per_epoch,
callbacks=self.callbacks_list,
)
def evaluate(self, test_dataset, which_checkpoint="best"):
checkpoint = self.find_checkpoint(which_checkpoint=which_checkpoint)
assert checkpoint is not None
model = keras.models.load_model(checkpoint)
test_loss, test_acc = model.evaluate(test_dataset, verbose=0)
return test_loss, test_acc
def find_checkpoint(self, which_checkpoint="best"):
if which_checkpoint == "best":
checkpoints = list(self.opts.model_weights_path.glob('best_model.h5'))
if which_checkpoint == "last":
checkpoints = list(self.opts.model_weights_path.glob('last_model.h5'))
if checkpoints:
latest_checkpoint = max(checkpoints, key=os.path.getctime)
return latest_checkpoint
else:
return None
@click.command(no_args_is_help=True)
@click.option('--task', required=True, type=click.Choice(['train', 'test']), help='Task to perform')
@click.option('--model_name', type=str, required=True, default="default", help='Name of your experiment')
@click.option('--batch_size', type=int, default=1000, help='Batch size', )
@click.option('--num_epochs', type=int, default=100, help='Number of epochs')
@click.option('--resume', is_flag=True, default=False, help='Whether to resume training')
@click.option('--val_fraction', type=click.FloatRange(0, 1), default=0.2, help='Fraction of validation subset')
def train_or_evaluate(**kwargs):
sys_info(**kwargs)
set_seed()
if kwargs["task"] == 'train':
training_opts = TrainingOptions(**kwargs)
mnist_classifier = MNIST_classifier(opts=training_opts)
mnist_classifier.train(resume=training_opts.resume)
elif kwargs["task"] == 'test':
eval_opts = TrainingOptions(**kwargs)
eval_opts.load_opts()
mnist_classifier = MNIST_classifier(opts=eval_opts)
mnist_classifier.train_dataset, mnist_classifier.val_dataset, mnist_classifier.test_dataset = mnist_classifier.prepare_dataset()
train_loss, train_acc = mnist_classifier.evaluate(mnist_classifier.train_dataset)
logger.info(f'Train accuracy: {train_acc}, train loss: {train_loss}')
val_loss, val_acc = mnist_classifier.evaluate(mnist_classifier.val_dataset)
logger.info(f'Val accuracy: {val_acc}, val loss: {train_loss}')
test_loss, test_acc = mnist_classifier.evaluate(mnist_classifier.test_dataset)
logger.info(f'Test accuracy: {test_acc}, test loss: {train_loss}')
if __name__ == "__main__":
train_or_evaluate()
## Run locally
1. Build your container
```shell
apptainer build --fakeroot nvidia_tensorflow.sif nvidia_tensorflow.def
```
2. Run
- Run test line to see that correct tf version is installed:
```shell
apptainer exec nvidia_tensorflow.sif python -c "import tensorflow as tf; print("TensorFlow version:", tf.__version__)"
```
- Or run training of MNIST:
```shell
apptainer exec --fakeroot nvidia_tensorflow.sif python MNIST_example/src/main.py --task train --model_name model_distr --num_epochs 5 --batch_size 500 --do_profile
```
## Run on RAVEN
1. Load the apptainer modules:
```shell
module purge #this will unload all previous modules
module load apptainer/1.2.2
```
2. Build your container:
```shell
apptainer build --fakeroot nvidia_tensorflow.sif nvidia_tensorflow.def
```
3. Run
3.1. You could now test that correct version of container is installed:
```shell
apptainer exec --nv nvidia_tensorflow.sif python -c "import tensorflow as tf; print(f'TensorFlow version:{tf.__version__}')"
```
Alternatively, you could connect to container shell and test that everything withing a container is visible/runs correcly:
```shell
apptainer shell nvidia_tensorflow.sif #This will switch you to the container instance
python -c "import tensorflow as tf; print(f'TensorFlow version:{tf.__version__}')"
```
3.2. You could run training of MNIST via command:
```shell
apptainer exec --nv nvidia_tensorflow.sif python MNIST_example/src/main.py --task train --model_name test --num_epochs 5 --batch_size 100
```
See example of submitting a job to the RAVEN cluster:
- **For undistributed training** : in "scripts/run_undistributed.slurm"
- **For distributed training** using 1 node and 4 GPUs: in "scripts/run_distributed_4_gpu.slurm"
## Monitor training
One can connect to tensorboard to monitor training.
In order to do so:
1. On you local machine run:
```shell
ssh -N -f -L localhost:6006:localhost:6006 user.name@raven.mpcdf.mpg.de
```
2. On the cluster, load tensorboard module:
```shell
module load anaconda/3/2021.11 tensorflow/cpu/2.11.0 tensorboard/2.11.0
```
3. Connect to your log directory:
```shell
tensorboard --logdir MNIST_example/models/test/logs --port 6006
```
4. Now in a browser, open http://localhost:6006
\ No newline at end of file
BootStrap: docker
From: nvcr.io/nvidia/tensorflow:23.12-tf2-py3
%post
%environment
%files
%runscript
%labels
author Nastassya Horlava
version 1.0
%help
The converted Docker image for NVIDIAs Tensorflow (version 2.14).
%test
#!/bin/bash -l
#
#SBATCH -o logs/%j_gpu4.out.distr
#SBATCH -e logs/%j_gpu4.err.distr
#SBATCH -D ./
#SBATCH -J mnist
#
#SBATCH --nodes=1
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=72
#SBATCH --mem=0
#
#SBATCH --constraint="gpu"
#SBATCH --gres=gpu:a100:4
#
#SBATCH --mail-type=end
#SBATCH --mail-user=g.nasta.work@gmail.com
#
# Wall clock limit (max. is 24 hours):
#SBATCH --time=00:59:00
cd ../
EXPERIMENT_NAME=MNIST_example
NUM_EPOCH=20
BATCH_SIZE=120
MODEL_NAME="${EXPERIMENT_NAME}_distributed_4gpu"
echo "Use containers"
source /etc/profile.d/modules.sh
module purge
module load apptainer/1.2.2
nvidia-smi --query-gpu=timestamp,utilization.gpu,utilization.memory,pci.bus_id --format=csv -l 2 > "nvidia_smi_monitoring_$MODEL_NAME.csv" &
NVIDIASMI_PID=$!
srun apptainer exec \
--nv -B .:"$HOME" \
nvidia_tensorflow.sif python $EXPERIMENT_NAME/src/main.py --task train --model_name $MODEL_NAME \
--num_epochs $NUM_EPOCH --batch_size $BATCH_SIZE
srun apptainer exec \
--nv -B .:"$HOME" \
nvidia_tensorflow.sif python $EXPERIMENT_NAME/src/main.py --task test --model_name $MODEL_NAME \
--batch_size $BATCH_SIZE
kill $NVIDIASMI_PID
#!/bin/bash -l
#
#SBATCH -o logs/%j_gpu1_undistr.out.undistr
#SBATCH -e logs/%j_gpu1_undistr.err.undistr
#SBATCH -D ./
#SBATCH -J mnist
#
#SBATCH --nodes=1
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=18
#SBATCH --mem=64GB
#
#SBATCH --constraint="gpu"
#SBATCH --gres=gpu:a100:1
#
#SBATCH --mail-type=end
#SBATCH --mail-user=g.nasta.work@gmail.com
#
# Wall clock limit (max. is 24 hours):
#SBATCH --time=00:59:00
cd ../
EXPERIMENT_NAME=MNIST_example
NUM_EPOCH=20
BATCH_SIZE=120
MODEL_NAME="${EXPERIMENT_NAME}_undistributed"
source /etc/profile.d/modules.sh
module purge
module load apptainer/1.2.2
nvidia-smi --query-gpu=timestamp,utilization.gpu,utilization.memory,pci.bus_id --format=csv -l 2 > "nvidia_smi_monitoring_$MODEL_NAME.csv" &
NVIDIASMI_PID=$!
srun apptainer exec \
--nv -B .:"$HOME" \
nvidia_tensorflow.sif python $EXPERIMENT_NAME/src/main.py --task train \
--model_name $MODEL_NAME --num_epochs $NUM_EPOCH --batch_size $BATCH_SIZE
srun apptainer exec \
--nv -B .:"$HOME" \
nvidia_tensorflow.sif python $EXPERIMENT_NAME/src/main.py --task test \
--model_name $MODEL_NAME --batch_size $BATCH_SIZE
kill $NVIDIASMI_PID
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment