Loading all lines from cassandra using multiple (python) clients in parallel

When using Cassandra, RandomPartitioner (or Murmur3Partitioner) is recommended, it is impossible to make meaningful range requests on keys, because the rows are distributed around the cluster using the md5 key hash . These hashes are called tokens.

Nevertheless, it would be very useful to divide a large table among many computing workers, assigning each a series of tokens. Using CQL3, it seems possible to issue queries directly against tokens , however the following python does not work ... EDIT: works after switching to testing with the latest cassandra database (doh!), And also updates the syntax in the notes below:

## use python cql module import cql ## If running against an old version of Cassandra, this raises: ## TApplicationException: Invalid method name: 'set_cql_version' conn = cql.connect('localhost', cql_version='3.0.2') cursor = conn.cursor() try: ## remove the previous attempt to make this work cursor.execute('DROP KEYSPACE test;') except Exception, exc: print exc ## make a keyspace and a simple table cursor.execute("CREATE KEYSPACE test WITH strategy_class = 'SimpleStrategy' AND strategy_options:replication_factor = 1;") cursor.execute("USE test;") cursor.execute('CREATE TABLE data (k int PRIMARY KEY, v varchar);') ## put some data in the table -- must use single quotes around literals, not double quotes cursor.execute("INSERT INTO data (k, v) VALUES (0, 'a');") cursor.execute("INSERT INTO data (k, v) VALUES (1, 'b');") cursor.execute("INSERT INTO data (k, v) VALUES (2, 'c');") cursor.execute("INSERT INTO data (k, v) VALUES (3, 'd');") ## split up the full range of tokens. ## Suppose there are 2**k workers: k = 3 # --> eight workers token_sub_range = 2**(127 - k) worker_num = 2 # for example start_token = worker_num * token_sub_range end_token = (1 + worker_num) * token_sub_range ## put single quotes around the token strings cql3_command = "SELECT k, v FROM data WHERE token(k) >= '%d' AND token(k) < '%d';" % (start_token, end_token) print cql3_command ## this fails with "ProgrammingError: Bad Request: line 1:28 no viable alternative at input 'token'" cursor.execute(cql3_command) for row in cursor: print row cursor.close() conn.close() 

Ideally, I would like to make this work with pycassa, because I prefer its more pythonic interface.

Is there a better way to do this?

+4
source share
2 answers

I updated the question to contain the answer.

+1
source

This is not CQL3, but a simple program that reads all (pickled) data belonging to localhost, directly using the Thrift interface. This can be used to create a simple map / reduce mechanism with Cassandra as a backend. Each node will run something like this for map () on data belonging to itself, thereby not having the network overhead to retrieve the data. Then the result will be sent back to the reduce () phase on a separate node.

Obviously this does not work for vnodes in Cassandra1.2 +. Now I am using the indexing method, which allows the use of map () for smaller subsets of local data and support for vnodes.

 #!/usr/bin/env python2.7 import sys import socket import cPickle as pickle from thrift import Thrift from thrift.transport import TTransport from thrift.transport import TSocket from pycassa.cassandra import Cassandra from pycassa.cassandra.ttypes import * import time import pprint def main(): jobname = sys.argv[1] pp = pprint.PrettyPrinter(indent=2) (client, transport) = connect("localhost") # Determine local IP address ip = socket.gethostbyname(socket.gethostname()) # Set up query keyspace = "data" column_parent = ColumnParent(column_family=foo) try: # Find range of tokens for which this node is first replica for tokenrange in client.describe_ring(keyspace): if tokenrange.endpoints[0] == ip: start_token=tokenrange.start_token end_token=tokenrange.end_token break # Set kesypace client.set_keyspace(keyspace) # Query for all data owned by this node slice_range = SliceRange(start="", finish="") predicate = SlicePredicate(slice_range=slice_range) keyrange = KeyRange(start_token=start_token, end_token=end_token, count=10000) t0 = time.time() ptime = 0 keycount = 0 start="" for keyslice in client.get_range_slices(column_parent, predicate, keyrange, ConsistencyLevel.ONE): keycount += 1 for col in keyslice.columns: pt0 = time.time() data = pickle.loads(col.column.value) ptime += time.time() - pt0 except Thrift.TException, tx: print 'Thrift: %s' % tx.message finally: disconnect(transport) t1 = time.time() - t0 print "Read data for %d tasks in: %.2gs" %(keycount, t1) print "Job unpickling time: %.2gs" %ptime print "Unpickling percentage: %.2f%%" %(ptime/t1*100) def connect(host): """ Connect to cassandra instance on given host. Returns: Cassandra.Client object """ socket = TSocket.TSocket(host, 9160) transport = TTransport.TFramedTransport(socket) protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport) transport.open() client = Cassandra.Client(protocol) return (client, transport) def disconnect(transport): """ Disconnect from cassandra instance """ transport.close() if __name__ == '__main__': main() 
0
source

All Articles