How to efficiently calculate the internal product of two dictionaries

Suppose I represent a vector function using a dictionary (why? Because I know that functions are sparse, but more on that later).

How to implement the inner product of two such dictionaries (denoted by A, B)

I tried the naive approach:

for k in A: if k in B: sum += A[k] * B[k] 

but it turns out to be slow.

Additional Information:

  • I use a dictionary to represent functions because

    • Function keys are lines
    • Possible keys ~ 20K
    • Each vector is sparse (say, about 1000 non-zero elements).
  • I’m really interested in calculating the pairwise scalar product N = 2000 of different dictionaries (that is, their linear core).

+7
python linear-algebra algebra linear
source share
5 answers

Here is my answer (following @ valentin-clement suggestion):

First I wrap scipy.sparse dok_matrix. The idea is to assign an index to each of the possible attributes.

 import scipy.sparse as sps import numpy as np class MSK: # DD is a dict of dict, whose values are of type float. # features - the set of possible features keys def __init__(self, DD, features): self.features = {k: j for (j, k) in enumerate(features)} self.strings = DD.keys() n = len(self.strings) d = len(self.features) self.M = sps.dok_matrix((n, d), dtype=np.float64) for (i, s) in enumerate(self.strings): v = DD[s] for k in v: j = self.features[k] self.M[i, j] = v[k] 

And we test using the following code, where the number of elements is 800, the dimension is also 800, but the sparsity is 200 (exactly 200 elements are non-zero).

 np.random.seed(1) N = 800 DD = dict() R = range(N) for i in xrange(N): DD[i] = dict() S = np.random.permutation(R) S = S[:N/4] for j in S: DD[i][j] = np.random.randn(1)[0] K = MSK(DD, R) import cProfile cProfile.runctx("A = KM * KMT", globals(), locals()) print A.todense() 

Output:

 2080520 function calls (2080519 primitive calls) in 2.884 seconds 

Let's say 3 seconds. My naive implementation (using @Joowani if-statement) takes about 19 seconds.

(MSK stands for MatrixSparseKeys)

+1
source share

Hmm, it seems your approach is actually the best for dense vectors:

 >>> # Eric answer >>> timeit.timeit('sum([A[k]*B[k] for k in set(A.keys()) & set(B.keys())])', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100))', number=10000) 0.4360210521285808 >>> # My comment >>> timeit.timeit('for k,v in A.iteritems(): sum += v*B.get(k,0)', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100));sum=0', number=10000) 0.4082838999682963 # My comment, more compact >>> timeit.timeit('sum(v*B.get(k,0) for k,v in A.iteritems())', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100))', number=10000) 0.38053266868496394 >>> #Your approach >>> timeit.timeit('for k in A: sum += A[k]*B[k] if k in B else 0.', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100));sum=0', number=10000) 0.35574231962510794 >>> # Your approach, more compact >>> timeit.timeit('sum(A[k]*B[k] for k in A if k in B)', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100))', number=10000) 0.3400850549682559 

For rarer answers, Eric works better, but yours is still the fastest:

 # Mine >>> timeit.timeit('sum(v*B.get(k,0) for k,v in A.iteritems())', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=10000) 0.1390782696843189 # Eric's >>> timeit.timeit('sum([A[k]*B[k] for k in set(A.keys()) & set(B.keys())])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=10000) 0.11702822992151596 # Yours >>> timeit.timeit('sum(A[k]*B[k] for k in A if k in B)', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=10000) 0.07878250570843193 

EDIT

Having a little worried, it seems that sum([x for x ...]) much faster than sum(x for x in ...) . Rebenchmarking with this and Janne's remark for the keys in Eric's answer, yours is still at the top (with Joowani gives a slight improvement):

 >>> timeit.timeit('sum([v*B.get(k,0) for k,v in A.items()])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 1.1604375791416714 >>> timeit.timeit('sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 0.9234189571552633 >>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 0.5411289579401455 >>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 0.5198972138696263 

Scaling to very large sizes, you see exactly the same pattern:

 >>> #Mine >>> timeit.timeit('sum([v*B.get(k,0) for k,v in A.iteritems()])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 45.328807250833506 >>> #Eric's >>> timeit.timeit('sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 28.042937058640973 >>> #Yours >>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 16.55080344861699 >>> #Joowani's >>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 15.485236119691308 

I think the Joowani trick here doesn’t improve, because the vectors are about the same size, but depending on your problem (if some vectors are ridiculously smaller than others), this may be more significant ...

CHANGE AGAIN

Oops, it looks like I should have taken some more coffee before publishing ... As Eric noted (although I completely missed it ...), defining the array in setup saves it the same for all tests, which is not the best way to benchmark. When testing random PROPER vectors, the results do not differ significantly, but for completeness:

 >>> timeit.timeit('mine(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 6.294158102577967 >>> timeit.timeit('erics(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 6.068052507449011 >>> timeit.timeit('yours(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 5.745110704570834 >>> timeit.timeit('joowanis(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 5.737499445367575 

To scale:

 >>> timeit.timeit('mine(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 5.0510995368395015 >>> timeit.timeit('erics(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 4.350612399185138 >>> timeit.timeit('yours(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 4.15619379016789 >>> timeit.timeit('joowanis(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 4.185129374341159 

I think the bottom line is that you cannot expect significant acceleration by skillfully reordering your expressions for these kinds of things ... Perhaps you could try to make the number part in C / Cython or use the Scipy Sparse package?

+7
source share

Not sure if faster, but here's a different approach:

 keys = A.viewkeys() & B.viewkeys() the_sum = sum(a[k] * b[k] for k in keys) 
+6
source share

In the case where A is much longer than B, can this help?

 if len(A) > len(B): A, B = B, A for k in A: if k in B: the_sum += A[k] * B[k] 
+2
source share

You should try using namedtuples instead of dict.

 from collections import namedtuple A = dict B = dict _A = namedtuple('_A', A.keys()) _B = namedtuple('_B', B.keys()) DictA = _A(**A) DictB = _B(**B) 

and then use them as a dict. more information about namedtuples here: What is "named tuples" in Python?

+1
source share

All Articles