ChatServiceImpl.java 2.24 KB
Newer Older
Gabriele Civitarese's avatar
Gabriele Civitarese committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
package chat;

import com.example.chat.ChatServiceGrpc;
import com.example.chat.ChatServiceOuterClass;
import io.grpc.stub.StreamObserver;

import java.util.HashSet;
import java.util.LinkedHashSet;

public class ChatServiceImpl extends ChatServiceGrpc.ChatServiceImplBase {

    //an hashset to store all the streams which the server uses to communicate with each client
    HashSet<StreamObserver> observers = new LinkedHashSet<StreamObserver>();

    @Override public StreamObserver<ChatServiceOuterClass.ChatMessage> chat(final StreamObserver<ChatServiceOuterClass.ChatMessage> responseObserver){

        //the stream used to communicate with a specific client is stored in a hash set (avoiding duplicates)
        observers.add(responseObserver);

        //it returns the stream that will be used by the clients to send messages.
        //the client will write on this stream
        return new StreamObserver<ChatServiceOuterClass.ChatMessage>() {

            //receiving a message from a specific client
            public void onNext(ChatServiceOuterClass.ChatMessage chatMessage) {

                //unwrapping message
                String message = chatMessage.getMessage();
                String from = chatMessage.getFrom();

                System.out.println("[MESSAGE RECEIVED] Received a message from "+from+": "+message);

                //iterating on all the streams to communicate with all the clients
                for(StreamObserver<ChatServiceOuterClass.ChatMessage> observer: observers){

                    //we exclude the one which is sending the message
                    if(!observer.equals(responseObserver))
                        //we simply forward the message
                        observer.onNext(ChatServiceOuterClass.ChatMessage.newBuilder().setMessage(message).setFrom(from).build());

                }

            }

            //if there is an error (client abruptly disconnect) we remove the client.
            public void onError(Throwable throwable) {

                observers.remove(responseObserver);

            }

            //if the client explicitly terminated, we remove it from the hashset.
            public void onCompleted() {
                observers.remove(responseObserver);
            }
        };
    }


}