ChatServiceImpl.java 2.43 KB
Newer Older
Gabriele Civitarese's avatar
Gabriele Civitarese committed
1 2
package chat;

Gabriele Civitarese's avatar
Gabriele Civitarese committed
3 4
import com.example.chat.ChatServiceGrpc.*;
import com.example.chat.ChatServiceOuterClass.*;
Gabriele Civitarese's avatar
Gabriele Civitarese committed
5 6 7 8 9
import io.grpc.stub.StreamObserver;

import java.util.HashSet;
import java.util.LinkedHashSet;

Gabriele Civitarese's avatar
Gabriele Civitarese committed
10
public class ChatServiceImpl extends ChatServiceImplBase {
Gabriele Civitarese's avatar
Gabriele Civitarese committed
11 12 13 14

    //an hashset to store all the streams which the server uses to communicate with each client
    HashSet<StreamObserver> observers = new LinkedHashSet<StreamObserver>();

Gabriele Civitarese's avatar
Gabriele Civitarese committed
15
    @Override public StreamObserver<ChatMessage> chat(final StreamObserver<ChatMessage> responseObserver){
Gabriele Civitarese's avatar
Gabriele Civitarese committed
16 17

        //the stream used to communicate with a specific client is stored in a hash set (avoiding duplicates)
18
        synchronized (observers) {
Gabriele Civitarese's avatar
Gabriele Civitarese committed
19

20 21 22
            observers.add(responseObserver);

        }
Gabriele Civitarese's avatar
Gabriele Civitarese committed
23 24
        //it returns the stream that will be used by the clients to send messages.
        //the client will write on this stream
Gabriele Civitarese's avatar
Gabriele Civitarese committed
25
        return new StreamObserver<ChatMessage>() {
Gabriele Civitarese's avatar
Gabriele Civitarese committed
26 27

            //receiving a message from a specific client
Gabriele Civitarese's avatar
Gabriele Civitarese committed
28
            public void onNext(ChatMessage chatMessage) {
Gabriele Civitarese's avatar
Gabriele Civitarese committed
29 30 31 32 33 34 35

                //unwrapping message
                String message = chatMessage.getMessage();
                String from = chatMessage.getFrom();

                System.out.println("[MESSAGE RECEIVED] Received a message from "+from+": "+message);

36 37 38 39 40 41 42 43 44

                HashSet<StreamObserver> copy;

                synchronized (observers) {

                    copy = new HashSet<>(observers);

                }

Gabriele Civitarese's avatar
Gabriele Civitarese committed
45
                //iterating on all the streams to communicate with all the clients
Gabriele Civitarese's avatar
Gabriele Civitarese committed
46
                for(StreamObserver<ChatMessage> observer: copy){
Gabriele Civitarese's avatar
Gabriele Civitarese committed
47 48 49 50

                    //we exclude the one which is sending the message
                    if(!observer.equals(responseObserver))
                        //we simply forward the message
Gabriele Civitarese's avatar
Gabriele Civitarese committed
51
                        observer.onNext(ChatMessage.newBuilder().setMessage(message).setFrom(from).build());
Gabriele Civitarese's avatar
Gabriele Civitarese committed
52 53 54 55 56 57 58 59

                }

            }

            //if there is an error (client abruptly disconnect) we remove the client.
            public void onError(Throwable throwable) {

60 61 62
                synchronized (observers) {

                    observers.remove(responseObserver);
Gabriele Civitarese's avatar
Gabriele Civitarese committed
63

64
                }
Gabriele Civitarese's avatar
Gabriele Civitarese committed
65 66 67 68
            }

            //if the client explicitly terminated, we remove it from the hashset.
            public void onCompleted() {
69 70 71 72 73
                synchronized (observers) {

                    observers.remove(responseObserver);

                }
Gabriele Civitarese's avatar
Gabriele Civitarese committed
74 75 76 77 78 79
            }
        };
    }


}