Sparse array compression using SIMD (AVX2)

I have a sparse array a (mostly zeros):

 unsigned char a[1000000]; 

and I would like to create an array of b indices for nonzero elements a using SIMD instructions for Intel x64 architecture with AVX2. I am looking for tips on how to do this effectively. In particular, are there SIMD instructions for obtaining the positions of consecutive nonzero elements in the SIMD register, ordered?

+6
source share
3 answers

Five methods for calculating indices of nonzero elements:

  • Half-cycle: Load the SIMD vector with symbols, compare with zero and apply movemask. Use a small scalar loop if any of the characters is non-zero ( @stgatilov is also suggested). This works well for very rare arrays. The arr2ind_movmsk function in the code below uses BMI1 instructions for a scalar loop.

  • Vectorized Loop: Intel Haswell processors and newer support for BMI1 and BMI2 instruction sets. BMI2 contains the pext instruction ( Output of parallel bits, see the wikipedia link ), which is useful here. See arr2ind_pext in the code below.

  • Classic scalar loop with if statement: arr2ind_if .

  • Scalar loop without branches: arr2ind_cmov .

  • Search table: @stgatilov shows that instead of the pdep table and another integer, you can use the instruction search table. This may work well, however the lookup table is quite large: it is not suitable for L1 cache. Not tested here. See also discussion here .


 /* gcc -O3 -Wall -m64 -mavx2 -fopenmp -march=broadwell -std=c99 -falign-loops=16 sprs_char2ind.c example: Test different methods with an array a of size 20000 and approximate 25/1024*100%=2.4% nonzeros: ./a.out 20000 25 */ #include <stdio.h> #include <immintrin.h> #include <stdint.h> #include <omp.h> #include <string.h> __attribute__ ((noinline)) int arr2ind_movmsk(const unsigned char * restrict a, int n, int * restrict ind, int * m){ int i, m0, k; __m256i msk; m0=0; for (i=0;i<n;i=i+32){ /* Load 32 bytes and compare with zero: */ msk=_mm256_cmpeq_epi8(_mm256_load_si256((__m256i *)&a[i]),_mm256_setzero_si256()); k=_mm256_movemask_epi8(msk); k=~k; /* Search for nonzero bits instead of zero bits. */ while (k){ ind[m0]=i+_tzcnt_u32(k); /* Count the number of trailing zero bits in k. */ m0++; k=_blsr_u32(k); /* Clear the lowest set bit in k. */ } } *m=m0; return 0; } __attribute__ ((noinline)) int arr2ind_pext(const unsigned char * restrict a, int n, int * restrict ind, int * m){ int i, m0; uint64_t cntr_const = 0xFEDCBA9876543210; __m256i shft = _mm256_set_epi64x(0x04,0x00,0x04,0x00); __m256i vmsk = _mm256_set1_epi8(0x0F); __m256i cnst16 = _mm256_set1_epi32(16); __m256i shf_lo = _mm256_set_epi8(0x80,0x80,0x80,0x0B, 0x80,0x80,0x80,0x03, 0x80,0x80,0x80,0x0A, 0x80,0x80,0x80,0x02, 0x80,0x80,0x80,0x09, 0x80,0x80,0x80,0x01, 0x80,0x80,0x80,0x08, 0x80,0x80,0x80,0x00); __m256i shf_hi = _mm256_set_epi8(0x80,0x80,0x80,0x0F, 0x80,0x80,0x80,0x07, 0x80,0x80,0x80,0x0E, 0x80,0x80,0x80,0x06, 0x80,0x80,0x80,0x0D, 0x80,0x80,0x80,0x05, 0x80,0x80,0x80,0x0C, 0x80,0x80,0x80,0x04); __m128i pshufbcnst = _mm_set_epi8(0x80,0x80,0x80,0x80,0x80,0x80,0x80,0x80, 0x0E,0x0C,0x0A,0x08,0x06,0x04,0x02,0x00); __m256i i_vec = _mm256_setzero_si256(); m0=0; for (i=0;i<n;i=i+16){ __m128i v = _mm_load_si128((__m128i *)&a[i]); /* Load 16 bytes. */ __m128i msk = _mm_cmpeq_epi8(v,_mm_setzero_si128()); /* Generate 16x8 bit mask. */ msk = _mm_srli_epi64(msk,4); /* Pack 16x8 bit mask to 16x4 bit mask. */ msk = _mm_shuffle_epi8(msk,pshufbcnst); /* Pack 16x8 bit mask to 16x4 bit mask. */ msk = _mm_xor_si128(msk,_mm_set1_epi32(-1)); /* Invert 16x4 mask. */ uint64_t msk64 = _mm_cvtsi128_si64x(msk); /* _mm_popcnt_u64 and _pext_u64 work on 64-bit general-purpose registers, not on simd registers.*/ int p = _mm_popcnt_u64(msk64)>>2; /* p is the number of nonzeros in 16 bytes of a. */ uint64_t cntr = _pext_u64(cntr_const,msk64); /* parallel bits extract. cntr contains p 4-bit integers. The 16 4-bit integers in cntr_const are shuffled to the p 4-bit integers that we want */ /* The next 7 intrinsics unpack these p 4-bit integers to p 32-bit integers. */ __m256i cntr256 = _mm256_set1_epi64x(cntr); cntr256 = _mm256_srlv_epi64(cntr256,shft); cntr256 = _mm256_and_si256(cntr256,vmsk); __m256i cntr256_lo = _mm256_shuffle_epi8(cntr256,shf_lo); __m256i cntr256_hi = _mm256_shuffle_epi8(cntr256,shf_hi); cntr256_lo = _mm256_add_epi32(i_vec,cntr256_lo); cntr256_hi = _mm256_add_epi32(i_vec,cntr256_hi); _mm256_storeu_si256((__m256i *)&ind[m0],cntr256_lo); /* Note that the stores of iteration i and i+16 may overlap. */ _mm256_storeu_si256((__m256i *)&ind[m0+8],cntr256_hi); /* Array ind has to be large enough to avoid segfaults. At most 16 integers are written more than strictly necessary */ m0 = m0+p; i_vec = _mm256_add_epi32(i_vec,cnst16); } *m=m0; return 0; } __attribute__ ((noinline)) int arr2ind_if(const unsigned char * restrict a, int n, int * restrict ind, int * m){ int i, m0; m0=0; for (i=0;i<n;i++){ if (a[i]!=0){ ind[m0]=i; m0=m0+1; } } *m=m0; return 0; } __attribute__((noinline)) int arr2ind_cmov(const unsigned char * restrict a, int n, int * restrict ind, int * m){ int i, m0; m0=0; for (i=0;i<n;i++){ ind[m0]=i; m0=(a[i]==0)? m0 : m0+1; /* Compiles to cmov instruction. */ } *m=m0; return 0; } __attribute__ ((noinline)) int print_nonz(const unsigned char * restrict a, const int * restrict ind, const int m){ int i; for (i=0;i<m;i++) printf("i=%d, ind[i]=%da[ind[i]]=%u\n",i,ind[i],a[ind[i]]); printf("\n"); fflush( stdout ); return 0; } __attribute__ ((noinline)) int print_chk(const unsigned char * restrict a, const int * restrict ind, const int m){ int i; /* Compute a hash to compare the results of different methods. */ unsigned int chk=0; for (i=0;i<m;i++){ chk=((chk<<1)|(chk>>31))^(ind[i]); } printf("chk = %10X\n",chk); return 0; } int main(int argc, char **argv){ int n, i, m; unsigned int j, k, d; unsigned char *a; int *ind; double t0,t1; int meth, nrep; char txt[30]; sscanf(argv[1],"%d",&n); /* Length of array a. */ n=n>>5; /* Adjust n to a multiple of 32. */ n=n<<5; sscanf(argv[2],"%u",&d); /* The approximate fraction of nonzeros in a is: d/1024 */ printf("n=%d, d=%u\n",n,d); a=_mm_malloc(n*sizeof(char),32); ind=_mm_malloc(n*sizeof(int),32); /* Generate a pseudo random array a. */ j=73659343; for (i=0;i<n;i++){ j=j*653+1; k=(j & 0x3FF00)>>8; /* k is a pseudo random number between 0 and 1023 */ if (k<d){ a[i] = (j&0xFE)+1; /* Set a[i] to nonzero. */ }else{ a[i] = 0; } } /* for (i=0;i<n;i++){if (a[i]!=0){printf("i=%d, a[i]=%u\n",i,a[i]);}} printf("\n"); */ /* Uncomment this line to print the nonzeros in a. */ char txt0[]="arr2ind_movmsk: "; char txt1[]="arr2ind_pext: "; char txt2[]="arr2ind_if: "; char txt3[]="arr2ind_cmov: "; nrep=10000; /* Repeat a function nrep times to make relatively accurate timings possible. */ /* With nrep=1000000: ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 519 */ /* With nrep=10000: ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 513 */ printf("nrep = \%d \n\n",nrep); arr2ind_movmsk(a,n,ind,&m); /* Make sure that the arrays a and ind are read and/or written at least one time before benchmarking. */ for (meth=0;meth<4;meth++){ t0=omp_get_wtime(); switch (meth){ case 0: for(i=0;i<nrep;i++) arr2ind_movmsk(a,n,ind,&m); strcpy(txt,txt0); break; case 1: for(i=0;i<nrep;i++) arr2ind_pext(a,n,ind,&m); strcpy(txt,txt1); break; case 2: for(i=0;i<nrep;i++) arr2ind_if(a,n,ind,&m); strcpy(txt,txt2); break; case 3: for(i=0;i<nrep;i++) arr2ind_cmov(a,n,ind,&m); strcpy(txt,txt3); break; default: ; } t1=omp_get_wtime(); printf("method = %s ",txt); /* print_chk(a,ind,m); */ printf(" elapsed time = %6.2f\n",t1-t0); } print_nonz(a, ind, 2); /* Do something with the results */ printf("density = %f %% \n\n",((double)m)/((double)n)*100); /* Actual nonzero density of array a. */ /* print_nonz(a, ind, m); */ /* Uncomment this line to print the indices of the nonzeros. */ return 0; } /* With nrep=1000000: ./a.out 10016 4 ; ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 48 ; ./a.out 10016 519 ; ./a.out 10016 519 With nrep=10000: ./a.out 1000000 5 ; ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 52 ; ./a.out 1000000 513 ; ./a.out 1000000 513 */ 


The code was tested with an array size n = 10016 (data in L1 cache) and n = 1,000,000, with various non-zero densities of about 0.5%, 5% and 50%. For an exact time, the functions were called 1,000,000 and 10,000 times, respectively.


 Time in seconds, size n=10016, 1e6 function calls. Intel core i5-6500 0.53% 5.1% 50.0% arr2ind_movmsk: 0.27 0.53 4.89 arr2ind_pext: 1.44 1.59 1.45 arr2ind_if: 5.93 8.95 33.82 arr2ind_cmov: 6.82 6.83 6.82 Time in seconds, size n=1000000, 1e4 function calls. 0.49% 5.1% 50.1% arr2ind_movmsk: 0.57 2.03 5.37 arr2ind_pext: 1.47 1.47 1.46 arr2ind_if: 5.88 8.98 38.59 arr2ind_cmov: 6.82 6.81 6.81 


In these examples, vectorized loops are faster than scalar loops. The performance of arr2ind_movmsk highly dependent on density a . It is only faster than arr2ind_pext if the density is low enough. The breakeven point also depends on the size of the array n . The function 'arr2ind_if' clearly suffers from unsuccessful branch prediction at 50% non-zero density.

+2
source

If you expect the number of non-zero elements to be very low (i.e. much less than 1%), you can simply check every 16 byte chunk for a non-zero value:

  int mask = _mm_movemask_epi8(_mm_cmpeq_epi8(reg, _mm_setzero_si128()); if (mask != 65535) { //store zero bits of mask with scalar code } 

If the percentage of good elements is small enough, the cost of incorrectly predicted branches and the cost of the slow scalar code inside the "if" will be negligible.


For a good general solution, first consider implementing SSE stream compression. It removes all null elements from the byte array (idea taken from here ):

 __m128i shuf [65536]; //must be precomputed char cnt [65536]; //must be precomputed int compress(const char *src, int len, char *dst) { char *ptr = dst; for (int i = 0; i < len; i += 16) { __m128i reg = _mm_load_si128((__m128i*)&src[i]); __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128()); int mask = _mm_movemask_epi8(zeroMask); __m128i compressed = _mm_shuffle_epi8(reg, shuf[mask]); _mm_storeu_si128((__m128i*)ptr, compressed); ptr += cnt[mask]; //alternative: ptr += 16-_mm_popcnt_u32(mask); } return ptr - dst; } 

As you can see, ( _mm_shuffle_epi8 + lookup table) can work wonders. I don’t know any other way to vectorize structurally complex code, such as stream compression.


Now, the only remaining problem with your query is that you want to get indexes. Each index must be stored in a 4-byte value, so a block of 16 input bytes can produce up to 64 output bytes that do not fit into a single SSE register.

One way to handle this is to honestly decompress the output into 64 bytes. Thus, you replace reg with a constant (0,1,2,3,4, ..., 15) in the code, then unpack the SSE register into 4 registers and add a register with four i values. This will require much larger instructions: 6 unpacking instructions, 4 additions and 3 magazines (one already exists). For me, this is a huge overhead, especially if you expect less than 25% non-zero elements.

Alternatively, you can limit the number of nonzero bytes processed by the iteration of the same name to 4, so that only one register is always needed for output. Here is a sample code:

 __m128i shufMask [65536]; //must be precomputed char srcMove [65536]; //must be precomputed char dstMove [65536]; //must be precomputed int compress_ids(const char *src, int len, int *dst) { const char *ptrSrc = src; int *ptrDst = dst; __m128i offsets = _mm_setr_epi8(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15); __m128i base = _mm_setzero_si128(); while (ptrSrc < src + len) { __m128i reg = _mm_loadu_si128((__m128i*)ptrSrc); __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128()); int mask = _mm_movemask_epi8(zeroMask); __m128i ids8 = _mm_shuffle_epi8(offsets, shufMask[mask]); __m128i ids32 = _mm_unpacklo_epi16(_mm_unpacklo_epi8(ids8, _mm_setzero_si128()), _mm_setzero_si128()); ids32 = _mm_add_epi32(ids32, base); _mm_storeu_si128((__m128i*)ptrDst, ids32); ptrDst += dstMove[mask]; //alternative: ptrDst += min(16-_mm_popcnt_u32(mask), 4); ptrSrc += srcMove[mask]; //no alternative without LUT base = _mm_add_epi32(base, _mm_set1_epi32(dstMove[mask])); } return ptrDst - dst; } 

One of the drawbacks of this approach is that now each subsequent iteration of the loop cannot begin until the line ptrDst += dstMove[mask]; will not be executed in the previous iteration. Thus, the critical path has increased dramatically. Hardware hyper-threading or its manual emulation can remove this punishment.


So, as you can see, there are many variations of this basic idea that solve your problem with varying degrees of effectiveness. You can also reduce the size of the LUT if you don't like it (again, due to reduced throughput performance).

This approach cannot be completely expanded to wider registers (i.e., AVX2 and AVX-512), but you can try to combine several consecutive iteration instructions into one AVX2 or AVX-512 instruction, thereby slightly increasing throughput.

Note. I have not tested any code (since precalculating the LUT correctly requires noticeable effort).

+1
source

Although the AVX2 instruction set contains many GATHER instructions, its performance is too slow. And the most efficient way to do this is to process the array manually.

0
source

Source: https://habr.com/ru/post/926005/


All Articles