Primary Queues and Threads

I am trying to do the following:

  • Is there a stream that reads data from a very large file that speaks of 10 GB and put it in the queue. (I do not want the queue to be very large too)

  • While the buildQueue thread buildQueue pushing data into the queue at the same time, there are about 5 de-queue worker threads and process data.

I tried, but my other threads are not available due to the continuous loop in my buildQueue thread.

My approach may be completely wrong. Thanks for any help, this is much appreciated.

Here is the code for buildQueue :

 sub buildQueue { print "Enter a file name: "; my $dict_path = <STDIN>; chomp($dict_path); open DICT_FILE, $dict_path or die("Sorry, could not open file!"); while (1) { if (<DICT_FILE>) { if ($queue->pending() < 100) { my $query = <DICT_FILE>; chomp($query); $queue->enqueue($query); my $count = $queue->pending(); print "Queue Size: $count Query: $query\n"; } } } } 

And as I expected when this thread will be executed, nothing else after it is executed, because this thread will not be completed.

 my $builder = new Thread(&buildQueue); 

Since the constructor thread will work for a long time, I never create workflows.

Here is the whole code:

 #!/usr/bin/perl -w use strict; use Thread; use Thread::Queue; my $queue = new Thread::Queue(); my @threads; sub buildQueue { print "Enter a file name: "; my $dict_path = <STDIN>; chomp($dict_path); open dict_file, $dict_path or die("Sorry, could not open file!"); while (1) { if (<dict_file>) { if ($queue->pending() < 100) { my $query = <dict_file>; chomp($query); $queue->enqueue($query); my $count = $queue->pending(); print "Queue Size: $count Query: $query\n"; } } } } sub processor { my $query; while (1) { if ($query = $queue->dequeue) { print "$query\n"; } } } my $builder = new Thread(&buildQueue); push @threads, new Thread(&processor) for 1..5; 
+7
source share
4 answers

You will need to note when you want your threads to exit (via join or detach ). The problem is that you have endless loops without last statements to break out of them is also a problem.

Edit: I also forgot a very important role! Each worker thread will block, waiting for another element to process the queue from the queue until it receives undef in the queue . Therefore, why do we deliberately insert undef once for each thread after the completion of the queue.

Try:

 #!/usr/bin/perl -w use strict; use threads; use Thread::Queue; my $queue = new Thread::Queue(); our @threads; #Do you really need our instead of my? sub buildQueue { print "Enter a file name: "; my $dict_path = <STDIN>; chomp($dict_path); #Three-argument open, please! open my $dict_file, "<",$dict_path or die("Sorry, could not open file!"); while(my $query=<$dict_file>) { chomp($query); while(1) { #Wait to see if our queue has < 100 items... if ($queue->pending() < 100) { $queue->enqueue($query); print "Queue Size: " . $queue->pending . "\n"; last; #This breaks out of the infinite loop } } } close($dict_file); foreach(1..5) { $queue->enqueue(undef); } } sub processor { my $query; while ($query = $queue->dequeue) { print "Thread " . threads->tid . " got $query\n"; } } my $builder=threads->create(\&buildQueue); push @threads,threads->create(\&process) for 1..5; #Waiting for our threads to finish. $builder->join; foreach(@threads) { $_->join; } 
+10
source

The MCE module for Perl loves large files. In MCE, you can cut multiple lines multiple times, cut a large chunk as a scalar line, or read one line at a time. Blocking many rows at once reduces the overhead for IPC.

MCE 1.504 is gone now. It provides MCE :: Queue support for child processes, including threads. In addition, release 1.5 comes with 5 models (MCE :: Flow, MCE :: Grep, MCE :: Loop, MCE :: Map and MCE :: Stream) that create an MCE instance, setting max_workers and chunk_size. You can override these options.

Below, for demonstration, use MCE :: Loop.

 use MCE::Loop; print "Enter a file name: "; my $dict_path = <STDIN>; chomp($dict_path); mce_loop_f { my ($mce, $chunk_ref, $chunk_id) = @_; foreach my $line ( @$chunk_ref ) { chomp $line; ## add your code here to process $line } } $dict_path; 

If you want to specify the number of workers and / or chunk_size, then there are 2 ways to do this.

 use MCE::Loop max_workers => 5, chunk_size => 300000; 

Or...

 use MCE::Loop; MCE::Loop::init { max_workers => 5, chunk_size => 300000 }; 

Although preference is given to large files, you can compare the time with interleaving one line at a time. You can skip the first line inside the block (commented out). Note how there is no need for an inner loop. $ chunk_ref is still a ref array containing 1 row. The input scalar $ _ contains a string when chunk_size is 1, otherwise it points to $ chunk_ref.

 use MCE::Loop; MCE::Loop::init { max_workers => 5, chunk_size => 1 }; print "Enter a file name: "; my $dict_path = <STDIN>; chomp($dict_path); mce_loop_f { # my ($mce, $chunk_ref, $chunk_id) = @_; my $line = $_; ## add your code here to process $line or $_ } $dict_path; 

I hope this demo was useful for people who want to process the file in parallel.

:) mario

+3
source

It looks like this case could be related to Parallel::ForkManager .

+1
source

Another approach: you can also use user_tasks in MCE 1.2+ and create two multi- working tasks , one read task (since this is a large file, you can also use parallel reading while maintaining the search for reading the file) and one task for processing, etc. .d.

The code below uses Thread :: Queue to manage your buffer queue.

The buildQueue has its own queue management size and transfers the data directly to the manager $ R_QUEUE process, since we used threads, so it has access to the parent memory space. If you want to use forks, you can still access the queue through the callback function. But here I decided just to click on the queue.

The processQueue will simply turn off everything in the queue until there is nothing to expect.

The task_end subroutine in each task is launched only once by the manager process at the end of each task, so we use it to signal the stop of our work processes.

Obviously, there is a lot of freedom in how you want to distribute your data with workers so that you can determine the size of the chunk or even how to decompose your data.

 #!/usr/bin/env perl use strict; use warnings; use threads; use threads::shared; use Thread::Queue; use MCE; my $R_QUEUE = Thread::Queue->new; my $queue_workers = 8; my $process_workers = 8; my $chunk_size = 1; print "Enter a file name: "; my $input_file = <STDIN>; chomp($input_file); sub buildQueue { my ($self, $chunk_ref, $chunk_id) = @_; if ($R_QUEUE->pending() < 100) { $R_QUEUE->enqueue($chunk_ref); $self->sendto('stdout', "Queue Size: " . $R_QUEUE->pending ."\n"); } } sub processQueue { my $self = shift; my $wid = $self->wid; while (my $buff = $R_QUEUE->dequeue) { $self->sendto('stdout', "Thread " . $wid . " got $$buff"); } } my $mce = MCE->new( input_data => $input_file, # this could be a filepath or a file handle or even a scalar to treat like a file, check the documentation for more details. chunk_size => $chunk_size, use_slurpio => 1, user_tasks => [ { # queueing task max_workers => $queue_workers, user_func => \&buildQueue, use_threads => 1, # we'll use threads to have access to the parent variables in shared memory. task_end => sub { $R_QUEUE->enqueue( (undef) x $process_workers ) } # signal stop to our process workers when they hit the end of the queue. Thanks > Jack Maney! }, { # process task max_workers => $process_workers, user_func => \&processQueue, use_threads => 1, # we'll use threads to have access to the parent variables in shared memory task_end => sub { print "Finished processing!\n"; } } ] ); $mce->run(); exit; 
0
source

All Articles