Commit 9c1df202 authored by Berenger Bramas's avatar Berenger Bramas

Make it work

parent 52fc2ad3
Pipeline #77805 canceled with stage
...@@ -13,18 +13,11 @@ ...@@ -13,18 +13,11 @@
#ifndef PARALLELINPLACE_HPP #ifndef PARALLELINPLACE_HPP
#define PARALLELINPLACE_HPP #define PARALLELINPLACE_HPP
#include <cstdio>
#include <cstdlib>
#include <unistd.h>
#include <cstring> #include <cstring>
#include <sys/time.h>
#include <omp.h>
#include <utility> #include <utility>
#include <algorithm> #include <algorithm>
#include <cassert> #include <cassert>
#include <array>
#include <memory> #include <memory>
#include <iostream>// TODO
#include <omp.h> #include <omp.h>
...@@ -154,7 +147,7 @@ struct WorkingInterval{ ...@@ -154,7 +147,7 @@ struct WorkingInterval{
template <class NumType> template <class NumType>
inline void parallelMergeInPlaceCore(NumType array[], int currentStart, int currentMiddle, int currentEnd, inline void parallelMergeInPlaceCore(NumType array[], int currentStart, int currentMiddle, int currentEnd,
int level, const int depthLimite, int level, const int depthLimite,
WorkingInterval<NumType> intervals[], int barrier[]){ volatile WorkingInterval<NumType> intervals[], volatile int barrier[]){
assert(0 <= currentStart); assert(0 <= currentStart);
assert(currentStart <= currentMiddle); assert(currentStart <= currentMiddle);
...@@ -175,11 +168,8 @@ inline void parallelMergeInPlaceCore(NumType array[], int currentStart, int curr ...@@ -175,11 +168,8 @@ inline void parallelMergeInPlaceCore(NumType array[], int currentStart, int curr
const int targetThread = (1 << (depthLimite - level - 1)) + omp_get_thread_num(); const int targetThread = (1 << (depthLimite - level - 1)) + omp_get_thread_num();
#pragma omp critical(out)
std::cout << omp_get_thread_num() << "] Ok for " << targetThread << std::endl;// TODO
// Should be #pragma omp atomic write // Should be #pragma omp atomic write
intervals[targetThread] = WorkingInterval<NumType>{array, ((WorkingInterval<NumType>*)intervals)[targetThread] = WorkingInterval<NumType>{array,
currentStart+middleA+middleB, currentStart+middleA+middleB,
currentStart+middleA+middleB+sizeRestA, currentStart+middleA+middleB+sizeRestA,
currentEnd, currentEnd,
...@@ -203,11 +193,8 @@ std::cout << omp_get_thread_num() << "] Ok for " << targetThread << std::endl;// ...@@ -203,11 +193,8 @@ std::cout << omp_get_thread_num() << "] Ok for " << targetThread << std::endl;//
while(level != depthLimite){ while(level != depthLimite){
const int targetThread = (1 << (depthLimite - level - 1)) + omp_get_thread_num(); const int targetThread = (1 << (depthLimite - level - 1)) + omp_get_thread_num();
#pragma omp critical(out)
std::cout << omp_get_thread_num() << "] Ok no work for " << targetThread << std::endl;// TODO
// Should be #pragma omp atomic write // Should be #pragma omp atomic write
intervals[targetThread] = WorkingInterval<NumType>{array, ((WorkingInterval<NumType>*)intervals)[targetThread] = WorkingInterval<NumType>{array,
currentEnd, currentEnd,
currentEnd, currentEnd,
currentEnd, currentEnd,
...@@ -222,13 +209,13 @@ std::cout << omp_get_thread_num() << "] Ok no work for " << targetThread << std: ...@@ -222,13 +209,13 @@ std::cout << omp_get_thread_num() << "] Ok no work for " << targetThread << std:
template <class NumType> template <class NumType>
inline void parallelMergeInPlace(NumType array[], const int sizeArray, int centerPosition, inline void parallelMergeInPlace(NumType array[], const int sizeArray, int centerPosition,
const long int numThreadsInvolved, const long int firstThread, const long int numThreadsInvolved, const long int firstThread,
WorkingInterval<NumType> intervals[], int barrier[]){ volatile WorkingInterval<NumType> intervals[], volatile int barrier[]){
const int numThread = omp_get_thread_num(); const int numThread = omp_get_thread_num();
for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){ for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){
if(idxThread + firstThread == numThread){ if(idxThread + firstThread == numThread){
#pragma omp atomic write #pragma omp atomic write
barrier[idxThread + firstThread] = -1; barrier[numThread] = -1;
} }
while(true){ while(true){
int dataAreReady; int dataAreReady;
...@@ -242,14 +229,10 @@ inline void parallelMergeInPlace(NumType array[], const int sizeArray, int cente ...@@ -242,14 +229,10 @@ inline void parallelMergeInPlace(NumType array[], const int sizeArray, int cente
// Already in good shape // Already in good shape
if(centerPosition == 0 || centerPosition == sizeArray || array[centerPosition-1] <= array[centerPosition]){ if(centerPosition == 0 || centerPosition == sizeArray || array[centerPosition-1] <= array[centerPosition]){
#pragma omp critical(out)
std::cout << omp_get_thread_num() << "] nothing to do array " << array << " centerPosition " << centerPosition << " " <<
array[centerPosition-1] << " " << array[centerPosition] << std::endl;// TODO
for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){ for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){
if(idxThread + firstThread == numThread){ if(idxThread + firstThread == numThread){
#pragma omp atomic write #pragma omp atomic write
barrier[idxThread + firstThread] = 0; barrier[numThread] = 0;
} }
while(true){ while(true){
int dataAreReady; int dataAreReady;
...@@ -264,14 +247,10 @@ std::cout << omp_get_thread_num() << "] nothing to do array " << array << " cent ...@@ -264,14 +247,10 @@ std::cout << omp_get_thread_num() << "] nothing to do array " << array << " cent
return; return;
} }
#pragma omp critical(out)
std::cout << omp_get_thread_num() << "] work to do array " << array << " centerPosition " << centerPosition << " " <<
array[centerPosition-1] << " " << array[centerPosition] << std::endl;// TODO
for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){ for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){
if(idxThread + firstThread == numThread){ if(idxThread + firstThread == numThread){
#pragma omp atomic write #pragma omp atomic write
barrier[idxThread + firstThread] = -2; barrier[numThread] = -2;
} }
while(true){ while(true){
int dataAreReady; int dataAreReady;
...@@ -284,8 +263,6 @@ std::cout << omp_get_thread_num() << "] work to do array " << array << " centerP ...@@ -284,8 +263,6 @@ std::cout << omp_get_thread_num() << "] work to do array " << array << " centerP
} }
if(numThread == firstThread){ if(numThread == firstThread){
#pragma omp critical(out)
std::cout << omp_get_thread_num() << "] LEAD" << std::endl;// TODO
const int depthLimite = ffs(numThreadsInvolved) - 1; const int depthLimite = ffs(numThreadsInvolved) - 1;
#pragma omp atomic write #pragma omp atomic write
barrier[numThread] = 1; barrier[numThread] = 1;
...@@ -294,9 +271,6 @@ std::cout << omp_get_thread_num() << "] LEAD" << std::endl;// TODO ...@@ -294,9 +271,6 @@ std::cout << omp_get_thread_num() << "] LEAD" << std::endl;// TODO
intervals, barrier); intervals, barrier);
} }
else{ else{
//#pragma omp critical(out)
//std::cout << omp_get_thread_num() << "] wait my turn" << std::endl;// TODO
while(true){ while(true){
int myDataAreReady; int myDataAreReady;
#pragma omp atomic read #pragma omp atomic read
...@@ -306,21 +280,6 @@ std::cout << omp_get_thread_num() << "] LEAD" << std::endl;// TODO ...@@ -306,21 +280,6 @@ std::cout << omp_get_thread_num() << "] LEAD" << std::endl;// TODO
} }
} }
#pragma omp critical(out)
std::cout << omp_get_thread_num() << "] GOO" << std::endl;// TODO
// #pragma omp critical(out)
// std::cout << omp_get_thread_num() << "] intervals[numThread].array = " << intervals[numThread].array << std::endl;// TODO
// #pragma omp critical(out)
// std::cout << omp_get_thread_num() << "] intervals[numThread].currentStart = " << intervals[numThread].currentStart << std::endl;// TODO
// #pragma omp critical(out)
// std::cout << omp_get_thread_num() << "] intervals[numThread].currentMiddle = " << intervals[numThread].currentMiddle << std::endl;// TODO
// #pragma omp critical(out)
// std::cout << omp_get_thread_num() << "] intervals[numThread].currentEnd = " << intervals[numThread].currentEnd << std::endl;// TODO
// #pragma omp critical(out)
// std::cout << omp_get_thread_num() << "] intervals[numThread].level = " << intervals[numThread].level << std::endl;// TODO
// #pragma omp critical(out)
// std::cout << omp_get_thread_num() << "] intervals[numThread].depthLimite = " << intervals[numThread].depthLimite << std::endl;// TODO
parallelMergeInPlaceCore<NumType>(intervals[numThread].array, parallelMergeInPlaceCore<NumType>(intervals[numThread].array,
intervals[numThread].currentStart, intervals[numThread].currentStart,
intervals[numThread].currentMiddle, intervals[numThread].currentMiddle,
...@@ -330,13 +289,10 @@ std::cout << omp_get_thread_num() << "] LEAD" << std::endl;// TODO ...@@ -330,13 +289,10 @@ std::cout << omp_get_thread_num() << "] LEAD" << std::endl;// TODO
intervals, barrier); intervals, barrier);
} }
#pragma omp critical(out)
std::cout << omp_get_thread_num() << "] done wait other" << std::endl;// TODO
for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){ for(int idxThread = 0 ; idxThread < numThreadsInvolved ; ++idxThread){
if(idxThread + firstThread == numThread){ if(idxThread + firstThread == numThread){
#pragma omp atomic write #pragma omp atomic write
barrier[idxThread + firstThread] = 0; barrier[numThread] = 0;
} }
while(true){ while(true){
int dataAreReady; int dataAreReady;
...@@ -347,9 +303,6 @@ std::cout << omp_get_thread_num() << "] done wait other" << std::endl;// TODO ...@@ -347,9 +303,6 @@ std::cout << omp_get_thread_num() << "] done wait other" << std::endl;// TODO
} }
} }
} }
#pragma omp critical(out)
std::cout << omp_get_thread_num() << "] leave" << std::endl;// TODO
} }
} }
......
...@@ -31,7 +31,6 @@ ...@@ -31,7 +31,6 @@
#include <cfloat> #include <cfloat>
#include <algorithm> #include <algorithm>
#include <cassert> #include <cassert>
#include <iostream>// TODO
#if defined(_OPENMP) #if defined(_OPENMP)
#include <omp.h> #include <omp.h>
...@@ -6144,10 +6143,6 @@ static inline void SortOmpParMerge(SortType array[], const IndexType size){ ...@@ -6144,10 +6143,6 @@ static inline void SortOmpParMerge(SortType array[], const IndexType size){
const IndexType previousPartToWait = (firstThreadPreviousLevel >> (level-1)) + const IndexType previousPartToWait = (firstThreadPreviousLevel >> (level-1)) +
(firstThread == firstThreadPreviousLevel ? 1 : -1); (firstThread == firstThreadPreviousLevel ? 1 : -1);
#pragma omp critical(out)
std::cout << omp_get_thread_num() << "] level " << level << " firstThread " << firstThread << " firstThreadPreviousLevel = " << firstThreadPreviousLevel
<< " previousPartToWait " << previousPartToWait << std::endl;// TODO
while(true){ while(true){
int otherIsDone; int otherIsDone;
#pragma omp atomic read #pragma omp atomic read
...@@ -6164,26 +6159,11 @@ static inline void SortOmpParMerge(SortType array[], const IndexType size){ ...@@ -6164,26 +6159,11 @@ static inline void SortOmpParMerge(SortType array[], const IndexType size){
const IndexType middle = std::min(size, first + (nbOriginalPartsToMerge/2)*chunk); const IndexType middle = std::min(size, first + (nbOriginalPartsToMerge/2)*chunk);
const IndexType last = std::min(size, first + nbOriginalPartsToMerge*chunk); const IndexType last = std::min(size, first + nbOriginalPartsToMerge*chunk);
// #pragma omp critical(out)
// std::cout << omp_get_thread_num() << "] nbOriginalPartsToMerge = " << nbOriginalPartsToMerge << std::endl;// TODO
// #pragma omp critical(out)
// std::cout << omp_get_thread_num() << "] numThreadsInvolved = " << numThreadsInvolved << std::endl;// TODO
// #pragma omp critical(out)
// std::cout << omp_get_thread_num() << "] firstThread = " << firstThread << std::endl;// TODO
// #pragma omp critical(out)
// std::cout << omp_get_thread_num() << "] first = " << first << std::endl;// TODO
// #pragma omp critical(out)
// std::cout << omp_get_thread_num() << "] middle = " << middle << std::endl;// TODO
// #pragma omp critical(out)
// std::cout << omp_get_thread_num() << "] last = " << last << std::endl;// TODO
ParallelInplace::parallelMergeInPlace(&array[first], last-first, middle-first, ParallelInplace::parallelMergeInPlace(&array[first], last-first, middle-first,
numThreadsInvolved, firstThread, numThreadsInvolved, firstThread,
intervals, barrier); intervals, barrier);
if(threadInCharge){ if(threadInCharge){
#pragma omp critical(out)
std::cout << omp_get_thread_num() << "] level " << level << " set " << (omp_get_thread_num()>>level) << std::endl;// TODO
int& mydone = done[level][(omp_get_thread_num()>>level)]; int& mydone = done[level][(omp_get_thread_num()>>level)];
#pragma omp atomic write #pragma omp atomic write
mydone = 1; mydone = 1;
......
...@@ -1379,7 +1379,28 @@ void testQs512(){ ...@@ -1379,7 +1379,28 @@ void testQs512(){
std::cout << " " << idx << std::endl; std::cout << " " << idx << std::endl;
std::unique_ptr<NumType[]> array(new NumType[idx]); std::unique_ptr<NumType[]> array(new NumType[idx]);
createRandVec(array.get(), idx); Checker<NumType> checker(array.get(), array.get(), idx); createRandVec(array.get(), idx); Checker<NumType> checker(array.get(), array.get(), idx);
Sort512::SortOmp<NumType,size_t>(array.get(), idx); Sort512::SortOmpPartition<NumType,size_t>(array.get(), idx);
assertNotSorted(array.get(), idx, "");
}
for(size_t idx = 1 ; idx <= (1<<10); idx *= 2){
std::cout << " " << idx << std::endl;
std::unique_ptr<NumType[]> array(new NumType[idx]);
createRandVec(array.get(), idx); Checker<NumType> checker(array.get(), array.get(), idx);
Sort512::SortOmpMerge<NumType,size_t>(array.get(), idx);
assertNotSorted(array.get(), idx, "");
}
for(size_t idx = 1 ; idx <= (1<<10); idx *= 2){
std::cout << " " << idx << std::endl;
std::unique_ptr<NumType[]> array(new NumType[idx]);
createRandVec(array.get(), idx); Checker<NumType> checker(array.get(), array.get(), idx);
Sort512::SortOmpMergeDeps<NumType,size_t>(array.get(), idx);
assertNotSorted(array.get(), idx, "");
}
for(size_t idx = 1 ; idx <= (1<<10); idx *= 2){
std::cout << " " << idx << std::endl;
std::unique_ptr<NumType[]> array(new NumType[idx]);
createRandVec(array.get(), idx); Checker<NumType> checker(array.get(), array.get(), idx);
Sort512::SortOmpParMerge<NumType,size_t>(array.get(), idx);
assertNotSorted(array.get(), idx, ""); assertNotSorted(array.get(), idx, "");
} }
#endif #endif
...@@ -1414,7 +1435,7 @@ void testQs512_pair(){ ...@@ -1414,7 +1435,7 @@ void testQs512_pair(){
for(size_t idxval = 0 ; idxval < idx ; ++idxval){ for(size_t idxval = 0 ; idxval < idx ; ++idxval){
values[idxval] = array[idxval]*100+1; values[idxval] = array[idxval]*100+1;
} }
Sort512kv::SortOmp<NumType,size_t>(array.get(), values.get(), idx); Sort512kv::SortOmpPartition<NumType,size_t>(array.get(), values.get(), idx);
assertNotSorted(array.get(), idx, ""); assertNotSorted(array.get(), idx, "");
for(size_t idxval = 0 ; idxval < idx ; ++idxval){ for(size_t idxval = 0 ; idxval < idx ; ++idxval){
if(values[idxval] != array[idxval]*100+1){ if(values[idxval] != array[idxval]*100+1){
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment