Problem Statement: To transfer data from a device log in real time for analytics.
Approach:
There are multiple streaming/batch platforms for real-time analytics and batch analytics. One of the challenges that most industry face is the data transfer from their edge devices to the streaming platform. Fluentd [1] helps us solve this challenge. Fluentd is a cross platform open-source data collection software project originally developed at Treasure Data. It is written primarily in the Ruby programming language. There are multiple architecture patterns in fluentd to solve this challenge [2]. As edge devices are resource constraint, we would like to look at a lightweight forwarder aggregator architecture pattern in fluent.

Fluent forwarder aggregator architecture
Before we dive deep into this architecture, we can start trying by installing a fluentd locally and tailing [3] log file. Fluentd can be installed as docker containers. As a prerequisite, the docker must be installed in the system [4].
To run fluentd in docker, you need to create a docker compose file named docker-compose.yaml and copy the following lines.
version: "3"
services:
data-forwarder:
container_name: data-forwarder
user: root
build:
context: .
image: fluent/fluentd:latest
ports:
- 24224:24224
volumes:
- ./Configuration:/fluentd/etc/
- ./input/:/fluentd/log/files/
forwarder docker-compose.yaml
Here you can see that two volume mappings are being done. The configuration folder which we have created should be mapped to /fluentd/etc/and our log file could be mapped to any location, but the location in fluent should be the one which we give in the configuration file.
Now we need to create a configuration file for tailing a log file. For better maintainability we will create separate configuration files. We need to create a folder named as Configuration. This could be anywhere, for convenience we will create in the same folder where docker file is existing. Then we need two configuration files. The first one is fluent.conf and the second one is fluent_tail.conf. We will do all our configurations in the fluent_tail.conf file.
<source>
@type tail
path /fluentd/log/files/device.log
pos_file /fluentd/log/files/device.pos
tag device.log
<parse>
@type none
</parse>
</source>
<match device.log>
@type stdout
</match>
Sample Configuration
The above configuration is to tail a file named device.log present at fluentd/log/files/device.log. There is also a position file. The position file is used by fluentd to know about last tailed location. We then tag the data as device.log. We have a match tag where the corresponding tag is provided, and it will be printed to the system. Now we need to include this configuration in our fluent.conf.
Sample Configuration:
@include fluent_tail_.conf
Now we are all set to tail the log files. We can now start the container by the following command
docker-compose up
Once you start the container, you would be seeing the configuration which we have given above.

Fluentd Forwarder Docker Log
As the forwarder is eagerly waiting to tail the log files, we need to have a mechanism to write log files. The following python [5] code helps us to write a sample json record as log files to the location.
import json
logfile= open("<device.log location>","a")
for x in range(100):
data {"temperature_sensor1": x,"temperature_sensor1": x*x,"serial":"033_appu"} logfile.write(json.dumps(data))
logfile.write('\n')
logfile.flush()
Python script for data generation
Sample Data in File:
{“temperature_sensor1”: 0, “temperature_sensor2”: 0, “serial”: “033_appu”}
{“temperature_sensor1”: 1, “temperature_sensor2”: 1, “serial”: “033_appu”}
{“temperature_sensor1”: 2, “temperature_sensor2”: 4, “serial”: “033_appu”}
{“temperature_sensor1”: 3, “temperature_sensor2”: 9, “serial”: “033_appu”}
Now you should be able to see the same data in the terminal where fluentd forwarder is running.
Sample Data in terminal:

Fluentd Forwarder docker log tail stdout
We will stop the container by
docker-compose down
We will now try to calculate the sum of both sensors. A filter needs to be added before the match tag for doing the same. The fluent_tail_.conf needs to be modified as follows :
<filter device.log>
@type parser
format json
key_name message
</filter>
<filter device.log>
@type record_transformer
enable_ruby true
<record>
total_temperature ${record["temperature_sensor1"]+record["temperature_sensor2"]} </record>
</filter>
Fluentd data transoformation filters
Here the data is passed to two filters. The first one will parse the incoming string as a json, and the record_transformer [6] filter plugin calculate the total temperature based on existing fields. enable_ruby is to enable ruby functions when we transform the data. There are various plugins [7][8] in fluentd which include input, output, filter etc. We can also write our own custom plugins in ruby
We can start both our forwarder and data generator. The data which we see in terminal will have an additional json field
Sample Data:
{“temperature_sensor1”:0,”temperature_sensor2″:0,”serial”:”033_appu”,”total_temperature”:0}
{“temperature_sensor1”:1,”temperature_sensor2″:1,”serial”:”033_appu”,”total_temperature”:2}
{“temperature_sensor1”:2,”temperature_sensor2″:4,”serial”:”033_appu”,”total_temperature”:6}
{“temperature_sensor1”:3,”temperature_sensor2″:9,”serial”:”033_appu”,”total_temperature”:12}

Fluentd Forwarder docker log transformation
We have successfully tailed a log file and a small transformation. Now we need to send this data to another fluentd instance, which we call as an aggregator. Like forwarder, we can create a docker container for the same.
version: "3"
services:
data-aggregator:
container_name: data-aggregator
user: root
build:
context: .
image: fluent/fluentd:latest
ports:
- 24225:24225
volumes:
- ./Configuration:/fluentd/etc/
- ./output/:/tmp/output/
aggregator docker-compose.yaml
Like forwarder we have volume mapped the configuration folder. We need to create configuration files for the aggregator. We can name it as fluent.confand fluent_agg.conf
Sample Configuration of fluent_agg.conf is as follows:
<source>
@type forward
port 24225
bind 0.0.0.0
</source>
<match device.log>
@type stdout
</match>
Sample Configutation
Like forwarder, we will include the same in fluent.conf
Here we can see that the source is forward [9] and we have used the same tag for stdout. We can start the aggregator container by the following command:
docker compose up

Fluentd Aggregator docker log
Now we need to configure our forwarder to send the data to aggregator. The match pattern must be modified as follows:
<match device.log>
@type forward
send_timeout 60s
recover_wait 10s
hard_timeout 60s
require_ack_response true
<server>
host <your_ip>
port 24225
</server>
</match>
Sample forward configuration
With this configuration, fluentd will send data from forwarder to aggregator. Here the type used is forward [10]. Once the setup is complete, the data forwarder and python scripts must be started as mentioned earlier. The output which we saw in the data forwarder will be now seen in the data aggregator.
Sample Data:
{“temperature_sensor1”:0,”temperature_sensor2″:0,”serial”:”033_appu”,”total_temperature”:0}
{“temperature_sensor1”:1,”temperature_sensor2″:1,”serial”:”033_appu”,”total_temperature”:2}
{“temperature_sensor1”:2,”temperature_sensor2″:4,”serial”:”033_appu”,”total_temperature”:6}
{“temperature_sensor1”:3,”temperature_sensor2″:9,”serial”:”033_appu”,”total_temperature”:12}

Fluentd Aggregator docker log stdout
Now we will send the output of the aggregator to a file. The match pattern must be modified as follows:
<match device.log>
@type file
path /tmp/output/
</match>
The data will be present inside the output folder inside the aggregator folder once the data generator and data forwarder are started. The file name of the output will be in the following pattern buffer.<hashstring>.log
Sample data :
2022–05–30T10:19:56+00:00 device.log {“temperature_sensor1”:0,”temperature_sensor2″:0,”serial”:”033_appu”,”total_temperature”:0}
2022–05–30T10:19:56+00:00 device.log {“temperature_sensor1”:1,”temperature_sensor2″:1,”serial”:”033_appu”,”total_temperature”:2}
2022–05–30T10:19:56+00:00 device.log {“temperature_sensor1”:2,”temperature_sensor2″:4,”serial”:”033_appu”,”total_temperature”:6}
2022–05–30T10:19:56+00:00 device.log {“temperature_sensor1”:3,”temperature_sensor2″:9,”serial”:”033_appu”,”total_temperature”:12}
The data is successfully transferred from device log to aggregator. The data transfer can be secured via mlts. Also, fluentd forward supports high availability [11] and we can configure buffers [12].
Here in this example, we have configured the output of aggregator to a stdout and file. This can be configured to various outputs like Kafka, S3, Azure blob, elastic etc…
Advantages
- Less resource utilization on the edge devices (maximize throughput)
- Allow processing to scale independently on the aggregator tier.
- Easy to add more backends (configuration change in aggregator vs. all forwarders)
Disadvantages
- Dedicated resources required for an aggregation instance
Complete Code is available in Git.
References
2. https://fluentbit.io/blog/2020/12/03/common-architecture-patterns-with-fluentd-and-fluent-bit/
3. https://docs.docker.com/engine/install/ubuntu/
4. https://docs.fluentd.org/input/tail
6. https://docs.fluentd.org/filter/record_transformer
7. https://docs.fluentd.org/plugin-development
8. https://www.fluentd.org/plugins
9. https://docs.fluentd.org/input/forward
10. https://docs.fluentd.org/output/forward