ChatServiceImpl.java 2.78 KB
Newer Older
Luca Arrotta's avatar
Luca Arrotta committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
package chat;

import com.example.chat.ChatServiceGrpc.*;
import com.example.chat.ChatServiceOuterClass.*;
import io.grpc.stub.StreamObserver;

import java.util.HashSet;
import java.util.LinkedHashSet;

public class ChatServiceImpl extends ChatServiceImplBase {

    //an hashset to store all the streams which the server uses to communicate with each client
    HashSet<StreamObserver> observers = new LinkedHashSet<StreamObserver>();

    @Override public StreamObserver<ChatMessage> chat(final StreamObserver<ChatMessage> responseObserver){

        //the stream used to communicate with a specific client is stored in a hash set (avoiding duplicates)
        synchronized (observers) {

            observers.add(responseObserver);

        }
        //it returns the stream that will be used by the clients to send messages.
        //the client will write on this stream
        return new StreamObserver<ChatMessage>() {

            //receiving a message from a specific client
            public void onNext(ChatMessage chatMessage) {

                //unwrapping message
                String message = chatMessage.getMessage();
                String from = chatMessage.getFrom();

                System.out.println("[MESSAGE RECEIVED] Received a message from "+from+": "+message);


                HashSet<StreamObserver> copy;

                synchronized (observers) {

                    copy = new HashSet<>(observers);

                }

                //iterating on all the streams to communicate with all the clients
                for(StreamObserver<ChatMessage> observer: copy){

                    //we exclude the one which is sending the message
                    if(!observer.equals(responseObserver))
                        //we simply forward the message
                        try {

                            observer.onNext(ChatMessage.newBuilder().setMessage(message).setFrom(from).build());

                        }
                        catch(io.grpc.StatusRuntimeException e){
                            //peer no longer available
                            synchronized (observers){

Luca Arrotta's avatar
Luca Arrotta committed
60
                                observers.remove(observer);
Luca Arrotta's avatar
Luca Arrotta committed
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90

                            }
                        }
                }

            }

            //if there is an error (client abruptly disconnect) we remove the client.
            public void onError(Throwable throwable) {

                synchronized (observers) {

                    observers.remove(responseObserver);

                }
            }

            //if the client explicitly terminated, we remove it from the hashset.
            public void onCompleted() {
                synchronized (observers) {

                    observers.remove(responseObserver);

                }
            }
        };
    }


}