Optimization: dropping JSON from streaming API in Mongo

Background: I have a python module designed to capture JSON objects from the streaming API and store them (bulk insert of 25 pieces at a time) in MongoDB using pymongo. For comparison, I also have a bash command for curl from the same streaming API and pipe to mongoimport . Both of these approaches store data in separate collections.

I periodically track the count() collections to check how they are charged.

So far I see that the python module is about 1000 JSON objects behind the curl | mongoimport curl | mongoimport .

Problem: How to optimize my python module that will sync with curl | mongoimport curl | mongoimport ?

I can not use tweetstream , since I do not use the Twitter API, but a third-party streaming service.

Can someone please help me here?

python module:

 class StreamReader: def __init__(self): try: self.buff = "" self.tweet = "" self.chunk_count = 0 self.tweet_list = [] self.string_buffer = cStringIO.StringIO() self.mongo = pymongo.Connection(DB_HOST) self.db = self.mongo[DB_NAME] self.raw_tweets = self.db["raw_tweets_gnip"] self.conn = pycurl.Curl() self.conn.setopt(pycurl.ENCODING, 'gzip') self.conn.setopt(pycurl.URL, STREAM_URL) self.conn.setopt(pycurl.USERPWD, AUTH) self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data) self.conn.perform() except Exception as ex: print "error ocurred : %s" % str(ex) def handle_data(self, data): try: self.string_buffer = cStringIO.StringIO(data) for line in self.string_buffer: try: self.tweet = json.loads(line) except Exception as json_ex: print "JSON Exception occurred: %s" % str(json_ex) continue if self.tweet: try: self.tweet_list.append(self.tweet) self.chunk_count += 1 if self.chunk_count % 1000 == 0 self.raw_tweets.insert(self.tweet_list) self.chunk_count = 0 self.tweet_list = [] except Exception as insert_ex: print "Error inserting tweet: %s" % str(insert_ex) continue except Exception as ex: print "Exception occurred: %s" % str(ex) print repr(self.buff) def __del__(self): self.string_buffer.close() 

Thanks for reading.

+8
json python mongodb pymongo
source share
2 answers

Got an exemption from the StringIO library. As a callback, the WRITEFUNCTION handle_data is called for each row in this case, just load the JSON directly. Sometimes, however, there can be two JSON objects in the data. Sorry, I cannot publish the curl command that I use, because it contains our credentials. But, as I said, this is a common problem that applies to any streaming API.

 def handle_data(self, buf): try: self.tweet = json.loads(buf) except Exception as json_ex: self.data_list = buf.split('\r\n') for data in self.data_list: self.tweet_list.append(json.loads(data)) 
+1
source share

There was an error in your code originally.

  if self.chunk_count % 50 == 0 self.raw_tweets.insert(self.tweet_list) self.chunk_count = 0 

You reset chunk_count, but you do not reset tweet_list. So, the second time you are trying to insert 100 items (50 new and 50 that have already been sent to the database before). You fixed it, but you still see the difference in performance.

The whole thing of party size turns out to be a red herring. I tried using a large json file and uploading it via python and uploading it via mongoimport, and Python was always faster (even in safe mode - see below).

After carefully reviewing your code, I realized that the problem is that the streaming API is actually passing data to you in pieces. You just have to take these pieces and put them in the database (which mongoimport does). The extra work your python does is to split the stream, add it to the list, and then periodically send batches to Mongo, probably the difference between what I see and what you see.

Try this snippet for your handle_data ()

 def handle_data(self, data): try: string_buffer = StringIO(data) tweets = json.load(string_buffer) except Exception as ex: print "Exception occurred: %s" % str(ex) try: self.raw_tweets.insert(tweets) except Exception as ex: print "Exception occurred: %s" % str(ex) 

It should be noted that your python inserts do not work in "safe mode" - you must change this by adding the safe=True argument to your insert statement. After that, you will get an exception in any insert that fails, and your try / catch will print an error prone to the problem.

It's not that expensive in performance: I'm currently testing, and after about five minutes, the sizes of the two collections are 14120 14113.

+3
source share

All Articles