2013-07-24 19 views
5

Kodumda, websocket paketlerinin "unmasking" özelliğini kullanmam gerekiyor. SO (Websocket data unmasking/multi byte xor) sayesinde SSE2/AVX2 uzantıları kullanarak bunu nasıl (umarım) hızlandıracağımızı (şimdi umduğum) buldum, ancak şimdi baktığımızda, bağlantısız verilerin işlenmesi tamamen sub-optimal. Kodumu optimize etmenin veya en azından aynı performansla daha basitleştirmenin bir yolu var mı, yoksa kodum zaten en iyi performans mı?en uygun hale getirilmiş SSE2/AVX2'yi optimize edin XOR

Kodun önemli kısmı şu şekildedir (soru için, her zaman AVX2 döngüsünü bir kez çalıştırmak için yeterli olacak, ancak aynı anda çoğunlukla yalnızca birkaç kez çalıştırılacaktır) :

// circular shift left for uint32 
int cshiftl_u32(uint32_t num, uint8_t shift) { 
    return (num << shift) | (num >> (32 - shift));                  
}                              

// circular shift right for uint32 
int cshiftr_u32(uint32_t num, uint8_t shift) { 
    return (num >> shift) | (num << (32 - shift));                  
}                              

void optimized_xor_32(uint32_t mask, uint8_t *ds, uint8_t *de) { 
    if (ds == de) return; // zero data len -> nothing to do 

    uint8_t maskOffset = 0; 

// process single bytes till 4 byte alignment (<= 3) 
    for (; ds < de && ((uint64_t)ds & (uint64_t)3); ds++) { 
     *ds ^= *((uint8_t *)(&mask) + maskOffset); 
     maskOffset = (maskOffset + 1) & (uint8_t)3; 
    } 

    if (ds == de) return; // done, return 

    if (maskOffset != 0) { // circular left-shift mask around so it works for other instructions 
     mask = cshiftl_u32(mask, maskOffset); 

     maskOffset = 0; 
    } 

// process 4 byte block till 8 byte alignment (<= 1) 
    uint8_t *de32 = (uint8_t *)((uint64_t)de & ~((uint64_t)31)); 

    if (ds < de32 && ((uint64_t)de & (uint64_t)7)) { 
     *(uint32_t *)ds ^= mask; // mask is uint32_t 

     if (++ds == de) return; 
    } 

// process 8 byte block till 16 byte alignment (<= 1) 
    uint64_t mask64 = mask | (mask << 4); 
    uint8_t *de64 = (uint8_t *)((uint64_t)de & ~((uint64_t)63)); 

    if (ds < de64 && ((uint64_t)ds & (uint64_t)15)) { 
     *(uint64_t *)ds ^= mask64; 

     if (++ds == de) return; // done, return 
    } 


// process 16 byte block till 32 byte alignment (<= 1) (if supported) 
#ifdef CPU_SSE2 
    __m128i v128, v128_mask; 
    v128_mask = _mm_set1_epi32(mask); 

    uint8_t *de128 = (uint8_t *)((uint64_t)de & ~((uint64_t)127)); 

    if (ds < de128 && ((uint64_t)ds & (uint64_t)31)) { 
     v128 = _mm_load_si128((__m128i *)ds); 
     v128 = _mm_xor_si128(v128, v128_mask); 
     _mm_store_si128((__m128i *)ds, v128); 

     if (++ds == de) return; // done, return 
    } 

#endif 
#ifdef CPU_AVX2 // process 32 byte blocks (if supported -> haswell upwards) 
    __m256i v256, v256_mask; 
    v256_mask = _mm256_set1_epi32(mask); 

    uint8_t *de256 = (uint8_t *)((uint64_t)de & ~((uint64_t)255)); 

    for (; ds < de256; ds+=32) { 
     v256 = _mm256_load_si256((__m256i *)ds); 
     v256 = _mm256_xor_si256(v256, v256_mask); 
     _mm256_store_si256((__m256i *)ds, v256); 
    } 

    if (ds == de) return; // done, return 
#endif 
#ifdef CPU_SSE2 // process remaining 16 byte blocks (if supported) 
    for (; ds < de128; ds+=16) { 
     v128 = _mm_load_si128((__m128i *)ds); 
     v128 = _mm_xor_si128(v128, v128_mask); 
     _mm_store_si128((__m128i *)ds, v128); 
    } 

    if (ds == de) return; // done, return 

#endif 
    // process remaining 8 byte blocks 
    // this should always be supported, so remaining can be assumed to be executed <= 1 times 
    for (; ds < de64; ds += 8) { 
     *(uint64_t *)ds ^= mask64; 
    } 

    if (ds == de) return; // done, return 

    // process remaining 4 byte blocks (<= 1) 
    if (ds < de32) { 
     *(uint32_t *)ds ^= mask; 

     if (++ds == de) return; // done, return 
    } 


    // process remaining bytes (<= 3) 

    for (; ds < de; ds ++) { 
     *ds ^= *((uint8_t *)(&mask) + maskOffset); 
     maskOffset = (maskOffset + 1) & (uint8_t)3; 
    } 

} 

PS: #ifdef yerine cpuid veya cpu bayrak tespiti için benzeri kullanımını görmezden gelin.

+0

Kodunuzu zamanlamaya çalıştınız mı? (Ayrıca, koşullarınızda & quot; parantezleri ile bit ve bitlerini sarmak isteyebilirsiniz) –

+1

Zamanlama gerçekten yardımcı olmaz, çünkü sadece girdi olarak alacağım verilerden varsayımlar yapabilirim, fakat gerçek olmayacaktır. Gelecek birkaç ay için giriş. Ayrıca zamanlama ile sadece mutlak bir sayı elde edecektim, bu da benim sorunumun bu kodun xy girişiyle ne kadar uzun sürdüğünü değil, daha hızlı bir şekilde nasıl yapılacağını bulması olarak bana yardım etmiyor. Neyin değişeceğine dair bir fikrim yok. S.S .: Bit için sarıldı ve daha kolay anlaşılması için ipucu için teşekkürler! – griffin

+1

Veri bağımlılık tezgahlarının hizalanmış/hizalanmamış faydaya ağır basacağını göreceksiniz. Eğer döngülerinizi 2x kadar açabiliyorsanız, önemli bir gelişme görmelisiniz. – BitBank

cevap

2

Elkitabında yazılanların aksine, çoğu Intel işlemcisi, hizalanmamış verileri işlemede gerçekten oldukça iyidir. Intel'in derleyicileri vektör işleme için kullandığınız için, icc'un makul bir son sürümüne erişiminiz olduğunu kabul ediyorum.

Verilerinizi doğal olarak hizalayamazsanız, yaptığınız şey maksimum performansa ulaşabileceğiniz kadar yakın olduğunuzdan korkuyorum. Xeon Phi (64 bayt vektör kayıtları)/Gelecekte daha uzun vektör işlemcilerde kodu daha okunabilir ve kullanılabilir hale getirme açısından, Intel Cilk Plus'u kullanmaya başlamanızı öneririm.

Örnek:

void intel_cilk_xor(uint32_t mask, uint8_t *d, size_t length) { 
    while (length & 0x3) { 
     *(d++) ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left 
     length--; 
    } 

    // switch to 4 bytes per block 
    uint32_t _d = d; 
    length >>= 2; 

    // Intel Cilk Plus Array Notation 
    // Should expand automatically to the best possible SIMD instructions 
    // you are compiling for 
    _d[0:length] ^= mask; 
} 

Şu anda Intel derleyici erişimi yok gibi ben bu kodu test etmedi unutmayınız. Eğer problemlerle karşılaşırsanız, önümüzdeki hafta ofisime döndüğümde bunun üstesinden gelebilirim.

Bunun yerine intrinsics tercih ederse o zaman önemli ölçüde hayatınızı kolaylaştırmak için önişlemci makro doğru kullanımı: Başka bir kayda göre

#if defined(__MIC__) 
// intel Xeon Phi 
#define VECTOR_BLOCKSIZE 64 
// I do not remember the correct types/instructions right now 
#error "TODO: MIC handling" 
#elif defined(CPU_AVX2) 
#define VECTOR_BLOCKSIZE 32 
typedef __m256i my_vector_t; 
#define VECTOR_LOAD_MASK _mm256_set1_epi32 
#define VECTOR_XOR(d, mask) _mm_store_si256(d, _mm256_set1_epi32(_mm256_load_si256(d), mask)) 
#elif defined(CPU_SSE2) 
#define VECTOR_BLOCKSIZE 16 
typedef __m128i my_vector_t; 
#define VECTOR_LOAD_MASK _mm128_set1_epi32 
#define VECTOR_XOR(d, mask) _mm_store_si128(d, _mm128_set1_epi32(_mm128_load_si128(d), mask)) 
#else 
#define VECTOR_BLOCKSIZE 8 
#define VECTOR_LOAD_MASK(mask) ((mask) << 32 | (mask)) 
#define VECTOR_XOR(d, mask) (*(d)) ^= (mask) 
typedef uint64_t my_vector_t; 
#fi 

void optimized_xor_32(uint32_t mask, uint8_t *d, size_t length) { 
    size_t i; 

    // there really is no point in having extra 
    // branches for different vector lengths if they are 
    // executed at most once 
    // branch prediction is your friend here 
    // so we do one byte at a time until the block size 
    // is reached 

    while (length && (d & (VECTOR_BLOCKSIZE - 1))) { 
     *(d++) ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left 
     length--; 
    } 

    my_vector_t * d_vector = (my_vector_t *)d; 
    my_vector_t vector_mask = VECTOR_LOAD_MASK(mask); 

    size_t vector_legth = length/VECTOR_BLOCKSIZE; // compiler will optimise this to a bitshift 
    length &= VECTOR_BLOCKSIZE -1; // remaining length 

    for (i = 0; i < vector_legth; i++) { 
     VECTOR_XOR(d_vector + i, vector_mask); 
    } 

    // process the tail 
    d = (uint8_t*)(d_vector + i); 
    for (i = 0; i < length; i++) { 
     d[i] ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); 
    } 

} 

: Sen x86 kullanmak isteyebilirsiniz yerine bitin talimat mask döndürmek geçer döndürmek:

#define asm_rol(var, bits) asm ("rol %1, %0" : "+r" (var) : "c" ((uint8_t)bits) : "cc") 
+0

icc'yi kullanmam ama gcc kullanmam ve icc'e herhangi bir özel erişimim yok. Döndürme talimatını bilmiyordum, tam olarak ne yaptığını araştırmak zorundayım, thx! – griffin

+0

@griffin Tamam, '_mm_load_si128' ve ailesinin bir' icc 'yerleşik olduğu izlenimi altındaydım. Bu durumda, ikinci kod snippet'ini, sadece MIC'in parçası olmadan almalısınız. Ne yazık ki, döndürme yönergeleri için içsel bir şey yoktur, örneğin 'htons' 2 bayt döndürmeyi kullandığını biliyorum. –

+0

İstedim, ama zamanımın olduğu zaman bunu denemem gerekecek, muhtemelen bu erken olmayacak, ama ben bunu test ettiğimde ve iyi performans gösterdiğimde bunu kabul ettiğimden emin olacağım. Bu arada teşekkürler! – griffin

İlgili konular