Note. I would like to learn more about how to approach and come up with such solutions, not the solution itself.
I have a very critical function on my system that appears as the number one profile access point in certain contexts. It is in the middle of the iteration k-means (already multi-threading using parallel to process the sub-ranges of points in each workflow).
ClusterPoint& pt = points[j]; pt.min_index = -1; pt.min_dist = numeric_limits<float>::max(); for (int i=0; i < num_centroids; ++i) { const ClusterCentroid& cent = centroids[i]; const float dist = ...; if (dist < pt.min_dist)
Any saving in time spent processing this section of code increases significantly, so I often do this a lot. It might be worth putting a centroid loop outside, for example, and repeating parallel points for a given center of gravity. The number of cluster points here is millions, and the number of centroids is in thousands. The algorithm is used for several iterations (often up to 10). He does not strive for perfect convergence / stability, but only for some โreasonableโ approximation.
Any ideas are welcome, but what I really want to discover is to make this code silent, as this will allow you to use the SIMD version. I did not develop this kind of mental ability to easily understand how to come up with branchy solutions: my brain fails there, as it was when I was first subjected to recursion in the early days, so the guide to writing non-proliferation code and ways to develop appropriate thinking for it will also be helpful.
In short, I'm looking for any guides and tips and suggestions (not necessarily solutions) on how to optimize this code for micro-optimization. He most likely has room for algorithmic improvements, but my blind has always been in microoptimization decisions (and I'm curious to know how to apply them more efficiently without going overboard with him). It is already tightly multithreaded with a short parallel for logic, so I pretty much pushed into the corner of micro-optimization as one of the faster tasks to try without a more reasonable algorithm. We can completely change the memory location.
In response to algorithmic sentences
I completely agree on how to look at all this, trying to optimize the O (knm) algorithm, which can be clearly improved at the algorithm level. This pushes this particular issue into a somewhat academic and impractical field. However, if I manage to give an anecdote, I proceed from the initial background of high-level programming - a large emphasis on a broad, large-scale point of view, security, and very little on the details of low-level implementation. I recently switched projects to a completely different kind of modern taste, and I'm learning all kinds of new tricks from my peers in cache memory, GPGPU, non-proliferation technologies, SIMD, specialized memory allocators that actually outperform malloc (but for specific scenarios), etc.
Here I am trying to catch up with the latest trends in performance, and I unexpectedly discovered that those old data structures that I often preferred in the 90s that were often connected / tree structures actually far outperformed the much more naive, rude, micro -optimized, parallel code that uses customized instructions for adjacent memory blocks. This is somewhat disappointing at the same time, because I feel that we are now adjusting the algorithms more for the machine and narrowing the possibilities this way (especially with GPGPU).
The funny thing is that I think that this type of micro-optimized, fast array processing code is much easier to maintain than the complex algorithms and data structures that I used before. For starters, it's easier to generalize. In addition, my peers can often complain about complaints about a certain slowdown in the area, just tickle the parallel for and, possibly, some SIMDs and call it at a decent speed. Algorithmic improvements can often offer significantly more, but the speed and non-intrusiveness in which these microoptimizations can be applied, I would like to know more in this area, since reading articles on better algorithms can take some time (and also require more extensive changes). Thus, I recently jumped to this micro-optimization group a little later and maybe too much in this particular case, but my curiosity is more related to expanding the range of possible solutions for any scenario.
Dismantling
Note. Actually, I am very poor at assembly, so I often set things up more in trial and error form, coming up with some educated guesses about why the hot spot shown in vtune could be a bottleneck, and then try things to see, will the times improve, assuming the guesswork has some hint of truth if the time has really improved or completely missed the mark if they don't.
000007FEEE3FB8A1 jl thread_partition+70h (7FEEE3FB780h) { ClusterPoint& pt = points[j]; pt.min_index = -1; pt.min_dist = numeric_limits<float>::max(); for (int i = 0; i < num_centroids; ++i) 000007FEEE3FB8A7 cmp ecx,r10d 000007FEEE3FB8AA jge thread_partition+1F4h (7FEEE3FB904h) 000007FEEE3FB8AC lea rax,[rbx+rbx*2] 000007FEEE3FB8B0 add rax,rax 000007FEEE3FB8B3 lea r8,[rbp+rax*8+8] { const ClusterCentroid& cent = centroids[i]; const float x = pt.pos[0] - cent.pos[0]; const float y = pt.pos[1] - cent.pos[1]; 000007FEEE3FB8B8 movss xmm0,dword ptr [rdx] const float z = pt.pos[2] - cent.pos[2]; 000007FEEE3FB8BC movss xmm2,dword ptr [rdx+4] 000007FEEE3FB8C1 movss xmm1,dword ptr [rdx-4] 000007FEEE3FB8C6 subss xmm2,dword ptr [r8] 000007FEEE3FB8CB subss xmm0,dword ptr [r8-4] 000007FEEE3FB8D1 subss xmm1,dword ptr [r8-8] const float dist = x*x + y*y + z*z; 000007FEEE3FB8D7 mulss xmm2,xmm2 000007FEEE3FB8DB mulss xmm0,xmm0 000007FEEE3FB8DF mulss xmm1,xmm1 000007FEEE3FB8E3 addss xmm2,xmm0 000007FEEE3FB8E7 addss xmm2,xmm1 if (dist < pt.min_dist)
We are forced to focus on SSE 2 - a bit behind nowadays, but the user base actually worked once when we assumed that even SSE 4 was ok as the minimum requirement (the user had an Intel prototype machine).
Standalone update: ~ 5.6 s
I am very grateful for any help! Since the code base is quite extensive, and the conditions for running this code are complex (system events are triggered by several threads), it is a bit cumbersome to make experimental changes and analyze them every time. So I installed the surface test on the side as a standalone application that others can run and test so that I can experiment with all these kindly proposed solutions.
#define _SECURE_SCL 0 #include <iostream> #include <fstream> #include <vector> #include <limits> #include <ctime> #if defined(_MSC_VER) #define ALIGN16 __declspec(align(16)) #else #include <malloc.h> #define ALIGN16 __attribute__((aligned(16))) #endif using namespace std; // Aligned memory allocation (for SIMD). static void* malloc16(size_t amount) { #ifdef _MSC_VER return _aligned_malloc(amount, 16); #else void* mem = 0; posix_memalign(&mem, 16, amount); return mem; #endif } template <class T> static T* malloc16_t(size_t num_elements) { return static_cast<T*>(malloc16(num_elements * sizeof(T))); } // Aligned free. static void free16(void* mem) { #ifdef _MSC_VER return _aligned_free(mem); #else free(mem); #endif } // Test parameters. enum {num_centroids = 512}; enum {num_points = num_centroids * 2000}; enum {num_iterations = 5}; static const float range = 10.0f; class Points { public: Points(): data(malloc16_t<Point>(num_points)) { for (int p=0; p < num_points; ++p) { const float xyz[3] = { range * static_cast<float>(rand()) / RAND_MAX, range * static_cast<float>(rand()) / RAND_MAX, range * static_cast<float>(rand()) / RAND_MAX }; init(p, xyz); } } ~Points() { free16(data); } void init(int n, const float* xyz) { data[n].centroid = -1; data[n].xyz[0] = xyz[0]; data[n].xyz[1] = xyz[1]; data[n].xyz[2] = xyz[2]; } void associate(int n, int new_centroid) { data[n].centroid = new_centroid; } int centroid(int n) const { return data[n].centroid; } float* operator[](int n) { return data[n].xyz; } private: Points(const Points&); Points& operator=(const Points&); struct Point { int centroid; float xyz[3]; }; Point* data; }; class Centroids { public: Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids)) { // Naive initial selection algorithm, but outside the // current area of interest. for (int c=0; c < num_centroids; ++c) init(c, points[c]); } ~Centroids() { free16(data); } void init(int n, const float* xyz) { data[n].count = 0; data[n].xyz[0] = xyz[0]; data[n].xyz[1] = xyz[1]; data[n].xyz[2] = xyz[2]; } void reset(int n) { data[n].count = 0; data[n].xyz[0] = 0.0f; data[n].xyz[1] = 0.0f; data[n].xyz[2] = 0.0f; } void sum(int n, const float* pt_xyz) { data[n].xyz[0] += pt_xyz[0]; data[n].xyz[1] += pt_xyz[1]; data[n].xyz[2] += pt_xyz[2]; ++data[n].count; } void average(int n) { if (data[n].count > 0) { const float inv_count = 1.0f / data[n].count; data[n].xyz[0] *= inv_count; data[n].xyz[1] *= inv_count; data[n].xyz[2] *= inv_count; } } float* operator[](int n) { return data[n].xyz; } int find_nearest(const float* pt_xyz) const { float min_dist_squared = numeric_limits<float>::max(); int min_centroid = -1; for (int c=0; c < num_centroids; ++c) { const float* cen_xyz = data[c].xyz; const float x = pt_xyz[0] - cen_xyz[0]; const float y = pt_xyz[1] - cen_xyz[1]; const float z = pt_xyz[2] - cen_xyz[2]; const float dist_squared = x*x + y*y * z*z; if (min_dist_squared > dist_squared) { min_dist_squared = dist_squared; min_centroid = c; } } return min_centroid; } private: Centroids(const Centroids&); Centroids& operator=(const Centroids&); struct Centroid { int count; float xyz[3]; }; Centroid* data; }; // A high-precision real timer would be nice, but we lack C++11 and // the coarseness of the testing here should allow this to suffice. static double sys_time() { return static_cast<double>(clock()) / CLOCKS_PER_SEC; } static void k_means(Points& points, Centroids& centroids) { // Find the closest centroid for each point. for (int p=0; p < num_points; ++p) { const float* pt_xyz = points[p]; points.associate(p, centroids.find_nearest(pt_xyz)); } // Reset the data of each centroid. for (int c=0; c < num_centroids; ++c) centroids.reset(c); // Compute new position sum of each centroid. for (int p=0; p < num_points; ++p) centroids.sum(points.centroid(p), points[p]); // Compute average position of each centroid. for (int c=0; c < num_centroids; ++c) centroids.average(c); } int main() { Points points; Centroids centroids(points); cout << "Starting simulation..." << endl; double start_time = sys_time(); for (int i=0; i < num_iterations; ++i) k_means(points, centroids); cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl; cout << "# Points: " << num_points << endl; cout << "# Centroids: " << num_centroids << endl; // Write the centroids to a file to give us some crude verification // of consistency as we make changes. ofstream out("centroids.txt"); for (int c=0; c < num_centroids; ++c) out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl; }
I know the dangers of surface testing, but since it is already considered a hot spot in previous real-world sessions, I hope this is excusable. I am also interested in general methods related to micro-optimization of such code.
I got slightly different results in profiling this. The time here is a little more evenly distributed within the loop, and I'm not sure why. Perhaps this is because the data is smaller (I skipped the elements and pulled out the min_dist member and made it a local variable). The exact ratio between centroids and points is also slightly different, but hopefully close enough to translate the improvements to the source code. It is also single-threaded in this surface test, and disassembly looks very different, so I can risk optimizing this surface test without the original (a risk that I am ready to take now, since I am more interested in expanding my knowledge of methods that could optimize these cases, not the solution for this exact case).

Update with Yochai Timmer offer - ~ 12.5 seconds
Oh, I am facing micro-optimization problems without understanding the assembly very well. I replaced this:
-if (min_dist_squared > dist_squared) -{ - min_dist_squared = dist_squared; - pt.centroid = c; -}
Wherein:
+const bool found_closer = min_dist_squared > dist_squared; +pt.centroid = bitselect(found_closer, c, pt.centroid); +min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared);
.. only to find the time increased from ~ 5.6 sec. to ~ 12.5 sec. However, this is not his fault and does not take away the value of his decision - this is mine because he does not understand what is happening at the machine level and takes punches in the dark. This, apparently, was missed, and, apparently, I was not a victim of incorrect branch prediction, as I initially thought. However, his proposed solution is a wonderful and generalized function to try in such cases, and I am grateful for adding it to my set of tips and tricks. Now for round 2.
Harold SIMD Solution - 2.496 seconds (see disclaimer)
This decision can be awesome. After converting the cluster representation to SoA, I get a time of ~ 2.5 seconds with this! Unfortunately, there seems to be some kind of glitch. I get very different results for the final result, which offers more than slight differences in accuracy, including some centroids towards the end with values โโof 0 (assuming they were not found in the search). I am trying to go through the SIMD logic with a debugger to find out what might be - it may just be a transcription error on my part, but here is the code if someone can detect the error.
If the error can be corrected without slowing down the results, this improvement in speed is more than I ever imagined from pure micro-optimization!
// New version of Centroids::find_nearest (from harold solution): int find_nearest(const float* pt_xyz) const { __m128i min_index = _mm_set_epi32(3, 2, 1, 0); __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x)); __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y)); __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z)); __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i index = min_index; for (int i=4; i < num_centroids; i += 4) { xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i)); ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i)); zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i)); __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist)); min_dist = _mm_min_ps(min_dist, dist); min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); index = _mm_add_epi32(index, _mm_set1_epi32(4)); } ALIGN16 float mdist[4]; ALIGN16 uint32_t mindex[4]; _mm_store_ps(mdist, min_dist); _mm_store_si128((__m128i*)mindex, min_index); float closest = mdist[0]; int closest_i = mindex[0]; for (int i=1; i < 4; i++) { if (mdist[i] < closest) { closest = mdist[i]; closest_i = mindex[i]; } } return closest_i; }
Harold SIMD solution (fixed) - ~ 2.5 s
After applying the corrections and testing, their results are not damaged and will work correctly with similar improvements in the source code base!
Since this concerns the holy grail of knowledge, which I tried to understand better (diskless SIMD), I am going to reward the solution with additional details for more than doubling the speed of the operation. I have homework, trying to understand this, because my goal was not only to soften this access point, but also to expand my personal understanding of possible solutions to solve them.
Nevertheless, I am grateful for all the contributions from the algorithmic sentences to the really bitselect trick! I would like to accept all the answers. At some point, I can try all of them, but now I have homework cut out in understanding some of these non-arithmetic SIMD operations.
int find_nearest_simd(const float* pt_xyz) const { __m128i min_index = _mm_set_epi32(3, 2, 1, 0); __m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]); __m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]); __m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]); __m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x)); __m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y)); __m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z)); __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i index = min_index; for (int i=4; i < num_centroids; i += 4) { xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i)); ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i)); zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i)); __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); index = _mm_add_epi32(index, _mm_set1_epi32(4)); __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist)); min_dist = _mm_min_ps(min_dist, dist); min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); } ALIGN16 float mdist[4]; ALIGN16 uint32_t mindex[4]; _mm_store_ps(mdist, min_dist); _mm_store_si128((__m128i*)mindex, min_index); float closest = mdist[0]; int closest_i = mindex[0]; for (int i=1; i < 4; i++) { if (mdist[i] < closest) { closest = mdist[i]; closest_i = mindex[i]; } } return closest_i; }