I subscribe to a signal created as follows:
RACSignal *signal = [[RACSignal createSignal:^(... subscriber) {
for (int i = 0; i < 100; i++) {
[subscriber sendNext:[[RACSignal createSignal:^(... subscriber2) {
NSString *string = someFunctionThatTakesALongTime(i);
[subscriber2 sendNext:string];
[subscriber2 sendComplete];
return nil;
}] setNameWithFormat:@"inside signal"]];
}
[subscriber sendComplete];
return nil;
}] setNameWithFormat:@"outside signal"];
int n = 4;
[[signal flatten:n] subscribeNext:^(NSString *string) { ... }];
I want to -flatten:subscribe to signals nin parallel. I tried -startLazilyWithScheduler:block:with [RACScheduler scheduler]for the "internal signal", but it stopped my computer. In the "Tools" it seems that for each signal a new stream is created.
A previous version of this code is added as NSOperations to NSOperationQueue, which is configured to run operations in parallel n. It works, but I could simplify its use with RAC.
How do I -flatten: nsignal at a time from my signal signals, so that internal signals are triggered in the same nthreads?
=====================================
: ; . , , . , , - , - RAC. , , :
:
[[[self drawRects] flatten:self.maxProcesses] subscribeNext:^(NSDictionary *result) {
@strongify(self);
NSString *keyString = result[kDrawRectsResultsKeyKey];
self.imagesByLocation[keyString] = result[kDrawRectsResultsImageKey];
self.repsByLocation[keyString] = result[kDrawRectsResultsRepKey];
[self setNeedsDisplayInRect:[result[kDrawRectsResultsRectKey] rectValue]];
}];
RAC ( ):
RACSignal *zoomedDrawingBounds = [RACChannelTo(self, zoomedDrawingBounds) skip:1];
RACSignal *imagesFromImageProvider = [[[RACChannelTo(self, imageProvider) skip:1]
map:^(id<PTWImageProvider> imageProvider) {
return RACChannelTo(imageProvider, imagesByLocation);
}]
switchToLatest];
RACSignal *drawingSignals = [[self rac_liftSelector:@selector(drawingSignalsForRect:givenImagesByLocations:)
withSignalsFromArray:@[ zoomedDrawingBounds, imagesFromImageProvider, ]]
switchToLatest];
@weakify(self);
RACSignal *drawnRectanglesZoomed = [[[[drawingSignals
rac_liftSelector:@selector(flatten:) withSignalsFromArray:@[ RACChannelTo(self, maxProcesses) ]]
switchToLatest]
doNext:^(NSDictionary *result) {
@strongify(self);
NSString *keyString = result[kDrawRectsResultsKeyKey];
self.imagesByLocation[keyString] = result[kDrawRectsResultsImageKey];
self.repsByLocation[keyString] = result[kDrawRectsResultsRepKey];
}]
map:^(NSDictionary *result) {
return result[kDrawRectsResultsRectKey];
}];
RACSignal *drawnRectangles = [[drawnRectanglesZoomed
combineLatestWith:RACChannelTo(self, zoomLevel)]
map:^(RACTuple *tuple) {
CGRect zoomedRect = [[tuple first] rectValue];
CGFloat zoomLevel = [[tuple second] floatValue];
CGAffineTransform zoomTransform = CGAffineTransformMakeScale(zoomLevel, zoomLevel);
return [NSValue valueWithRect:CGRectApplyAffineTransform(zoomedRect, zoomTransform)];
}];
[self rac_liftSelector:@selector(setNeedsDisplayInRect:)
withSignalsFromArray:@[ [drawnRectangles deliverOn:[RACScheduler mainThreadScheduler]] ]];
, , , flatten: , :
RACSignal *signal = [[RACSignal createSignal:^(... subscriber) {
for (int i = 0; i < 100; i++) {
[subscriber sendNext:[[RACSignal startLazilyWithScheduler:[RACScheduler scheduler] block:^(... subscriber2) {
NSString *string = someFunctionThatTakesALongTime(i);
[subscriber2 sendNext:string];
[subscriber2 sendComplete];
}] setNameWithFormat:@"inside signal"]];
}
[subscriber sendComplete];
return nil;
}] setNameWithFormat:@"outside signal"];