gpu排序

单机版的双调排序可以参考 http://blog.****.net/sunmenggmail/article/details/42869235

还是这张图片

gpu排序


基于cuda的双调排序的思路是:

为每一个元素提供一个线程,如果大于1024个元素,还是提供1024个线程,这是因为__syncthreads只能作为block内的线程同步,而一个block最多有1024个线程,如果元素个数大于1024则每个线程可能就要负责一个以上的元素的比较


就上图而言,一个矩形代表一次多线程的比较,那么此图仅需要6次比较,就可以有右边的输出。


  1. #include <vector>  
  2. #include <algorithm>  
  3. #include <iostream>  
  4. #include <time.h>  
  5. #include <sys/time.h>  
  6. #include <string.h>  
  7. #include <math.h>  
  8. #include <stdlib.h>  
  9. #include <stdio.h>  
  10.   
  11. using namespace std;  
  12.   
  13. #define CHECK_EQ1(a,b) do { \  
  14.     if ((a) != (b)) { \   
  15.         cout <<__FILE__<<" : "<< __LINE__<<" : check failed because "<<a<<"!="<<b<<endl;\  
  16.         cout << cudaGetErrorString(a) <<endl;\  
  17.         exit(1);\  
  18.     }\  
  19. while(0)  
  20.   
  21. #define CUDA_CHECK(condition)\  
  22. do {\  
  23.     cudaError_t error = condition;\  
  24.     CHECK_EQ1(error, cudaSuccess);\  
  25. while(0)  
  26.   
  27. static __device__ __forceinline__ unsigned int __btflo(unsigned int word)  
  28. {  
  29.     unsigned int ret;  
  30.     asm volatile("bfind.u32 %0, %1;" : "=r"(ret) : "r"(word));  
  31.     //return the index of highest non-zero bit in a word; for example, 00000110, return 2  
  32.     return ret;  
  33. }  
  34.   
  35. //for > 1024  
  36. __global__ void bigBinoticSort(unsigned int *arr, int len, unsigned int *buf) {  
  37.     unsigned len2 = 1 << (__btflo(len-1u) + 1);//  
  38.       
  39.     unsigned int MAX = 0xffffffffu;  
  40.     unsigned id = threadIdx.x;  
  41.     if (id >= len2) return;  
  42.     unsigned iter = blockDim.x;  
  43.     for (unsigned i = id; i < len2; i += iter) {  
  44.         if (i >= len) {  
  45.             buf[i-len] = MAX;  
  46.         }  
  47.     }  
  48.       
  49.     __syncthreads();  
  50.       
  51.     int count = 0;  
  52.     for (unsigned k = 2; k <= len2; k*=2) {  
  53.         for (unsigned j = k >> 1; j > 0; j >>= 1) {  
  54.             for (unsigned i = id; i < len2; i += iter) {  
  55.                 unsigned swapIdx = i ^ j;  
  56.                   
  57.                 if (swapIdx > i) {  
  58.                     unsigned myelem, other;  
  59.                     if (i < len) myelem = arr[i];  
  60.                     else myelem = buf[i-len];  
  61.                       
  62.                     if (swapIdx < len) other = arr[swapIdx];  
  63.                     else other = buf[swapIdx-len];  
  64.                       
  65.                     bool swap = false;  
  66.                       
  67.                     if ((i & k)==0 && myelem > other) swap = true;  
  68.                     if ((i & k) == k && myelem < other) swap = true;  
  69.                       
  70.                     if (swap) {  
  71.                         if (swapIdx < len) arr[swapIdx] = myelem;   
  72.                         else buf[swapIdx-len] = myelem;  
  73.                           
  74.                         if (i < len) arr[i] = other;  
  75.                         else buf[i-len] = other;  
  76.                     }  
  77.                 }  
  78.             }  
  79.             __syncthreads();  
  80.               
  81.               
  82.         }  
  83.     }  
  84. }  
  85.   
  86. //for <= 1024  
  87. __global__ void binoticSort(unsigned int *arr, int len) {  
  88.     __shared__  unsigned int buf[1024];  
  89.     buf[threadIdx.x] = (threadIdx.x < len ? arr[threadIdx.x] : 0xffffffffu);  
  90.     __syncthreads();  
  91.       
  92.     for (unsigned k = 2; k <= blockDim.x; k*=2) {//buid k elements ascend or descend  
  93.         for (unsigned j = k >> 1; j > 0; j >>= 1) {//merge longer binotic into shorter binotic  
  94.             unsigned swapIdx = threadIdx.x ^ j;  
  95.             unsigned myelem = buf[threadIdx.x];  
  96.             unsigned other = buf[swapIdx];  
  97.               
  98.             __syncthreads();  
  99.               
  100.             unsigned ascend = k * (swapIdx < threadIdx.x);  
  101.             unsigned descend = k * (swapIdx > threadIdx.x);  
  102.             //if I is front, swap is back; ascend = 0, descend = k  
  103.             //if I is back, swap is front; ascend = k, descend = 0;  
  104.             bool swap = false;  
  105.             if ((threadIdx.x & k) == ascend) {  
  106.                 if (myelem > other) swap = true;  
  107.             }  
  108.             if ((threadIdx.x & k) == descend) {  
  109.                 if (myelem < other) swap = true;  
  110.             }  
  111.               
  112.             if (swap) buf[swapIdx] = myelem;  
  113.             __syncthreads();  
  114.         }  
  115.     }  
  116.       
  117.     if (threadIdx.x < len) arr[threadIdx.x] = buf[threadIdx.x];  
  118. }  
  119.   
  120. template<class T>  
  121. inline void printVec(T *vec, int len) {  
  122.     for (int i = 0; i < len; ++i) cout <<vec[i] << "\t";  
  123.     cout << endl;  
  124. }  
  125.   
  126. template<class T>  
  127. inline void printVecg(T *gvec, int len) {  
  128.     T *vec = (T*)malloc(sizeof(T)*len);  
  129.     CUDA_CHECK(cudaMemcpy(vec,gvec,sizeof(T)*len,cudaMemcpyDeviceToHost));  
  130.     printVec(vec,len);  
  131.     free(vec);  
  132. }  
  133.   
  134. void lineSize(int N, int &nblocks, int &nthreads) {  
  135.     if (N <= 1024) {  
  136.         nthreads = (N + 32 - 1)/32*32;//  
  137.     }  
  138.     else {  
  139.         nblocks = (N + 1024 -1)/1024;  
  140.     }  
  141. }  
  142.   
  143. bool validate(unsigned *gvec, int len) {  
  144.     unsigned *vec = (unsigned*)malloc(sizeof(unsigned)*len);  
  145.     CUDA_CHECK(cudaMemcpy(vec,gvec,sizeof(unsigned)*len,cudaMemcpyDeviceToHost));  
  146.     for(int i = 1; i < len; ++i) {  
  147.         if (vec[i] <= vec[i-1]) return false;  
  148.     }  
  149.     return true;  
  150. }  
  151.   
  152. inline int roundUpPower2(int v) {  
  153.     v--;  
  154.     v |= v >> 1;  
  155.     v |= v >> 2;  
  156.     v |= v >> 4;  
  157.     v |= v >> 8;  
  158.     v |= v >> 16;  
  159.   
  160.     v++;  
  161.     return v;  
  162. }  
  163.   
  164.   
  165.   
  166. int main(int argc, char *argv[]) {  
  167.     if (argc != 2) {  
  168.         cout << "len \n";  
  169.         return;  
  170.     }  
  171.     int len = atoi(argv[1]);  
  172.     unsigned int *arr = (unsigned int*)malloc(sizeof(unsigned int)*len);  
  173.     for (int i = 0; i < len; ++i) arr[i] = i;  
  174.     srand((unsigned int)time(NULL));  
  175.     for (int i = len; i >= 2; --i) {  
  176.         int j = rand() % i;  
  177.         swap(arr[i-1], arr[j]);  
  178.     }  
  179.       
  180.       
  181.       
  182.     unsigned* debug;  
  183.     CUDA_CHECK(cudaMalloc((void**)&debug, sizeof(unsigned)*1000));  
  184.       
  185.     unsigned int* darr, *buf;  
  186.     CUDA_CHECK(cudaMalloc((void**)&darr, sizeof(unsigned int)*len));  
  187.     CUDA_CHECK(cudaMalloc((void**)&buf, sizeof(unsigned int)*len));  
  188.     CUDA_CHECK(cudaMemcpy(darr, arr, sizeof(unsigned int)*len, cudaMemcpyHostToDevice));  
  189.       
  190.       
  191.     bigBinoticSort<<<1,1024>>>(darr,len, buf);  
  192.     CUDA_CHECK(cudaPeekAtLastError());  
  193.     CUDA_CHECK(cudaDeviceSynchronize());  
  194.       
  195.     if (validate(darr, len))  
  196.         cout << "yes\n";  
  197.     else  
  198.         cout << "no\n";  
  199.       
  200.       
  201.     return 1;  
  202. }  


算法有两个双调排序实现,一个用于小于1024个元素,用到了共享内存加快访问速度,但是如果真要排序1024以下的元素,建议还是用cpu版本的快排吧,gpu的在速度上并没有明显的优势,甚至还比cpu慢


如果大于1024元素,就采用另一种方法。这种方法的缺点也是很明显的,就是不管再多的元素,只能用一个block进行计算,而一个block最多只能用1024个线程,估计在一万个元素以内的话,这个方法是gpu上最快的。


经过本人测试,包括thrust的sort(基数排序), 只有元素数量超过5000个,gpu上的排序算法才有明显的优势。10万左右的元素,gpu上的排序算法比cpu有一百倍的提速。


下面会介绍在gpu上进行快速排序。gpu快速排序可以处理非常大的数据,但是会有递归深度的限制,当超过递归深度时,就可以调用上面所讲的双调排序进行处理。测试表明,速度比thrust还是快一点


gpu上的快排主要参考样例 NVIDIA_CUDA-6.5_Samples/6_Advanced/cdpAdvancedQuicksort

快排只处理大于1024个元素的数组,然后将其分隔为左右两个子数组,如果子数组长度大于1024则继续动态递归调用快排,如果小于1024则动态调用双调排序。如果快排的递归深度已经超过最大递归深度(cuda最大嵌套深度64,但是还受限于每一级所使用的内存大小),则直接调用双调排序。


这段程序的最精彩的地方在于分隔函数

将数组按照warp大小进行分隔,每个warp处理32个元素,通过全局的atomicAdd函数,分别获得warp内的小于和大于pivot数在数组的偏移地址,注意在同一个warp内,这个偏移地址是一样的,然后每个线程将自己的元素放到偏移地址,这样就完成了分割


需要注意的是,这个快排不是in-place的,又涉及到递归调用,所以还得处理原数组和缓冲区的调换


由于cuda没有显式的锁,此方法采用了一种特殊的循环队列,本人认为在极端情况下,可能会出现问题

(这里的代码有错,没有处理原数组和缓冲区的调换,只是帮助理解。正确的代码请参考Samples里的)

  1. #define QSORT_BLOCKSIZE_SHIFT   9  
  2. #define QSORT_BLOCKSIZE         (1 << QSORT_BLOCKSIZE_SHIFT)  
  3. #define BITONICSORT_LEN         1024            // Must be power of 2!  
  4. #define QSORT_MAXDEPTH          16              // Will force final bitonic stage at depth QSORT_MAXDEPTH+1  
  5. #define QSORT_STACK_ELEMS   1*1024*1024 // One million stack elements is a HUGE number.  
  6.   
  7. typedef struct __align__(128) qsortAtomicData_t  
  8. {  
  9.     volatile unsigned int lt_offset;    // Current output offset for <pivot  
  10.     volatile unsigned int gt_offset;    // Current output offset for >pivot  
  11.     volatile unsigned int sorted_count; // Total count sorted, for deciding when to launch next wave  
  12.     volatile unsigned int index;        // Ringbuf tracking index. Can be ignored if not using ringbuf.  
  13. } qsortAtomicData;  
  14.   
  15. ////////////////////////////////////////////////////////////////////////////////  
  16. // A ring-buffer for rapid stack allocation  
  17. ////////////////////////////////////////////////////////////////////////////////  
  18. typedef struct qsortRingbuf_t  
  19. {  
  20.     volatile unsigned int head; //1        // Head pointer - we allocate from here  
  21.     volatile unsigned int tail; //0        // Tail pointer - indicates last still-in-use element  
  22.     volatile unsigned int count;//0        // Total count allocated  
  23.     volatile unsigned int max;  //0        // Max index allocated  
  24.     unsigned int stacksize;  //           // Wrap-around size of buffer (must be power of 2)  
  25.     volatile void *stackbase;           // Pointer to the stack we're allocating from  
  26. } qsortRingbuf;  
  27.   
  28.   
  29. /* 
  30. for cuda has no lock, so we have to do like this: 
  31. if alloc , ++head 
  32. if free , ++tail 
  33. so [tail, head) contains alloced chunks; head point to the next free chunk 
  34. count record the number of chunks had free 
  35. we have n chunks, but the index of a chunk is increase when re-alloc 
  36. max record the maximum index of the free chunks 
  37. only if the chunks before max are all free, aka, max == count, we can alter tail value  
  38. */  
  39. template<class T>  
  40. static __device__ void ringbufFree(qsortRingbuf *ringbuf, T *data) {  
  41.     unsigned index = data->index;  
  42.     unsigned count = atomicAdd((unsigned*)&(ringbuf->count), 1) + 1;  
  43.     unsigned max = atomicMax((unsigned*)&(ringbuf->max), index + 1);  
  44.       
  45.     if (max < (index + 1)) max = index + 1;  
  46.     if (max == count) {  
  47.         atomicMax((unsigned*)&(ringbuf->tail), count);  
  48.     }  
  49. }  
  50.   
  51. template<class T>  
  52. static __device__ T* ringbufAlloc(qsortRingbuf *ringbuf) {  
  53.     unsigned int loop = 10000;  
  54.     while (((ringbuf->head - ringbuf->tail) >= ringbuf->stacksize) && (loop-- > 0));  
  55.     if (loop == 0) return NULL;  
  56.       
  57.     unsigned index = atomicAdd((unsigned*)&ringbuf->head, 1);  
  58.     T *ret = (T*)(ringbuf->stackbase) + (index & (ringbuf->stacksize - 1));  
  59.     ret->index = index;  
  60.       
  61.     return ret;  
  62. }  
  63.   
  64.   
  65.   
  66. __global__ void qsort_warp(unsigned *indata,  
  67.                            unsigned *outdata,  
  68.                            unsigned int offset,//0  
  69.                            unsigned int len,//  
  70.                            qsortAtomicData *atomicData,//stack  
  71.                            qsortRingbuf *atomicDataStack,//ringbuf  
  72.                            unsigned int source_is_indata,//true  
  73.                            unsigned int depth)   
  74. {  
  75.     //printf("depth = %d", depth);  
  76.         // Find my data offset, based on warp ID  
  77.     unsigned int thread_id = threadIdx.x + (blockIdx.x << QSORT_BLOCKSIZE_SHIFT);  
  78.     //unsigned int warp_id = threadIdx.x >> 5;   // Used for debug only  
  79.     unsigned int lane_id = threadIdx.x & (warpSize-1);// %32  
  80.   
  81.     // Exit if I'm outside the range of sort to be done  
  82.     if (thread_id >= len)  
  83.         return;  
  84.   
  85.     //  
  86.     // First part of the algorithm. Each warp counts the number of elements that are  
  87.     // greater/less than the pivot.  
  88.     //  
  89.     // When a warp knows its count, it updates an atomic counter.  
  90.     //  
  91.   
  92.     // Read in the data and the pivot. Arbitrary pivot selection for now.  
  93.     unsigned pivot = indata[offset + len/2];  
  94.     unsigned data  = indata[offset + thread_id];  
  95.   
  96.     // Count how many are <= and how many are > pivot.  
  97.     // If all are <= pivot then we adjust the comparison  
  98.     // because otherwise the sort will move nothing and  
  99.     // we'll iterate forever.  
  100.     unsigned int greater = (data > pivot);  
  101.     unsigned int gt_mask = __ballot(greater);//Evaluate predicate for all active threads of the warp and return an integer whose Nth bit is set if and only if predicate evaluates to non-zero for the Nth thread of the warp and the Nth thread is active.  
  102.       
  103.     if (gt_mask == 0) {  
  104.         greater = (data >= pivot);  
  105.         gt_mask = __ballot(greater);  
  106.     }  
  107.       
  108.     unsigned lt_mask = __ballot(!greater);  
  109.     unsigned gt_count = __popc(gt_mask);//count number of 1 in a warp;  
  110.     unsigned lt_count = __popc(lt_mask);  
  111.       
  112.     //only thread 0 in warp calc  
  113.     //find 2 new positions for this warp  
  114.     unsigned lt_oft, gt_oft;  
  115.     if (lane_id == 0) {  
  116.         if (lt_count > 0)  
  117.             lt_oft = atomicAdd((unsigned*)&atomicData->lt_offset, lt_count);//atomicAdd return old value, not the newer//all the warps will syn call this  
  118.         if (gt_count > 0)  
  119.             gt_oft = len - (atomicAdd((unsigned*) &atomicData->gt_offset, gt_count) + gt_count);  
  120.           
  121.         //printf("depth = %d\n", depth);  
  122.         //printf("pivot = %u\n", pivot);  
  123.         //printf("lt_count %u lt_oft %u gt_count %u gt_oft %u  atomicDataGtOffset %u\n", lt_count,lt_oft, gt_count,gt_oft, atomicData->gt_offset);  
  124.     }  
  125.       
  126.     lt_oft = __shfl((int)lt_oft, 0);  
  127.     gt_oft = __shfl((int)gt_oft, 0);//Everyone pulls the offsets from lane 0  
  128.       
  129.     __syncthreads();  
  130.      // Now compute my own personal offset within this. I need to know how many  
  131.     // threads with a lane ID less than mine are going to write to the same buffer  
  132.     // as me. We can use popc to implement a single-operation warp scan in this case.  
  133.     unsigned lane_mask_lt;  
  134.     asm("mov.u32 %0, %%lanemask_lt;" : "=r"(lane_mask_lt));//bits set in positions less than the thread's lane number the warp  
  135.       
  136.     unsigned my_mask = greater ? gt_mask : lt_mask;  
  137.     unsigned my_oft = __popc(my_mask & lane_mask_lt);//  
  138.       
  139.     //move data  
  140.     my_oft += greater ? gt_oft : lt_oft;  
  141.     outdata[offset + my_oft] = data;  
  142.     __syncthreads();  
  143.       
  144.     //if (lane_id == 0) printf("pivot = %d", pivot);  
  145.     if (lane_id == 0) {  
  146.         /*if (blockIdx.x == 0) { 
  147.         printf("depth = %d\n", depth); 
  148.         for (int i = 0; i < len; ++i) 
  149.             printf("%u ", outdata[offset+i]); 
  150.         printf("\n"); 
  151.         }*/  
  152.           
  153.           
  154.         unsigned mycount = lt_count + gt_count;  
  155.         //we are the last warp   
  156.         if (atomicAdd((unsigned*)&atomicData->sorted_count, mycount) + mycount == len) {  
  157.             unsigned lt_len = atomicData->lt_offset;  
  158.             unsigned gt_len = atomicData->gt_offset;  
  159.                   
  160.             cudaStream_t lstream, rstream;  
  161.             cudaStreamCreateWithFlags(&lstream, cudaStreamNonBlocking);  
  162.             cudaStreamCreateWithFlags(&rstream, cudaStreamNonBlocking);  
  163.               
  164.             ringbufFree<qsortAtomicData>(atomicDataStack, atomicData);  
  165.               
  166.             if (lt_len == 0) return;  
  167.             //////////////////////////// left  
  168.             if (lt_len > BITONICSORT_LEN) {  
  169.                 if (depth >= QSORT_MAXDEPTH) {  
  170.                     bigBinoticSort<<<1, BITONICSORT_LEN,0, rstream>>>(outdata + offset, lt_len, indata + offset);  
  171.                 }  
  172.                 else {  
  173.                       
  174.                       
  175.                     if ((atomicData = ringbufAlloc<qsortAtomicData>(atomicDataStack)) == NULL)  
  176.                         printf("Stack-allocation error. Failing left child launch.\n");  
  177.                     else {  
  178.                         atomicData->lt_offset = atomicData->gt_offset = atomicData->sorted_count = 0;  
  179.                         unsigned int numblocks = (unsigned int)(lt_len+(QSORT_BLOCKSIZE-1)) >> QSORT_BLOCKSIZE_SHIFT;  
  180.                         qsort_warp<<< numblocks, QSORT_BLOCKSIZE, 0, lstream >>>(outdata, indata, offset, lt_len, atomicData, atomicDataStack, true, depth+1);  
  181.                     }  
  182.                       
  183.                       
  184.                 }  
  185.             }  
  186.             else if (lt_len > 1) {  
  187.                 unsigned int bitonic_len = 1 << (__btflo(lt_len-1U)+1);  
  188.                 binoticSort<<< 1, bitonic_len, 0, lstream >>>(outdata + offset,lt_len);  
  189.             }  
  190.             ////////////////////////// right  
  191.             if (gt_len > BITONICSORT_LEN) {  
  192.                 if (depth >= QSORT_MAXDEPTH)  
  193.                     bigBinoticSort<<<1, BITONICSORT_LEN,0, rstream>>>(outdata + offset + lt_len, gt_len, indata + offset + lt_len);  
  194.                 else {  
  195.                       
  196.                     if ((atomicData = ringbufAlloc<qsortAtomicData>(atomicDataStack)) == NULL)  
  197.                         printf("Stack allocation error! Failing right-side launch.\n");  
  198.                     else {  
  199.                         atomicData->lt_offset = atomicData->gt_offset = atomicData->sorted_count = 0;  
  200.                         unsigned int numblocks = (unsigned int)(gt_len+(QSORT_BLOCKSIZE-1)) >> QSORT_BLOCKSIZE_SHIFT;  
  201.                         qsort_warp<<< numblocks, QSORT_BLOCKSIZE, 0, rstream >>>(outdata, indata, offset+lt_len, gt_len, atomicData, atomicDataStack, true, depth+1);  
  202.                     }  
  203.                       
  204.                 }  
  205.             }  
  206.             else if (gt_len > 1) {  
  207.                 unsigned int bitonic_len = 1 << (__btflo(gt_len-1U)+1);  
  208.                 binoticSort<<< 1, bitonic_len, 0, rstream >>>(outdata + offset + lt_len,gt_len);  
  209.             }  
  210.               
  211.         }  
  212.     }  
  213. }  
  214.   
  215. void runqsort(unsigned *gpudata, unsigned *scratchdata, unsigned int count, cudaStream_t stream) {  
  216.     unsigned int stacksize = QSORT_STACK_ELEMS;//1*1024*1024  
  217.   
  218.     // This is the stack, for atomic tracking of each sort's status  
  219.     qsortAtomicData *gpustack;  
  220.     CUDA_CHECK(cudaMalloc((void **)&gpustack, stacksize * sizeof(qsortAtomicData)));  
  221.     CUDA_CHECK(cudaMemset(gpustack, 0, sizeof(qsortAtomicData)));     // Only need set first entry to 0  
  222.   
  223.     // Create the memory ringbuffer used for handling the stack.  
  224.     // Initialise everything to where it needs to be.  
  225.     qsortRingbuf buf;  
  226.     qsortRingbuf *ringbuf;  
  227.     CUDA_CHECK(cudaMalloc((void **)&ringbuf, sizeof(qsortRingbuf)));  
  228.     buf.head = 1;           // We start with one allocation  
  229.     buf.tail = 0;  
  230.     buf.count = 0;  
  231.     buf.max = 0;  
  232.     buf.stacksize = stacksize;  
  233.     buf.stackbase = gpustack;  
  234.     CUDA_CHECK(cudaMemcpy(ringbuf, &buf, sizeof(buf), cudaMemcpyHostToDevice));  
  235.       
  236.     if (count > BITONICSORT_LEN)//1024  
  237.     {//QSORT_BLOCKSIZE = 2^9 = 512  
  238.           
  239.         unsigned int numblocks = (unsigned int)(count+(QSORT_BLOCKSIZE-1)) >> QSORT_BLOCKSIZE_SHIFT;  
  240.         qsort_warp<<< numblocks, QSORT_BLOCKSIZE, 0, stream >>>(gpudata, scratchdata, 0U, count, gpustack, ringbuf, true, 0);  
  241.           
  242.    }  
  243.     else  
  244.     {  
  245.         binoticSort<<< 1, BITONICSORT_LEN >>>(gpudata, count);  
  246.         CUDA_CHECK(cudaMemcpy(scratchdata, gpudata, sizeof(unsigned)*count, cudaMemcpyDeviceToDevice));  
  247.     }  
  248.       
  249.     cudaDeviceSynchronize();