Preload data from a file using a separate stream

I have a small application that processes a large number of (relatively small) files. It is executed sequentially: it downloads data from a file, performs operations on it, and proceeds to the next file. I noticed that during operation, the CPU usage is not 100%, and I think this is due to the fact that the I / O time on the hard disk is.

Thus, the idea would be to load the following data into memory in parallel with the processing of the current data using a separate stream (the data to be examined will be just an int sequence stored in a vector). This seems like a very common problem, but it's hard for me to find a simple, simple C ++ example to do this! And now C ++ 0x is on its way, a simple demo code using the new stream feature, without an external library, will be very enjoyable.

In addition, although I know that this depends on many factors, is it possible to get a reasonable assumption about the advantages (or failures) of such an approach with respect to the size of the data file to upload, for example? I think that with large files, disk I / O is very rare, since the data is already buffered (using fstream (?))

Olivie

+5
source share
4 answers

A toy program on how to use some C ++ 0x streaming and synchronization tools. I don’t know how this works (I recommend Matt to answer), I focus on clarity and correctness for the sake of example.

, . int, , , -. , std::string.

#include <fstream>
#include <sstream>
#include <string>
#include <vector>
#include <deque>
#include <future>
#include <mutex>
#include <condition_variable>

int
main()
{
    // this is shared
    std::mutex mutex;
    std::condition_variable condition;
    bool more_to_process = true;
    std::deque<std::string> to_process;

    /* Reading the files is done asynchronously */
    std::vector<std::string> filenames = /* initialize */
    auto process = std::async(std::launch::async, [&](std::vector<std::string> filenames)
    {
        typedef std::lock_guard<std::mutex> lock_type;
        for(auto&& filename: filenames) {
            std::ifstream file(filename);
            if(file) {
                std::ostringstream stream;
                stream << file.rdbuf();
                if(stream) {
                    lock_type lock(mutex);
                    to_process.push_back(stream.str());
                    condition.notify_one();
                }
            }
        }
        lock_type lock(mutex);
        more_to_process = false;
        condition.notify_one();
    }, std::move(filenames));

    /* processing is synchronous */
    for(;;) {
        std::string file;
        {
            std::unique_lock<std::mutex> lock(mutex);
            condition.wait(lock, [&]
            { return !more_to_process || !to_process.empty(); });

            if(!more_to_process && to_process.empty())
                break;
            else if(to_process.empty())
                continue;

            file = std::move(to_process.front());
            to_process.pop_front();
        }

        // use file here
    }

    process.get();
}

:

  • , , std::string . - /
  • std::async std::thread,
  • ; - , . : , ; boost::variant<std::string, std::exception_ptr>, ( , error_code , ). .
+4

IO , , . "" -, , .

, IO, , , , , ​​ ( ) .

+2

:

  • ,
  • ,

, . , .

, , - . , .

** **

: , . , " " / " ", .

+1

, ,

1. First thread reading and processing only files placed at even number 
in the file listing (ls -l in *nix).
2. Second thread reading the oddly placed file in the listing.

, " , , ", , .

[, ]

, , .

, .

**:**

, , , (, char), , .

As mentioned earlier, the problem would be to read and write from the same queue as the STL containers are not thread safe.

Therefore, I can recommend here to manage your general data structure in this queue using locaks, and everything else: <Preview> 1. Boost Lock free: Lock lock 2. Write your own free implementation: Lock free impl

0
source

All Articles