Is it safe to migrate a thread safe value across thread boundaries using fork / join?

I have a class that is not thread safe:

class ThreadUnsafeClass {
  long i;

  long incrementAndGet() { return ++i; }
}

(I used it here longas a field, but we must think that its field is some unsafe type).

I now have a class that looks like

class Foo {
  final ThreadUnsafeClass c;

  Foo(ThreadUnsafeClass c) {
    this.c = c;
  }
}

That is, an unsafe stream class is its final field. Now I will do this:

public class JavaMM {
  public static void main(String[] args) {
    final ForkJoinTask<ThreadUnsafeClass> work = ForkJoinTask.adapt(() -> {
      ThreadUnsafeClass t = new ThreadUnsafeClass();
      t.incrementAndGet();
      return new FC(t);
    });

    assert (work.fork().join().c.i == 1); 
  }
}

That is, from the T(main) thread, I invoke some work on T'(fork-join-pool), which creates and mutates an instance of my unsafe class, and then returns the result wrapped in Foo. Please note that all my mutation unsafe flow class takes place in a single streamT' .

1: , T' ~> T join?

2: , ? :

Map<Long, Foo> results = 
  Stream
    .of(new ThreadUnsafeClass())
    .parallel()
    .map(tuc -> {
      tuc.incrementAndGet();
      return new Foo(tuc);
    })
    .collect(
      Collectors.toConcurrentMap(
        foo -> foo.c.i,
        Function.identity();
      )
    );
assert(results.get(1) != null)
+6
1

, ForkJoinTask.join() , Future.get() ( join() Javadoc, get() ). Future.get() :

, , Future.get() .

, " " Future/FJT. , , FJT, FJT.join(). , , , , , .

, final . , :

public static void main(String... args) throws Exception {
    ExecutorService s = Executors.newCachedThreadPool();
    Future<MyObject> f = s.submit(() -> new MyObject(42));
    assert (f.get().x == 42); // guaranteed!
    s.shutdown();
}

public class MyObject {
    int x;
    public MyObject(int x) { this.x = x; }
}

, Stream ( Stream.of.parallel Executor.submit Stream.collect FJT.join/Future.get), thread, , - . , , HB on submit, :

public static void main(String... args) throws Exception {
    ExecutorService s = Executors.newCachedThreadPool();
    MyObject o = new MyObject(42);
    Future<?> f = s.submit(() -> o.x++); // new --hb--> submit
    f.get(); // get -->hb--> read o.x
    assert (o.x == 43); // guaranteed
    s.shutdown();
}

public static class MyObject {
    int x;
    public MyObject(int x) { this.x = x; }
}

( , HB read(o.x) , store(o.x, 43))

+10

All Articles