2013-09-24 21 views
6

bir seyrek dizi a (çoğunlukla sıfır) sahiptir:Seyrek dizi sıkıştırma

unsigned char a[1000000]; 

ve SIMD talimatlarını kullanarak a sıfır olmayan elemanları dizinlerinin bir dizi b yaratmak istiyoruz AVX2 ile Intel x64 mimarisinde. Nasıl verimli bir şekilde yapılacağına dair ipuçları arıyorum. Spesifik olarak, SIMD kaydında ardışık sıfır olmayan elemanların pozisyonlarını almak için SIMD komutları var mı?

+1

Doğrudan değil, ancak pcp'yi sıfırlayabildiniz, daha sonra 'pmovmskb' bunu normal bir kayıt defterine yazabilir ve ilk indeksi' bsf' ile ayıklayabilir (ve sonra ikinci ve daha fazlası, umarım çok fazla değil) – harold

+1

Sadece 'SIMD'den daha spesifik olmanız gerekecek - hangi mimariyi hedefliyorsunuz?x86, ARM, PowerPC, POWER ve bazı GPGPU'ların hepsi farklı SIMD uzantılarına sahiptir. Ayrıca, x86'nın birden fazla SIMD uzantısı vardır: MMX, SSE, SSE2, SSE3, SSSE3, SSE4, AVX, AVX2, vb. (AVX2'nin bu bağlamda yararlı olabilecek SIMD yönergelerine sahip olduğunu unutmayın). –

+0

@Paul R Bunun için üzgünüm. Sorumu düzenledim - AVX2 kabul edilebilir. –

cevap

0

AVX2 komut setinde birçok GATHER yönergesi olsa da, performansı çok yavaştır. Ve bunu yapmanın en etkili yolu - bir diziyi manuel olarak işlemek. Eğer sıfır olan elemanlarının sayısı (yani çok daha az% 1'den) çok düşük olmasını bekliyoruz, o zaman sadece sıfırdan farklı olduğu için her 16 bayt öbek kontrol edebilirsiniz

1

:

int mask = _mm_movemask_epi8(_mm_cmpeq_epi8(reg, _mm_setzero_si128()); 
    if (mask != 65535) { 
     //store zero bits of mask with scalar code 
    } 

Eğer iyi elementlerin yüzde yeterince küçüktür, yanlış tahmin edilen şubelerin maliyeti ve 's' içindeki yavaş skaler kodun maliyeti göz ardı edilebilir olur.


İyi bir genel çözüm olarak, önce SSE akış sıkıştırmasının uygulanmasını düşünün. Gördüğünüz gibi

__m128i shuf [65536]; //must be precomputed 
char cnt [65536]; //must be precomputed 

int compress(const char *src, int len, char *dst) { 
    char *ptr = dst; 
    for (int i = 0; i < len; i += 16) { 
     __m128i reg = _mm_load_si128((__m128i*)&src[i]); 
     __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128()); 
     int mask = _mm_movemask_epi8(zeroMask); 
     __m128i compressed = _mm_shuffle_epi8(reg, shuf[mask]); 
     _mm_storeu_si128((__m128i*)ptr, compressed); 
     ptr += cnt[mask]; //alternative: ptr += 16-_mm_popcnt_u32(mask); 
    } 
    return ptr - dst; 
} 

, (_mm_shuffle_epi8 + arama tablosu) harikalar yaratabilir: Bu bayt dizisinden tüm sıfır elemanlar (here alınan fikri) kaldırır. Akış sıkıştırması gibi yapısal olarak karmaşık kodların vektörünü başka bir şekilde bilmiyorum.


Artık isteğinizle ilgili geriye kalan sorun, endeks almak istediğinizdir. Her indeks 4 bayt değerinde saklanmalıdır, bu nedenle 16 giriş baytından oluşan bir yığın, tek SSE kaydına sığmayan 64 bayta kadar çıktı üretebilir.

Bunu işlemenin bir yolu, çıkışı 64 bayta dürüst bir şekilde açmaktır. Böylece reg kodunu sabit olarak (0,1,2,3,4, ..., 15) kodla değiştirdikten sonra SSE kaydını 4 register'a açınız ve dört adet i değerlerine sahip bir register ekleyiniz. Bu çok daha fazla talimat alır: 6 paket açma talimatı, 4 ek ve 3 mağaza (biri zaten var). Bana gelince, bu büyük bir ek yüktür, özellikle de sıfır olmayan elemanların% 25'inden daha azını beklerseniz.

Alternatif olarak, tek döngü yineleme tarafından işlenen sıfır olmayan bayt sayısını 4 ile sınırlayabilirsiniz, böylece bir kayıt her zaman çıktı için yeterli olur. Bu yaklaşımın

__m128i shufMask [65536]; //must be precomputed 
char srcMove [65536]; //must be precomputed 
char dstMove [65536]; //must be precomputed 

int compress_ids(const char *src, int len, int *dst) { 
    const char *ptrSrc = src; 
    int *ptrDst = dst; 
    __m128i offsets = _mm_setr_epi8(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15); 
    __m128i base = _mm_setzero_si128(); 
    while (ptrSrc < src + len) { 
     __m128i reg = _mm_loadu_si128((__m128i*)ptrSrc); 
     __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128()); 
     int mask = _mm_movemask_epi8(zeroMask); 
     __m128i ids8 = _mm_shuffle_epi8(offsets, shufMask[mask]); 
     __m128i ids32 = _mm_unpacklo_epi16(_mm_unpacklo_epi8(ids8, _mm_setzero_si128()), _mm_setzero_si128()); 
     ids32 = _mm_add_epi32(ids32, base); 
     _mm_storeu_si128((__m128i*)ptrDst, ids32); 
     ptrDst += dstMove[mask]; //alternative: ptrDst += min(16-_mm_popcnt_u32(mask), 4); 
     ptrSrc += srcMove[mask]; //no alternative without LUT 
     base = _mm_add_epi32(base, _mm_set1_epi32(dstMove[mask])); 
    } 
    return ptrDst - dst; 
} 

bir mahsur ise, hat ptrDst += dstMove[mask]; önceki iterasyondan yürütülür kadar hemen sonraki her döngü tekrarı başlayamaz olmasıdır: Burada örnek kod. Böylece kritik yol çarpıcı bir şekilde arttı. Donanımsal hiper iş parçacıklı veya manuel öykünme bu cezayı kaldırabilir. Gördüğünüz gibi


yüzden, verimlilik farklı derecesi ile sorunu çözmek hepsi bu temel fikrinin çok varyasyon vardır. Beğenmediğiniz takdirde LUT boyutunu da azaltabilirsiniz (yine, çıktı performansının düşürülmesi pahasına). Bu yaklaşım, daha geniş sicillere tam olarak genişletilememektedir (ör.AVX2 ve AVX-512), ancak birkaç ardışık yinelemenin talimatlarını tek bir AVX2 veya AVX-512 komutuna birleştirmeyi deneyebilirsiniz, böylece işlem hacmi biraz artar.

Not: Herhangi bir kodu sınamıyorum (çünkü ön hesaplama LUT'un doğru şekilde dikkate değer bir çaba gerektirmesi gerekir). nonzeros indislerine hesaplamak için

+0

LUT yaklaşımınızın [yanıt] ile nasıl karşılaştırıldığını görmek güzel olurdu (http://stackoverflow.com/a/41958528/2439725) Bit Manipülasyon Talimatlarına (BMI1 ve BMI2) dayanmaktadır. – wim

2

beş yöntem vardır:

  • Yarı vektörleşen döngü: sıfır ile karşılaştırmak, karakter ile bir SIMD vektörü yükleyin ve movemask uygulanır. Hiçbir chars (@stgatilov tarafından da önerilmektedir) sıfırdan küçük bir skaler döngü kullanın. Bu çok seyrek diziler için iyi çalışır. Aşağıdaki koddaki arr2ind_movmsk numaralı işlev, skaler döngü için BMI1 yönergeleri kullanır.

  • Vectorized loop: Intel Haswell işlemcileri ve daha yeni BMI1 ve BMI2 komut kümelerini destekler. BMI2, pext talimatı (Parallel bits extract, see wikipedia link) , belgesini içerir, bu da burada yararlıdır. Aşağıdaki kodda arr2ind_pext'a bakın.

  • If ifadesiyle birlikte klasik skaler döngü: arr2ind_if.

  • Dalsız skaler döngü: arr2ind_cmov.

  • Arama tablosu: @stgatilov, pdep ve diğer tam sayı yönergeleri yerine bir arama tablosunun kullanılabileceğini gösterir. Bu iyi işe yarayabilir, ancak, arama tablosu oldukça büyüktür: L1 önbelleğine sığmaz. Burada test edilmedi. Ayrıca bkz. here.


/* 
gcc -O3 -Wall -m64 -mavx2 -fopenmp -march=broadwell -std=c99 -falign-loops=16 sprs_char2ind.c 

example: Test different methods with an array a of size 20000 and approximate 25/1024*100%=2.4% nonzeros: 
       ./a.out 20000 25 
*/ 

#include <stdio.h> 
#include <immintrin.h> 
#include <stdint.h> 
#include <omp.h> 
#include <string.h> 


__attribute__ ((noinline)) int arr2ind_movmsk(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0, k; 
    __m256i msk; 
    m0=0; 
    for (i=0;i<n;i=i+32){        /* Load 32 bytes and compare with zero:   */ 
     msk=_mm256_cmpeq_epi8(_mm256_load_si256((__m256i *)&a[i]),_mm256_setzero_si256()); 
     k=_mm256_movemask_epi8(msk); 
     k=~k;           /* Search for nonzero bits instead of zero bits. */ 
     while (k){ 
     ind[m0]=i+_tzcnt_u32(k);      /* Count the number of trailing zero bits in k. */ 
     m0++; 
     k=_blsr_u32(k);        /* Clear the lowest set bit in k.     */ 
     } 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__ ((noinline)) int arr2ind_pext(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0; 
    uint64_t  cntr_const = 0xFEDCBA; 
    __m256i  shft  = _mm256_set_epi64x(0x04,0x00,0x04,0x00); 
    __m256i  vmsk  = _mm256_set1_epi8(0x0F); 
    __m256i  cnst16  = _mm256_set1_epi32(16); 
    __m256i  shf_lo  = _mm256_set_epi8(0x80,0x80,0x80,0x0B, 0x80,0x80,0x80,0x03, 0x80,0x80,0x80,0x0A, 0x80,0x80,0x80,0x02, 
              0x80,0x80,0x80,0x09, 0x80,0x80,0x80,0x01, 0x80,0x80,0x80,0x08, 0x80,0x80,0x80,0x00); 
    __m256i  shf_hi  = _mm256_set_epi8(0x80,0x80,0x80,0x0F, 0x80,0x80,0x80,0x07, 0x80,0x80,0x80,0x0E, 0x80,0x80,0x80,0x06, 
              0x80,0x80,0x80,0x0D, 0x80,0x80,0x80,0x05, 0x80,0x80,0x80,0x0C, 0x80,0x80,0x80,0x04); 
    __m128i  pshufbcnst = _mm_set_epi8(0x80,0x80,0x80,0x80,0x80,0x80,0x80,0x80, 0x0E,0x0C,0x0A,0x08,0x06,0x04,0x02,0x00);            

    __m256i  i_vec  = _mm256_setzero_si256(); 
    m0=0; 
    for (i=0;i<n;i=i+16){ 
     __m128i v   = _mm_load_si128((__m128i *)&a[i]);      /* Load 16 bytes.                    */ 
     __m128i msk  = _mm_cmpeq_epi8(v,_mm_setzero_si128());    /* Generate 16x8 bit mask.                  */ 
       msk  = _mm_srli_epi64(msk,4);        /* Pack 16x8 bit mask to 16x4 bit mask.               */ 
       msk  = _mm_shuffle_epi8(msk,pshufbcnst);      /* Pack 16x8 bit mask to 16x4 bit mask.               */ 
       msk  = _mm_xor_si128(msk,_mm_set1_epi32(-1));    /* Invert 16x4 mask.                   */ 
     uint64_t msk64  = _mm_cvtsi128_si64x(msk);        /* _mm_popcnt_u64 and _pext_u64 work on 64-bit general-purpose registers, not on simd registers.*/ 
     int  p   = _mm_popcnt_u64(msk64)>>2;        /* p is the number of nonzeros in 16 bytes of a.            */ 
     uint64_t cntr  = _pext_u64(cntr_const,msk64);       /* parallel bits extract. cntr contains p 4-bit integers. The 16 4-bit integers in cntr_const are shuffled to the p 4-bit integers that we want */ 
                        /* The next 7 intrinsics unpack these p 4-bit integers to p 32-bit integers.     */ 
     __m256i cntr256 = _mm256_set1_epi64x(cntr); 
       cntr256 = _mm256_srlv_epi64(cntr256,shft); 
       cntr256 = _mm256_and_si256(cntr256,vmsk); 
     __m256i cntr256_lo = _mm256_shuffle_epi8(cntr256,shf_lo); 
     __m256i cntr256_hi = _mm256_shuffle_epi8(cntr256,shf_hi); 
       cntr256_lo = _mm256_add_epi32(i_vec,cntr256_lo); 
       cntr256_hi = _mm256_add_epi32(i_vec,cntr256_hi); 

          _mm256_storeu_si256((__m256i *)&ind[m0],cntr256_lo);  /* Note that the stores of iteration i and i+16 may overlap.               */ 
          _mm256_storeu_si256((__m256i *)&ind[m0+8],cntr256_hi); /* Array ind has to be large enough to avoid segfaults. At most 16 integers are written more than strictly necessary */ 
       m0   = m0+p; 
       i_vec  = _mm256_add_epi32(i_vec,cnst16); 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__ ((noinline)) int arr2ind_if(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0; 
    m0=0; 
    for (i=0;i<n;i++){ 
     if (a[i]!=0){ 
     ind[m0]=i; 
     m0=m0+1; 
     } 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__((noinline)) int arr2ind_cmov(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0; 
    m0=0; 
    for (i=0;i<n;i++){ 
     ind[m0]=i; 
     m0=(a[i]==0)? m0 : m0+1; /* Compiles to cmov instruction. */ 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__ ((noinline)) int print_nonz(const unsigned char * restrict a, const int * restrict ind, const int m){ 
    int i; 
    for (i=0;i<m;i++) printf("i=%d, ind[i]=%d a[ind[i]]=%u\n",i,ind[i],a[ind[i]]); 
    printf("\n"); fflush(stdout); 
    return 0; 
} 


__attribute__ ((noinline)) int print_chk(const unsigned char * restrict a, const int * restrict ind, const int m){ 
    int i;        /* Compute a hash to compare the results of different methods. */ 
    unsigned int chk=0; 
    for (i=0;i<m;i++){ 
     chk=((chk<<1)|(chk>>31))^(ind[i]); 
    } 
    printf("chk = %10X\n",chk); 
    return 0; 
} 



int main(int argc, char **argv){ 
int n, i, m; 
unsigned int j, k, d; 
unsigned char *a; 
int *ind; 
double t0,t1; 
int meth, nrep; 
char txt[30]; 

sscanf(argv[1],"%d",&n);   /* Length of array a.         */ 
n=n>>5;        /* Adjust n to a multiple of 32.       */ 
n=n<<5; 
sscanf(argv[2],"%u",&d);   /* The approximate fraction of nonzeros in a is: d/1024 */ 
printf("n=%d, d=%u\n",n,d); 

a=_mm_malloc(n*sizeof(char),32); 
ind=_mm_malloc(n*sizeof(int),32);  

            /* Generate a pseudo random array a.      */ 
j=73659343;     
for (i=0;i<n;i++){ 
    j=j*653+1; 
    k=(j & 0x3FF00)>>8;    /* k is a pseudo random number between 0 and 1023  */ 
    if (k<d){ 
     a[i] = (j&0xFE)+1;   /* Set a[i] to nonzero.         */ 
    }else{ 
     a[i] = 0; 
    } 

} 

/* for (i=0;i<n;i++){if (a[i]!=0){printf("i=%d, a[i]=%u\n",i,a[i]);}} printf("\n"); */ /* Uncomment this line to print the nonzeros in a. */ 

char txt0[]="arr2ind_movmsk: "; 
char txt1[]="arr2ind_pext: "; 
char txt2[]="arr2ind_if:  "; 
char txt3[]="arr2ind_cmov: "; 

nrep=10000;         /* Repeat a function nrep times to make relatively accurate timings possible.       */ 
               /* With nrep=1000000: ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 519     */ 
               /* With nrep=10000:  ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 513     */ 
printf("nrep = \%d \n\n",nrep); 
arr2ind_movmsk(a,n,ind,&m);     /* Make sure that the arrays a and ind are read and/or written at least one time before benchmarking. */ 
for (meth=0;meth<4;meth++){ 
    t0=omp_get_wtime(); 
    switch (meth){ 
     case 0: for(i=0;i<nrep;i++) arr2ind_movmsk(a,n,ind,&m);   strcpy(txt,txt0); break; 
     case 1: for(i=0;i<nrep;i++) arr2ind_pext(a,n,ind,&m);   strcpy(txt,txt1); break; 
     case 2: for(i=0;i<nrep;i++) arr2ind_if(a,n,ind,&m);    strcpy(txt,txt2); break; 
     case 3: for(i=0;i<nrep;i++) arr2ind_cmov(a,n,ind,&m);   strcpy(txt,txt3); break; 
     default: ; 
    } 
    t1=omp_get_wtime(); 
    printf("method = %s ",txt); 
    /* print_chk(a,ind,m); */ 
    printf(" elapsed time = %6.2f\n",t1-t0); 
} 
print_nonz(a, ind, 2);           /* Do something with the results     */ 
printf("density = %f %% \n\n",((double)m)/((double)n)*100);  /* Actual nonzero density of array a.   */ 

/* print_nonz(a, ind, m); */ /* Uncomment this line to print the indices of the nonzeros.      */ 

return 0; 
} 

/* 
With nrep=1000000: 
./a.out 10016 4 ; ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 48 ; ./a.out 10016 519 ; ./a.out 10016 519  
With nrep=10000: 
./a.out 1000000 5 ; ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 52 ; ./a.out 1000000 513 ; ./a.out 1000000 513  
*/ 


kod n dizi büyüklüğü ile test edilmiştir = 10016 (veriler L1 önbellek oturur) ve n = 1000000, bir farklı sıfır olmayan yoğunluklarda yaklaşık% 0.5,% 5 ve% 50. Doğru zamanlama için fonksiyonlar sırasıyla 1000000 ve 10000 kez olarak adlandırıldı.

Time in seconds, size n=10016, 1e6 function calls. Intel core i5-6500 
        0.53%  5.1%  50.0% 
arr2ind_movmsk:  0.27  0.53  4.89 
arr2ind_pext:   1.44  1.59  1.45 
arr2ind_if:   5.93  8.95  33.82 
arr2ind_cmov:   6.82  6.83  6.82 

Time in seconds, size n=1000000, 1e4 function calls. 

        0.49%  5.1%  50.1% 
arr2ind_movmsk:  0.57  2.03  5.37 
arr2ind_pext:   1.47  1.47  1.46 
arr2ind_if:   5.88  8.98  38.59 
arr2ind_cmov:   6.82  6.81  6.81 
vectorized döngüler hızlı sayıl döngüler daha vardır Bu örneklerde



. arr2ind_movmsk'un performansı, a yoğunluğuna çok bağlıdır. Yoğunluk yeterince küçükse, yalnızca arr2ind_pext'dan daha hızlıdır. Kırılma noktası ayrıca n dizi boyutuna da bağlıdır. Fonksiyon 'arr2ind_if', net olmayan dallanma tahmininden% 50 sıfır olmayan yoğunluğa sahiptir.