ChatServiceImpl.java 2.57 KB
Newer Older
Gabriele Civitarese's avatar
Gabriele Civitarese committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
package chat;

import com.example.chat.ChatServiceGrpc;
import com.example.chat.ChatServiceOuterClass;
import io.grpc.stub.StreamObserver;

import java.util.HashSet;
import java.util.LinkedHashSet;

public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase {

    //an hashset to store all the streams which the server uses to communicate with each client
    HashSet<StreamObserver> observers = new LinkedHashSet<StreamObserver>();

    @Override public StreamObserver<ChatServiceOuterClass.ChatMessage> chat(final StreamObserver<ChatServiceOuterClass.ChatMessage> responseObserver){

        //the stream used to communicate with a specific client is stored in a hash set (avoiding duplicates)
18
        synchronized (observers) {
Gabriele Civitarese's avatar
Gabriele Civitarese committed
19

20 21 22
            observers.add(responseObserver);

        }
Gabriele Civitarese's avatar
Gabriele Civitarese committed
23 24 25 26 27 28 29 30 31 32 33 34 35
        //it returns the stream that will be used by the clients to send messages.
        //the client will write on this stream
        return new StreamObserver<ChatServiceOuterClass.ChatMessage>() {

            //receiving a message from a specific client
            public void onNext(ChatServiceOuterClass.ChatMessage chatMessage) {

                //unwrapping message
                String message = chatMessage.getMessage();
                String from = chatMessage.getFrom();

                System.out.println("[MESSAGE RECEIVED] Received a message from "+from+": "+message);

36 37 38 39 40 41 42 43 44

                HashSet<StreamObserver> copy;

                synchronized (observers) {

                    copy = new HashSet<>(observers);

                }

Gabriele Civitarese's avatar
Gabriele Civitarese committed
45
                //iterating on all the streams to communicate with all the clients
46
                for(StreamObserver<ChatServiceOuterClass.ChatMessage> observer: copy){
Gabriele Civitarese's avatar
Gabriele Civitarese committed
47 48 49 50 51 52 53 54 55 56 57 58 59

                    //we exclude the one which is sending the message
                    if(!observer.equals(responseObserver))
                        //we simply forward the message
                        observer.onNext(ChatServiceOuterClass.ChatMessage.newBuilder().setMessage(message).setFrom(from).build());

                }

            }

            //if there is an error (client abruptly disconnect) we remove the client.
            public void onError(Throwable throwable) {

60 61 62
                synchronized (observers) {

                    observers.remove(responseObserver);
Gabriele Civitarese's avatar
Gabriele Civitarese committed
63

64
                }
Gabriele Civitarese's avatar
Gabriele Civitarese committed
65 66 67 68
            }

            //if the client explicitly terminated, we remove it from the hashset.
            public void onCompleted() {
69 70 71 72 73
                synchronized (observers) {

                    observers.remove(responseObserver);

                }
Gabriele Civitarese's avatar
Gabriele Civitarese committed
74 75 76 77 78 79
            }
        };
    }


}