I need to store about 250 numerical values ββper second per client, which is about 900 thousand numbers per hour. This will probably not be an all-day recording (probably from 5 to 10 hours a day), but I will break down my data based on the client ID and the day of reading. The maximum line length is about 22-23 M, which is still controllable. Neverteless, my circuit looks like this:
CREATE TABLE measurement ( clientid text, date text, event_time timestamp, value int, PRIMARY KEY ((clientid,date), event_time) );
The key space has a replication factor of 2, for testing only, the Snitch GossipingPropertyFileSnitch and NetworkTopologyStrategy . I know that a replication ratio of 3 is more standard.
Then I created a small cluster on company servers, three bare metal virtualized computers with two processors x 2 cores and 16 GB of RAM and a lot of space. I am with them in a gigabit LAN. The cluster works based on nodetool.
Here is the code I'm using to test my installation:
Cluster cluster = Cluster.builder() .addContactPoint("192.168.1.100") .addContactPoint("192.168.1.102") .build(); Session session = cluster.connect(); DateTime time = DateTime.now(); BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true); try { ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)"; PreparedStatement preparedStatement = session.prepare(insertQuery); BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also //generating the entries for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements time = time.plus(4); //4ms between each entry BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important batch.add(bound); //The batch statement must have 65535 statements at most if (batch.size() >= 65534) { queryQueue.put(batch); batch = new BatchStatement(); } } queryQueue.put(batch); //the last batch, perhaps shorter than 65535 //storing the data System.out.println("Starting storing"); while (!queryQueue.isEmpty()) { pool.execute(() -> { try { long threadId = Thread.currentThread().getId(); System.out.println("Started: " + threadId); BatchStatement statement = queryQueue.take(); long start2 = System.currentTimeMillis(); session.execute(statement); System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2)); } catch (Exception ex) { System.out.println(ex.toString()); } }); } pool.shutdown(); pool.awaitTermination(120,TimeUnit.SECONDS); } catch (Exception ex) { System.out.println(ex.toString()); } finally { session.close(); cluster.close(); }
I came up with code while reading posts here and on other blogs and sites. As I understand it, itβs important for the client to use multiple threads, so I did it. I also tried using asynchronous operations.
The final result is no matter which approach I use, one batch is completed in 5-6 seconds, although it may take up to 10. It takes the same if I enter only one batch (so, only ~ 65k columns), or if I use a silent single-threaded application. Honestly, I expected a little more. Moreover, I get more or less similar performance on my laptop with a local instance.
The second, perhaps more important issue, is the exceptions that I encounter in an unpredictable way. These two:
com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout while writing a request with ONE consistency (1 copy required but only 0 confirmed entries)
and
com.datastax.driver.core.exceptions.NoHostAvailableException: all host (s) tried for the request failed (tried: /192.168.1.102: 9042 (com.datastax.driver.core.TransportException: [/192.168.1.102:9042 ] Connection closed), / 192.168.1.100:9042 (com.datastax.driver.core.TransportException: [/192.168.1.100:9042] Connection closed), / 192.168.1.101:9042 (com.datastax.driver.core.TransportException : [/ 192.168.1.101:9042] Connection closed))
On the bottom line, am I doing something wrong? Should I reorganize the way I load data or change the schema. I tried to reduce the length of the string (so I have 12 hour strings), but that didn't really matter.
================================ Update:
I was rude and forgot to insert the sample code that I used after answering the question. It works pretty well, however I continue my research with KairosDB and binary transfer with Astyanax. It looks like I can get much better performance with them on CQL, although KairosDB can have problems when they are overloaded (but I'm working on it), and Astyanax is a bit verbose for use to my taste. However, here is the code, maybe I was mistaken somewhere.
The slot number of the semaphore does not affect performance when moving above 5000, its almost constant.
String insertQuery = "insert into keyspace.measurement (userid,time_by_hour,time,value) values (?, ?, ?, ?)"; PreparedStatement preparedStatement = session.prepare(insertQuery); Semaphore semaphore = new Semaphore(15000); System.out.println("Starting " + Thread.currentThread().getId()); DateTime time = DateTime.parse("2015-01-05T12:00:00"); //generating the entries long start = System.currentTimeMillis(); for (int i = 0; i < 900000; i++) { BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important semaphore.acquire(); ResultSetFuture resultSetFuture = session.executeAsync(statement); Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() { @Override public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) { semaphore.release(); } @Override public void onFailure(Throwable throwable) { System.out.println("Error: " + throwable.toString()); semaphore.release(); } }); time = time.plus(4); //4ms between each entry }