This is not CQL3, but a simple program that reads all (pickled) data belonging to localhost, directly using the Thrift interface. This can be used to create a simple map / reduce mechanism with Cassandra as a backend. Each node will run something like this for map () on data belonging to itself, thereby not having the network overhead to retrieve the data. Then the result will be sent back to the reduce () phase on a separate node.
Obviously this does not work for vnodes in Cassandra1.2 +. Now I am using the indexing method, which allows the use of map () for smaller subsets of local data and support for vnodes.
#!/usr/bin/env python2.7 import sys import socket import cPickle as pickle from thrift import Thrift from thrift.transport import TTransport from thrift.transport import TSocket from pycassa.cassandra import Cassandra from pycassa.cassandra.ttypes import * import time import pprint def main(): jobname = sys.argv[1] pp = pprint.PrettyPrinter(indent=2) (client, transport) = connect("localhost") # Determine local IP address ip = socket.gethostbyname(socket.gethostname()) # Set up query keyspace = "data" column_parent = ColumnParent(column_family=foo) try: # Find range of tokens for which this node is first replica for tokenrange in client.describe_ring(keyspace): if tokenrange.endpoints[0] == ip: start_token=tokenrange.start_token end_token=tokenrange.end_token break # Set kesypace client.set_keyspace(keyspace) # Query for all data owned by this node slice_range = SliceRange(start="", finish="") predicate = SlicePredicate(slice_range=slice_range) keyrange = KeyRange(start_token=start_token, end_token=end_token, count=10000) t0 = time.time() ptime = 0 keycount = 0 start="" for keyslice in client.get_range_slices(column_parent, predicate, keyrange, ConsistencyLevel.ONE): keycount += 1 for col in keyslice.columns: pt0 = time.time() data = pickle.loads(col.column.value) ptime += time.time() - pt0 except Thrift.TException, tx: print 'Thrift: %s' % tx.message finally: disconnect(transport) t1 = time.time() - t0 print "Read data for %d tasks in: %.2gs" %(keycount, t1) print "Job unpickling time: %.2gs" %ptime print "Unpickling percentage: %.2f%%" %(ptime/t1*100) def connect(host): """ Connect to cassandra instance on given host. Returns: Cassandra.Client object """ socket = TSocket.TSocket(host, 9160) transport = TTransport.TFramedTransport(socket) protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport) transport.open() client = Cassandra.Client(protocol) return (client, transport) def disconnect(transport): """ Disconnect from cassandra instance """ transport.close() if __name__ == '__main__': main()
source share