Exchange data between multiple Java threads and get updated value

I want to create a java application where we want to make vacationers for several users using an access token. I use 1 thread for each user. The access token that I use is valid for 1 hour. After the expiration, I get error 401 and should update the token for all threads and continue. I am thinking about using a mutable variable that I made static to update all threads. My requirement is the moment when I find out in one of the flows that the token has expired, I want all the threads to stop processing and wait until a new token is generated (this takes a couple of seconds). the token should be updated automatically, without failure of each stream due to the expired token.

The following is an example of the code I wrote:

import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class Sample { public static void main(String[] args) { String[] myStrings = { "User1" , "User2" , "User3" }; ScheduledExecutorService scheduledExecutorService = Executors .newScheduledThreadPool(myStrings.length); TokenGenerator.getToken(); for(String str : myStrings){ scheduledExecutorService.scheduleAtFixedRate(new Task(str), 0, 5, TimeUnit.SECONDS); } } } class Task implements Runnable{ private String name; public Task(String name){ this.name = name; } @Override public void run() { getResponse(TokenGenerator.token); } private void getResponse(String token) { // Make http calls // if token expire , call getToken again. Pause all the running threads , and // update the token for all threads TokenGenerator.getToken(); } } class TokenGenerator { public static volatile String token; public static void getToken() { token = "new Token everytime"; } } 

Is there a better approach to this problem? The above code does not satisfy my use case, because as soon as a thread starts generating a new token, all other threads are not suspended. Please suggest some improvements.

+7
java multithreading thread-safety
source share
3 answers

You can put the token in AtomicReference and use Semaphore to pause threads:

 public class TokenWrapper { private final AtomicReference<Token> tokenRef = new AtomicReference<>(null); private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); public TokenWrapper() { Token newToken = // refresh token tokenRef.set(newToken); } public Token getToken() { Token token = null; while((token = tokenRef.get()) == null) { semaphore.acquire(); } return token; } public Token refreshToken(Token oldToken) { if(tokenRef.compareAndSet(oldToken, null)) { semaphore.drainPermits(); Token newToken = // refresh token tokenRef.set(newToken); semaphore.release(Integer.MAX_VALUE); return newToken; } else return getToken(); } } public class RESTService { private static final TokenWrapper tokenWrapper = new TokenWrapper(); public void run() { Token token = tokenWrapper.getToken(); Response response = // call service with token if(response.getStatus == 401) { tokenWrapper.refreshToken(token); } } } 

refreshToken() uses the atomic compareAndSet on tokenRef to ensure that only one thread updates the token and then calls drainPermits() in semaphore so that other threads wait for the token to refresh. getToken() returns a token if it is not null , otherwise it waits for semaphore - this is done in a loop, because it is possible that the thread will have to rotate for several cycles between tokenRef null and drainPermits() is called on semaphore .


Edit: The signature of refreshToken(Token oldToken) so that the old token is transmitted and not read inside the method - this prevents the situation when RESTService_A updates the token, RESTService_B receives 401 with the old expired token, and then RESTService_B calls refreshToken after the call ends RESTService_A on refreshToken , as a result of which the token is updated twice. With a new signature, RESTService_B will take place in the old expired token, and therefore the call to compareAndSet will fail when the old token cannot match the new token, resulting in refreshToken only once.

+5
source share

You can use the following template, accessing the token only with the help of its recipient and the calling loadToken when you receive an error response.

 class TokenGenerator { private String token = null; public synchronized String getToken() { if (token == null) { loadToken(); } return token; } public synchronized void loadToken() { token = "load here"; } } 

To solve your thread suspension problem, you can simply call getToken() if you want to stop Thread , which will automatically block if the token is currently active.

 class Task implements Runnable{ private String name; private TokenGenerator tokenGenerator; public Task(String name, TokenGenerator tokenGenerator) { this.name = name; this.tokenGenerator = tokenGenerator; } @Override public void run() { getResponse(tokenGenerator.getToken()); } private void getResponse(String token) { // Make http calls // if token expire , call getToken again. Pause all the running threads , and // update the token for all threads tokenGenerator.loadToken(); } } 
+1
source share

Since you need to do two things (http calls and update token), you can try two methods of verification.

to check if the token is expired or not , and the other to check if some other stream is trying to update the token .

to demonstrate the idea here, this is a little code (its a dirty way to do this so that it can clear up a bit).

 ... private string token private volatile static isTokenExpired=false //checking if the token is expired or not private volatile static waitingForTokenRefresher=false; //checking if we should wait for update. @Override public void run(){ while(tokenisExpired){ //wait } //http calls find out if token is good to go //check if no one else uses the token: if( token is actually expired){ if(!waitingForTokenRefresher){ isTokenExpired=true; waitingForTokenRefresher=true; //refresh token waitingForTokenRefresher=false isTokenExpired=false; } } while(!waitingForTokenRefresher){ //wait... } } 
+1
source share

All Articles