From 616e91025791f4becf3fe605711172bfc5cc633d Mon Sep 17 00:00:00 2001 From: Repo Updater <noreply@mpcdf.mpg.de> Date: Mon, 14 Nov 2022 15:25:39 +0100 Subject: [PATCH] d26720ea Adapt jupyter book toc to filenames that changed --- ...roduction.ipynb => 1a--Introduction.ipynb} | 0 ...usion.ipynb => 1b--Teaser_Diffusion.ipynb} | 0 ...-Parallel_Programming_Shared_Memory.ipynb} | 1177 +--------------- ...allel_Programming_Distributed_Memory.ipynb | 1225 +++++++++++++++++ 4 files changed, 1230 insertions(+), 1172 deletions(-) rename notebooks/{1b--Introduction.ipynb => 1a--Introduction.ipynb} (100%) rename notebooks/{1a--Teaser_Diffusion.ipynb => 1b--Teaser_Diffusion.ipynb} (100%) rename notebooks/{4a--Parallel_Programming.ipynb => 4a--Parallel_Programming_Shared_Memory.ipynb} (61%) create mode 100644 notebooks/4c--Parallel_Programming_Distributed_Memory.ipynb diff --git a/notebooks/1b--Introduction.ipynb b/notebooks/1a--Introduction.ipynb similarity index 100% rename from notebooks/1b--Introduction.ipynb rename to notebooks/1a--Introduction.ipynb diff --git a/notebooks/1a--Teaser_Diffusion.ipynb b/notebooks/1b--Teaser_Diffusion.ipynb similarity index 100% rename from notebooks/1a--Teaser_Diffusion.ipynb rename to notebooks/1b--Teaser_Diffusion.ipynb diff --git a/notebooks/4a--Parallel_Programming.ipynb b/notebooks/4a--Parallel_Programming_Shared_Memory.ipynb similarity index 61% rename from notebooks/4a--Parallel_Programming.ipynb rename to notebooks/4a--Parallel_Programming_Shared_Memory.ipynb index c205077..18f0775 100644 --- a/notebooks/4a--Parallel_Programming.ipynb +++ b/notebooks/4a--Parallel_Programming_Shared_Memory.ipynb @@ -8,7 +8,8 @@ } }, "source": [ - "# Parallel programming\n", + "# Parallel programming: shared-memory approaches\n", + "\n", "**Python for HPC course**\n", "\n", "Max Planck Computing and Data Facility, Garching" @@ -26,8 +27,7 @@ "\n", "* What is parallel computing?\n", "* Shared vs. distributed memory\n", - "* Multiprocessing\n", - "* MPI: mpi4py" + "* Multiprocessing" ] }, { @@ -261,7 +261,8 @@ "* MPI (distributed memory)\n", " * `mpi4py`\n", " * more complex\n", - " * can be used over a cluster" + " * can be used over a cluster\n", + " * separate lecture" ] }, { @@ -469,1174 +470,6 @@ "* Parallel image processing\n", "* Monte Carlo $\\pi$ computation\n" ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "slide" - } - }, - "source": [ - "## Distributed-memory computing with MPI\n", - "* Message passing model\n", - " * Computational problem is decomposed into local parts\n", - " * Multiple processes are launched that work on individual parts\n", - " * Processes communicate via messages to solve the global problem\n", - "* Message passing interface, MPI\n", - " * MPI is a standard defining APIs and semantics for communication\n", - " * Collective communication\n", - " * Peer-to-peer communication\n", - "* Various implementations are available, e.g.\n", - " * OpenMPI (via Linux distributions)\n", - " * IntelMPI (commercial product, on HPC systems)\n", - " * and many others" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "## A quick introduction to MPI\n", - "* Message passing for communication & synchronization\n", - "* For more in-depth coverage on parallel programming, see courses at HLRS (e.g. \"Advanced Parallel Programming with MPI and OpenMP\")" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### General functions:\n", - "\n", - "| Description | C implementation | python implementation |\n", - "| ----------- | ---------------- | --------------------- |\n", - "| Initialization | MPI_Init | - (done automatically) |\n", - "| Standard communicator | MPI_COMM_WORLD | comm = MPI.COMM_WORLD |\n", - "| Get rank of process | MPI_Comm_rank | comm.Get_rank() |\n", - "| Get number of processes | MPI_Comm_size | comm.Get_size() |\n", - "| Wait for all processes | MPI_Barrier | comm.Barrier() |" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Point-to-point communication:\n", - "\n", - "| Description | C implementation | python implementation |\n", - "| ----------- | ---------------- | --------------------- |\n", - "| Send to another process | MPI_Send | comm.Send |\n", - "| Receive from other process | MPI_Recv | comm.Recv |\n", - "| Combined send & receive | MPI_Sendrecv | comm.Sendrecv |" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Collective communication:\n", - "\n", - "| Description | C implementation | python implementation |\n", - "| ----------- | ---------------- | --------------------- |\n", - "| Reduction (sum, max, min, ...) | MPI_Reduce, MPI_Allreduce | comm.Reduce, comm.Allreduce |\n", - "| Gathering data | MPI_Gather, MPI_Allgather | comm.Gather, comm.Allgather |\n", - "| Scattering data | MPI_Scatter | comm.Scatter |\n", - "| Broadcasting data | MPI_Bcast | comm.Bcast |\n" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Non-blocking communication\n", - "* Functions start with I (e.g. `MPI_Isend`, `MPI_Irecv`)\n", - " * Return immediately to caller\n", - " * Additional request object\n", - "* Check if communication has finished with `MPI_Test` or `MPI_Wait`\n", - "* Sometimes necessary to avoid deadlock\n", - "* Can be more efficient: more messages can be processed at once\n", - "* Possibility of overlapping computation and communication (advanced!)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "slide" - } - }, - "source": [ - "## MPI for Python\n", - "* Most popular Python module `mpi4py`\n", - "* Wrapper around (almost) all MPI functions\n", - "* Communication of\n", - " * python objects\n", - " * Lower-case functions (send, recv, ...)\n", - " * Objects need to be compatible with pickle\n", - " * Numpy arrays\n", - " * Upper-case functions (Send, Recv, ...)\n", - " * More efficient: underlying memory used directly" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### `mpi4py`: simple example" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "from mpi4py import MPI\n", - "\n", - "comm = MPI.COMM_WORLD\n", - "rank = comm.Get_rank()\n", - "size = comm.Get_size()\n", - "print(\"I am rank {:d} of {:d}.\".format(rank, size))\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Point-to-point communication\n", - "* No global synchronization needed\n", - "* Messages to be sent and to be received must match!\n", - "\n", - "| Description | C implementation | python implementation |\n", - "| ----------- | ---------------- | --------------------- |\n", - "| Send to another process | MPI_Send | comm.Send |\n", - "| Receive from other process | MPI_Recv | comm.Recv |\n", - "| Combined send & receive | MPI_Sendrecv | comm.Sendrecv |\n", - "\n", - "* Send and receive take a `tag` argument (to separate different communications)\n", - "* Send needs a `dest` argument\n", - "* Recv needs a `source` argument" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "```python\n", - "# mpi_point_to_point_basic.py\n", - "from mpi4py import MPI\n", - "\n", - "comm = MPI.COMM_WORLD\n", - "rank = comm.Get_rank()\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "if rank == 0:\n", - " data = {'a': 7, 'b': 3.14}\n", - " # note: data is packed using 'pickle' internally\n", - " comm.send(data, dest=1, tag=11)\n", - "elif rank == 1:\n", - " # note: data is unpacked using 'pickle' internally\n", - " data = comm.recv(source=0, tag=11)\n", - " print(\"Hello from rank 1. I received: data=\"+str(data))\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "```python\n", - "# mpi_point_to_point_numpy.py\n", - "from mpi4py import MPI\n", - "import numpy as np\n", - "\n", - "comm = MPI.COMM_WORLD\n", - "rank = comm.Get_rank()\n", - "\n", - "n_elem = 4\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "# automatic MPI datatype discovery\n", - "if rank == 0:\n", - " data = np.arange(n_elem, dtype=np.float64)\n", - " comm.Send(data, dest=1, tag=13)\n", - "elif rank == 1:\n", - " data = np.empty(n_elem, dtype=np.float64)\n", - " comm.Recv(data, source=0, tag=13)\n", - " print(\"Hello from rank 1. I received: data=\"+str(data))\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Collective communication\n", - "\n", - "| Description | C implementation | python implementation |\n", - "| ----------- | ---------------- | --------------------- |\n", - "| Reduction (sum, max, min, ...) | MPI_Reduce, MPI_Allreduce | comm.Reduce, comm.Allreduce |\n", - "| Gathering data | MPI_Gather, MPI_Allgather | comm.Gather, comm.Allgather |\n", - "| Scattering data | MPI_Scatter | comm.Scatter |\n", - "| Broadcasting data | MPI_Bcast | comm.Bcast |\n", - "\n", - "* All processes of a communicator participate\n", - "* Useful for managing distributed data objects" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "#### Broadcast and scatter\n", - "\n", - "* Distribute data from one rank to all other ranks\n", - "\n", - "\n", - "\n", - "\n", - "(figure from https://github.com/wesleykendall/mpitutorial)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "```python\n", - "# mpi_collective_bcast_basic.py\n", - "from mpi4py import MPI\n", - "\n", - "comm = MPI.COMM_WORLD\n", - "rank = comm.Get_rank()\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "if rank == 0:\n", - " data = {'key1' : [7, 2.72, 2+3j],\n", - " 'key2' : ('abc', 'xyz')}\n", - "else:\n", - " data = None\n", - "# broadcast the data from rank 0 to all the other ranks\n", - "data = comm.bcast(data, root=0)\n", - "\n", - "if rank > 0:\n", - " print(\"Hello from rank \"+str(rank)+\". I received: data=\" + str(data))\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "```python\n", - "# mpi_collective_bcast_numpy.py\n", - "from mpi4py import MPI\n", - "import numpy as np\n", - "\n", - "comm = MPI.COMM_WORLD\n", - "rank = comm.Get_rank()\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "if rank == 0:\n", - " data = np.arange(100, dtype=np.int)\n", - "else:\n", - " data = np.empty(100, dtype=np.int)\n", - "\n", - "comm.Bcast(data, root=0)\n", - "\n", - "for i in range(100):\n", - " assert data[i] == i\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "```python\n", - "from mpi4py import MPI\n", - "import numpy as np\n", - "\n", - "comm = MPI.COMM_WORLD\n", - "size = comm.Get_size()\n", - "rank = comm.Get_rank()\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "sendbuf = None\n", - "if rank == 0:\n", - " sendbuf = np.arange(size*4, dtype='i')\n", - "recvbuf = np.empty(4, dtype='i')\n", - "comm.Scatter(sendbuf, recvbuf, root=0)\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "#### Gather and Allgather\n", - "\n", - "* Gather data from all ranks to \n", - " * one rank (gather)\n", - " * all ranks (allgather)\n", - "\n", - "<div>\n", - "<img src=\"fig/gather.png\" style=\"float:left\" />\n", - "<img src=\"fig/allgather.png\" />\n", - "</div>\n", - "\n", - "(figures from https://github.com/wesleykendall/mpitutorial)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "```python\n", - "from mpi4py import MPI\n", - "import numpy as np\n", - "\n", - "comm = MPI.COMM_WORLD\n", - "size = comm.Get_size()\n", - "rank = comm.Get_rank()\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "sendbuf = np.zeros(100, dtype='i') + rank\n", - "recvbuf = None\n", - "if rank == 0:\n", - " recvbuf = np.empty([size, 100], dtype='i')\n", - " \n", - "comm.Gather(sendbuf, recvbuf, root=0)\n", - "if rank == 0:\n", - " for i in range(size):\n", - " assert np.allclose(recvbuf[i,:], i)\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "#### Reductions\n", - "\n", - "* Combine values from all ranks with a certain operation (e.g. sum, max, min, product)\n", - "* Get result on\n", - " * one rank: Reduce\n", - " * all ranks: Allreduce\n", - "\n", - "<div>\n", - "<img src=\"fig/mpi_reduce_2.png\" style=\"float:left; width:45%\" />\n", - "<img src=\"fig/mpi_allreduce_1.png\" style=\"width:45%\" />\n", - "</div>\n", - "\n", - "(figures from https://github.com/wesleykendall/mpitutorial)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "```python\n", - "from mpi4py import MPI\n", - "import numpy as np\n", - "\n", - "comm = MPI.COMM_WORLD\n", - "rank = comm.Get_rank()\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "data = np.arange(100, dtype=np.float64) + rank * 1000\n", - "maximum = data.max()\n", - "\n", - "# use python variable\n", - "global_maximum = comm.allreduce(maximum, op=MPI.MAX)\n", - "\n", - "# use numpy array -> sum for every element of the array across all process\n", - "data_sum = np.zeros_like(data)\n", - "comm.Allreduce(data, data_sum, op=MPI.SUM)\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Distributed-memory programming in python\n", - "* `mpi4py` provides wrappers for almost all MPI functions\n", - "* Use with numpy arrays $\\to$ better performance\n", - "* Interfacing external libraries possible\n", - " * Passing MPI communicators from python to library" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "slide" - } - }, - "source": [ - "## Design of parallel algorithms\n", - "\n", - "Which steps are necessary to parallelize my application?" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Design of parallel algorithms\n", - "1. Partitioning\n", - " * divide computation and data into pieces\n", - " * use domain and functional decomposition\n", - "2. Communication\n", - " * leads to parallel overhead\n", - " * prefer local communication over global communication\n", - "3. Agglomeration\n", - " * grouping of tasks\n", - " * balance computation and communication\n", - "4. Mapping\n", - " * ensure maximum processor utilization\n", - " * static/dynamic load balancing, task scheduling\n", - " \n", - "(from M. Quinn, *Parallel programming in C with MPI and OpenMP*)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "slide" - } - }, - "source": [ - "## Design example: embarrassingly parallel data processing\n", - "* Process data depending on an index\n", - "* Processing independent for each index\n", - "* Example code:" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "indices = np.arange(N)\n", - "results = np.zeros_like(indices, dtype=np.float64)\n", - "for index in indices:\n", - " data = load_data(index)\n", - " result = compute_something_difficult(data)\n", - " results[index] = get_result_summary(result)\n", - " save_some_figure()\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Data processing: apply design steps\n", - "1. Partitioning:\n", - " * Computation for each index independent\n", - " * Data loaded independently\n", - "2. Communication:\n", - " * Computation embarassingly parallel, no communication needed\n", - " * Results need to be collected at the end\n", - "3. Agglomeration:\n", - " * If `compute_something_difficult` takes long enough, no agglomeration needed\n", - "4. Mapping:\n", - " * Assume that computation takes the same time for each index\n", - " * Thus, use *static load balancing*" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### From serial to parallel\n", - "* Indices\n", - " * Global indices: full index array to be worked on\n", - " * Local indices: indices that the current process works on\n", - "* Mapping: determine local indices\n", - " * Use simple modulus rule\n", - " * For 3 processes:\n", - " * Process 0 works on indices 0, 3, 6, ...\n", - " * Process 1 works on indices 1, 4, 7, ...\n", - " * Process 2 works on indices 2, 5, 8, ...\n", - "* Communication\n", - " * Communicate results array to all process\n", - " * Simplest solution: use `Allreduce` function" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Computational part with mapping" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "from mpi4py import MPI\n", - "comm = MPI.COMM_WORLD\n", - "rank = comm.Get_rank()\n", - "number_ranks = comm.Get_size()\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "fragment" - } - }, - "source": [ - "```python\n", - "indices = np.arange(N)\n", - "results = np.zeros_like(indices, dtype=np.float64)\n", - "\n", - "# get local indices using modulus\n", - "local_selection = indices % number_ranks == rank\n", - "for index in indices[local_selection]:\n", - " # same as above\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Communication" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "results_sum = np.zeros_like(results)\n", - "comm.Allreduce(results, results_sum, op=MPI.SUM)\n", - "results = results_sum\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Data processing example\n", - "* Simple example for a quick parallelization\n", - "* Also in `mpi/parallel_processing.py`\n", - "* Improvements:\n", - " * In principle, `Allreduce` not needed, `Allgather` enough $\\to$ more complicated to program\n", - " * Other selection of local indices possible (e.g. block data decomposition, see later)\n", - " * If computation times differ, *dynamic load balancing* needed" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "slide" - } - }, - "source": [ - "## Design example: domain decomposition for diffusion equation\n", - "* Toy problem: diffusion on a 2d periodic grid $u_t = \\alpha \\Delta u$\n", - "* Formula for time update of $u$ at position $x_i,y_j$ from time $t_n$ to $t_{n+1}$: \n", - "$$ u_{i,j}^{(n+1)} = u_{i,j}^{(n)} + \\frac{\\alpha \\Delta t}{\\Delta x^2} \\left( \n", - "u_{i-1,j}^{(n)}\n", - "+u_{i+1,j}^{(n)}\n", - "+u_{i,j-1}^{(n)}\n", - "+u_{i,j+1}^{(n)}\n", - "-4u_{i,j}^{(n)}\n", - "\\right) $$\n", - "* Stencil:\n", - "<img src=\"fig/stencil_5pt.svg\" style=\"width:33%\" />" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Apply design steps\n", - "\n", - "1. Partitioning: update for each grid point independent from others (data parallelism)\n", - "2. Communication: for each grid point, 4 neighboring points are needed\n", - "3. Agglomeration: to avoid excessive communication, assign larger parts of the 2D grid to each process\n", - "4. Mapping: update for each point takes the same time\n", - " * distribute grid in equal pieces to processes\n", - " * using square parts minimizes communication" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Domain decomposition\n", - "\n", - "<img src=\"fig/domain_decomposition_2d.svg\" style=\"width:45%\" />" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Diffusion equation: from serial to parallel\n", - "* Different grids: local and global grid with local and global indices\n", - "* Update on local grid: same as before\n", - "* But: communication of halo points needed in each time step!\n", - " * Determine neighboring processes\n", - " * Exchange halo points in each direction (send and receive)\n", - "* For periodic boundaries:\n", - " * Also communication of boundaries needed\n", - " * Treat halo & boundary points the same" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Implementation in python\n", - "* Global variables: define grid and processes" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "from mpi4py import MPI\n", - "n_global = 256 # global grid size\n", - "# number of process in x and y\n", - "np_x = 2\n", - "np_y = 2\n", - "# local grid size in x and y (works only for 4 processes in this case!)\n", - "nx_local = n_global / np_x\n", - "ny_local = n_global / np_y\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Process grid: use Cartesian communicator" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "# create Cartesian communicator\n", - "cart = comm.Create_cart(dims=(np_x, np_y), periods=(True, True), reorder=True)\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "fragment" - } - }, - "source": [ - "```python\n", - "# get position in Cartesian grid\n", - "index_x, index_y = cart.Get_coords(rank)\n", - "print(\"Rank {:d}, indices: {:d},{:d} of grid {:d},{:d}\".format(rank, index_x, index_y, np_x, np_y))\n", - "# get global and local indices (\"edges\" and \"bounds\")\n", - "edges_x = np.arange(np_x+1) * n_global // np_x\n", - "edges_y = np.arange(np_y+1) * n_global // np_y\n", - "bounds_x = (edges_x[index_x], edges_x[index_x+1])\n", - "bounds_y = (edges_y[index_y], edges_y[index_y+1])\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Determine neighbours" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "-" - } - }, - "source": [ - "```python\n", - "# determine neighbors for halo update\n", - "cart_neighbors = {}\n", - "src, dest = cart.Shift(direction=0, disp=1)\n", - "cart_neighbors['left'] = src\n", - "cart_neighbors['right'] = dest\n", - "src, dest = cart.Shift(direction=1, disp=1)\n", - "cart_neighbors['down'] = src\n", - "cart_neighbors['up'] = dest\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Halo exchange" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "```python\n", - "# Initialization of grid at a certain point\n", - "n_ghost = 2\n", - "grid = np.zeros([nx_local+n_ghost, ny_local+n_ghost])\n", - "\n", - "# Halo exchange in +x direction (right)\n", - "# declare send and receive buffers\n", - "sendbuf = np.array(grid[nx_local,1:ny_local+1]) # send rightmost column\n", - "recvbuf = np.zeros_like(sendbuf)\n", - "# call Sendrecv with neighbors determined earlier\n", - "cart.Sendrecv(sendbuf=sendbuf, dest=cart_neighbors['right'],\n", - " recvbuf=recvbuf, source=cart_neighbors['left'])\n", - "# copy from receive buffer\n", - "grid[0,1:ny_local+1] = recvbuf[:] # save to left halo column\n", - "\n", - "# repeat for other directions (left, up, down )\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### Domain decomposition example\n", - "* Part of exercises: complete halo exchange\n", - "* Simple example, several extensions possible\n", - " * Different decomposition for different numbers of processors\n", - " * Handle number of processors more flexible\n", - " * Larger/different stencil (more communication necessary)\n", - " * Different time integration scheme\n", - " * and much more..." - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "## Partitioning example: block data decomposition\n", - "* Distribute a vector of size $n$ over $p$ processors\n", - "* **If $n$ not divisible by $p$:** assign $\\lfloor n/p \\rfloor$ or $\\lceil n/p \\rceil$ elements to each processor\n", - "* Easiest way:\n", - " * First element of process $i$: $\\lfloor in/p\\rfloor$\n", - " * Last element of process $i$: $\\lfloor (i+1)n/p\\rfloor -1$\n", - " * Process controlling element $j$: $\\lfloor (p(j+1)-1)/n\\rfloor$\n", - "* In the algorithm: each processor only works on the local elements\n", - "* Often neighbouring elements necessary $\\to$ ghost cells, need to be communicated" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "slide" - } - }, - "source": [ - "## Parallel Input/Output\n", - "* Different options for I/O in parallel programs\n", - "* For data-parallel processing, often one file per data item\n", - "* For distributed data\n", - " * One file per process\n", - " * Pro: easy to program\n", - " * Con: good performance only up to a few thousand processes, depends on number of processes\n", - " * One file from all processes\n", - " * Pro: independent of number of processes, no problems with file system\n", - " * Con: more difficult to program (correctness, performance)\n", - "* Possible solution: use hdf5 via `h5py` also in parallel" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### External library: `h5py-mpi`\n", - "\n", - "* Write to one hdf5 file in parallel\n", - "* Pass MPI communicator from `mpi4py` to `h5py` (example from `h5py` documentation):\n", - "\n", - "```python\n", - "from mpi4py import MPI\n", - "import h5py\n", - "\n", - "rank = MPI.COMM_WORLD.Get_rank()\n", - "N = MPI.COMM_WORLD.Get_size()\n", - "\n", - "f = h5py.File('parallel.hdf5', 'w', driver='mpio', comm=MPI.COMM_WORLD)\n", - "dset = f.create_dataset('test', (N,), dtype='i')\n", - "dset[rank] = rank\n", - "f.close()\n", - "```\n", - "* This writes the numbers from 0 to N to the same dataset in the same file in parallel" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "slide" - } - }, - "source": [ - "## Summary: parallel programming with python\n", - "\n", - "* **Shared-memory programming** with processes: `multiprocessing` module\n", - " * Use `Pool` class for simple parallelization schemes\n", - " * *Use on one node only*\n", - "* **Distributed-memory programming** with MPI: `mpi4py` module\n", - " * Provides wrappers around all MPI-1/2/3 functions\n", - " * Communication of picklable objects or numpy arrays\n", - " * Complex programs possible\n", - " * *Scaling to several nodes possible*\n", - " \n", - "$\\to$ **Parallelization necessary to leverage modern HPC resources**" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "slide" - } - }, - "source": [ - "## How to use python on an HPC cluster\n", - "\n", - "* Usual workflow:\n", - " * Copy code to login node\n", - " * Compile extensions, if necessary\n", - " * Submit batch job to scheduler\n", - " * After the job has been run automatically, analyze the results\n", - "* Scheduler that is often used: slurm\n", - " * Job script depends on type of parallelization used\n", - " * Shared memory (`multiprocessing`): single-node job\n", - " * Distributed memory (`mpi4py`): single and multi-node jobs using MPI" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### An example slurm script for a shared-memory program\n", - "```bash\n", - "#!/bin/bash -l\n", - "#SBATCH -o ./job.out.%j\n", - "#SBATCH -e ./job.err.%j\n", - "#SBATCH -D ./\n", - "#SBATCH -J PY_MULTIPROCESSING\n", - "#SBATCH --ntasks=1 # launch job\n", - "#SBATCH --cpus-per-task=16 # allocating 16 cores of a shared node\n", - "#SBATCH --mem=60800M # memory limit (16 x 3800M)\n", - "#SBATCH --time=00:10:00\n", - "\n", - "module purge\n", - "module load gcc/10 impi/2019.8 anaconda/3/2020.02\n", - "\n", - "# Limit the number of OMP threads to the available resources per process ( 1 core)\n", - "export OMP_NUM_THREADS=1\n", - "\n", - "# The program 'python_multiprocessing.py' should internally fork '$SLURM_CPUS_PER_TASK' (e.g. passed as command line argument or read from 'os.environ') worker processes via multiprocessing.\n", - "srun python3 ./python_multiprocessing.py\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "source": [ - "### An example slurm script for a distributed-memory program\n", - "```bash\n", - "#!/bin/bash -l\n", - "#SBATCH -o ./job.out.%j\n", - "#SBATCH -e ./job.err.%j\n", - "#SBATCH -D ./\n", - "#SBATCH -J MPI4PY\n", - "#SBATCH --nodes=2 # launch 144 MPI tasks\n", - "#SBATCH --ntasks-per-node=72 # on two full nodes (i.e. on 2 x 72 physical cores)\n", - "#SBATCH --cpus-per-task=1\n", - "#SBATCH --time=00:10:00\n", - "\n", - "module purge\n", - "module load gcc/10 impi/2019.8 anaconda/3/2020.02 mpi4py/3.0.3\n", - "\n", - "# Limit the number of OMP threads to the available resources per MPI task (here: 1 core)\n", - "export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK\n", - "\n", - "srun python3 ./python_mpi4py.py\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "slideshow": { - "slide_type": "slide" - } - }, - "source": [ - "## Conclusion\n", - "\n", - "**python for HPC**: fast development of code + high performance is possible!" - ] } ], "metadata": { diff --git a/notebooks/4c--Parallel_Programming_Distributed_Memory.ipynb b/notebooks/4c--Parallel_Programming_Distributed_Memory.ipynb new file mode 100644 index 0000000..56e092c --- /dev/null +++ b/notebooks/4c--Parallel_Programming_Distributed_Memory.ipynb @@ -0,0 +1,1225 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "slide" + } + }, + "source": [ + "# Parallel programming: distributed-memory approaches\n", + "\n", + "**Python for HPC course**\n", + "\n", + "Max Planck Computing and Data Facility, Garching" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "slide" + } + }, + "source": [ + "## Outline\n", + "* Introduction to MPI\n", + "* Using MPI from python (mpi4py)\n", + "* Design of parallel algorithms\n", + "* Example\n", + "* Using python on an HPC cluster" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "slide" + } + }, + "source": [ + "## Distributed-memory computing with MPI\n", + "* Message passing model\n", + " * Computational problem is decomposed into local parts\n", + " * Multiple processes are launched that work on individual parts\n", + " * Processes communicate via messages to solve the global problem\n", + "* Message passing interface, MPI\n", + " * MPI is a standard defining APIs and semantics for communication\n", + " * Collective communication\n", + " * Peer-to-peer communication\n", + "* Various implementations are available, e.g.\n", + " * OpenMPI (via Linux distributions)\n", + " * IntelMPI (commercial product, on HPC systems)\n", + " * and many others" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "## A quick introduction to MPI\n", + "* Message passing for communication & synchronization\n", + "* For more in-depth coverage on parallel programming, see courses at HLRS (e.g. \"Advanced Parallel Programming with MPI and OpenMP\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### General functions:\n", + "\n", + "| Description | C implementation | python implementation |\n", + "| ----------- | ---------------- | --------------------- |\n", + "| Initialization | MPI_Init | - (done automatically) |\n", + "| Standard communicator | MPI_COMM_WORLD | comm = MPI.COMM_WORLD |\n", + "| Get rank of process | MPI_Comm_rank | comm.Get_rank() |\n", + "| Get number of processes | MPI_Comm_size | comm.Get_size() |\n", + "| Wait for all processes | MPI_Barrier | comm.Barrier() |" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Point-to-point communication:\n", + "\n", + "| Description | C implementation | python implementation |\n", + "| ----------- | ---------------- | --------------------- |\n", + "| Send to another process | MPI_Send | comm.Send |\n", + "| Receive from other process | MPI_Recv | comm.Recv |\n", + "| Combined send & receive | MPI_Sendrecv | comm.Sendrecv |" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Collective communication:\n", + "\n", + "| Description | C implementation | python implementation |\n", + "| ----------- | ---------------- | --------------------- |\n", + "| Reduction (sum, max, min, ...) | MPI_Reduce, MPI_Allreduce | comm.Reduce, comm.Allreduce |\n", + "| Gathering data | MPI_Gather, MPI_Allgather | comm.Gather, comm.Allgather |\n", + "| Scattering data | MPI_Scatter | comm.Scatter |\n", + "| Broadcasting data | MPI_Bcast | comm.Bcast |\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Non-blocking communication\n", + "* Functions start with I (e.g. `MPI_Isend`, `MPI_Irecv`)\n", + " * Return immediately to caller\n", + " * Additional request object\n", + "* Check if communication has finished with `MPI_Test` or `MPI_Wait`\n", + "* Sometimes necessary to avoid deadlock\n", + "* Can be more efficient: more messages can be processed at once\n", + "* Possibility of overlapping computation and communication (advanced!)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "slide" + } + }, + "source": [ + "## MPI for Python\n", + "* Most popular Python module `mpi4py`\n", + "* Wrapper around (almost) all MPI functions\n", + "* Communication of\n", + " * python objects\n", + " * Lower-case functions (send, recv, ...)\n", + " * Objects need to be compatible with pickle\n", + " * Numpy arrays\n", + " * Upper-case functions (Send, Recv, ...)\n", + " * More efficient: underlying memory used directly" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### `mpi4py`: simple example" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "from mpi4py import MPI\n", + "\n", + "comm = MPI.COMM_WORLD\n", + "rank = comm.Get_rank()\n", + "size = comm.Get_size()\n", + "print(\"I am rank {:d} of {:d}.\".format(rank, size))\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Point-to-point communication\n", + "* No global synchronization needed\n", + "* Messages to be sent and to be received must match!\n", + "\n", + "| Description | C implementation | python implementation |\n", + "| ----------- | ---------------- | --------------------- |\n", + "| Send to another process | MPI_Send | comm.Send |\n", + "| Receive from other process | MPI_Recv | comm.Recv |\n", + "| Combined send & receive | MPI_Sendrecv | comm.Sendrecv |\n", + "\n", + "* Send and receive take a `tag` argument (to separate different communications)\n", + "* Send needs a `dest` argument\n", + "* Recv needs a `source` argument" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "```python\n", + "# mpi_point_to_point_basic.py\n", + "from mpi4py import MPI\n", + "\n", + "comm = MPI.COMM_WORLD\n", + "rank = comm.Get_rank()\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "if rank == 0:\n", + " data = {'a': 7, 'b': 3.14}\n", + " # note: data is packed using 'pickle' internally\n", + " comm.send(data, dest=1, tag=11)\n", + "elif rank == 1:\n", + " # note: data is unpacked using 'pickle' internally\n", + " data = comm.recv(source=0, tag=11)\n", + " print(\"Hello from rank 1. I received: data=\"+str(data))\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "```python\n", + "# mpi_point_to_point_numpy.py\n", + "from mpi4py import MPI\n", + "import numpy as np\n", + "\n", + "comm = MPI.COMM_WORLD\n", + "rank = comm.Get_rank()\n", + "\n", + "n_elem = 4\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "# automatic MPI datatype discovery\n", + "if rank == 0:\n", + " data = np.arange(n_elem, dtype=np.float64)\n", + " comm.Send(data, dest=1, tag=13)\n", + "elif rank == 1:\n", + " data = np.empty(n_elem, dtype=np.float64)\n", + " comm.Recv(data, source=0, tag=13)\n", + " print(\"Hello from rank 1. I received: data=\"+str(data))\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Collective communication\n", + "\n", + "| Description | C implementation | python implementation |\n", + "| ----------- | ---------------- | --------------------- |\n", + "| Reduction (sum, max, min, ...) | MPI_Reduce, MPI_Allreduce | comm.Reduce, comm.Allreduce |\n", + "| Gathering data | MPI_Gather, MPI_Allgather | comm.Gather, comm.Allgather |\n", + "| Scattering data | MPI_Scatter | comm.Scatter |\n", + "| Broadcasting data | MPI_Bcast | comm.Bcast |\n", + "\n", + "* All processes of a communicator participate\n", + "* Useful for managing distributed data objects" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "#### Broadcast and scatter\n", + "\n", + "* Distribute data from one rank to all other ranks\n", + "\n", + "\n", + "\n", + "\n", + "(figure from https://github.com/wesleykendall/mpitutorial)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "```python\n", + "# mpi_collective_bcast_basic.py\n", + "from mpi4py import MPI\n", + "\n", + "comm = MPI.COMM_WORLD\n", + "rank = comm.Get_rank()\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "if rank == 0:\n", + " data = {'key1' : [7, 2.72, 2+3j],\n", + " 'key2' : ('abc', 'xyz')}\n", + "else:\n", + " data = None\n", + "# broadcast the data from rank 0 to all the other ranks\n", + "data = comm.bcast(data, root=0)\n", + "\n", + "if rank > 0:\n", + " print(\"Hello from rank \"+str(rank)+\". I received: data=\" + str(data))\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "```python\n", + "# mpi_collective_bcast_numpy.py\n", + "from mpi4py import MPI\n", + "import numpy as np\n", + "\n", + "comm = MPI.COMM_WORLD\n", + "rank = comm.Get_rank()\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "if rank == 0:\n", + " data = np.arange(100, dtype=np.int)\n", + "else:\n", + " data = np.empty(100, dtype=np.int)\n", + "\n", + "comm.Bcast(data, root=0)\n", + "\n", + "for i in range(100):\n", + " assert data[i] == i\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "```python\n", + "from mpi4py import MPI\n", + "import numpy as np\n", + "\n", + "comm = MPI.COMM_WORLD\n", + "size = comm.Get_size()\n", + "rank = comm.Get_rank()\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "sendbuf = None\n", + "if rank == 0:\n", + " sendbuf = np.arange(size*4, dtype='i')\n", + "recvbuf = np.empty(4, dtype='i')\n", + "comm.Scatter(sendbuf, recvbuf, root=0)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "#### Gather and Allgather\n", + "\n", + "* Gather data from all ranks to \n", + " * one rank (gather)\n", + " * all ranks (allgather)\n", + "\n", + "<div>\n", + "<img src=\"fig/gather.png\" style=\"float:left\" />\n", + "<img src=\"fig/allgather.png\" />\n", + "</div>\n", + "\n", + "(figures from https://github.com/wesleykendall/mpitutorial)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "```python\n", + "from mpi4py import MPI\n", + "import numpy as np\n", + "\n", + "comm = MPI.COMM_WORLD\n", + "size = comm.Get_size()\n", + "rank = comm.Get_rank()\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "sendbuf = np.zeros(100, dtype='i') + rank\n", + "recvbuf = None\n", + "if rank == 0:\n", + " recvbuf = np.empty([size, 100], dtype='i')\n", + " \n", + "comm.Gather(sendbuf, recvbuf, root=0)\n", + "if rank == 0:\n", + " for i in range(size):\n", + " assert np.allclose(recvbuf[i,:], i)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "#### Reductions\n", + "\n", + "* Combine values from all ranks with a certain operation (e.g. sum, max, min, product)\n", + "* Get result on\n", + " * one rank: Reduce\n", + " * all ranks: Allreduce\n", + "\n", + "<div>\n", + "<img src=\"fig/mpi_reduce_2.png\" style=\"float:left; width:45%\" />\n", + "<img src=\"fig/mpi_allreduce_1.png\" style=\"width:45%\" />\n", + "</div>\n", + "\n", + "(figures from https://github.com/wesleykendall/mpitutorial)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "```python\n", + "from mpi4py import MPI\n", + "import numpy as np\n", + "\n", + "comm = MPI.COMM_WORLD\n", + "rank = comm.Get_rank()\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "data = np.arange(100, dtype=np.float64) + rank * 1000\n", + "maximum = data.max()\n", + "\n", + "# use python variable\n", + "global_maximum = comm.allreduce(maximum, op=MPI.MAX)\n", + "\n", + "# use numpy array -> sum for every element of the array across all process\n", + "data_sum = np.zeros_like(data)\n", + "comm.Allreduce(data, data_sum, op=MPI.SUM)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Distributed-memory programming in python\n", + "* `mpi4py` provides wrappers for almost all MPI functions\n", + "* Use with numpy arrays $\\to$ better performance\n", + "* Interfacing external libraries possible\n", + " * Passing MPI communicators from python to library" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "slide" + } + }, + "source": [ + "## Design of parallel algorithms\n", + "\n", + "Which steps are necessary to parallelize my application?" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Design of parallel algorithms\n", + "1. Partitioning\n", + " * divide computation and data into pieces\n", + " * use domain and functional decomposition\n", + "2. Communication\n", + " * leads to parallel overhead\n", + " * prefer local communication over global communication\n", + "3. Agglomeration\n", + " * grouping of tasks\n", + " * balance computation and communication\n", + "4. Mapping\n", + " * ensure maximum processor utilization\n", + " * static/dynamic load balancing, task scheduling\n", + " \n", + "(from M. Quinn, *Parallel programming in C with MPI and OpenMP*)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "slide" + } + }, + "source": [ + "## Design example: embarrassingly parallel data processing\n", + "* Process data depending on an index\n", + "* Processing independent for each index\n", + "* Example code:" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "indices = np.arange(N)\n", + "results = np.zeros_like(indices, dtype=np.float64)\n", + "for index in indices:\n", + " data = load_data(index)\n", + " result = compute_something_difficult(data)\n", + " results[index] = get_result_summary(result)\n", + " save_some_figure()\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Data processing: apply design steps\n", + "1. Partitioning:\n", + " * Computation for each index independent\n", + " * Data loaded independently\n", + "2. Communication:\n", + " * Computation embarassingly parallel, no communication needed\n", + " * Results need to be collected at the end\n", + "3. Agglomeration:\n", + " * If `compute_something_difficult` takes long enough, no agglomeration needed\n", + "4. Mapping:\n", + " * Assume that computation takes the same time for each index\n", + " * Thus, use *static load balancing*" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### From serial to parallel\n", + "* Indices\n", + " * Global indices: full index array to be worked on\n", + " * Local indices: indices that the current process works on\n", + "* Mapping: determine local indices\n", + " * Use simple modulus rule\n", + " * For 3 processes:\n", + " * Process 0 works on indices 0, 3, 6, ...\n", + " * Process 1 works on indices 1, 4, 7, ...\n", + " * Process 2 works on indices 2, 5, 8, ...\n", + "* Communication\n", + " * Communicate results array to all process\n", + " * Simplest solution: use `Allreduce` function" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Computational part with mapping" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "from mpi4py import MPI\n", + "comm = MPI.COMM_WORLD\n", + "rank = comm.Get_rank()\n", + "number_ranks = comm.Get_size()\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "fragment" + } + }, + "source": [ + "```python\n", + "indices = np.arange(N)\n", + "results = np.zeros_like(indices, dtype=np.float64)\n", + "\n", + "# get local indices using modulus\n", + "local_selection = indices % number_ranks == rank\n", + "for index in indices[local_selection]:\n", + " # same as above\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Communication" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "results_sum = np.zeros_like(results)\n", + "comm.Allreduce(results, results_sum, op=MPI.SUM)\n", + "results = results_sum\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Data processing example\n", + "* Simple example for a quick parallelization\n", + "* Also in `mpi/parallel_processing.py`\n", + "* Improvements:\n", + " * In principle, `Allreduce` not needed, `Allgather` enough $\\to$ more complicated to program\n", + " * Other selection of local indices possible (e.g. block data decomposition, see later)\n", + " * If computation times differ, *dynamic load balancing* needed" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "slide" + } + }, + "source": [ + "## Design example: domain decomposition for diffusion equation\n", + "* Toy problem: diffusion on a 2d periodic grid $u_t = \\alpha \\Delta u$\n", + "* Formula for time update of $u$ at position $x_i,y_j$ from time $t_n$ to $t_{n+1}$: \n", + "$$ u_{i,j}^{(n+1)} = u_{i,j}^{(n)} + \\frac{\\alpha \\Delta t}{\\Delta x^2} \\left( \n", + "u_{i-1,j}^{(n)}\n", + "+u_{i+1,j}^{(n)}\n", + "+u_{i,j-1}^{(n)}\n", + "+u_{i,j+1}^{(n)}\n", + "-4u_{i,j}^{(n)}\n", + "\\right) $$\n", + "* Stencil:\n", + "<img src=\"fig/stencil_5pt.svg\" style=\"width:33%\" />" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Apply design steps\n", + "\n", + "1. Partitioning: update for each grid point independent from others (data parallelism)\n", + "2. Communication: for each grid point, 4 neighboring points are needed\n", + "3. Agglomeration: to avoid excessive communication, assign larger parts of the 2D grid to each process\n", + "4. Mapping: update for each point takes the same time\n", + " * distribute grid in equal pieces to processes\n", + " * using square parts minimizes communication" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Domain decomposition\n", + "\n", + "<img src=\"fig/domain_decomposition_2d.svg\" style=\"width:45%\" />" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Diffusion equation: from serial to parallel\n", + "* Different grids: local and global grid with local and global indices\n", + "* Update on local grid: same as before\n", + "* But: communication of halo points needed in each time step!\n", + " * Determine neighboring processes\n", + " * Exchange halo points in each direction (send and receive)\n", + "* For periodic boundaries:\n", + " * Also communication of boundaries needed\n", + " * Treat halo & boundary points the same" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Implementation in python\n", + "* Global variables: define grid and processes" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "from mpi4py import MPI\n", + "n_global = 256 # global grid size\n", + "# number of process in x and y\n", + "np_x = 2\n", + "np_y = 2\n", + "# local grid size in x and y (works only for 4 processes in this case!)\n", + "nx_local = n_global / np_x\n", + "ny_local = n_global / np_y\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Process grid: use Cartesian communicator" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "# create Cartesian communicator\n", + "cart = comm.Create_cart(dims=(np_x, np_y), periods=(True, True), reorder=True)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "fragment" + } + }, + "source": [ + "```python\n", + "# get position in Cartesian grid\n", + "index_x, index_y = cart.Get_coords(rank)\n", + "print(\"Rank {:d}, indices: {:d},{:d} of grid {:d},{:d}\".format(rank, index_x, index_y, np_x, np_y))\n", + "# get global and local indices (\"edges\" and \"bounds\")\n", + "edges_x = np.arange(np_x+1) * n_global // np_x\n", + "edges_y = np.arange(np_y+1) * n_global // np_y\n", + "bounds_x = (edges_x[index_x], edges_x[index_x+1])\n", + "bounds_y = (edges_y[index_y], edges_y[index_y+1])\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Determine neighbours" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "-" + } + }, + "source": [ + "```python\n", + "# determine neighbors for halo update\n", + "cart_neighbors = {}\n", + "src, dest = cart.Shift(direction=0, disp=1)\n", + "cart_neighbors['left'] = src\n", + "cart_neighbors['right'] = dest\n", + "src, dest = cart.Shift(direction=1, disp=1)\n", + "cart_neighbors['down'] = src\n", + "cart_neighbors['up'] = dest\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Halo exchange" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```python\n", + "# Initialization of grid at a certain point\n", + "n_ghost = 2\n", + "grid = np.zeros([nx_local+n_ghost, ny_local+n_ghost])\n", + "\n", + "# Halo exchange in +x direction (right)\n", + "# declare send and receive buffers\n", + "sendbuf = np.array(grid[nx_local,1:ny_local+1]) # send rightmost column\n", + "recvbuf = np.zeros_like(sendbuf)\n", + "# call Sendrecv with neighbors determined earlier\n", + "cart.Sendrecv(sendbuf=sendbuf, dest=cart_neighbors['right'],\n", + " recvbuf=recvbuf, source=cart_neighbors['left'])\n", + "# copy from receive buffer\n", + "grid[0,1:ny_local+1] = recvbuf[:] # save to left halo column\n", + "\n", + "# repeat for other directions (left, up, down )\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### Domain decomposition example\n", + "* Part of exercises: complete halo exchange\n", + "* Simple example, several extensions possible\n", + " * Different decomposition for different numbers of processors\n", + " * Handle number of processors more flexible\n", + " * Larger/different stencil (more communication necessary)\n", + " * Different time integration scheme\n", + " * and much more..." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "## Partitioning example: block data decomposition\n", + "* Distribute a vector of size $n$ over $p$ processors\n", + "* **If $n$ not divisible by $p$:** assign $\\lfloor n/p \\rfloor$ or $\\lceil n/p \\rceil$ elements to each processor\n", + "* Easiest way:\n", + " * First element of process $i$: $\\lfloor in/p\\rfloor$\n", + " * Last element of process $i$: $\\lfloor (i+1)n/p\\rfloor -1$\n", + " * Process controlling element $j$: $\\lfloor (p(j+1)-1)/n\\rfloor$\n", + "* In the algorithm: each processor only works on the local elements\n", + "* Often neighbouring elements necessary $\\to$ ghost cells, need to be communicated" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "slide" + } + }, + "source": [ + "## Parallel Input/Output\n", + "* Different options for I/O in parallel programs\n", + "* For data-parallel processing, often one file per data item\n", + "* For distributed data\n", + " * One file per process\n", + " * Pro: easy to program\n", + " * Con: good performance only up to a few thousand processes, depends on number of processes\n", + " * One file from all processes\n", + " * Pro: independent of number of processes, no problems with file system\n", + " * Con: more difficult to program (correctness, performance)\n", + "* Possible solution: use hdf5 via `h5py` also in parallel" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### External library: `h5py-mpi`\n", + "\n", + "* Write to one hdf5 file in parallel\n", + "* Pass MPI communicator from `mpi4py` to `h5py` (example from `h5py` documentation):\n", + "\n", + "```python\n", + "from mpi4py import MPI\n", + "import h5py\n", + "\n", + "rank = MPI.COMM_WORLD.Get_rank()\n", + "N = MPI.COMM_WORLD.Get_size()\n", + "\n", + "f = h5py.File('parallel.hdf5', 'w', driver='mpio', comm=MPI.COMM_WORLD)\n", + "dset = f.create_dataset('test', (N,), dtype='i')\n", + "dset[rank] = rank\n", + "f.close()\n", + "```\n", + "* This writes the numbers from 0 to N to the same dataset in the same file in parallel" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "slide" + } + }, + "source": [ + "## Summary: parallel programming with python\n", + "\n", + "* **Shared-memory programming** with processes: `multiprocessing` module\n", + " * Use `Pool` class for simple parallelization schemes\n", + " * *Use on one node only*\n", + "* **Distributed-memory programming** with MPI: `mpi4py` module\n", + " * Provides wrappers around all MPI-1/2/3 functions\n", + " * Communication of picklable objects or numpy arrays\n", + " * Complex programs possible\n", + " * *Scaling to several nodes possible*\n", + " \n", + "$\\to$ **Parallelization necessary to leverage modern HPC resources**" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "slide" + } + }, + "source": [ + "## How to use python on an HPC cluster\n", + "\n", + "* Usual workflow:\n", + " * Copy code to login node\n", + " * Compile extensions, if necessary\n", + " * Submit batch job to scheduler\n", + " * After the job has been run automatically, analyze the results\n", + "* Scheduler that is often used: slurm\n", + " * Job script depends on type of parallelization used\n", + " * Shared memory (`multiprocessing`): single-node job\n", + " * Distributed memory (`mpi4py`): single and multi-node jobs using MPI" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### An example slurm script for a shared-memory program\n", + "```bash\n", + "#!/bin/bash -l\n", + "#SBATCH -o ./job.out.%j\n", + "#SBATCH -e ./job.err.%j\n", + "#SBATCH -D ./\n", + "#SBATCH -J PY_MULTIPROCESSING\n", + "#SBATCH --ntasks=1 # launch job\n", + "#SBATCH --cpus-per-task=16 # allocating 16 cores of a shared node\n", + "#SBATCH --mem=60800M # memory limit (16 x 3800M)\n", + "#SBATCH --time=00:10:00\n", + "\n", + "module purge\n", + "module load gcc/10 impi/2019.8 anaconda/3/2020.02\n", + "\n", + "# Limit the number of OMP threads to the available resources per process ( 1 core)\n", + "export OMP_NUM_THREADS=1\n", + "\n", + "# The program 'python_multiprocessing.py' should internally fork '$SLURM_CPUS_PER_TASK' (e.g. passed as command line argument or read from 'os.environ') worker processes via multiprocessing.\n", + "srun python3 ./python_multiprocessing.py\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "subslide" + } + }, + "source": [ + "### An example slurm script for a distributed-memory program\n", + "```bash\n", + "#!/bin/bash -l\n", + "#SBATCH -o ./job.out.%j\n", + "#SBATCH -e ./job.err.%j\n", + "#SBATCH -D ./\n", + "#SBATCH -J MPI4PY\n", + "#SBATCH --nodes=2 # launch 144 MPI tasks\n", + "#SBATCH --ntasks-per-node=72 # on two full nodes (i.e. on 2 x 72 physical cores)\n", + "#SBATCH --cpus-per-task=1\n", + "#SBATCH --time=00:10:00\n", + "\n", + "module purge\n", + "module load gcc/10 impi/2019.8 anaconda/3/2020.02 mpi4py/3.0.3\n", + "\n", + "# Limit the number of OMP threads to the available resources per MPI task (here: 1 core)\n", + "export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK\n", + "\n", + "srun python3 ./python_mpi4py.py\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "slideshow": { + "slide_type": "slide" + } + }, + "source": [ + "## Conclusion\n", + "\n", + "**python for HPC**: fast development of code + high performance is possible!" + ] + } + ], + "metadata": { + "celltoolbar": "Slideshow", + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} -- GitLab