For the project, I need to analyze some pretty large CSV files. The contents of some records are stored in a MySQL database. I am trying to speed it up using multithreading, but so far it only slows down the work.
I am parsing a CSV file (up to 10 GB), and some of these records (aprox. 5M from 20M + CSV records) need to be inserted into the MySQL database. To determine which record should be inserted, we use a Redis server with sets that contain the correct identifiers / links.
Since we process about 30 of these files at any given time, and there are some dependencies, we save each file in the Resque queue and have several servers that process these queues (priority ones).
In a nutshell:
class Worker def self.perform(file) CsvParser.each(file) do |line| next unless check_line_with_redis(line) a = ObjectA.find_or_initialize_by_reference(line[:reference]) a.object_bs.destroy_all a.update_attributes(line) end end
This works, scales well horizontally (more CSV files = more servers), but large CSV files pose a problem. We currently have files that take more than 75 hours to analyze this path. I already thought of several optimizations:
One of them shortens MySQL queries; we create AR objects while inserting with simple SQL, if we know Id objects, much faster. That way, we can probably get rid of most of AR, and possibly even Rails, to remove overhead in this way. We cannot use simple MySQL load data, since we must map CSV records to other objects that can now have different identifiers (we merge a dozen outdated databases into a new database).
Another is trying to do more at the same time. There is some I / O latency, network latency for Redis and MySQL, and even while MRI uses green threads, this may allow us to schedule our MySQL queries while reading IO, etc. But using the following code:
class Worker def self.perform(file) CsvParser.each(file) do |line| next unless check_line_with_redis(line) create_or_join_thread(line) do |myLine| a = ObjectA.find_or_initialize_by_reference(myLine[:reference]) a.object_bs.destroy_all a.update_attributes(myLine) end end def self.create_or_join_thread(line) @thread.join if @thread.present? @thread = Thread.new(line) do |myLine| yield myLine end end end
This slowly slows down the process. When I ps au , it starts with 100% of the CPU, but over time, it drops to 2-3%. At this moment, he does not insert new records at all, he just hangs.
I have a strace d process, and at first I see that MySQL queries pass by, after a while it does not execute my ruby ββcode at all. It may be a dead end (it hung after parsing the last line of the CSV, but the process continued to run on 5% of the CPU and did not quit), or something that I read here: http://timetobleed.com/ruby-threading-bugfix-small -fix-goes-a-long-way /
I am using Rails 2.3.8, REE, 1.8.7-2010.02 on Ubuntu 10.10. Any insight into how to handle a large number of threads (or perhaps why not use threads here at all) is greatly appreciated!