diff --git a/src/main/java/chat/ChatServiceImpl.java b/src/main/java/chat/ChatServiceImpl.java index 4e1de13bb242b1cb48e2313187da54a0acb18372..b851ee09c49660ffd5ce0ea2ac73ea3920cb5321 100644 --- a/src/main/java/chat/ChatServiceImpl.java +++ b/src/main/java/chat/ChatServiceImpl.java @@ -15,8 +15,11 @@ public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase { @Override public StreamObserver chat(final StreamObserver responseObserver){ //the stream used to communicate with a specific client is stored in a hash set (avoiding duplicates) - observers.add(responseObserver); + synchronized (observers) { + observers.add(responseObserver); + + } //it returns the stream that will be used by the clients to send messages. //the client will write on this stream return new StreamObserver() { @@ -30,8 +33,17 @@ public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase { System.out.println("[MESSAGE RECEIVED] Received a message from "+from+": "+message); + + HashSet copy; + + synchronized (observers) { + + copy = new HashSet<>(observers); + + } + //iterating on all the streams to communicate with all the clients - for(StreamObserver observer: observers){ + for(StreamObserver observer: copy){ //we exclude the one which is sending the message if(!observer.equals(responseObserver)) @@ -45,13 +57,20 @@ public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase { //if there is an error (client abruptly disconnect) we remove the client. public void onError(Throwable throwable) { - observers.remove(responseObserver); + synchronized (observers) { + + observers.remove(responseObserver); + } } //if the client explicitly terminated, we remove it from the hashset. public void onCompleted() { - observers.remove(responseObserver); + synchronized (observers) { + + observers.remove(responseObserver); + + } } }; }