How to speed up processing of large CSV using ruby

For the project, I need to analyze some pretty large CSV files. The contents of some records are stored in a MySQL database. I am trying to speed it up using multithreading, but so far it only slows down the work.

I am parsing a CSV file (up to 10 GB), and some of these records (aprox. 5M from 20M + CSV records) need to be inserted into the MySQL database. To determine which record should be inserted, we use a Redis server with sets that contain the correct identifiers / links.

Since we process about 30 of these files at any given time, and there are some dependencies, we save each file in the Resque queue and have several servers that process these queues (priority ones).

In a nutshell:

class Worker def self.perform(file) CsvParser.each(file) do |line| next unless check_line_with_redis(line) a = ObjectA.find_or_initialize_by_reference(line[:reference]) a.object_bs.destroy_all a.update_attributes(line) end end 

This works, scales well horizontally (more CSV files = more servers), but large CSV files pose a problem. We currently have files that take more than 75 hours to analyze this path. I already thought of several optimizations:

One of them shortens MySQL queries; we create AR objects while inserting with simple SQL, if we know Id objects, much faster. That way, we can probably get rid of most of AR, and possibly even Rails, to remove overhead in this way. We cannot use simple MySQL load data, since we must map CSV records to other objects that can now have different identifiers (we merge a dozen outdated databases into a new database).

Another is trying to do more at the same time. There is some I / O latency, network latency for Redis and MySQL, and even while MRI uses green threads, this may allow us to schedule our MySQL queries while reading IO, etc. But using the following code:

 class Worker def self.perform(file) CsvParser.each(file) do |line| next unless check_line_with_redis(line) create_or_join_thread(line) do |myLine| a = ObjectA.find_or_initialize_by_reference(myLine[:reference]) a.object_bs.destroy_all a.update_attributes(myLine) end end def self.create_or_join_thread(line) @thread.join if @thread.present? @thread = Thread.new(line) do |myLine| yield myLine end end end 

This slowly slows down the process. When I ps au , it starts with 100% of the CPU, but over time, it drops to 2-3%. At this moment, he does not insert new records at all, he just hangs.

I have a strace d process, and at first I see that MySQL queries pass by, after a while it does not execute my ruby ​​code at all. It may be a dead end (it hung after parsing the last line of the CSV, but the process continued to run on 5% of the CPU and did not quit), or something that I read here: http://timetobleed.com/ruby-threading-bugfix-small -fix-goes-a-long-way /

I am using Rails 2.3.8, REE, 1.8.7-2010.02 on Ubuntu 10.10. Any insight into how to handle a large number of threads (or perhaps why not use threads here at all) is greatly appreciated!

+7
source share
2 answers

Do you have indexes on these tables?

Could you temporarily disable these indexes during your bulk inserts?

Before doing bulk inserts, we will disable index keys:

  ALTER TABLE foo DISABLE KEYS

After we finish, we will enable the index keys:

  ALTER TABLE foo ENABLE KEYS

From the docs:

ALTER TABLE ... DISABLE KEYS tells MySQL to stop updating non-ideal indexes. ALTER TABLE ... ENABLE KEYS should be used to recreate the missing indexes. MySQL does this with a special algorithm, which is much faster than inserting keys one by one, so disabling keys before performing bulk insert operations should give significant speedup. Using ALTER TABLE ... DISABLE KEYS requires the INDEX privilege in addition to the privileges mentioned earlier. Although non-index indices are disabled, they are ignored for statements such as SELECT and EXPLAIN that would otherwise use them.
+1
source

You can try to wrap it all in one transaction - this will probably make a big difference:

 class Worker def self.perform(file) ObjectA.transaction do CsvParser.each(file) do |line| next unless check_line_with_redis(line) a = ObjectA.find_or_initialize_by_reference(line[:reference]) a.object_bs.destroy_all a.update_attributes(line) end end end end 

Otherwise, each save will be wrapped in its own transaction. Although for a 10 gigabyte file, you probably want to break it into pieces, such as 1000 attachments per transaction or something like that.

+1
source

All Articles