Commit aca18ffa authored by BRAMAS Berenger's avatar BRAMAS Berenger

Merge branch 'update/omp-sort-strategies' into 'master'

Update/omp sort strategies

See merge request bramas/avx-512-sort!1
parents ef5ddf21 eee8a547
Pipeline #77809 failed with stage
//////////////////////////////////////////////////////////
/// Code to sort merge two arrays in parallel taken
/// from an existing study.
///
/// By berenger.bramas@inria.fr 2020.
/// Licence is MIT.
/// Comes without any warranty.
///
/// Please refer to:
/// https://gitlab.inria.fr/bramas/inplace-merge
/// https://hal.inria.fr/hal-02613668
//////////////////////////////////////////////////////////
#ifndef PARALLELINPLACE_HPP
#define PARALLELINPLACE_HPP
#include <cstring>
#include <utility>
#include <algorithm>
#include <cassert>
#include <memory>
#include <omp.h>
namespace ParallelInplace {
// Reorder an array in place with extra moves :
// if we have [0 1 2 3 4 5 ; A B C]
// it creates [A B C ; 0 1 2 3 4 5]
template <class NumType>
inline void reorderShifting(NumType array[], const int lengthLeftPart, const int totalSize){
const int lengthRightPart = totalSize - lengthLeftPart;
// if one of the size is zero just return
if(lengthLeftPart == 0 || lengthRightPart == 0){
// do nothing
return;
}
// size of the partitions at first iteration
int workingLeftLength = lengthLeftPart;
int workingRightLength = lengthRightPart;
// while the partitions have different sizes and none of them are null
while(workingLeftLength != workingRightLength && workingLeftLength && workingRightLength){
// if the left partition is the smallest
if(workingLeftLength < workingRightLength){
// move the left parition in the correct place
for(int idx = 0 ; idx < workingLeftLength ; ++idx){
std::swap(array[idx], array[idx + workingRightLength]);
}
// the new left partition is now the values that have been swaped
workingRightLength = workingRightLength - workingLeftLength;
//workingLeftLength = workingLeftLength;
}
// if right partition is the smallest
else{
// move the right partition in the correct place
for(int idx = 0 ; idx < workingRightLength ; ++idx){
std::swap(array[idx], array[idx + workingLeftLength]);
}
// shift the pointer to skip the correct values
array = (array + workingRightLength);
// the new left partition is the previous right minus the swaped values
//workingRightLength = workingRightLength;
workingLeftLength = workingLeftLength - workingRightLength;
}
}
// if partitions have the same size
for(int idx = 0 ; idx < workingLeftLength ; ++idx){
std::swap(array[idx], array[idx + workingLeftLength]);
}
}
////////////////////////////////////////////////////////////////
// Merge functions
////////////////////////////////////////////////////////////////
template <class NumType>
void FindMedian(NumType array[], int centerPosition, const int sizeArray,
int* middleA, int* middleB){
if(centerPosition == 0 || centerPosition == sizeArray || array[centerPosition-1] <= array[centerPosition]){
*middleA = centerPosition;
*middleB = 0;
return;
}
if(!(array[0] <= array[sizeArray-1])){
*middleA = 0;
*middleB = sizeArray-centerPosition;
}
int leftStart = 0;
int leftLimite = centerPosition;
int leftPivot = (leftLimite-leftStart)/2 + leftStart;
int rightStart = centerPosition;
int rightLimte = sizeArray;
int rightPivot = (rightLimte-rightStart)/2 + rightStart;
while(leftStart < leftLimite && rightStart < rightLimte
&& !(array[leftPivot] == array[rightPivot])){
assert( leftPivot < leftLimite);
assert( rightPivot < rightLimte);
assert( leftPivot < centerPosition);
assert( rightPivot < sizeArray);
const int A0 = leftPivot-0;
const int A1 = centerPosition-leftPivot;
const int B0 = rightPivot-centerPosition;
const int B1 = sizeArray-rightPivot;
if(array[leftPivot] < array[rightPivot]){
if(A0+B0 < A1+B1){
leftStart = leftPivot+1;
leftPivot = (leftLimite-leftStart)/2 + leftStart;
}
else{
rightLimte = rightPivot;
rightPivot = (rightLimte-rightStart)/2 + rightStart;
}
}
else{
if(A0+B0 < A1+B1){
rightStart = rightPivot+1;
rightPivot = (rightLimte-rightStart)/2 + rightStart;
}
else{
leftLimite = leftPivot;
leftPivot = (leftLimite-leftStart)/2 + leftStart;
}
}
}
*middleA = leftPivot;
*middleB = rightPivot-centerPosition;
}
template <class NumType>
struct WorkingInterval{
NumType* array;
int currentStart;
int currentMiddle;
int currentEnd;
int level;
int depthLimite;
};
template <class NumType>
inline void parallelMergeInPlaceCore(NumType array[], int currentStart, int currentMiddle, int currentEnd,
int level, const int depthLimite,
volatile WorkingInterval<NumType> intervals[], volatile int barrier[]){
assert(0 <= currentStart);
assert(currentStart <= currentMiddle);
assert(currentMiddle <= currentEnd);
if(currentStart != currentMiddle && currentMiddle != currentEnd){
while(level != depthLimite && (currentEnd-currentStart) > 256){
int middleA = 0;
int middleB = 0;
FindMedian(array + currentStart, currentMiddle - currentStart, currentEnd-currentStart,
&middleA, &middleB);
const int sizeRestA = currentMiddle-currentStart-middleA;
const int sizeRestB = currentEnd-currentMiddle-middleB;
reorderShifting(array + middleA + currentStart, sizeRestA, middleB+sizeRestA);
const int targetThread = (1 << (depthLimite - level - 1)) + omp_get_thread_num();
// Should be #pragma omp atomic write
((WorkingInterval<NumType>*)intervals)[targetThread] = WorkingInterval<NumType>{array,
currentStart+middleA+middleB,
currentStart+middleA+middleB+sizeRestA,
currentEnd,
level+1, depthLimite};
#pragma omp atomic write
barrier[targetThread] = 1;
currentEnd = currentStart+middleA+middleB;
currentMiddle = currentStart+middleA;
assert(0 <= currentStart);
assert(currentStart <= currentMiddle);
assert(currentMiddle <= currentEnd);
level += 1;
}
std::inplace_merge(array + currentStart, array+currentMiddle, array+currentEnd);
}
while(level != depthLimite){
const int targetThread = (1 << (depthLimite - level - 1)) + omp_get_thread_num();
// Should be #pragma omp atomic write
((WorkingInterval<NumType>*)intervals)[targetThread] = WorkingInterval<NumType>{array,
currentEnd,
currentEnd,
currentEnd,
level+1, depthLimite};
#pragma omp atomic write
barrier[targetThread] = 1;
level += 1;
}
}
template <class NumType>
inline void parallelMergeInPlace(NumType array[], const int sizeArray, int centerPosition,
const long int numThreadsInvolved, const long int firstThread,
volatile WorkingInterval<NumType> intervals[], volatile int barrier[]){
const int numThread = omp_get_thread_num();
for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){
if(idxThread + firstThread == numThread){
#pragma omp atomic write
barrier[numThread] = -1;
}
while(true){
int dataAreReady;
#pragma omp atomic read
dataAreReady = barrier[idxThread + firstThread];
if(dataAreReady == -1){
break;
}
}
}
// Already in good shape
if(centerPosition == 0 || centerPosition == sizeArray || array[centerPosition-1] <= array[centerPosition]){
for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){
if(idxThread + firstThread == numThread){
#pragma omp atomic write
barrier[numThread] = 0;
}
while(true){
int dataAreReady;
#pragma omp atomic read
dataAreReady = barrier[idxThread + firstThread];
if(dataAreReady == 0){
break;
}
}
}
return;
}
for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){
if(idxThread + firstThread == numThread){
#pragma omp atomic write
barrier[numThread] = -2;
}
while(true){
int dataAreReady;
#pragma omp atomic read
dataAreReady = barrier[idxThread + firstThread];
if(dataAreReady == -2){
break;
}
}
}
if(numThread == firstThread){
const int depthLimite = ffs(numThreadsInvolved) - 1;
#pragma omp atomic write
barrier[numThread] = 1;
parallelMergeInPlaceCore<NumType>(array, 0, centerPosition, sizeArray, 0, depthLimite,
intervals, barrier);
}
else{
while(true){
int myDataAreReady;
#pragma omp atomic read
myDataAreReady = barrier[numThread];
if(myDataAreReady == 1){
break;
}
}
parallelMergeInPlaceCore<NumType>(intervals[numThread].array,
intervals[numThread].currentStart,
intervals[numThread].currentMiddle,
intervals[numThread].currentEnd,
intervals[numThread].level,
intervals[numThread].depthLimite,
intervals, barrier);
}
for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){
if(idxThread + firstThread == numThread){
#pragma omp atomic write
barrier[numThread] = 0;
}
while(true){
int dataAreReady;
#pragma omp atomic read
dataAreReady = barrier[idxThread + firstThread];
if(dataAreReady == 0){
break;
}
}
}
}
}
#endif
......@@ -30,9 +30,11 @@
#include <climits>
#include <cfloat>
#include <algorithm>
#include <cassert>
#if defined(_OPENMP)
#include <omp.h>
#include "parallelInplace.hpp"
#endif
namespace Sort512 {
......@@ -5964,7 +5966,7 @@ static inline void Sort(SortType array[], const IndexType size){
#if defined(_OPENMP)
template <class SortType, class IndexType = size_t>
static inline void CoreSortTask(SortType array[], const IndexType left, const IndexType right, const int deep){
static inline void CoreSortTaskPartition(SortType array[], const IndexType left, const IndexType right, const int deep){
static const int SortLimite = 16*64/sizeof(SortType);
if(right-left < SortLimite){
SmallSort16V(array+left, right-left+1);
......@@ -5975,10 +5977,10 @@ static inline void CoreSortTask(SortType array[], const IndexType left, const In
// default(none) has been removed for clang compatibility
if(part+1 < right){
#pragma omp task default(shared) firstprivate(array, part, right, deep)
CoreSortTask<SortType,IndexType>(array,part+1,right, deep - 1);
CoreSortTaskPartition<SortType,IndexType>(array,part+1,right, deep - 1);
}
// not task needed, let the current thread compute it
if(part && left < part-1) CoreSortTask<SortType,IndexType>(array,left,part - 1, deep - 1);
if(part && left < part-1) CoreSortTaskPartition<SortType,IndexType>(array,left,part - 1, deep - 1);
}
else {
if(part+1 < right) CoreSort<SortType,IndexType>(array,part+1,right);
......@@ -5988,10 +5990,7 @@ static inline void CoreSortTask(SortType array[], const IndexType left, const In
}
template <class SortType, class IndexType = size_t>
static inline void SortOmp(SortType array[], const IndexType size){
// const int nbTasksRequiere = (omp_get_max_threads() * 5);
// int deep = 0;
// while( (1 << deep) < nbTasksRequiere ) deep += 1;
static inline void SortOmpPartition(SortType array[], const IndexType size){
int deep = 0;
while( (IndexType(1) << deep) < size ) deep += 1;
......@@ -5999,10 +5998,181 @@ static inline void SortOmp(SortType array[], const IndexType size){
{
#pragma omp master
{
CoreSortTask<SortType,IndexType>(array, 0, size - 1 , deep);
CoreSortTaskPartition<SortType,IndexType>(array, 0, size - 1 , deep);
}
}
}
template <class SortType, class IndexType = size_t>
static inline void SortOmpMerge(SortType array[], const IndexType size){
const long int MAX_THREADS = 128;
const long int LOG2_MAX_THREADS = 7;
int done[LOG2_MAX_THREADS][MAX_THREADS] = {0};
assert(((omp_get_num_threads()-1) & omp_get_num_threads()) == 0); // Must be power of 2
#pragma omp parallel
{
const IndexType chunk = (size + omp_get_num_threads() - 1)/omp_get_num_threads();
{
const IndexType first = std::min(IndexType(size), chunk * omp_get_thread_num());
const IndexType last = std::min(IndexType(size), chunk * (omp_get_thread_num() + 1));
if(first < last) CoreSort<SortType,IndexType>(array,first,last-1);
}
{
int& mydone = done[0][omp_get_thread_num()];
#pragma omp atomic write
mydone = 1;
}
int level = 1;
while(!(omp_get_thread_num() & (1<<(level-1))) && (1<<level) <= omp_get_num_threads()){
while(true){
int otherIsDone;
#pragma omp atomic read
otherIsDone = done[level-1][(omp_get_thread_num()>>(level-1))+1];
if(otherIsDone){
break;
}
}
const IndexType nbOriginalPartsToMerge = (1 << level);
const IndexType first = std::min(size, (omp_get_thread_num())*chunk);
const IndexType middle = std::min(size, first + (nbOriginalPartsToMerge/2)*chunk);
const IndexType last = std::min(size, first + nbOriginalPartsToMerge*chunk);
std::inplace_merge(&array[first],
&array[middle],
&array[last]);
{
int& mydone = done[level][(omp_get_thread_num()>>level)];
#pragma omp atomic write
mydone = 1;
}
level += 1;
}
}
}
template <class SortType, class IndexType = size_t>
static inline void SortOmpMergeDeps(SortType array[], const IndexType size){
int nbParts = 1;
while(nbParts < omp_get_max_threads()){
nbParts <<= 1;
}
#pragma omp parallel
{
#pragma omp master
{
const IndexType chunk = (size + nbParts - 1)/nbParts;
for(long int idxPart = 0 ; idxPart < nbParts ; ++idxPart){
const IndexType first = std::min(IndexType(size), chunk * idxPart);
const IndexType last = std::min(IndexType(size), chunk * (idxPart + 1));
if(first < last){
#pragma omp task depend(inout:array[first]) firstprivate(first, last)
CoreSort<SortType,IndexType>(array,first,last-1);
}
}
int level = 1;
while((1 << level) <= nbParts){
const IndexType nbPartsAtLevel = nbParts/(1<<level);
const IndexType nbOriginalPartsToMerge = (1 << level);
for(IndexType idxPart = 0 ; idxPart < nbPartsAtLevel ; ++idxPart){
const IndexType first = std::min(size, (idxPart * nbOriginalPartsToMerge)*chunk);
const IndexType middle = std::min(size, first + (nbOriginalPartsToMerge/2)*chunk);
const IndexType last = std::min(size, first + nbOriginalPartsToMerge*chunk);
#pragma omp task depend(inout:array[first],array[middle]) firstprivate(first, middle,last)
std::inplace_merge(&array[first],
&array[middle],
&array[last]);
}
level += 1;
}
#pragma omp taskwait
}
}
}
template <class SortType, class IndexType = size_t>
static inline void SortOmpParMerge(SortType array[], const IndexType size){
if(size < omp_get_max_threads()){
CoreSort<SortType,IndexType>(array,0,size-1);
return;
}
const long int MAX_THREADS = 128;
const long int LOG2_MAX_THREADS = 7;
int done[LOG2_MAX_THREADS][MAX_THREADS] = {0};
ParallelInplace::WorkingInterval<SortType> intervals[MAX_THREADS] = {0};
int barrier[MAX_THREADS] = {0};
assert(((omp_get_num_threads()-1) & omp_get_num_threads()) == 0); // Must be power of 2
#pragma omp parallel
{
const IndexType chunk = (size + omp_get_num_threads() - 1)/omp_get_num_threads();
{
const IndexType first = std::min(IndexType(size), chunk * omp_get_thread_num());
const IndexType last = std::min(IndexType(size), chunk * (omp_get_thread_num() + 1));
if(first < last) CoreSort<SortType,IndexType>(array,first,last-1);
}
{
int& mydone = done[0][omp_get_thread_num()];
#pragma omp atomic write
mydone = 1;
}
int level = 1;
while((1<<level) <= omp_get_num_threads()){
const bool threadInCharge = (((omp_get_thread_num()>>level)<<level) == omp_get_thread_num());
const IndexType firstThread = ((omp_get_thread_num() >> level) << level);
assert(threadInCharge == false || firstThread == omp_get_thread_num());
{
const IndexType firstThreadPreviousLevel = ((omp_get_thread_num() >> (level-1)) << (level-1));
const IndexType previousPartToWait = (firstThreadPreviousLevel >> (level-1)) +
(firstThread == firstThreadPreviousLevel ? 1 : -1);
while(true){
int otherIsDone;
#pragma omp atomic read
otherIsDone = done[level-1][previousPartToWait];
if(otherIsDone){
break;
}
}
}
const IndexType nbOriginalPartsToMerge = (1 << level);
const IndexType numThreadsInvolved = nbOriginalPartsToMerge;
const IndexType first = std::min(size, (firstThread)*chunk);
const IndexType middle = std::min(size, first + (nbOriginalPartsToMerge/2)*chunk);
const IndexType last = std::min(size, first + nbOriginalPartsToMerge*chunk);
ParallelInplace::parallelMergeInPlace(&array[first], last-first, middle-first,
numThreadsInvolved, firstThread,
intervals, barrier);
if(threadInCharge){
int& mydone = done[level][(omp_get_thread_num()>>level)];
#pragma omp atomic write
mydone = 1;
}
level += 1;
}
}
}
#endif
}
......
......@@ -6138,7 +6138,7 @@ static inline void Sort(SortType array[], SortType values[], const IndexType siz
#if defined(_OPENMP)
template <class SortType, class IndexType = size_t>
static inline void CoreSortTask(SortType array[], SortType values[], const IndexType left, const IndexType right, const int deep){
static inline void CoreSortTaskPartition(SortType array[], SortType values[], const IndexType left, const IndexType right, const int deep){
static const int SortLimite = 16*64/sizeof(SortType);
if(right-left < SortLimite){
SmallSort16V(array+left, values+left, right-left+1);
......@@ -6149,10 +6149,10 @@ static inline void CoreSortTask(SortType array[], SortType values[], const Index
// default(none) has been removed for clang compatibility
if(part+1 < right){
#pragma omp task default(shared) firstprivate(array, values, part, right, deep)
CoreSortTask<SortType,IndexType>(array,values, part+1,right, deep - 1);
CoreSortTaskPartition<SortType,IndexType>(array,values, part+1,right, deep - 1);
}
// not task needed, let the current thread compute it
if(part && left < part-1) CoreSortTask<SortType,IndexType>(array,values, left,part - 1, deep - 1);
if(part && left < part-1) CoreSortTaskPartition<SortType,IndexType>(array,values, left,part - 1, deep - 1);
}
else {
if(part+1 < right) CoreSort<SortType,IndexType>(array,values, part+1,right);
......@@ -6162,7 +6162,7 @@ static inline void CoreSortTask(SortType array[], SortType values[], const Index
}
template <class SortType, class IndexType = size_t>
static inline void SortOmp(SortType array[], SortType values[], const IndexType size){
static inline void SortOmpPartition(SortType array[], SortType values[], const IndexType size){
// const int nbTasksRequiere = (omp_get_max_threads() * 5);
// int deep = 0;
// while( (1 << deep) < nbTasksRequiere ) deep += 1;
......@@ -6173,7 +6173,7 @@ static inline void SortOmp(SortType array[], SortType values[], const IndexType
{
#pragma omp master
{
CoreSortTask<SortType,IndexType>(array, values, 0, size - 1 , deep);
CoreSortTaskPartition<SortType,IndexType>(array, values, 0, size - 1 , deep);
}
}
}
......
This diff is collapsed.
......@@ -1379,7 +1379,28 @@ void testQs512(){
std::cout << " " << idx << std::endl;
std::unique_ptr<NumType[]> array(new NumType[idx]);
createRandVec(array.get(), idx); Checker<NumType> checker(array.get(), array.get(), idx);
Sort512::SortOmp<NumType,size_t>(array.get(), idx);
Sort512::SortOmpPartition<NumType,size_t>(array.get(), idx);
assertNotSorted(array.get(), idx, "");
}
for(size_t idx = 1 ; idx <= (1<<10); idx *= 2){
std::cout << " " << idx << std::endl;
std::unique_ptr<NumType[]> array(new NumType[idx]);
createRandVec(array.get(), idx); Checker<NumType> checker(array.get(), array.get(), idx);
Sort512::SortOmpMerge<NumType,size_t>(array.get(), idx);
assertNotSorted(array.get(), idx, "");
}
for(size_t idx = 1 ; idx <= (1<<10); idx *= 2){
std::cout << " " << idx << std::endl;
std::unique_ptr<NumType[]> array(new NumType[idx]);
createRandVec(array.get(), idx); Checker<NumType> checker(array.get(), array.get(), idx);
Sort512::SortOmpMergeDeps<NumType,size_t>(array.get(), idx);
assertNotSorted(array.get(), idx, "");
}
for(size_t idx = 1 ; idx <= (1<<10); idx *= 2){
std::cout << " " << idx << std::endl;
std::unique_ptr<NumType[]> array(new NumType[idx]);
createRandVec(array.get(), idx); Checker<NumType> checker(array.get(), array.get(), idx);
Sort512::SortOmpParMerge<NumType,size_t>(array.get(), idx);
assertNotSorted(array.get(), idx, "");
}
#endif