Commit a44aa90f authored by Gabriele Civitarese's avatar Gabriele Civitarese

Aggiunta sincronizzazione lato server

parent c7004263
...@@ -15,8 +15,11 @@ public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase { ...@@ -15,8 +15,11 @@ public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase {
@Override public StreamObserver<ChatServiceOuterClass.ChatMessage> chat(final StreamObserver<ChatServiceOuterClass.ChatMessage> responseObserver){ @Override public StreamObserver<ChatServiceOuterClass.ChatMessage> chat(final StreamObserver<ChatServiceOuterClass.ChatMessage> responseObserver){
//the stream used to communicate with a specific client is stored in a hash set (avoiding duplicates) //the stream used to communicate with a specific client is stored in a hash set (avoiding duplicates)
synchronized (observers) {
observers.add(responseObserver); observers.add(responseObserver);
}
//it returns the stream that will be used by the clients to send messages. //it returns the stream that will be used by the clients to send messages.
//the client will write on this stream //the client will write on this stream
return new StreamObserver<ChatServiceOuterClass.ChatMessage>() { return new StreamObserver<ChatServiceOuterClass.ChatMessage>() {
...@@ -30,8 +33,17 @@ public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase { ...@@ -30,8 +33,17 @@ public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase {
System.out.println("[MESSAGE RECEIVED] Received a message from "+from+": "+message); System.out.println("[MESSAGE RECEIVED] Received a message from "+from+": "+message);
HashSet<StreamObserver> copy;
synchronized (observers) {
copy = new HashSet<>(observers);
}
//iterating on all the streams to communicate with all the clients //iterating on all the streams to communicate with all the clients
for(StreamObserver<ChatServiceOuterClass.ChatMessage> observer: observers){ for(StreamObserver<ChatServiceOuterClass.ChatMessage> observer: copy){
//we exclude the one which is sending the message //we exclude the one which is sending the message
if(!observer.equals(responseObserver)) if(!observer.equals(responseObserver))
...@@ -45,13 +57,20 @@ public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase { ...@@ -45,13 +57,20 @@ public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase {
//if there is an error (client abruptly disconnect) we remove the client. //if there is an error (client abruptly disconnect) we remove the client.
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
synchronized (observers) {
observers.remove(responseObserver); observers.remove(responseObserver);
} }
}
//if the client explicitly terminated, we remove it from the hashset. //if the client explicitly terminated, we remove it from the hashset.
public void onCompleted() { public void onCompleted() {
synchronized (observers) {
observers.remove(responseObserver); observers.remove(responseObserver);
}
} }
}; };
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment