Tuesday, February 26, 2019

Filesystem events to Elasticsearch / Kibana through Kafka Connect / Kafka

Filesystem events are useful to monitor. They can indicate a security breach. They can also help  understanding how a complex system works by looking at the files it reads and writes.

When monitoring events, you can expect a lot of data to be generated quickly. The events might be interesting to process for different systems and at a different pace. Also it would be nice if you could replay events from the start or a specific moment. Enter Kafka. In order to put the filesystem events in Kafka (from an output file), the Kafka Connect FileSourceConnector is used. In order to get the data from Kafka to Elasticsearch, the Kafka Connect ElasticsearchSinkConnector is used. Both connectors can be used without Enterprise license.

Filesystem events

In order to obtain filesystem events, I've used inotify-tools. On Ubuntu like systems they can be installed with

sudo apt-get install inotify-tools

inotify-tools contains two CLI utilities. inotifywait which can be used to output events to a file in a specific format. inotify-wait can generate statistics. Since we'll do our analyses in Kibana, we want individual events from inotifywait. Using the following command, the /home/developer directory is watched. Events are put in a JSON format in a file called /tmp/inotify.txt

inotifywait -r -m /home/developer -o /tmp/inotify.txt --timefmt "%FT%T%z" --format '{"time": "%T","watched": "%w","file":"%f","events":"%e"}' 

Events can look like;

{"time": "2019-02-26T13:52:15+0000","watched": "/home/developer/","file":"t","events":"OPEN"}
{"time": "2019-02-26T13:52:15+0000","watched": "/home/developer/","file":"t","events":"ATTRIB"}
{"time": "2019-02-26T13:52:15+0000","watched": "/home/developer/","file":"t","events":"CLOSE_WRITE,CLOSE"}
{"time": "2019-02-26T13:52:17+0000","watched": "/home/developer/","file":"t","events":"DELETE"}

In the above example I touched /home/developer/t and removed it. When watching your home folder, it is interesting to see what is happening there! I could not add a more specific moment of the event since the format is printf-like and uses strftime for formatting of date/time. strftime does not support anything more fine grained than seconds. The order of events however is correct. I could not add more specific information myself since the allowed replacement variables to be replaced is limited to a specific set. Want to know more? man inotifywait

Filesystem events to Kafka

To install the Confluent platform I did:

wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -
add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"
apt-get update
apt-get -y install confluent-platform-2.11

Starting Kafka and related can be done with

confluent start

Using the Kafka Connect FileStreamSource connector (available without Enterprise license), it is relatively easy to monitor the file which is written by notifywait. Kafka Connect can run in distributed mode and in standalone mode. Since it needs to save information on what it has already processed, storage is required. In standalone mode, this can be a file. In distributed mode these are Kafka topics. I choose to go with the standalone mode since removing the file (to do the loading of events again) is quite easy. Drawback of using standalone mode is that the connector cannot be monitored by the Kafka Control Center. Benefit of running distributed is also that it could easily be run in containers since the connector itself is stateless; state is in Kafka.

I used the following filesource.properties:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/inotify.txt
topic=connect-test

And the following worker.properties

offset.storage.file.filename=/tmp/example.offsets
bootstrap.servers=localhost:9092
offset.flush.interval.ms=10000
rest.port=10082
rest.host.name=localhost
rest.advertised.port=10082
rest.advertised.host.name=localhost
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
plugin.path=/usr/share/java

There is also a JsonConverter available, but that wrapped my JSON into a string (escapes characters) and adds schema information (unless disabled). Both I did not want. I did not find a way to disable the behavior that my Json message became an escaped string. This caused issues on the Elasticsearch side. Kafka Connect should in my use-case be a 'dumb pipe' and the StringConverter does a nice job at that!

To start this connector, I can do:

/usr/bin/connect-standalone worker.properties filesource.properties

Elasticsearch and Kibana

In order to run Elasticsearch, you need to increase vm.max_map_count (see here)

sudo sysctl -w vm.max_map_count=262144

For running Elasticsearch and Kibana, I’ve used the following docker-compose.yml file

version: '3.3'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:6.6.1
    container_name: elasticsearch
    environment:
      - node.name=es01
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      nproc: 65535
      memlock:
        soft: -1
        hard: -1
    cap_add:
      - ALL
    privileged: true
    ports:
      - 9200:9200
      - 9300:9300
  kibana:
    image: docker.elastic.co/kibana/kibana-oss:6.6.1
    container_name: kibana
    environment:
      SERVER_NAME: localhost
      ELASTICSEARCH_URL: http://elasticsearch:9200/
    ports:
      - 5601:5601
    ulimits:
      nproc: 65535
      memlock:
        soft: -1
        hard: -1
    cap_add:
      - ALL

Starting it is easy with

docker-compose up

Next you can access Kibana at http://localhost:5601

Getting data from Kafka to Elasticsearch

In order to get the data from the connect-test topic to Elasticsearch, we can again use a standalone Kafka Connect connector. The ElasticsearchSinkConnector which is also available without Enterprise license. You can configure this as follows:

elasticsearch.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=connect-test
key.ignore=true
schema.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect

worker2.properties

offset.storage.file.filename=/tmp/example2.offsets
bootstrap.servers=localhost:9092
offset.flush.interval.ms=10000
rest.port=10083
rest.host.name=localhost
rest.advertised.port=10083
rest.advertised.host.name=localhost
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter.schemas.enable=false
plugin.path=/usr/share/java

You can start the connector with:

/usr/bin/connect-standalone worker2.properties elasticsearchsink.properties

Some differences with the FileSourceConnector worker.properties are:
  • a different port for the REST API is used
  • a different offset.storage.file is used
  • the key and value converters are Json instead of string
View the results in Kibana

Next you can view the results in Kibana


Also notice that in order for the data from Elasticsearch to be visible in Kibana, not only data needs to be available but also an index needs to be there. Since a JSON document is offered, a default index is created.

No comments:

Post a Comment