Controller.java 3.9 KB
Newer Older
Michele Fiori's avatar
Michele Fiori committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
package average;

import org.eclipse.paho.client.mqttv3.*;

import java.sql.Timestamp;

public class Controller {

    private static double temperaturesCurrentSum;
    private static int temperaturesCurrent;

    private static int subQos = 2;
    private static int pubQos = 2;

    private static String clientId;

    private static MqttClient client;
    private static String broker;
    private static String subTopic;
    private static String pubTopic;

    public static void main(String[] argv) {
        broker = "tcp://localhost:1883";
        clientId = MqttClient.generateClientId();
        subTopic = "home/sensors/temp";
        pubTopic = "home/controllers/temp";

        temperaturesCurrentSum = 0D;
        temperaturesCurrent = 0;

        try {
            client = new MqttClient(broker, clientId);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);

            System.out.println(clientId + " Connecting Broker " + broker);
            client.connect(connOpts);
            System.out.println(clientId + " Connected " + Thread.currentThread().getId());

            client.setCallback(new MqttCallback() {

                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    String time = new Timestamp(System.currentTimeMillis()).toString();
                    String receivedMessage = new String(message.getPayload());
                    System.out.println(clientId + " Received a Message! - Callback - Thread PID: " + Thread.currentThread().getId() +
                            "\n\tTime:    " + time +
                            "\n\tTopic:   " + topic +
                            "\n\tMessage: " + receivedMessage);
                    if (topic.equals("home/sensors/temp")) {
                        updateTemperatureAverage(Double.parseDouble(receivedMessage));
                    }
                }

                public void connectionLost(Throwable cause) {
                    System.out.println(clientId + " Connectionlost! cause:" + cause.getMessage() + "-  Thread PID: " + Thread.currentThread().getId());
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                    //Not used here
                }
            });

            System.out.println(clientId + " Subscribing ... - Thread PID: " + Thread.currentThread().getId());
            client.subscribe(subTopic,subQos);
            System.out.println(clientId + " Subscribed to topics : " + subTopic);


        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }


    }

    private static void updateTemperatureAverage(double newTemperature) {
        synchronized (Controller.class) {
            temperaturesCurrent += 1;
            temperaturesCurrentSum += newTemperature;
            if (temperaturesCurrent == 5) {
                temperaturesCurrent = 0;
                double avg = temperaturesCurrentSum / 5.0;
                String payload;
                if (avg >= 20) {
                    payload = "off";
                } else {
                    payload = "on";
                }
                MqttMessage message = new MqttMessage(payload.getBytes());
                message.setQos(pubQos);
                System.out.println(clientId + " Average: " + avg + "; Publishing message about what to do with the heater: " + payload);
                try {
                    client.publish(pubTopic, message);
                } catch (MqttException e) {
                    e.printStackTrace();
                }

                temperaturesCurrentSum = 0D;
            }
        }
    }
}