Windless K-tool (or other optimizations)

Note. I would like to learn more about how to approach and come up with such solutions, not the solution itself.

I have a very critical function on my system that appears as the number one profile access point in certain contexts. It is in the middle of the iteration k-means (already multi-threading using parallel to process the sub-ranges of points in each workflow).

ClusterPoint& pt = points[j]; pt.min_index = -1; pt.min_dist = numeric_limits<float>::max(); for (int i=0; i < num_centroids; ++i) { const ClusterCentroid& cent = centroids[i]; const float dist = ...; if (dist < pt.min_dist) // <-- #1 hotspot { pt.min_dist = dist; pt.min_index = i; } } 

Any saving in time spent processing this section of code increases significantly, so I often do this a lot. It might be worth putting a centroid loop outside, for example, and repeating parallel points for a given center of gravity. The number of cluster points here is millions, and the number of centroids is in thousands. The algorithm is used for several iterations (often up to 10). He does not strive for perfect convergence / stability, but only for some โ€œreasonableโ€ approximation.

Any ideas are welcome, but what I really want to discover is to make this code silent, as this will allow you to use the SIMD version. I did not develop this kind of mental ability to easily understand how to come up with branchy solutions: my brain fails there, as it was when I was first subjected to recursion in the early days, so the guide to writing non-proliferation code and ways to develop appropriate thinking for it will also be helpful.

In short, I'm looking for any guides and tips and suggestions (not necessarily solutions) on how to optimize this code for micro-optimization. He most likely has room for algorithmic improvements, but my blind has always been in microoptimization decisions (and I'm curious to know how to apply them more efficiently without going overboard with him). It is already tightly multithreaded with a short parallel for logic, so I pretty much pushed into the corner of micro-optimization as one of the faster tasks to try without a more reasonable algorithm. We can completely change the memory location.

In response to algorithmic sentences

I completely agree on how to look at all this, trying to optimize the O (knm) algorithm, which can be clearly improved at the algorithm level. This pushes this particular issue into a somewhat academic and impractical field. However, if I manage to give an anecdote, I proceed from the initial background of high-level programming - a large emphasis on a broad, large-scale point of view, security, and very little on the details of low-level implementation. I recently switched projects to a completely different kind of modern taste, and I'm learning all kinds of new tricks from my peers in cache memory, GPGPU, non-proliferation technologies, SIMD, specialized memory allocators that actually outperform malloc (but for specific scenarios), etc.

Here I am trying to catch up with the latest trends in performance, and I unexpectedly discovered that those old data structures that I often preferred in the 90s that were often connected / tree structures actually far outperformed the much more naive, rude, micro -optimized, parallel code that uses customized instructions for adjacent memory blocks. This is somewhat disappointing at the same time, because I feel that we are now adjusting the algorithms more for the machine and narrowing the possibilities this way (especially with GPGPU).

The funny thing is that I think that this type of micro-optimized, fast array processing code is much easier to maintain than the complex algorithms and data structures that I used before. For starters, it's easier to generalize. In addition, my peers can often complain about complaints about a certain slowdown in the area, just tickle the parallel for and, possibly, some SIMDs and call it at a decent speed. Algorithmic improvements can often offer significantly more, but the speed and non-intrusiveness in which these microoptimizations can be applied, I would like to know more in this area, since reading articles on better algorithms can take some time (and also require more extensive changes). Thus, I recently jumped to this micro-optimization group a little later and maybe too much in this particular case, but my curiosity is more related to expanding the range of possible solutions for any scenario.

Dismantling

Note. Actually, I am very poor at assembly, so I often set things up more in trial and error form, coming up with some educated guesses about why the hot spot shown in vtune could be a bottleneck, and then try things to see, will the times improve, assuming the guesswork has some hint of truth if the time has really improved or completely missed the mark if they don't.

 000007FEEE3FB8A1 jl thread_partition+70h (7FEEE3FB780h) { ClusterPoint& pt = points[j]; pt.min_index = -1; pt.min_dist = numeric_limits<float>::max(); for (int i = 0; i < num_centroids; ++i) 000007FEEE3FB8A7 cmp ecx,r10d 000007FEEE3FB8AA jge thread_partition+1F4h (7FEEE3FB904h) 000007FEEE3FB8AC lea rax,[rbx+rbx*2] 000007FEEE3FB8B0 add rax,rax 000007FEEE3FB8B3 lea r8,[rbp+rax*8+8] { const ClusterCentroid& cent = centroids[i]; const float x = pt.pos[0] - cent.pos[0]; const float y = pt.pos[1] - cent.pos[1]; 000007FEEE3FB8B8 movss xmm0,dword ptr [rdx] const float z = pt.pos[2] - cent.pos[2]; 000007FEEE3FB8BC movss xmm2,dword ptr [rdx+4] 000007FEEE3FB8C1 movss xmm1,dword ptr [rdx-4] 000007FEEE3FB8C6 subss xmm2,dword ptr [r8] 000007FEEE3FB8CB subss xmm0,dword ptr [r8-4] 000007FEEE3FB8D1 subss xmm1,dword ptr [r8-8] const float dist = x*x + y*y + z*z; 000007FEEE3FB8D7 mulss xmm2,xmm2 000007FEEE3FB8DB mulss xmm0,xmm0 000007FEEE3FB8DF mulss xmm1,xmm1 000007FEEE3FB8E3 addss xmm2,xmm0 000007FEEE3FB8E7 addss xmm2,xmm1 if (dist < pt.min_dist) // VTUNE HOTSPOT 000007FEEE3FB8EB comiss xmm2,dword ptr [rdx-8] 000007FEEE3FB8EF jae thread_partition+1E9h (7FEEE3FB8F9h) { pt.min_dist = dist; 000007FEEE3FB8F1 movss dword ptr [rdx-8],xmm2 pt.min_index = i; 000007FEEE3FB8F6 mov dword ptr [rdx-10h],ecx 000007FEEE3FB8F9 inc ecx 000007FEEE3FB8FB add r8,30h 000007FEEE3FB8FF cmp ecx,r10d 000007FEEE3FB902 jl thread_partition+1A8h (7FEEE3FB8B8h) for (int j = *irange.first; j < *irange.last; ++j) 000007FEEE3FB904 inc edi 000007FEEE3FB906 add rdx,20h 000007FEEE3FB90A cmp edi,dword ptr [rsi+4] 000007FEEE3FB90D jl thread_partition+31h (7FEEE3FB741h) 000007FEEE3FB913 mov rbx,qword ptr [irange] } } } } 

We are forced to focus on SSE 2 - a bit behind nowadays, but the user base actually worked once when we assumed that even SSE 4 was ok as the minimum requirement (the user had an Intel prototype machine).

Standalone update: ~ 5.6 s

I am very grateful for any help! Since the code base is quite extensive, and the conditions for running this code are complex (system events are triggered by several threads), it is a bit cumbersome to make experimental changes and analyze them every time. So I installed the surface test on the side as a standalone application that others can run and test so that I can experiment with all these kindly proposed solutions.

 #define _SECURE_SCL 0 #include <iostream> #include <fstream> #include <vector> #include <limits> #include <ctime> #if defined(_MSC_VER) #define ALIGN16 __declspec(align(16)) #else #include <malloc.h> #define ALIGN16 __attribute__((aligned(16))) #endif using namespace std; // Aligned memory allocation (for SIMD). static void* malloc16(size_t amount) { #ifdef _MSC_VER return _aligned_malloc(amount, 16); #else void* mem = 0; posix_memalign(&mem, 16, amount); return mem; #endif } template <class T> static T* malloc16_t(size_t num_elements) { return static_cast<T*>(malloc16(num_elements * sizeof(T))); } // Aligned free. static void free16(void* mem) { #ifdef _MSC_VER return _aligned_free(mem); #else free(mem); #endif } // Test parameters. enum {num_centroids = 512}; enum {num_points = num_centroids * 2000}; enum {num_iterations = 5}; static const float range = 10.0f; class Points { public: Points(): data(malloc16_t<Point>(num_points)) { for (int p=0; p < num_points; ++p) { const float xyz[3] = { range * static_cast<float>(rand()) / RAND_MAX, range * static_cast<float>(rand()) / RAND_MAX, range * static_cast<float>(rand()) / RAND_MAX }; init(p, xyz); } } ~Points() { free16(data); } void init(int n, const float* xyz) { data[n].centroid = -1; data[n].xyz[0] = xyz[0]; data[n].xyz[1] = xyz[1]; data[n].xyz[2] = xyz[2]; } void associate(int n, int new_centroid) { data[n].centroid = new_centroid; } int centroid(int n) const { return data[n].centroid; } float* operator[](int n) { return data[n].xyz; } private: Points(const Points&); Points& operator=(const Points&); struct Point { int centroid; float xyz[3]; }; Point* data; }; class Centroids { public: Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids)) { // Naive initial selection algorithm, but outside the // current area of interest. for (int c=0; c < num_centroids; ++c) init(c, points[c]); } ~Centroids() { free16(data); } void init(int n, const float* xyz) { data[n].count = 0; data[n].xyz[0] = xyz[0]; data[n].xyz[1] = xyz[1]; data[n].xyz[2] = xyz[2]; } void reset(int n) { data[n].count = 0; data[n].xyz[0] = 0.0f; data[n].xyz[1] = 0.0f; data[n].xyz[2] = 0.0f; } void sum(int n, const float* pt_xyz) { data[n].xyz[0] += pt_xyz[0]; data[n].xyz[1] += pt_xyz[1]; data[n].xyz[2] += pt_xyz[2]; ++data[n].count; } void average(int n) { if (data[n].count > 0) { const float inv_count = 1.0f / data[n].count; data[n].xyz[0] *= inv_count; data[n].xyz[1] *= inv_count; data[n].xyz[2] *= inv_count; } } float* operator[](int n) { return data[n].xyz; } int find_nearest(const float* pt_xyz) const { float min_dist_squared = numeric_limits<float>::max(); int min_centroid = -1; for (int c=0; c < num_centroids; ++c) { const float* cen_xyz = data[c].xyz; const float x = pt_xyz[0] - cen_xyz[0]; const float y = pt_xyz[1] - cen_xyz[1]; const float z = pt_xyz[2] - cen_xyz[2]; const float dist_squared = x*x + y*y * z*z; if (min_dist_squared > dist_squared) { min_dist_squared = dist_squared; min_centroid = c; } } return min_centroid; } private: Centroids(const Centroids&); Centroids& operator=(const Centroids&); struct Centroid { int count; float xyz[3]; }; Centroid* data; }; // A high-precision real timer would be nice, but we lack C++11 and // the coarseness of the testing here should allow this to suffice. static double sys_time() { return static_cast<double>(clock()) / CLOCKS_PER_SEC; } static void k_means(Points& points, Centroids& centroids) { // Find the closest centroid for each point. for (int p=0; p < num_points; ++p) { const float* pt_xyz = points[p]; points.associate(p, centroids.find_nearest(pt_xyz)); } // Reset the data of each centroid. for (int c=0; c < num_centroids; ++c) centroids.reset(c); // Compute new position sum of each centroid. for (int p=0; p < num_points; ++p) centroids.sum(points.centroid(p), points[p]); // Compute average position of each centroid. for (int c=0; c < num_centroids; ++c) centroids.average(c); } int main() { Points points; Centroids centroids(points); cout << "Starting simulation..." << endl; double start_time = sys_time(); for (int i=0; i < num_iterations; ++i) k_means(points, centroids); cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl; cout << "# Points: " << num_points << endl; cout << "# Centroids: " << num_centroids << endl; // Write the centroids to a file to give us some crude verification // of consistency as we make changes. ofstream out("centroids.txt"); for (int c=0; c < num_centroids; ++c) out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl; } 

I know the dangers of surface testing, but since it is already considered a hot spot in previous real-world sessions, I hope this is excusable. I am also interested in general methods related to micro-optimization of such code.

I got slightly different results in profiling this. The time here is a little more evenly distributed within the loop, and I'm not sure why. Perhaps this is because the data is smaller (I skipped the elements and pulled out the min_dist member and made it a local variable). The exact ratio between centroids and points is also slightly different, but hopefully close enough to translate the improvements to the source code. It is also single-threaded in this surface test, and disassembly looks very different, so I can risk optimizing this surface test without the original (a risk that I am ready to take now, since I am more interested in expanding my knowledge of methods that could optimize these cases, not the solution for this exact case).

VTune

Update with Yochai Timmer offer - ~ 12.5 seconds

Oh, I am facing micro-optimization problems without understanding the assembly very well. I replaced this:

  -if (min_dist_squared > dist_squared) -{ - min_dist_squared = dist_squared; - pt.centroid = c; -} 

Wherein:

  +const bool found_closer = min_dist_squared > dist_squared; +pt.centroid = bitselect(found_closer, c, pt.centroid); +min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared); 

.. only to find the time increased from ~ 5.6 sec. to ~ 12.5 sec. However, this is not his fault and does not take away the value of his decision - this is mine because he does not understand what is happening at the machine level and takes punches in the dark. This, apparently, was missed, and, apparently, I was not a victim of incorrect branch prediction, as I initially thought. However, his proposed solution is a wonderful and generalized function to try in such cases, and I am grateful for adding it to my set of tips and tricks. Now for round 2.

Harold SIMD Solution - 2.496 seconds (see disclaimer)

This decision can be awesome. After converting the cluster representation to SoA, I get a time of ~ 2.5 seconds with this! Unfortunately, there seems to be some kind of glitch. I get very different results for the final result, which offers more than slight differences in accuracy, including some centroids towards the end with values โ€‹โ€‹of 0 (assuming they were not found in the search). I am trying to go through the SIMD logic with a debugger to find out what might be - it may just be a transcription error on my part, but here is the code if someone can detect the error.

If the error can be corrected without slowing down the results, this improvement in speed is more than I ever imagined from pure micro-optimization!

  // New version of Centroids::find_nearest (from harold solution): int find_nearest(const float* pt_xyz) const { __m128i min_index = _mm_set_epi32(3, 2, 1, 0); __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x)); __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y)); __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z)); __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i index = min_index; for (int i=4; i < num_centroids; i += 4) { xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i)); ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i)); zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i)); __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist)); min_dist = _mm_min_ps(min_dist, dist); min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); index = _mm_add_epi32(index, _mm_set1_epi32(4)); } ALIGN16 float mdist[4]; ALIGN16 uint32_t mindex[4]; _mm_store_ps(mdist, min_dist); _mm_store_si128((__m128i*)mindex, min_index); float closest = mdist[0]; int closest_i = mindex[0]; for (int i=1; i < 4; i++) { if (mdist[i] < closest) { closest = mdist[i]; closest_i = mindex[i]; } } return closest_i; } 

Harold SIMD solution (fixed) - ~ 2.5 s

After applying the corrections and testing, their results are not damaged and will work correctly with similar improvements in the source code base!

Since this concerns the holy grail of knowledge, which I tried to understand better (diskless SIMD), I am going to reward the solution with additional details for more than doubling the speed of the operation. I have homework, trying to understand this, because my goal was not only to soften this access point, but also to expand my personal understanding of possible solutions to solve them.

Nevertheless, I am grateful for all the contributions from the algorithmic sentences to the really bitselect trick! I would like to accept all the answers. At some point, I can try all of them, but now I have homework cut out in understanding some of these non-arithmetic SIMD operations.

 int find_nearest_simd(const float* pt_xyz) const { __m128i min_index = _mm_set_epi32(3, 2, 1, 0); __m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]); __m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]); __m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]); __m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x)); __m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y)); __m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z)); __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i index = min_index; for (int i=4; i < num_centroids; i += 4) { xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i)); ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i)); zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i)); __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); index = _mm_add_epi32(index, _mm_set1_epi32(4)); __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist)); min_dist = _mm_min_ps(min_dist, dist); min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); } ALIGN16 float mdist[4]; ALIGN16 uint32_t mindex[4]; _mm_store_ps(mdist, min_dist); _mm_store_si128((__m128i*)mindex, min_index); float closest = mdist[0]; int closest_i = mindex[0]; for (int i=1; i < 4; i++) { if (mdist[i] < closest) { closest = mdist[i]; closest_i = mindex[i]; } } return closest_i; } 
+55
c ++ performance optimization
May 04 '15 at 5:39
source share
6 answers

Too bad, we cannot use SSE4.1, but very well, SSE2. I did not test this, just compiled it to see if there are syntax errors and see if the assembly makes sense (basically this is good, although GCC spills min_index even with some xmm registers that are not used, not sure why it happens)

 int find_closest(float *x, float *y, float *z, float pt_x, float pt_y, float pt_z, int n) { __m128i min_index = _mm_set_epi32(3, 2, 1, 0); __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x)); __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y)); __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z)); __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i index = min_index; for (int i = 4; i < n; i += 4) { xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x + i)); ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y + i)); zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z + i)); __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); index = _mm_add_epi32(index, _mm_set1_epi32(4)); __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist)); min_dist = _mm_min_ps(min_dist, dist); min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); } float mdist[4]; _mm_store_ps(mdist, min_dist); uint32_t mindex[4]; _mm_store_si128((__m128i*)mindex, min_index); float closest = mdist[0]; int closest_i = mindex[0]; for (int i = 1; i < 4; i++) { if (mdist[i] < closest) { closest = mdist[i]; closest_i = mindex[i]; } } return closest_i; } 

As usual, he expects the pointers to be aligned to 16. In addition, the indentation should be with infinite points (so they never get close to the goal).

SSE 4.1 will allow you to replace this

 min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); 

Under this

 min_index = _mm_blendv_epi8(min_index, index, mask); 

Here's the asm version made for vsyasm tested a bit (seems to work)

 bits 64 section .data align 16 centroid_four: dd 4, 4, 4, 4 centroid_index: dd 0, 1, 2, 3 section .text global find_closest proc_frame find_closest ; ; arguments: ; ecx: number of points (multiple of 4 and at least 4) ; rdx -> array of 3 pointers to floats (x, y, z) (the points) ; r8 -> array of 3 floats (the reference point) ; alloc_stack 0x58 save_xmm128 xmm6, 0 save_xmm128 xmm7, 16 save_xmm128 xmm8, 32 save_xmm128 xmm9, 48 [endprolog] movss xmm0, [r8] shufps xmm0, xmm0, 0 movss xmm1, [r8 + 4] shufps xmm1, xmm1, 0 movss xmm2, [r8 + 8] shufps xmm2, xmm2, 0 ; pointers to x, y, z in r8, r9, r10 mov r8, [rdx] mov r9, [rdx + 8] mov r10, [rdx + 16] ; reference point is in xmm0, xmm1, xmm2 (x, y, z) movdqa xmm3, [rel centroid_index] ; min_index movdqa xmm4, xmm3 ; current index movdqa xmm9, [rel centroid_four] ; index increment paddd xmm4, xmm9 ; calculate initial min_dist, xmm5 movaps xmm5, [r8] subps xmm5, xmm0 movaps xmm7, [r9] subps xmm7, xmm1 movaps xmm8, [r10] subps xmm8, xmm2 mulps xmm5, xmm5 mulps xmm7, xmm7 mulps xmm8, xmm8 addps xmm5, xmm7 addps xmm5, xmm8 add r8, 16 add r9, 16 add r10, 16 sub ecx, 4 jna _tail _loop: movaps xmm6, [r8] subps xmm6, xmm0 movaps xmm7, [r9] subps xmm7, xmm1 movaps xmm8, [r10] subps xmm8, xmm2 mulps xmm6, xmm6 mulps xmm7, xmm7 mulps xmm8, xmm8 addps xmm6, xmm7 addps xmm6, xmm8 add r8, 16 add r9, 16 add r10, 16 movaps xmm7, xmm6 cmpps xmm6, xmm5, 1 minps xmm5, xmm7 movdqa xmm7, xmm6 pand xmm6, xmm4 pandn xmm7, xmm3 por xmm6, xmm7 movdqa xmm3, xmm6 paddd xmm4, xmm9 sub ecx, 4 ja _loop _tail: ; calculate horizontal minumum pshufd xmm0, xmm5, 0xB1 minps xmm0, xmm5 pshufd xmm1, xmm0, 0x4E minps xmm0, xmm1 ; find index of the minimum cmpps xmm0, xmm5, 0 movmskps eax, xmm0 bsf eax, eax ; index into xmm3, sort of movaps [rsp + 64], xmm3 mov eax, [rsp + 64 + rax * 4] movaps xmm9, [rsp + 48] movaps xmm8, [rsp + 32] movaps xmm7, [rsp + 16] movaps xmm6, [rsp] add rsp, 0x58 ret endproc_frame 

In C ++:

 extern "C" int find_closest(int n, float** points, float* reference_point); 
+21
May 04 '15 at 1:07
source share

You can use a branchy ternary operator, sometimes called a beatselek (condition? True: false).
Just use it for 2 participants without doing anything. Don't worry about extra operations; they are nothing compared to the if branch.

bitselect implementation:

 inline static int bitselect(int condition, int truereturnvalue, int falsereturnvalue) { return (truereturnvalue & -condition) | (falsereturnvalue & ~(-condition)); //a when TRUE and b when FALSE } inline static float bitselect(int condition, float truereturnvalue, float falsereturnvalue) { //Reinterpret floats. Would work because it just a bit select, no matter the actual value int& at = reinterpret_cast<int&>(truereturnvalue); int& af = reinterpret_cast<int&>(falsereturnvalue); int res = (at & -condition) | (af & ~(-condition)); //a when TRUE and b when FALSE return reinterpret_cast<float&>(res); } 

And your loop should look like this:

 for (int i=0; i < num_centroids; ++i) { const ClusterCentroid& cent = centroids[i]; const float dist = ...; bool isSmaeller = dist < pt.min_dist; //use same value if not smaller pt.min_index = bitselect(isSmaeller, i, pt.min_index); pt.min_dist = bitselect(isSmaeller, dist, pt.min_dist); } 
+21
May 04 '15 at 6:05
source share

++ - . , ++ , . , :

 int g(int, int); int f(const int *arr) { int min = 10000, minIndex = -1; for ( int i = 0; i < 1000; ++i ) { if ( arr[i] < min ) { min = arr[i]; minIndex = i; } } return g(min, minIndex); } 

, undefined "g" , . g++ 4.9.2 -O3 -S x86_64 ( -march), ( ) ,

 movl (%rdi,%rax,4), %ecx movl %edx, %r8d cmpl %edx, %ecx cmovle %ecx, %r8d cmovl %eax, %esi addq $1, %rax 

, , , , , "" , , . . "" , " ", . , , , . SIMD, " " ( "", , ).

+10
04 '15 10:07
source share

, :

 std::vector<float> centDists(num_centroids); //<-- one for each thread. for (size_t p=0; p<num_points; ++p) { Point& pt = points[p]; for (size_t c=0; c<num_centroids; ++c) { const float dist = ...; centDists[c]=dist; } pt.min_idx it= min_element(centDists.begin(),centDists.end())-centDists.begin(); } 

, , , , ( ), , , - , .

, , , .
, pt.min_dist , - - .

- Structs ( cententroids) (, ), , SIMD. . .

EDIT: (haswell, clang 3.5):
, 10% - , .

, AoS SoA , , 40% AoS SoA.

+6
04 '15 6:43
source share

-, , , - , . . , :

  • , , .
  • , - , dist.

, , , , . .

SSE , - , _ mm_min_ps . , SSE ( AVX). :

  float MinimumDistance(const float *values, int count) { __m128 min = _mm_set_ps(FLT_MAX, FLT_MAX, FLT_MAX, FLT_MAX); int i=0; for (; i < count - 3; i+=4) { __m128 distances = _mm_loadu_ps(&values[i]); min = _mm_min_ps(min, distances); } // Combine the four separate minimums to a single value min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(2, 3, 0, 1))); min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(1, 0, 3, 2))); // Deal with the last 0-3 elements the slow way float result = FLT_MAX; if (count > 3) _mm_store_ss(&result, min); for (; i < count; i++) { result = min(values[i], result); } return result; } 

SSE , . , , .

, - . ClusterCentroid, , , , , 64 .

+6
04 '15 12:12
source share

: min_dist min_index . , , , ; . . .

4 .

, , . , kd- ( ) , , .

, , " ":

 Sort the points by one coordinate, eg cent.pos[0] Pick a starting index for the query point (pt) Iterate forwards through the candidate points until you reach the end, OR when abs(pt.pos[0] - cent.pos[0]) > min_dist Repeat the previous step going the opposite direction. 

, ; , , .

, :

 // sort centroid by x coordinate. min_index = -1; min_dist = numeric_limits<float>::max(); // pick the start index. This works well if the points are evenly distributed. float min_x = centroids[0].pos[0]; float max_x = centroids[num_centroids-1].pos[0]; float cur_x = pt.pos[0]; float t = (max_x - cur_x) / (max_x - min_x); // TODO clamp t between 0 and 1 int start_index = int(t * float(num_centroids)) // Forward search for (int i=start_index ; i < num_centroids; ++i) { const ClusterCentroid& cent = centroids[i]; if (fabs(cent.pos[0] - pt.pos[0]) > min_i) // Everything to the right of this must be further min_dist, so break. // This is where the savings comes from! break; const float dist = ...; if (dist < min_dist) { min_dist = dist; min_index = i; } } // Backwards search for (int i=start_index ; i >= 0; --i) { // same as above } pt.min_dist = min_dist pt.min_index = min_index 

( , , , . ).

, ( ).

+3
04 '15 14:30
source share



All Articles