Subscribing to ReactiveCocoa signals in parallel, on a limited number of threads

I subscribe to a signal created as follows:

RACSignal *signal = [[RACSignal createSignal:^(... subscriber) {
    for (int i = 0; i < 100; i++) {
        [subscriber sendNext:[[RACSignal createSignal:^(... subscriber2) {
            NSString *string = someFunctionThatTakesALongTime(i);
            [subscriber2 sendNext:string];
            [subscriber2 sendComplete];
            return nil;
        }] setNameWithFormat:@"inside signal"]];
    }

    [subscriber sendComplete];
    return nil;
}] setNameWithFormat:@"outside signal"];

int n = 4;
[[signal flatten:n] subscribeNext:^(NSString *string) { ... }];

I want to -flatten:subscribe to signals nin parallel. I tried -startLazilyWithScheduler:block:with [RACScheduler scheduler]for the "internal signal", but it stopped my computer. In the "Tools" it seems that for each signal a new stream is created.

A previous version of this code is added as NSOperations to NSOperationQueue, which is configured to run operations in parallel n. It works, but I could simplify its use with RAC.

How do I -flatten: nsignal at a time from my signal signals, so that internal signals are triggered in the same nthreads?

=====================================

: ; . , , . , , - , - RAC. , , :

:

[[[self drawRects] flatten:self.maxProcesses] subscribeNext:^(NSDictionary *result) {
    @strongify(self);

    NSString *keyString = result[kDrawRectsResultsKeyKey];
    self.imagesByLocation[keyString] = result[kDrawRectsResultsImageKey];
    self.repsByLocation[keyString] = result[kDrawRectsResultsRepKey];

    [self setNeedsDisplayInRect:[result[kDrawRectsResultsRectKey] rectValue]];
}];

RAC ( ):

// Get the latest zoomed drawing bounds and get the latest imageProvider latest imagesByLocation
// Skip one of each signal to avoid firing immediately
RACSignal *zoomedDrawingBounds = [RACChannelTo(self, zoomedDrawingBounds) skip:1];
RACSignal *imagesFromImageProvider = [[[RACChannelTo(self, imageProvider) skip:1]
                                       map:^(id<PTWImageProvider> imageProvider) {
                                           return RACChannelTo(imageProvider, imagesByLocation);
                                       }]
                                      switchToLatest];

// Lift the drawing method, getting a signal of signals on each call
RACSignal *drawingSignals = [[self rac_liftSelector:@selector(drawingSignalsForRect:givenImagesByLocations:)
                               withSignalsFromArray:@[ zoomedDrawingBounds, imagesFromImageProvider, ]]
                             switchToLatest];

@weakify(self);

// Lift flatten: using maxProcesses so that if maxProcesses changes, the number of signals being
// flatten:ed can change almost immediately.
RACSignal *drawnRectanglesZoomed = [[[[drawingSignals
                                       rac_liftSelector:@selector(flatten:) withSignalsFromArray:@[ RACChannelTo(self, maxProcesses) ]]
                                      switchToLatest]
                                     doNext:^(NSDictionary *result) {
                                         @strongify(self);

                                         // side effects! store the rendered image and its associated image rep
                                         NSString *keyString = result[kDrawRectsResultsKeyKey];
                                         self.imagesByLocation[keyString] = result[kDrawRectsResultsImageKey];
                                         self.repsByLocation[keyString] = result[kDrawRectsResultsRepKey];
                                     }]
                                    map:^(NSDictionary *result) {
                                        // Extract the drawn rect from the results
                                        return result[kDrawRectsResultsRectKey];
                                    }];

RACSignal *drawnRectangles = [[drawnRectanglesZoomed
                               combineLatestWith:RACChannelTo(self, zoomLevel)]
                              map:^(RACTuple *tuple) {
                                  // Convert between zoomed and unzoomed coordinates
                                  CGRect zoomedRect = [[tuple first] rectValue];
                                  CGFloat zoomLevel = [[tuple second] floatValue];
                                  CGAffineTransform zoomTransform = CGAffineTransformMakeScale(zoomLevel, zoomLevel);
                                  return [NSValue valueWithRect:CGRectApplyAffineTransform(zoomedRect, zoomTransform)];
                              }];

// Lift setNeedsDisplayInRect: with results from the drawing signals, so setNeedsDisplayInRect: is called
// as tiles are rendered.
[self rac_liftSelector:@selector(setNeedsDisplayInRect:)
  withSignalsFromArray:@[ [drawnRectangles deliverOn:[RACScheduler mainThreadScheduler]] ]];

, , , flatten: , :

RACSignal *signal = [[RACSignal createSignal:^(... subscriber) {
    for (int i = 0; i < 100; i++) {
        [subscriber sendNext:[[RACSignal startLazilyWithScheduler:[RACScheduler scheduler] block:^(... subscriber2) {
            NSString *string = someFunctionThatTakesALongTime(i);
            [subscriber2 sendNext:string];
            [subscriber2 sendComplete];
        }] setNameWithFormat:@"inside signal"]];
    }

    [subscriber sendComplete];
    return nil;
}] setNameWithFormat:@"outside signal"];
+4
1

+[RACScheduler scheduler] GCD , , GCD .

, , , +flatten: , (.. , ).

, :

RACSignal *workSignal = [[[[RACSignal
    // Wait for one scheduler iteration,
    return:RACUnit.defaultUnit]
    delay:0]
    // then actually do the work.
    then:^{
        return [[RACSignal
            createSignal:^(id<RACSubscriber> subscriber2) {
                NSString *string = someFunctionThatTakesALongTime(i);
                [subscriber2 sendNext:string];
                [subscriber2 sendComplete];
                return nil;
            }]
            // Invokes the above block on a new background scheduler.
            subscribeOn:[RACScheduler scheduler]];
    }]
    setNameWithFormat:@"inside signal"];

[subscriber sendNext:workSignal];

. GCD , , , .

+5

All Articles