Phil Marius

Data Scientist, Data Engineer, Linux, and OSS Fan

21 Nov 2020

Sending Fluentd Logs to Azure EventHubs using the Kafka Streaming Protocol

As part of the Microsoft Partner Hack in November 2020, I decided to use this opportunity to try out a new method of ingesting Fluentd logs.

What is Fluentd?

Fluentd is a log collector which takes a declarative config file containing input (or “source”) and output information. Wikipedia defines it as:

a cross platform open-source data collection software project

The main idea is it allows developers to collect log information and send it to a given endpoint of their wishes without having to worry about the implementation of the log collection service itself.

At my current company, we ingest clickstream data to perform advanced analytics on it and we have a few ways of doing this. Our most recent method is a pixel server that serves a tiny, 1x1 pixel on a client’s webpage and collects various information about the customers browsing the client’s website. Nginx handles the requests and deals with cookies and Fluentd takes Nginx’s access log and sends it to our data stores. Currently, it simply dumps line delimited JSON objects in Azure Blob Storage.

However, I wanted to see if we could utilise more of Azure’s services to our advantage. Having recently become Azure qualified, I wanted to see what alternatives I could develop.

Azure EventHubs

Azure EventHubs (AEH) is Azure’s “real time data ingestion tool” and integrates easily with other Azure services. It also features the ability to easily monitor data streams and configure multiple event consumers to then ingest the incoming data. One thing that we use daily is Databricks and, through Azure certification revision, I learned that Spark Streaming integrates with AEH easily. So naturally it was AEH that I wanted to try out.

Plan

The plan was as follows:

  • Create an AEH instance to ingest clickstream data
  • Change the Fluentd output plugin to send data to a Kafka endpoint instead of Azure Blob Storage
  • Insert AEH connection information into the output plugin for Fluentd
  • Create a Databricks test cluster with a notebook attached to see whether we could ingest the clickstream events from AEH

TLDR: it works a treat

Why Kafka?

Yes, I know an AEH Fluentd output plugin exists. However, I’m a big fan of open source software and protocols so I wanted to try it with Kafka. We also have a RabbitMQ instance running internally too so wanted to try out AMQP initially but seeing as AEH has native Kafka support, I thought I’d roll with that first.

Implementation

So what does this look like in practice?

Azure EventHubs Setup

First, I have to set up the AEH instance

An AEH topic also needs to be created (yes I know they’re not called topics, they’re called EventHubs but that’s just confusing if you ask me so I’m going to call them topics from now on).

Fluentd Setup

Then, I have to configure Fluentd to utilise the out_kafka2 plugin. It comes built in with the td-agent installation of Fluentd, however, the pixel server uses Docker Compose with a Fluentd Docker image. Luckily, the official Docker image for Fluentd offers instructions for how to install plugins on their supplied image. This requires creating your own image so I followed the supplied instructions, added a gem install fluentd-plugin-kafka2 in it, and built the image locally (to be uploaded to Azure Container Repository once fully productionised). This local image was then used in the docker-compose.yml in the pixel server:


services:
  fkafka:
    image: custom-fluentd:latest
    restart: unless-stopped
    volumes:
      - ./fluentd/fluent.conf:/fluentd/etc/fluent.conf
      - ./logs:/var/logs
    env_file:
      - .env

Then, in the mounted fluent.conf file, I added the new output plugin with AEH endpoint and credentials added under a <match> statement. I found this repo that helped.


<match nginx.access>
  @type copy

  <store>
    @type kafka2

    # list of seed brokers
    # port 9093 is used by Kafka
    brokers <AEH_NAME>.servicebus.windows.net:9093
    use_event_time true

    # buffer settings
    buffer_type file
    buffer_path /var/log/td-agent/buffer/kafka
    flush_interval 3s

    # topic settings
    default_topic <TOPIC_NAME>

    <format>
      @type json
    </format>

    # producer settings
    max_send_retries 1
    required_acks -1

    # using default OS certs for SSL
    ssl_ca_certs_from_system true

    username $ConnectionString
    password "<SHARED_ACCESS_KEY>"
  </store>

  <store>
    @type stdout
  </store>
</match>

A few credentials need to be obtained from AEH. Firstly the broker connection string ( <AEH_NAME>.servicebus.windows.net:9093 ) is here:

Add in the topic name ( <TOPIC_NAME> ) from earlier. Then, create a shared access key ( <SHARED_ACCESS_KEY> ) from the Shared Access Policies tab in AEH, make sure it has Send permissions:

Databricks Setup

Now for Databricks, I followed this tutorial. You will need to create another shared access key that can Listen from the previous step for this. Make sure you specify the EntityHub to be the topic name in the connection string. Also, new versions of Spark require the connection string to be encrypted, use sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt to achieve that:


conn_string = "<SHARED_ACCESS_KEY>"

conf = {
  "eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(conn_string)
}

(It’s honestly this simple)

Testing

As mentioned before, I ran a local instance of the pixel server and then used curl to spam it with requests. Then, we monitored the message count in both AEH and Databricks and extracted the message body in Databricks to see what we were getting.

The commands were as followed:


curl --cacert <path_to_cert> "https://localhost:443/i?param=testing" --output -

Please note that I have an HTTPS setup for the pixel server including a local configuration for self-signed certificates, hence the --cacert argument with port 443. Another blog to follow.

Results

This honestly worked better than I ever expected.

Firstly, let’s see what it looked like from Fluentd. I used a @type copy to send both the logs to AEH Kafka endpoint and to stdout so that I could monitor when Fluentd picked up requests. After curling a few times, we received the below:


fkafka_1     | 2020-11-16 08:31:47.000000000 +0000 nginx.access: {"remote":"172.21.0.1","host":"-","user":"-","time":"2020-11-16T08:31:47+00:00","method":"GET","path":"/i?param=testing","code":"200","size":"43","referer":"-","agent":"curl/7.58.0","cookie":"3dfd6da7ee3f1d1f5f9700cc248f62bf"}
fkafka_1     | 2020-11-16 08:31:48.000000000 +0000 nginx.access: {"remote":"172.21.0.1","host":"-","user":"-","time":"2020-11-16T08:31:48+00:00","method":"GET","path":"/i?param=testing","code":"200","size":"43","referer":"-","agent":"curl/7.58.0","cookie":"67264926a924f882ae083e701f8398ac"}
fkafka_1     | 2020-11-16 08:31:49.000000000 +0000 nginx.access: {"remote":"172.21.0.1","host":"-","user":"-","time":"2020-11-16T08:31:49+00:00","method":"GET","path":"/i?param=testing","code":"200","size":"43","referer":"-","agent":"curl/7.58.0","cookie":"77e9463ffad775fe60c4be9b68fcccb2"}
fkafka_1     | 2020-11-16 08:31:50.000000000 +0000 nginx.access: {"remote":"172.21.0.1","host":"-","user":"-","time":"2020-11-16T08:31:50+00:00","method":"GET","path":"/i?param=testing","code":"200","size":"43","referer":"-","agent":"curl/7.58.0","cookie":"954ab20affac1abe36fc36a724cb0247"}
fkafka_1     | 2020-11-16 08:31:52.000000000 +0000 nginx.access: {"remote":"172.21.0.1","host":"-","user":"-","time":"2020-11-16T08:31:52+00:00","method":"GET","path":"/i?param=testing","code":"200","size":"43","referer":"-","agent":"curl/7.58.0","cookie":"35af98d190620bcf7fea8364663af81a"}
fkafka_1     | 2020-11-16 08:31:53.000000000 +0000 nginx.access: {"remote":"172.21.0.1","host":"-","user":"-","time":"2020-11-16T08:31:53+00:00","method":"GET","path":"/i?param=testing","code":"200","size":"43","referer":"-","agent":"curl/7.58.0","cookie":"d0a2f0d131908bf742619c89b066b775"}

Now that I’m sure Fluentd picked up the requests, let’s look at the message count metric for AEH in the past 30 minutes:

Sure enough, messages were coming through on the AEH topic. Now let’s look at Databricks and see if they’re coming through there:

And extracting the body information:

And would you look at that, the messages are coming through perfectly!

Conclusion

I set out in the Microsoft Partner Hackathon to trial out a new method for ingesting clickstream data and came out with a working, proof-of-concept after only three days of research.

So what’s the benefit of this new data ingestion method?

I’m glad you asked, there’s a few positive takeaways from a streaming pipeline like this:

  • We now get live data and can offer this to clients
  • We don’t need to change our PySpark processing code thanks to Spark Streaming to handle this new form of data
  • We can now monitor traffic as and when it comes in rather than building our own queries for batch data, further powering the metrics we can offer to client teams
  • Failures are instantaneous rather than scheduled - we know immediately when something fails and integrating this pipeline with Azure Monitor will give us alerting possibilities on top of this
  • This new ingestion method also fuels the separation of concerns within our pipeline and this has to be my biggest excitement with this method

Closing Notes

Thank you to Microsoft and the team who organised the Partner Hackathon, as much as virtual hackathons are difficult to organise, I was very impressed by this one. The engineers and cloud solution architects were always at hand to offer help at a moment’s notice and I couldn’t have done it without them.