C ++ vectorization of conditional code with internal properties

I tried to enable vectorization of a frequently used function to improve performance.

The algorithm should do the following and is called ~ 4,000,000 times!

Input:  double* cellvalue 
Output: int8*   Output (8 bit integer, c++ char) 

Algo:

if (cellvalue > upper_threshold )
    *output = 1;
else if (cellvalue < lower_threshold)
    *output = -1;
else
    *output = 0;

My first vectorization approach for parallel computing 2 parallels is as follows:

__m128d lowerThresh = _mm_set1_pd(m_lowerThreshold);
__m128d upperThresh = _mm_set1_pd(m_upperThreshold);

__m128d vec = _mm_load_pd(cellvalue);
__m128d maskLower = _mm_cmplt_pd(vec, lowerThresh); // less than
__m128d maskUpper = _mm_cmpgt_pd(vec, upperThresh); // greater than

static const tInt8 negOne = -1;
static const tInt8 posOne =  1;
output[0] = (negOne & *((tInt8*)&maskLower.m128d_f64[0])) | (posOne & *((tInt8*)&maskUpper.m128d_f64[0]));
output[1] = (negOne & *((tInt8*)&maskLower.m128d_f64[1])) | (posOne & *((tInt8*)&maskUpper.m128d_f64[1]));

Does that make sense to you? It works, but I think the last part to create the output is very difficult. Is there a faster way to do this?

I also tried to compute 8 values ​​simultaneously with almost the same code. Will this work better? Does the order of instructions make sense?

__m128d lowerThresh = _mm_set1_pd(m_lowerThreshold);
__m128d upperThresh = _mm_set1_pd(m_upperThreshold);

// load 4 times
__m128d vec0 = _mm_load_pd(cellValue);
__m128d vec1 = _mm_load_pd(cellValue + 2);
__m128d vec2 = _mm_load_pd(cellValue + 4);
__m128d vec3 = _mm_load_pd(cellValue + 6);
__m128d maskLower0 = _mm_cmplt_pd(vec0, lowerThresh); // less than
__m128d maskLower1 = _mm_cmplt_pd(vec1, lowerThresh); // less than
__m128d maskLower2 = _mm_cmplt_pd(vec2, lowerThresh); // less than
__m128d maskLower3 = _mm_cmplt_pd(vec3, lowerThresh); // less than
__m128d maskUpper0 = _mm_cmpgt_pd(vec0, upperThresh); // greater than
__m128d maskUpper1 = _mm_cmpgt_pd(vec1, upperThresh); // greater than
__m128d maskUpper2 = _mm_cmpgt_pd(vec2, upperThresh); // greater than
__m128d maskUpper3 = _mm_cmpgt_pd(vec3, upperThresh); // greater than

static const tInt8 negOne = -1;
static const tInt8 posOne =  1;
output[0] = (negOne & *((tInt8*)&maskLower0.m128d_f64[0])) | (posOne & *((tInt8*)&maskUpper0.m128d_f64[0]));
output[1] = (negOne & *((tInt8*)&maskLower0.m128d_f64[1])) | (posOne & *((tInt8*)&maskUpper0.m128d_f64[1]));
output[2] = (negOne & *((tInt8*)&maskLower1.m128d_f64[0])) | (posOne & *((tInt8*)&maskUpper1.m128d_f64[0]));
output[3] = (negOne & *((tInt8*)&maskLower1.m128d_f64[1])) | (posOne & *((tInt8*)&maskUpper1.m128d_f64[1]));
output[4] = (negOne & *((tInt8*)&maskLower2.m128d_f64[0])) | (posOne & *((tInt8*)&maskUpper2.m128d_f64[0]));
output[5] = (negOne & *((tInt8*)&maskLower2.m128d_f64[1])) | (posOne & *((tInt8*)&maskUpper2.m128d_f64[1]));
output[6] = (negOne & *((tInt8*)&maskLower3.m128d_f64[0])) | (posOne & *((tInt8*)&maskUpper3.m128d_f64[0]));
output[7] = (negOne & *((tInt8*)&maskLower3.m128d_f64[1])) | (posOne & *((tInt8*)&maskUpper3.m128d_f64[1]));

I hope you can help me better understand the subject of vectorization;)

+4
2

_mm_cmplt_pd _mm_cmpgt_pd , 0 -1; and -1 , and 1 . , upper_threshold > lower_threshold ( ), *:

_mm_storeu_si128(output, _mm_sub_epi64(maskLower, maskUpper));

(*) , "int8" ; ++. 8- int, , . 8- int, .


, int8 8- . :

__m128i result = _mm_sub_epi64(maskLower, maskUpper)
output[0] = result.m128i_i64[0]; // .m128i_i64 is an oddball MSVC-ism, so
output[1] = result.m128i_i64[1]; // I'm not 100% sure about the syntax here.

.

+3

, .

:

#include <stdint.h>                                                                             
#include <iostream>                                                                             
#include <random>                                                                               
#include <vector>                                                                               
#include <chrono>                                                                               

using Clock = std::chrono::steady_clock;                                                        
using std::chrono::milliseconds;                                                                

typedef double Scalar;                                                                          
typedef int8_t Integer;                                                                         

const Scalar kUpperThreshold = .5;                                                              
const Scalar kLowerThreshold = .2;                                                              

void compute_comparisons1(int n, const Scalar* xs, Integer* ys) {                               
#pragma simd                                                                                    
  for (int i=0; i<n; ++i) {                                                                     
    Scalar x   = xs[i];                                                                         
    ys[i] = (x > kUpperThreshold) - (x < kLowerThreshold);                                      
  }                                                                                             
}                                                                                               

void compute_comparisons2(int n, const Scalar* xs, Integer* ys) {                               
  for (int i=0; i<n; ++i) {                                                                     
    Scalar x   = xs[i];                                                                         
    Integer& y = ys[i];                                                                         
    if (x > kUpperThreshold)                                                                    
      y = 1;                                                                                    
    else if(x < kLowerThreshold)                                                                
      y = -1;                                                                                   
    else                                                                                        
      y = 0;                                                                                    
  }                                                                                             
}                                                                                               

const int N = 4000000;                                                                          

auto random_generator = std::mt19937{0};                                                        

int main() {                                                                                    
  std::vector<Scalar> xs(N);                                                                    
  std::vector<Integer> ys1(N);                                                                  
  std::vector<Integer> ys2(N);                                                                  

  std::uniform_real_distribution<Scalar> dist(0, 1);                                            
  for (int i=0; i<N; ++i)                                                                       
    xs[i] = dist(random_generator);                                                             


  auto time0 = Clock::now();                                                                    
  compute_comparisons1(N, xs.data(), ys1.data());                                               
  auto time1 = Clock::now();                                                                    
  compute_comparisons2(N, xs.data(), ys2.data());                                               
  auto time2 = Clock::now();                                                                    

  std::cout << "v1: " << std::chrono::duration_cast<milliseconds>(time1 - time0).count() << "\n";
  std::cout << "v2: " << std::chrono::duration_cast<milliseconds>(time2 - time1).count() << "\n";

  for (int i=0; i<N; ++i) {                                                                     
    if (ys1[i] != ys2[i]) {                                                                     
      std::cout << "Error!\n";                                                                  
      return -1;                                                                                
    }                                                                                           
  }                                                                                             
  return 0;                                                                                     
} 

gcc ( 4.8.3) "-O3 -std = ++ 11 -march = native -S", , , . (3 16 .)

, , ; , float double ( 1,8 )

+1

All Articles