Realtime Temperature Analytics using Kafka Streams

Life is a series of natural and spontaneous changes. Don’t resist them -that only creates sorrow. Let reality be reality. Let things flow naturally forward.

— Lao-Tzu, 6th–5th century BCE

Was playing around with Kafka and an interesting use case for tracking and storing temperature readings from electronic sensor devices at real time came through. After evaluating a couple of different approaches and directions, Kafka Streams emerged as the most suitable framework.

Even with Kafka, leveraging Apache Kafka, deploying zookeeper, maintaining schema registry etc. proved to be a hazzle. The better alternative was Confluent Kafka, as they had a subscription based model in all the major cloud providers. With Exactly-once semantics provided by Kstremas, it turned out to be the defacto choice in server side, while Spring boot was leveraged to support the user interface.

Architecture Diagram

Sensors deployed in the devices will be generating temeperature reading in avro format and will be pushed to kafka topic. Multiple sensors will be sending these readings. We wanted to maintain metadata for these. Schema registry is an excellent tool for solving this challenge. It wil act as a service layer for metadata, which would act as a centralized repository for schemas. Leveraging schema registry, we have more flexibility to interact and exchange data without the challenge of managing and sharing schemas between them.In future, the sensors would be changed and the corresponding schemas would be evolved(Schema evolution). This could be easily carried out using schema reigistry.

{
  "namespace": "com.appu",
  "type": "record",
  "name": "equipmentvalue",
  "fields": [
    {
      "name": "serial",
      "type": "string",
      "doc": "Serial Number of the equipment"

    },
    {
      "name": "owner",
      "type": "string",
      "doc": "Owner name of the equipment"

    },
    {
      "name": "temp",
      "type": "string",
      "doc": "Temperature in degree celsius of the equipment"

    }
  ]
}

These streams of data will be saved as a Ktable. It represents the latest state of the data at a particular point in time. This data will be tracked in the Web UI.

There are static datas such as name, phone number etc., that are not real time values. These datas usually reside in databases or file systems. We need implement a Change Data Capture (CDC) to capture changes in these fields. Kafka connect helps us to tackle this. Kafka source connector could pull the data from file/table to a topic. It reduces the overhead of writing a producer/consumer duo to do the same. A Ktable join based on the key will capture this static change.

One of the challenges was to write unit test case for the application. We did not want to touch the existing cluster and wanted a solution that could run the tests without the need of kafka installation. kafka-streams-test-utils helps us to achieve that.

 public void setup ()
    {
        Properties props = new Properties();
        EquipmentAnalytics.runAnalytics(builder);

        Topology topology = builder.build();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "device-temperature-analytics-test-001");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        topologyTestDriver = new TopologyTestDriver(topology,props);
       }

As we were only dealing with the server side for now, we have developed a python based framework for producing mock data from both sensor and connect. The framework can be configured to simulate various scenarios such as metadata update, schema evolution etc.

[kafka]=
bootstrap.servers = localhost:9092
schema.registry.url = http://localhost:8081
topic = equipment
equipment_topic_json = equipmentJson
equipment_meta_json = equipmentMeta
mock_data = resources/data/equipment/equipment-mock
equipment_json_mock_data = resources/data/equipment/equipment-json-mock
equipment_meta_mock_data = resources/data/equipment/equipment-meta-mock


[avro]=
equipment-key-schema = resources/schemas/avro/equipment/equipment-key.avsc
equipment-value-schema = resources/schemas/avro/equipment/equipment-value.avsc

The latest flow of temeperature readings is displayed in the web ui using websocket and springboot using a line graph. It also displays the latest temperature readings.

Live Web Ui

In future this framework could be expanded to do various use cases such as finding the average temeperature in a window period, to store reading in a database or to send alerts when temeprate is above/below a threshold and much much more.

Links :

Data Generation : https://github.com/appuv/KafkaDataGen

Temperature Analytics : https://github.com/appuv/KafkaTemperatureAnalytics

Web UI : https://github.com/appuv/Live-Dashboard-using-Kafka-and-Spring-Websocket

There is a recording of the working in my YouTube channel : https://youtu.be/Cj3BeA4bV1c

Leave a Reply

Your email address will not be published. Required fields are marked *

Get updates on our Insights

Stay connected with NeST Digital to get timely updates of our thought leadership posts by filling the below details.

Please enable JavaScript in your browser to complete this form.
Please enable JavaScript in your browser to complete this form.

Get in touch with us

2023 © All rights reserved. NeST Digital Pvt Ltd.    |    Legal Disclaimer |    Disclaimer: Fraud Recruitment Offers    |    A NeST Group Company