When monitoring events, you can expect a lot of data to be generated quickly. The events might be interesting to process for different systems and at a different pace. Also it would be nice if you could replay events from the start or a specific moment. Enter Kafka. In order to put the filesystem events in Kafka (from an output file), the Kafka Connect FileSourceConnector is used. In order to get the data from Kafka to Elasticsearch, the Kafka Connect ElasticsearchSinkConnector is used. Both connectors can be used without Enterprise license.
Filesystem events
In order to obtain filesystem events, I've used inotify-tools. On Ubuntu like systems they can be installed with
sudo apt-get install inotify-tools
inotify-tools contains two CLI utilities. inotifywait which can be used to output events to a file in a specific format. inotify-wait can generate statistics. Since we'll do our analyses in Kibana, we want individual events from inotifywait. Using the following command, the /home/developer directory is watched. Events are put in a JSON format in a file called /tmp/inotify.txt
inotifywait -r -m /home/developer -o /tmp/inotify.txt --timefmt "%FT%T%z" --format '{"time": "%T","watched": "%w","file":"%f","events":"%e"}'
Events can look like;
{"time": "2019-02-26T13:52:15+0000","watched": "/home/developer/","file":"t","events":"OPEN"}
{"time": "2019-02-26T13:52:15+0000","watched": "/home/developer/","file":"t","events":"ATTRIB"}
{"time": "2019-02-26T13:52:15+0000","watched": "/home/developer/","file":"t","events":"CLOSE_WRITE,CLOSE"}
{"time": "2019-02-26T13:52:17+0000","watched": "/home/developer/","file":"t","events":"DELETE"}
Filesystem events to Kafka
To install the Confluent platform I did:
wget -qO - https://packages.confluent.io/deb/5.1/archive.key | sudo apt-key add -
add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"
apt-get update
apt-get -y install confluent-platform-2.11
Starting Kafka and related can be done with
confluent start
Using the Kafka Connect FileStreamSource connector (available without Enterprise license), it is relatively easy to monitor the file which is written by notifywait. Kafka Connect can run in distributed mode and in standalone mode. Since it needs to save information on what it has already processed, storage is required. In standalone mode, this can be a file. In distributed mode these are Kafka topics. I choose to go with the standalone mode since removing the file (to do the loading of events again) is quite easy. Drawback of using standalone mode is that the connector cannot be monitored by the Kafka Control Center. Benefit of running distributed is also that it could easily be run in containers since the connector itself is stateless; state is in Kafka.
I used the following filesource.properties:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/inotify.txt
topic=connect-test
offset.storage.file.filename=/tmp/example.offsets
bootstrap.servers=localhost:9092
offset.flush.interval.ms=10000
rest.port=10082
rest.host.name=localhost
rest.advertised.port=10082
rest.advertised.host.name=localhost
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
plugin.path=/usr/share/java
To start this connector, I can do:
/usr/bin/connect-standalone worker.properties filesource.properties
Elasticsearch and Kibana
In order to run Elasticsearch, you need to increase vm.max_map_count (see here)
sudo sysctl -w vm.max_map_count=262144
For running Elasticsearch and Kibana, I’ve used the following docker-compose.yml file
version: '3.3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:6.6.1
container_name: elasticsearch
environment:
- node.name=es01
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
nproc: 65535
memlock:
soft: -1
hard: -1
cap_add:
- ALL
privileged: true
ports:
- 9200:9200
- 9300:9300
kibana:
image: docker.elastic.co/kibana/kibana-oss:6.6.1
container_name: kibana
environment:
SERVER_NAME: localhost
ELASTICSEARCH_URL: http://elasticsearch:9200/
ports:
- 5601:5601
ulimits:
nproc: 65535
memlock:
soft: -1
hard: -1
cap_add:
- ALL
docker-compose up
Next you can access Kibana at http://localhost:5601
Getting data from Kafka to Elasticsearch
In order to get the data from the connect-test topic to Elasticsearch, we can again use a standalone Kafka Connect connector. The ElasticsearchSinkConnector which is also available without Enterprise license. You can configure this as follows:
elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=connect-test
key.ignore=true
schema.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
offset.storage.file.filename=/tmp/example2.offsets
bootstrap.servers=localhost:9092
offset.flush.interval.ms=10000
rest.port=10083
rest.host.name=localhost
rest.advertised.port=10083
rest.advertised.host.name=localhost
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter.schemas.enable=false
plugin.path=/usr/share/java
You can start the connector with:
/usr/bin/connect-standalone worker2.properties elasticsearchsink.properties
Some differences with the FileSourceConnector worker.properties are:
- a different port for the REST API is used
- a different offset.storage.file is used
- the key and value converters are Json instead of string
View the results in Kibana
Next you can view the results in Kibana
Next you can view the results in Kibana
Also notice that in order for the data from Elasticsearch to be visible in Kibana, not only data needs to be available but also an index needs to be there. Since a JSON document is offered, a default index is created.
No comments:
Post a Comment