ChatServiceImpl.java 2.79 KB
Newer Older
Luca Arrotta's avatar
Luca Arrotta committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
package chat;

import com.example.chat.ChatServiceGrpc.*;
import com.example.chat.ChatServiceOuterClass.*;
import io.grpc.stub.StreamObserver;

import java.util.HashSet;
import java.util.LinkedHashSet;

public class ChatServiceImpl extends ChatServiceImplBase {

    //an hashset to store all the streams which the server uses to communicate with each client
    HashSet<StreamObserver> observers = new LinkedHashSet<StreamObserver>();

    @Override public StreamObserver<ChatMessage> chat(final StreamObserver<ChatMessage> responseObserver){

        //the stream used to communicate with a specific client is stored in a hash set (avoiding duplicates)
        synchronized (observers) {

            observers.add(responseObserver);

        }
        //it returns the stream that will be used by the clients to send messages.
        //the client will write on this stream
        return new StreamObserver<ChatMessage>() {

            //receiving a message from a specific client
            public void onNext(ChatMessage chatMessage) {

                //unwrapping message
                String message = chatMessage.getMessage();
                String from = chatMessage.getFrom();

                System.out.println("[MESSAGE RECEIVED] Received a message from "+from+": "+message);


                HashSet<StreamObserver> copy;

                synchronized (observers) {

                    copy = new HashSet<>(observers);

                }

                //iterating on all the streams to communicate with all the clients
                for(StreamObserver<ChatMessage> observer: copy){

                    //we exclude the one which is sending the message
                    if(!observer.equals(responseObserver))
                        //we simply forward the message
                        try {

                            observer.onNext(ChatMessage.newBuilder().setMessage(message).setFrom(from).build());

                        }
                        catch(io.grpc.StatusRuntimeException e){
                            //peer no longer available
                            synchronized (observers){

                                observers.remove(responseObserver);

                            }
                        }
                }

            }

            //if there is an error (client abruptly disconnect) we remove the client.
            public void onError(Throwable throwable) {

                synchronized (observers) {

                    observers.remove(responseObserver);

                }
            }

            //if the client explicitly terminated, we remove it from the hashset.
            public void onCompleted() {
                synchronized (observers) {

                    observers.remove(responseObserver);

                }
            }
        };
    }


}