parallelInplace.hpp 10.5 KB
Newer Older
Berenger Bramas's avatar
Berenger Bramas committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
//////////////////////////////////////////////////////////
/// Code to sort merge two arrays in parallel taken
/// from an existing study.
///
/// By berenger.bramas@inria.fr 2020.
/// Licence is MIT.
/// Comes without any warranty.
///
/// Please refer to:
/// https://gitlab.inria.fr/bramas/inplace-merge
/// https://hal.inria.fr/hal-02613668
//////////////////////////////////////////////////////////
#ifndef PARALLELINPLACE_HPP
#define PARALLELINPLACE_HPP

#include <cstring>
#include <utility>
#include <algorithm>
#include <cassert>
#include <memory>

#include <omp.h>

namespace ParallelInplace {


// Reorder an array in place with extra moves :
// if we have [0 1 2 3 4 5 ; A B C]
// it creates [A B C ; 0 1 2 3 4 5]
template <class NumType>
inline void reorderShifting(NumType array[], const int lengthLeftPart, const int totalSize){
    const int lengthRightPart = totalSize - lengthLeftPart;
    // if one of the size is zero just return
    if(lengthLeftPart == 0 || lengthRightPart == 0){
        // do nothing
        return;
    }

    // size of the partitions at first iteration
    int workingLeftLength  = lengthLeftPart;
    int workingRightLength = lengthRightPart;
    // while the partitions have different sizes and none of them are null
    while(workingLeftLength != workingRightLength && workingLeftLength && workingRightLength){
        // if the left partition is the smallest
        if(workingLeftLength < workingRightLength){
            // move the left parition in the correct place
            for(int idx = 0 ; idx < workingLeftLength ; ++idx){
                std::swap(array[idx], array[idx + workingRightLength]);
            }
            // the new left partition is now the values that have been swaped
            workingRightLength = workingRightLength - workingLeftLength;
            //workingLeftLength = workingLeftLength;
        }
        // if right partition is the smallest
        else{
            // move the right partition in the correct place
            for(int idx = 0 ; idx < workingRightLength ; ++idx){
                std::swap(array[idx], array[idx + workingLeftLength]);
            }
            // shift the pointer to skip the correct values
            array = (array + workingRightLength);
            // the new left partition is the previous right minus the swaped values
            //workingRightLength = workingRightLength;
            workingLeftLength  = workingLeftLength - workingRightLength;
        }
    }
    // if partitions have the same size
    for(int idx = 0 ; idx < workingLeftLength ; ++idx){
        std::swap(array[idx], array[idx + workingLeftLength]);
    }
}

////////////////////////////////////////////////////////////////
// Merge functions
////////////////////////////////////////////////////////////////

template <class NumType>
void FindMedian(NumType array[],  int centerPosition, const int sizeArray,
                            int* middleA, int* middleB){
    if(centerPosition == 0 || centerPosition == sizeArray || array[centerPosition-1] <= array[centerPosition]){
        *middleA = centerPosition;
        *middleB = 0;
        return;
    }
    if(!(array[0] <= array[sizeArray-1])){
        *middleA = 0;
        *middleB = sizeArray-centerPosition;
    }

    int leftStart = 0;
    int leftLimite = centerPosition;
    int leftPivot = (leftLimite-leftStart)/2 + leftStart;

    int rightStart = centerPosition;
    int rightLimte = sizeArray;
    int rightPivot = (rightLimte-rightStart)/2 + rightStart;

    while(leftStart < leftLimite && rightStart < rightLimte
            && !(array[leftPivot] == array[rightPivot])){
        assert( leftPivot <  leftLimite);
        assert( rightPivot <  rightLimte);
        assert( leftPivot <  centerPosition);
        assert( rightPivot <  sizeArray);

        const int A0 = leftPivot-0;
        const int A1 = centerPosition-leftPivot;
        const int B0 = rightPivot-centerPosition;
        const int B1 = sizeArray-rightPivot;

        if(array[leftPivot] < array[rightPivot]){
            if(A0+B0 < A1+B1){
                leftStart = leftPivot+1;
                leftPivot = (leftLimite-leftStart)/2 + leftStart;
            }
            else{
                rightLimte = rightPivot;
                rightPivot = (rightLimte-rightStart)/2 + rightStart;
            }
        }
        else{
            if(A0+B0 < A1+B1){
                rightStart = rightPivot+1;
                rightPivot = (rightLimte-rightStart)/2 + rightStart;
            }
            else{
                leftLimite = leftPivot;
                leftPivot = (leftLimite-leftStart)/2 + leftStart;
            }
        }
    }

    *middleA = leftPivot;
    *middleB = rightPivot-centerPosition;
}


template <class NumType>
struct WorkingInterval{
    NumType* array;
    int currentStart;
    int currentMiddle;
    int currentEnd;
    int level;
    int depthLimite;
};

template <class NumType>
inline void parallelMergeInPlaceCore(NumType array[], int currentStart, int currentMiddle, int currentEnd,
                                    int level, const int depthLimite,
Berenger Bramas's avatar
Berenger Bramas committed
150
                                    volatile WorkingInterval<NumType> intervals[], volatile int barrier[]){
Berenger Bramas's avatar
Berenger Bramas committed
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171

    assert(0 <= currentStart);
    assert(currentStart <= currentMiddle);
    assert(currentMiddle <= currentEnd);

    if(currentStart != currentMiddle && currentMiddle != currentEnd){
        while(level != depthLimite && (currentEnd-currentStart) > 256){
            int middleA = 0;
            int middleB = 0;

            FindMedian(array + currentStart,  currentMiddle - currentStart, currentEnd-currentStart,
                        &middleA, &middleB);

            const int sizeRestA = currentMiddle-currentStart-middleA;
            const int sizeRestB = currentEnd-currentMiddle-middleB;

            reorderShifting(array + middleA + currentStart, sizeRestA, middleB+sizeRestA);

            const int targetThread = (1 << (depthLimite - level - 1)) + omp_get_thread_num();

            // Should be #pragma omp atomic write
Berenger Bramas's avatar
Berenger Bramas committed
172
            ((WorkingInterval<NumType>*)intervals)[targetThread] = WorkingInterval<NumType>{array,
Berenger Bramas's avatar
Berenger Bramas committed
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
                                     currentStart+middleA+middleB,
                                     currentStart+middleA+middleB+sizeRestA,
                                     currentEnd,
                                     level+1, depthLimite};
            #pragma omp atomic write
            barrier[targetThread] = 1;

            currentEnd = currentStart+middleA+middleB;
            currentMiddle = currentStart+middleA;

            assert(0 <= currentStart);
            assert(currentStart <= currentMiddle);
            assert(currentMiddle <= currentEnd);

            level += 1;
        }

        std::inplace_merge(array + currentStart, array+currentMiddle, array+currentEnd);
    }

    while(level != depthLimite){
        const int targetThread = (1 << (depthLimite - level - 1)) + omp_get_thread_num();

        // Should be #pragma omp atomic write
Berenger Bramas's avatar
Berenger Bramas committed
197
        ((WorkingInterval<NumType>*)intervals)[targetThread] = WorkingInterval<NumType>{array,
Berenger Bramas's avatar
Berenger Bramas committed
198 199 200 201 202 203 204 205 206 207 208 209 210 211
                                 currentEnd,
                                 currentEnd,
                                 currentEnd,
                                 level+1, depthLimite};
        #pragma omp atomic write
        barrier[targetThread] = 1;

        level += 1;
    }
}

template <class NumType>
inline void parallelMergeInPlace(NumType array[], const int sizeArray, int centerPosition,
                                 const long int numThreadsInvolved, const long int firstThread,
Berenger Bramas's avatar
Berenger Bramas committed
212
                                 volatile WorkingInterval<NumType> intervals[], volatile int barrier[]){
Berenger Bramas's avatar
Berenger Bramas committed
213 214 215 216 217
    const int numThread = omp_get_thread_num();

    for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){
        if(idxThread + firstThread == numThread){
#pragma omp atomic write
Berenger Bramas's avatar
Berenger Bramas committed
218
            barrier[numThread] = -1;
Berenger Bramas's avatar
Berenger Bramas committed
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
        }
        while(true){
            int dataAreReady;
#pragma omp atomic read
            dataAreReady = barrier[idxThread + firstThread];
            if(dataAreReady == -1){
                break;
            }
        }
    }

    // Already in good shape
    if(centerPosition == 0 || centerPosition == sizeArray || array[centerPosition-1] <= array[centerPosition]){
        for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){
            if(idxThread + firstThread == numThread){
        #pragma omp atomic write
Berenger Bramas's avatar
Berenger Bramas committed
235
                barrier[numThread] = 0;
Berenger Bramas's avatar
Berenger Bramas committed
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
            }
            while(true){
                int dataAreReady;
        #pragma omp atomic read
                dataAreReady = barrier[idxThread + firstThread];
                if(dataAreReady == 0){
                    break;
                }
            }
        }

        return;
    }

    for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){
        if(idxThread + firstThread == numThread){
    #pragma omp atomic write
Berenger Bramas's avatar
Berenger Bramas committed
253
            barrier[numThread] = -2;
Berenger Bramas's avatar
Berenger Bramas committed
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
        }
        while(true){
            int dataAreReady;
    #pragma omp atomic read
            dataAreReady = barrier[idxThread + firstThread];
            if(dataAreReady == -2){
                break;
            }
        }
    }

    if(numThread == firstThread){
        const int depthLimite = ffs(numThreadsInvolved) - 1;
#pragma omp atomic write
        barrier[numThread] = 1;

        parallelMergeInPlaceCore<NumType>(array, 0, centerPosition, sizeArray, 0, depthLimite,
                                          intervals, barrier);
    }
    else{
        while(true){
            int myDataAreReady;
#pragma omp atomic read
            myDataAreReady = barrier[numThread];
            if(myDataAreReady == 1){
                break;
            }
        }

        parallelMergeInPlaceCore<NumType>(intervals[numThread].array,
                                          intervals[numThread].currentStart,
                                          intervals[numThread].currentMiddle,
                                          intervals[numThread].currentEnd,
                                          intervals[numThread].level,
                                          intervals[numThread].depthLimite,
                                          intervals, barrier);
    }

    for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){
        if(idxThread + firstThread == numThread){
#pragma omp atomic write
Berenger Bramas's avatar
Berenger Bramas committed
295
            barrier[numThread] = 0;
Berenger Bramas's avatar
Berenger Bramas committed
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
        }
        while(true){
            int dataAreReady;
#pragma omp atomic read
            dataAreReady = barrier[idxThread + firstThread];
            if(dataAreReady == 0){
                break;
            }
        }
    }
}

}

#endif