Plugin Anatomy of Telegraf Software

cem akpolat
12 min readJul 14, 2023

--

In the previous article, we focused on the features of telegraf and demonstrated a use case scenario in which a number of I/IoT devices are connected via telegraf, and telegraf forwarded all the incoming data into influxdb. It is possible to increase the data collection from other devices, since telegraf supports right now more than 300 plugins just for the data collection.

Apart from the plugin-based architecture, the hidden features that you can perceive at the first look, the concurrency and parallelism (goroutines are everywhere in the code :D), the integration with the data storage systems (influxDB, elasticserarch, etc.)and its compatibility with the containerization and orchestration tools. All these features might be not needed for your development use-case, in this case, the customizable plugin system can play a crucial role. In this tutorial, the purpose is to understand how the plugin structure of telegraf is constructed and try to implement a couple of plugins for the data inputs, processors and aggregators. The article continues as follows:

  1. What are Key Components & Plugins of telegraf Framework
  2. How does Input Plugin System work in telegraf?
  3. How to implement an Input Plugin
  4. How to implement a Processor Plugin
  5. How to implement an Aggregator Plugin
  6. Demonstrate Integration of All Plugins via InfluxDB

What are Key Components & Plugins of telegraf Framework

telegraf is constructed on plugin-driven architecture and handle tasks within the related plugins. These plugins are input-, output-, processor-, aggregator-, service-plugins. Furthermore, configuration file for interpreting the telegraf configuration loader, and core engine that is the orchestrator managing the data collection, processing, aggregation and sending to the output plugins. Shortly, we can summarize the functionalities of these key components as below:

  • Configuration Loader is in charge of reading the telegraf.conf file and interpreting it.
  • Plugin Manager manages the lifecycle of all available plugins.
  • Input Plugin Executor invokes the Gather() method periodically, which we explain below:
  • Process and Aggregator Manager orchestrates the execution of processor and aggregator plugins.
  • Output Plugin Executor obtains the processed metrics from the previous plugins, forward them to the output plugins, which may send to the other system such as influxDB, kafka, Elasticsearch, etc.
  • Service Plugin Manager manages service plugins that can change the metric values such as doing filtering, or routing.

How does Input Plugin System work in telegraf?

Once you look at the code structure of telegraf plugin, there are common functions that enable the requirements of telegraf plugin. These are defined as follows:

  • Description(): A short description explaining the purpose and its functionality of the input plugin.
  • SampleConfig(): This provides how a sample config in telegraf.conf should look like.
  • init(): It is called once the telegraf started or the plugin is initialized.
  • Gather(acc telegraf.Accumulator): As its name applies, it collects the defined metrics and sends them to the Accumulator interface. Based on the defined interval, telegraf calls it periodically. The Acumulator interface can be thought as a central point, where all data with their labels are collected in the form of key-value, e.g. {input_plugin_name:latest_measurement}. Probably, the most important function in the Accumulator is the addFields() which enables adding metrics of the plugins. The Accumulator is responsible for processing and aggregating the added fields along with other metrics collected from various input plugins. These accumulated metrics can be transferred to the final destination through the output plugin. Shortly, this function is the most important function that enables the integration of the input plugin to the telegraf plugin system.

How to implement an Input Plugin

0. Fetch telegraf Source Code

git clone https://github.com/influxdata/telegraf.git
  1. Implement Required Plugin Codes

Since the telegraf code is written in Go language, therefore the input plugin should be written with Go.

package randomcpu
import (
"math/rand"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type RandomCPUPlugin struct {
// Add any configuration fields for your plugin here, if needed
}
func (r *RandomCPUPlugin) Description() string {
return "Generate random CPU usage metrics"
}
func (r *RandomCPUPlugin) SampleConfig() string {
return ``
}
func (r *RandomCPUPlugin) Gather(acc telegraf.Accumulator) error {
tags := map[string]string{
"cpu": "cpu-total",
}
fields := map[string]interface{}{
// generate a random cpu usage value
"usage": rand.Float64() * 100,
}
//create cpu_usage metric and add to the accumulator interface
acc.AddFields("cpu_usage", fields, tags, time.Now())
return nil
}
func init() {
// add plugin as randomcpu group name
inputs.Add("randomcpu", func() telegraf.Input { return &RandomCPUPlugin{} })
}

In this example, we define a RandomCPUPluginstruct that represents our input plugin. It doesn’t require an additional parameter, and implements the telegraf.Input interface's required methods: SampleConfig(), Description(), and Gather().

The SampleConfig() method returns a sample configuration in TOML format, defining the expected configuration options. The Description() method provides a brief description of the plugin.

The Gather() method is where the actual data collection takes place. In this example, we simulate collecting the random CPU value is generated directly within the Gather() function. You may extend this function as much as possible, just the important thing is here, Gather() function will be called by the upper objects in a defined period.

Within Gather(), we create a new metric using the acc.AddFields() method and add it to the accumulator. The metric has a measurement name of "cpu_usage", a fields, tag and the current timestamp (time.Now()) as the metric's timestamp. We will see all these values while visualizing the generated data in the influxdb.

In the lines of the plugin we encounter init() function, in which the plugin is registered by calling inputs.Add(), specifying the plugin name randomcpuand a function that returns an instance of the plugin. Notice that the folder name and the module name of the plugin are the same.

Note that this is a simplified example, and you may need to customize it further to suit your specific input source and requirements.

2. Add Codes & Build telegraf Software

The folder structure of the project plugins is seen as below. As the development is done for the input in this example, we need to create a folder named with randomcpu , add a file with randomcpu.go, and then place the file under the randomcpu folder.

You should have the following folder structure:

This file is in the required folder, however, our plugin won’t be found by telegraf unless we define it explicitly under thetelegraf/plugins/inputs/all/ folder with the ramdomcpu.go file name. This file includes only the location of our plugin.

package all

import _ "github.com/influxdata/telegraf/plugins/inputs/randomcpu" // register plugin

After the step above, the last step is to build the whole telegraf project under the main folder

An executable file named telegraf will be generated after the build process. Apart from the all other default libraries, the new telegraf binary file includes our plugin. In order to see whether it works, it should be defined in the telegraf.conf file as below:

# telegraf.conf
[[inputs.randomcpu]]

The only thing that we have to do is simply start the telegraf with the telegraf.conf file and then observe the randomly generated data in the influxdb.

./telegraf --config telegraf.conf

The demonstration of the influxdb usage along with the newly added plugins are introduced in the final step of this tutorial.

How to implement a Processor Plugin

The processor plugin works similar to the input plugin, except the Apply method. To summarize its functions:

  • Description(), SampleConfig(), and init() have the same purposes as in input plugin.
  • Apply(): This is the place where the function logic is implemented.

Once the data is collected in through the input plugin, the next step is to execute the related processor plugin if there is any. In order to be able to process an input metric, we need to pass a Field to the Average Processor structure. Description and SampleConfig functions can be empty at that level. The Apply function receives the defined field in the config and divide its value by 2, then it rounds this value. The following step is to save this processed value with new parameter name {defined_field}_processed . To utilize this plugin in the telegraf.conf, it is added to the global processor list in init() function.

Finally, the aggregated data is added to the accumulator using the acc.AddFields method, with the measurement name "aggregated_data"

package average

import (
"math"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)

type AverageProcessor struct {
Field string
}

func (ap *AverageProcessor) SampleConfig() string {
return ""
}

func (ap *AverageProcessor) Description() string {
return ""
}

func (ap *AverageProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in {
if value, ok := metric.GetField(ap.Field); ok {
if floatValue, ok := value.(float64); ok {
// Divide the value by 2
newValue := floatValue / 2.0

// Round the new value to 2 decimal places
roundedValue := math.Round(newValue*100) / 100

// Set the new value as a new field in the metric
metric.AddField(ap.Field+"_processed", roundedValue)
}
}
}

return in
}

func init() {
processors.Add("average_processor", func() telegraf.Processor {
return &AverageProcessor{}
})
}

2. Add Codes & Build telegraf Software

Again, the same operations are necessary here, the average.go file should be created under the ../processors/average folder as seen below, and the whole code in the previous section should be pasted into theaverage.go file.

As in the previous step, this is not enough for making recognize the plugin, and the same structure is available, thetelegraf/plugins/processors/all/ should include a file with the average.go file name and points out only the location of the plugin.

package all

import _ "github.com/influxdata/telegraf/plugins/processors/average" // register plugin

The latest operation to perform might be to add the new plugin with its name into the telegraf.conf file, and what metrics will be used. We defined the usage parameter in the previous input plugin.

[[processors.average_processor]]
field = "usage"

You can again build the code as in the input plugin step and observe the processed data in the influxdb, which will be performed in the end of this tutorial.

How to implement an Aggregator Plugin

  1. Implement Aggregator Plugin

The last plugin component to be covered in this tutorial is the aggregator, and the aim is to follow the similar steps in the processor plugin. The functions differ slightly from the other plugins, e.g. there are Add , Push and Reset functions.

  • Description(), SampleConfig(), and init() have the same purposes as in input plugin.
  • Add(): It checks whether the specified field does exist in each metric. If this is the case, the the aggregation logic based on the given field is applied.
  • Push(): The aggregated data is mapped to the fields. It allows you to extend the fields based on your requirements.
  • Reset(): The given fields is resetted to their initial values.

The goal is here to show the sum of the usage field defined in the input plugin. The Add function indicates the function, whereas Push simply matches the sum value to the sum field. It is noteworthy to comprehend, all these operations are performed through the performers. You may think like that, the telegraf upper objects define the ea variable and each time pass it to the related function. In the final function, init() , the name of the name of the plugin is defined, and it will be later used in telegraf.conf file.

package sum

import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
)

type ExampleAggregator struct {
Field string
Sum float64
}

func (ea *ExampleAggregator) SampleConfig() string {
return ""
}

func (ea *ExampleAggregator) Description() string {
return ""
}

func (ea *ExampleAggregator) Add(in telegraf.Metric) {
if value, ok := in.GetField(ea.Field); ok {
if floatValue, ok := value.(float64); ok {
ea.Sum += floatValue
}
}
}

func (ea *ExampleAggregator) Push(acc telegraf.Accumulator) {
fields := map[string]interface{}{
"sum": ea.Sum,
}

tags := map[string]string{} // Add any necessary tags if needed

// Add the aggregated data to the accumulator
acc.AddFields("aggregated_data", fields, tags)
}

func (ea *ExampleAggregator) Reset() {
ea.Sum = 0
}

func init() {
aggregators.Add("example_aggregator", func() telegraf.Aggregator {
return &ExampleAggregator{}
})
}

2. Add Codes & Build telegraf Software

The aggregator plugins resides in the ./plugins/aggregators/ folder, therefore we create a folder name sum , and a sum.go file directly under this folder. The whole code should be copied in this sum.go file.

In order to make telegraf recognize this plugin, we need to add a file again with the same name sum.go under /plugins/aggregators/all/ folder. This file should only contain the location of our newly added sum plugin, as indicated below.

package all

import _ "github.com/influxdata/telegraf/plugins/aggregators/sum" // register plugin

The plugin is now ready to use and to activate it, the only required configuration in the telegraf.conf file is to add the following two lines. Notice that we define the aggregator plugin name .example_aggregator as it is written in the plugin code. The added field points out the usage, which it is defined in the input plugin.

[[aggregators.example_aggregator]]
field = "usage"

Demonstrate Integration of All Plugins via InfluxDB

All plugin types are covered, and we want to see whether these plugins run at all. influxdb is tightly related with telegraf, since it is created by influxdb team, and is a enough good visualization tool to show the time series data. For this reason, our goal is to collect the data, process and aggregate it and then visualize it in the influxdb. Here are the steps to be applied:

  1. Build the telegraf software

To build the telegraf code, you can simply execute the go build as shown below under the telegraf folder.

2. Create a docker-compose file for influxdb and configure it

Until now, we haven’t yet seen the results of the implemented plugins. To realize this, influxdb will be executed through a docker-compose file configuration. In the first step, the configuration will be performed, and the configuration data such as token, bucket name and organization name will be added into the telegraf.conf file in the next step. The docker-compose.yml file is seen below, and the database is accessible on the web browser via http://localost:8086. To run it, simply execute on the terminal docker-compose up .

version: "3"

services:

influxdb:
image: influxdb
volumes:
- influxdb_data:/var/lib/influxdb
ports:
- 8086:8086


volumes:
influxdb_data:

Once it is run, you should encounter the webbrowser as displayed below:

A

After filling the blanks :), the following web page will appear

Please copy the token, since it will appear only once. Whenever you click Qucik Start , it will naviate you to the dashboard of the influxdb. What we will focus is Data Explorer and the metrics bucket name as given below:

3. Update the latest telegraf.conf

We bring all telegraf configuration together and add the influxdb configuration as below and please do not forget to update the generated token in the influxdb_v2 section.

# telegraf.conf
[[inputs.randomcpu]]

[[processors.average_processor]]
field = "usage"

[[aggregators.example_aggregator]]
field = "usage"


[[outputs.influxdb_v2]]
urls = ["http://127.0.0.1:8086"]
token = "add your token"
organization = "test-org"
bucket = "metrics"

4. Run telegraf and observe results

All files are ready to be tested and I belive the file/folder structure is seen as given below:

First the docker-compose.yml file will be executed and then the telegraf binary file under telegraf folder will be with telegraf.conf is executed.

docker-compose up -d 
./telegraf/telegraf --config telegraf.conf

In the following image, the fields of input plugin and processor plugin are depicted. The blue line in the graph indicates the usage_processed that is the double of the usage field.

The aggregated_data field comes from the aggregator plugin can be also displayed as in the following image.

Conclusion

In this article, the core focus was to develop the input-, processor-, and aggregator-plugin of the telegraf software. From the first steps to the last, how can the custom plugins be added into the telegraf software are explaned in detail. Since these are only examples and don’t fulfill a specific task, the use cases might seem not equivalant to yours, however, I believe you grasp the ground concept. There are still many other plugin types that are not mentioned here, however, they have the similar structure, and wouldn’t require more challenge. Especially, the output plugin can be interesting to transmit the processed input data to other third party tools, which aren’t available in the telegraf output pluginset. From the experience, thanks to the rich plugins, you may even not need a custom plugin, neverthless, it was quite nice to see the structure of the telegraf source code architecture and the plugin logic.

Source Code:

The source code of the project is given via the following link, it doesn’t include the whole telegraf source code. You should place the files in the right locations as written throughout the article.

https://github.com/cemakpolat/telegraf-plugin-extension-example

--

--