An optimized way to find the M largest elements in an NxN array using C ++

I need a quick and quick way to find the 2D positions and values ​​of the M largest elements in an NxN array.

right now i am doing this:

struct SourcePoint { Point point; float value; } SourcePoint* maxValues = new SourcePoint[ M ]; maxCoefficients = new SourcePoint*[ for (int j = 0; j < rows; j++) { for (int i = 0; i < cols; i++) { float sample = arr[i][j]; if (sample > maxValues[0].value) { int q = 1; while ( sample > maxValues[q].value && q < M ) { maxValues[q-1] = maxValues[q]; // shuffle the values back q++; } maxValues[q-1].value = sample; maxValues[q-1].point = Point(i,j); } } } 

The point structure is just two ints - x and y.

This code basically does an insert that includes values. maxValues ​​[0] always contains the SourcePoint with the lowest value, which still holds it at the top M values ​​that have been used so far. This gives us quick and easy salvation, if sample <= maxValues, we do nothing. The problem I am facing is shuffling every time a new best value is found. He works his way up to maxValues ​​until he finds it, shuffling all the elements of maxValues ​​to make room for himself.

I get to the point where I’m ready to take a peek at SIMD solutions or optimize cache memory, as it looks like there is an overly caching stream. Reducing the cost of this operation will dramatically affect the performance of my general algorithm, since it is called many times and amounts to 60-80% of my total cost.

I tried using std :: vector and make_heap, but I think that the overhead of creating the heap outweighed the savings in heap operations. This is probably due to the fact that M and N are usually small. M is usually 10-20 and N 10-30 (NxN 100 - 900). The problem is that this operation is called multiple times and cannot be precomputed.

I just thought about preloading the first elements of M maxValues, which can provide a small saving. In the current algorithm, the first M-elements are guaranteed to shuffle to the very beginning, in order to fill maxValues ​​first.

Any help from an optimization guru would be greatly appreciated :)

+7
source share
10 answers

A few ideas you can try. In some quick tests with N = 100 and M = 15, I was able to get it 25% faster in VC ++ 2010, but test it yourself to see if this helps in your case. Some of these changes may not have or even have a negative effect, depending on the actual use / data optimization and the compiler.

  • Do not maxValues new maxValues array every time unless you need to. Using a stack variable instead of dynamic allocation gives me + 5%.
  • Changing g_Source[i][j] to g_Source[j][i] gives you a bit (not as much as I thought it would).
  • Using the SourcePoint1 structure shown below gives me a few more percent.
  • The largest increase of about + 15% consisted in replacing the local variable sample with g_Source[j][i] . The compiler is probably smart enough to optimize multiple reads in an array, which it cannot do if you use a local variable.
  • An attempt at a simple binary search resulted in a slight loss of a few percent. For large M / Ns, you are likely to see an advantage.
  • If possible, try to keep the original data in arr[][] sorted, at least partially. Ideally, you want to generate maxValues[] at the same time as creating the source data.
  • See how data is created / saved / organized to give you templates or information to reduce the amount of time it maxValues[] to create the maxValues[] array. For example, at best, you can find a formula that gives you the upper M-coordinates without having to repeat and sort.

Code above:

 struct SourcePoint1 { int x; int y; float value; int test; //Play with manual/compiler padding if needed }; 
+5
source

If you want to switch to microoptimization at this stage, the simple first step is to get rid of Point and just use both dimensions in one int. This reduces the amount of data that you need to move, and SourcePoint gets up to two long powers, which makes indexing in it easier.

Also, are you sure that sorting the list is better than just rearranging, which item is the lowest after every time you shift the old lowest?

+4
source

(Updated 22:37 UTC 2011-08-20)

I suggest a fixed-size binary mini-heap holding the M largest elements (but still in mini-heap order!). This will probably not be so fast in practice, since I think that sorting OPs attachments probably has decent performance in the real world (at least when the recommendations of other posters in this thread are taken into account).

The look in case of failure should be constant: if the current element is less than the minimum element of the heap (containing the maximum elements of M), we can completely reject it.

If it turns out that we have an element larger than the current minimum of the heap (Mth is the largest element), we extract (discard) the previous min and insert a new element.

If items are needed in sorted order, the heap can be sorted afterwards.

First attempt at a minimal C ++ implementation:

 template<unsigned size, typename T> class m_heap { private: T nodes[size]; static const unsigned last = size - 1; static unsigned parent(unsigned i) { return (i - 1) / 2; } static unsigned left(unsigned i) { return i * 2; } static unsigned right(unsigned i) { return i * 2 + 1; } void bubble_down(unsigned int i) { for (;;) { unsigned j = i; if (left(i) < size && nodes[left(i)] < nodes[i]) j = left(i); if (right(i) < size && nodes[right(i)] < nodes[j]) j = right(i); if (i != j) { swap(nodes[i], nodes[j]); i = j; } else { break; } } } void bubble_up(unsigned i) { while (i > 0 && nodes[i] < nodes[parent(i)]) { swap(nodes[parent(i)], nodes[i]); i = parent(i); } } public: m_heap() { for (unsigned i = 0; i < size; i++) { nodes[i] = numeric_limits<T>::min(); } } void add(const T& x) { if (x < nodes[0]) { // reject outright return; } nodes[0] = x; swap(nodes[0], nodes[last]); bubble_down(0); } }; 

Small test / usage example:

 #include <iostream> #include <limits> #include <algorithm> #include <vector> #include <stdlib.h> #include <assert.h> #include <math.h> using namespace std; // INCLUDE TEMPLATED CLASS FROM ABOVE typedef vector<float> vf; bool compare(float a, float b) { return a > b; } int main() { int N = 2000; vf v; for (int i = 0; i < N; i++) v.push_back( rand()*1e6 / RAND_MAX); static const int M = 50; m_heap<M, float> h; for (int i = 0; i < N; i++) h.add( v[i] ); sort(v.begin(), v.end(), compare); vf heap(h.get(), h.get() + M); // assume public in m_heap: T* get() { return nodes; } sort(heap.begin(), heap.end(), compare); cout << "Real\tFake" << endl; for (int i = 0; i < M; i++) { cout << v[i] << "\t" << heap[i] << endl; if (fabs(v[i] - heap[i]) > 1e-5) abort(); } } 
+4
source

You are looking for a priority queue :

 template < class T, class Container = vector<T>, class Compare = less<typename Container::value_type> > class priority_queue; 

You will need to find the best base container to use, and perhaps define a Compare function to work with your Point type.

If you want to optimize it, you can start the queue on each row of your matrix in your workflow, and then run the algorithm to select the largest element of the fronts of the queue until you have your M elements.

+2
source

A quick optimization should be to add a step value to your maxValues array. If you have maxValues[M].value equal to std::numeric_limits<float>::max() , you can exclude the q < M test in its while while state.

+2
source

One idea would be to use the std::partial_sort algorithm in a simple one-dimensional sequence of links to your NxN array. Perhaps you can also cache this sequence of links for subsequent calls. I don’t know how well it works, but it’s worth a try - if it works well enough, you don’t have much “magic”. In particular, you do not resort to microoptimization.

Consider this showcase:

 #include <algorithm> #include <iostream> #include <vector> #include <stddef.h> static const int M = 15; static const int N = 20; // Represents a reference to a sample of some two-dimensional array class Sample { public: Sample( float *arr, size_t row, size_t col ) : m_arr( arr ), m_row( row ), m_col( col ) { } inline operator float() const { return m_arr[m_row * N + m_col]; } bool operator<( const Sample &rhs ) const { return (float)other < (float)*this; } int row() const { return m_row; } int col() const { return m_col; } private: float *m_arr; size_t m_row; size_t m_col; }; int main() { // Setup a demo array float arr[N][N]; memset( arr, 0, sizeof( arr ) ); // Put in some sample values arr[2][1] = 5.0; arr[9][11] = 2.0; arr[5][4] = 4.0; arr[15][7] = 3.0; arr[12][19] = 1.0; // Setup the sequence of references into this array; you could keep // a copy of this sequence around to reuse it later, I think. std::vector<Sample> samples; samples.reserve( N * N ); for ( size_t row = 0; row < N; ++row ) { for ( size_t col = 0; col < N; ++col ) { samples.push_back( Sample( (float *)arr, row, col ) ); } } // Let partial_sort find the M largest entry std::partial_sort( samples.begin(), samples.begin() + M, samples.end() ); // Print out the row/column of the M largest entries. for ( std::vector<Sample>::size_type i = 0; i < M; ++i ) { std::cout << "#" << (i + 1) << " is " << (float)samples[i] << " at " << samples[i].row() << "/" << samples[i].col() << std::endl; } } 
+1
source

First of all, you are going through the array in the wrong order!

You always, always, always want to scan from memory linearly. This means that the last index of your array should change quickly. So instead:

 for (int j = 0; j < rows; j++) { for (int i = 0; i < cols; i++) { float sample = arr[i][j]; 

Try the following:

 for (int i = 0; i < cols; i++) { for (int j = 0; j < rows; j++) { float sample = arr[i][j]; 

I predict that this will be more important than any other change.

Then I will use the heap instead of the sorted array. The standard <algorithm> header already has the push_heap and pop_heap for using the heap vector. (This probably won't help with that, though if M pretty big. For small M and a randomized array, you are not completing all that many inserts are average ... Something like O (log N) I believe. )

Next, use SSE2. But this is peanuts compared to going through memory in the correct order.

+1
source

You can get almost linear acceleration with parallel processing.

With N processors, you can process a range of rows/N (and all columns) with each processor, finding the top M entries in each range. And then do a sort to find the common top of M

Perhaps you could do this with SIMD as well (but here you would separate the task by alternating the columns rather than grouping the rows). Do not try to get SIMD to insert the sort faster, and then do more insert sorts, which you combine at the end, using one very quick step.

Naturally, you could perform both multithreading and SIMD, but for a problem that is only 30x30, this is hardly worth it.

0
source

I tried replacing the float with a double , and this is interesting, which allowed me to improve the speed by about 20% (using VC ++ 2008). This is a bit controversial, but modern processors or compilers seem to be optimized for double-cost processing.

0
source

Use the linked list to keep the best, but the M values. You still have to iterate over it to find the right place, but the insert is O (1). This is probably even better than binary searching and inserting O (N) + O (1) vs O (log (n)) + O (N). Change the fors so that you do not get access to every element N in memory and cache cache.


LE: Throwing another idea that can work for evenly distributed values.
Find min, max in comparison 3/2 * O (N ^ 2).
Create anywhere from N to N ^ 2 evenly spaced buckets, preferably closer to N ^ 2 than N.
For each element of the NxN matrix, put it in the bucket [(int) (value-min) / range], range = max-min.
Finally, create a set from the highest bucket to the lowest, add items from other buckets to it, and | current set | + | next bucket | <. = M
If you get M items, you are done. Most likely, you will get fewer elements than M, say P.
Apply your algorithm for the remaining bucket and get the biggest MP elements from it.
If the elements are homogeneous and you use N ^ 2 buckets, then the complexity is about 3.5 * (N ^ 2) versus your current solution, which is about O (N ^ 2) * ln (M).

-one
source

All Articles