Execution asynchronously, but recording synchronously

I have this situation: the video needs to be processed frame by frame, but as frames are processed, the output should be written to the file sequentially.

I want to run asynchronous blocks using dispatch_asynca parallel queue to speed up the process, but since this queue is asynchronous, I don’t see how I will coordinate the recording of series sequentially to the output.

Suppose this situation: frames 1, 2, 3, 4, and 5 are sent to parallel queues for processing. Since any block can be completed at any time, frame 4 may be the first, followed by 5, 3, 1, 2. So, how do I manage to write frames in sequential order to the output?

I have a code like this:

dispatch_queue_t aQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

while (true) {

    video >> frame;  // read a frame from the video

    dispatch_async(aQueue, ^{
         processVideo(frame, outputFrame);
         writeToVideo(outputFrame); // this is here just to show what needs to be done
    });

    // bla bla

}

any clues?

thank

+4
2

NSCondition. , , NSCondition , .

NSCondition docs:

. , , . , , . , .

- ...

BOOL ( NO), , , NSCondition. dispatch_async , .

, NSCondition, BOOL, , . , . , wait a signal NSCondition , . , unlock NSCondition.

, NSCondition . , BOOL, , . signal unlock NSCondition.

. , BOOL, , , outputFrame NSCondition; , .

// Create the background and serial queues
dispatch_queue_t backgroundQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_queue_t writeQueue = dispatch_queue_create("writeQueue", DISPATCH_QUEUE_SERIAL);

while (true) { // I'm assuming you have some way to break out of this...
    NSCondition *condition = [[NSCondition alloc] init];

    // These need the __block attribute so they can be changed inside the blocks
    __block BOOL frameProcessed = NO;
    __block FrameType outputFrame = nil;

    // video >> frame;  // read a frame from the video

    // dispatch the frame for processing
    dispatch_async(backgroundQueue, ^{
        [condition lock];

        processVideo(frame, outputFrame);
        frameProcessed = YES;

        [condition signal];
        [condition unlock];
    });

    // dispatch the write
    dispatch_async(writeQueue, ^{
        [condition lock];
        while (!frameProcessed) {
            [condition wait]; // this will block the current thread until it gets a signal
        }

        writeToVideo(outputFrame);

        [condition unlock];
    });
}

: BOOL frameProcessed. , , .


: NSCondition .

, , .

, NSCondition, , writeQueue , . NSCondition, ; int, , .

readCondition, writeQueueSize maxWriteQueueSize. lock readCondition, , writeQueueSize >= maxWriteQueueSize. , . ​​writeQueue, increment writeQueueSize. unlock readCondition.

, writeQueue, , lock readCondition, writeQueueSize signal unlock readCondition.

, writeQueue maxWriteQueueSize . , , writeQueue .

// Create the background and serial queues
dispatch_queue_t backgroundQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_queue_t writeQueue = dispatch_queue_create("writeQueue", DISPATCH_QUEUE_SERIAL);

NSCondition *readCondition = [[NSCondition alloc] init];
__block int writeQueueSize = 0;
const int maxWriteQueueSize = 10;

while (true) { // I'm assuming you have some way to break out of this...
    NSCondition *writeCondition = [[NSCondition alloc] init];

    // These need the __block attribute so they can be changed inside the blocks
    __block BOOL frameProcessed = NO;
    __block FrameType outputFrame = nil;

    [readCondition lock];
    while (writeQueueSize >= maxWriteQueueSize) {
        [readCondition wait];
    }

    // video >> frame;  // read a frame from the video

    // dispatch the frame for processing
    dispatch_async(backgroundQueue, ^{
        [writeCondition lock];

        processVideo(frame, outputFrame);
        frameProcessed = YES;

        [writeCondition signal];
        [writeCondition unlock];
    });

    // dispatch the write
    writeQueueSize++; // Increment the write queue size here, before the actual dispatch
    dispatch_async(writeQueue, ^{
        [writeCondition lock];
        while (!frameProcessed) {
            [writeCondition wait]; // this will block the current thread until it gets a signal
        }

        writeToVideo(outputFrame);

        [writeCondition unlock];

        // Decrement the write queue size and signal the readCondition that it changed
        [readCondition lock];
        writeQueueSize--;
        [readCondition signal];
        [readCondition unlock];
    });

    [readCondition unlock];
}
+3

, . , . , , . , , , .

, sleep . dispatch_group , . , .

int main(int argc, const char * argv[])
{
  @autoreleasepool {
    dispatch_queue_t mainQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_CONCURRENT);
    dispatch_group_t group = dispatch_group_create();

    dispatch_queue_t myQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);

    for (unsigned x = 1; x <= 5; x++ ) {

      // Chain the queues together in order; suspend all but the first.
      dispatch_queue_t subQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
      dispatch_set_target_queue(subQueue, myQueue);
      dispatch_suspend(subQueue);

      dispatch_group_async(group, mainQueue,^{

        // Perform a random amount of work
        u_int32_t sleepTime = arc4random_uniform(10);
        NSLog(@"Sleeping for thread %d (%d)", x, sleepTime);
        sleep(sleepTime);

        // OK, done with our work, queue our printing, and tell the next guy he can print
        dispatch_sync(myQueue, ^{
          printf("%d ", x);
          dispatch_resume(subQueue);
        });
      });

      myQueue = subQueue;
    }

    // Wait for the whole group to finish before terminating
    dispatch_group_wait(group, DISPATCH_TIME_FOREVER);
  }

  return 0;
}
0

All Articles