Log Aggregation: Configuring and ingesting your first custom log with ELK and Filebeat

Jason DeLano
11 min readNov 11, 2021
PoC2Ops, LLC

This story takes you through configuring Filebeat on your server on which you have an application, service, database, or container running that is generating some type of log. Filebeat is a lightweight shipper of logs that will forward logs to the ELK Server for aggregation and reporting. In this story we will take you through how to setup Filebeat on your server to collect a log and also setup an Ingest Node Pipeline to handle parsing the data from the log into your own custom fields.

This story assumes that you already have an ELK installed and configured with a password defined for the elastic user and the firewall is open between your server and the ELK Server on port 9200 and 5601.

Process overview

Identify log that you want to capture → Write your Grok script →Design and Test Ingest Node Pipeline → Install and Configure Filebeat on target server → Validate log is ingested

Tools used in this story

  1. Elasticsearch
  2. Kibana
  3. Grok
  4. Filebeat

Identify log that you want to capture

In order to start this process we need to find a log from one of your servers to test this out on. In this example we have an application called Unanet which is a financial web application and it captures all user activity generate from the server. This is all the pages that are visited and the user that visited the page. You can substitute the methodology and the process of finding the log with your log locations and formats. In this example our log is located in the following directory

/u01/Apache/apache-tomcat-8.5.9/logs/unanet*.log

and we also need an example of a couple of records from the unanet*.log to verify the fields that we would like to extract out of the data. When you use Filebeat to extract data they have some pre-built modules which will parse the data for you in put it into specific fields so check there first if there is an existing module. As of the date this article was written Filebeat version 7.15.1 shipped with the following modules

activemq          checkpoint     fortinet            infoblox   mysqlenterprise  panw          sonicwall       zookeeper       apache            cisco          gcp                 iptables   mysql              pensando    sophos            zoom       apache          coredns          googlecloud       juniper    nats               postgresql  squid             zscaler       auditd            crowdstrike    google_workspace  kafka        netflow          proofpoint    suricata       awsfargate        cyberarkpas    gsuite              kibana     netscout         rabbitmq      system       aws               cyberark       haproxy             logstash   nginx              radware     threatintel       azure             cylance        ibmmq               microsoft  o365               redis       tomcat       barracuda         elasticsearch  icinga            misp         okta             santa         tomcat       bluecoat          envoyproxy     iis                 mongodb    oracle           snort         traefik       cef               f5             imperva             mssql      osquery            snyk        zeek

If your application is one of these start with configuring the Filebeat module (instructions are on the Filebeat documentation) if yours in not listed above this article will apply to you.

The following code block has example log records from our Unanet log file

2021-11-09 17:35:20.340 [81] FINER - [359/470] 34.21.179.14 [19,389] CJIN start /unanet/action/people/dilution
2021-11-09 17:35:21.072 [81] FINER - [138/471] 34.21.179.14 [19,389] CJIN stop /unanet/action/people/dilution 732ms
2021-11-09 17:35:25.676 [45] FINER - [140/471] 34.21.179.14 [19,390] CJIN start /unanet/action/people/dilution/save
2021-11-09 17:35:26.441 [45] FINER - [155/471] 34.21.179.14 [19,390] CJIN stop /unanet/action/people/dilution/save 765ms
2021-11-09 17:35:28.636 [84] FINER - [155/471] 34.21.179.14 [19,391] CJIN start /unanet/action/people/dilution/recalculate
2021-11-09 17:35:29.427 [84] FINER - [170/471] 34.21.179.14 [19,391] CJIN stop /unanet/action/people/dilution/recalculate 790ms
2021-11-09 17:35:36.042 [45] FINER - [170/471] 34.21.179.14 [19,392] CJIN start /unanet/action/reports/project/summary/actuals/report
2021-11-09 17:35:36.271 [45] FINER - [182/471] 34.21.179.14 [19,392] CJIN stop /unanet/action/reports/project/summary/actuals/report 229ms

You notice in the log file there are a few examples of the type of logging the ip address (don’t worry I altered the ip address to a fake one), username in this case again I replaced “CJIN” with the original username so you could see the type of data in the log and then also you will see there are also run times for each of the processes in milliseconds. So now we now the location of the log and some example data from the log. These two pieces will be inputs to the next couple of sections.

Write your Grok script

If you look at the word Grok and say to yourself maybe that is a misspelling it is not. To learn about the syntax of Grok and its various uses the following site from elastic has a good breakdown of the various different syntax. In this example I’ll show you a few basics that apply to our records.

First lets navigate in the elastic web application that is part of the ELK stack and select the hamburger menu and scroll down to “Management” and select “Dev Tools”

This will take you to the Dev Tools page which gives you a few options, we are going to select “Grok Debugger”. This is my preferred tool to test my Grok pattern is functioning. You are probably wondering how we jump right to the full Grok pattern, don’t worry I’ll walk you through a little of the syntax.

Dev Tools → Grok Debugger

Let’s start with the first field with a very basic format of grok which includes the Syntax and then the ID of the filed. %{SYNTAX:ID}. For example we have the data in the log

2021-11-09

We will use the following SYNTAX to extract the date and store it with the ID of transactionDate

%{DATE:transactionDate}

Let’s jump to a more complicated parsing of a field that is inside of a bracket.

[81]

We want to extract only the 81 and not the brackets. To do this we will use the following Grok command, the “\” acts as the escape character so that the “[“ is not treated like the bracket for another function in Grok

\[%{WORD:logNumber}\]

One more example for handing an IP Address stored in the log. In this example we had an IP stored like

34.21.179.14

We would use the SYTNAX of IPORHOST and store this in the ID clientIP

%{IPORHOST:clientIP}

The following site has a good breakdown of the regular expression dialect that is part of Grok with handling or conditions “|” and other more complicated parsings. We can verify the output of the Grok expression by pressing the simulate button on the “Dev Tools” screen.

This will generate the in the “Structured Data” section of the Dev Tool output by the Grok expression and you can confirm each of the data fields you would like to extract.

The full expression we used for this example is below.

%{DATE:transactionDate} %{TIME:transactionTime} \[%{WORD:logNumber}\] %{WORD:logLevel} (-|%{WORD:fieldname}) \[%{WORD:messageNumber1}/%{WORD:messageNumber2}\] %{IPORHOST:clientIP} \[%{WORD:messageNumber3},%{WORD:messageNumber4}\] %{WORD:userName} %{WORD:processAction} %{GREEDYDATA:processPath}

The website below gives a good summary of other Grok commands and how they work and building your own syntax.

Design and Test Ingest Node Pipeline

Now that we have our Grok code tested for our log file we need to build an Ingest Node Pipeline. This is done by navigating to the “Stack Management” then selecting the “Ingest Node Pipelines”. This will bring you to the first screen listing all of the existing pipelines. Here you will press the “Create pipeline” button

After you create the pipeline it will prompt you for a series of fields to fill out. Populate a Name and Description.

Create pipeline

You will then need to click the “Add a processor” button. We are going to use the logic that we determined in the previous section for creating the processor. You will need to select the Processor type, for this example we are using “Grok”, also the field that we will be parsing. In this case it is a generic “message” field. Then we will populate the Patterns we are searching for. Please note that our test on the previous section we escaped the “[“ with one backslash “\”, in the configuration of the processor we escape with two backslashes “\\”.

%{DATA:date} %{TIME:time} \\[%{WORD:logNumber}\\] %{WORD:logLevel} (-|%{WORD:fieldname}) \\[%{WORD:messageNumber1}/%{WORD:messageNumber2}\\] %{IPORHOST:clientIP} \\[%{WORD:messageNumber3},%{WORD:messageNumber4}\\] %{WORD:userName} %{WORD:processAction} %{GREEDYDATA:processPath}
Configure processor

Before pressing the “Add” button you want to scroll down on the screen and select another option for “Ignore missing” this will help with your troubleshooting if one of the fields labeled is not there it will skip it.

Configure processor

Then we press the “Add” button and this takes us back to the “Create Pipeline” page where we can now press the “Create pipeline” button.

Create pipeline

After the button is pressed you will see a summary of your pipeline verifying that your pipeline is complete with a processor created for it.

Create pipeline summary

Install and Configure Filebeat on target server

Now that all of our pipeline and processor data is setup we can install and configure Filebeat on the server. Elastic webpage has a detailed installation process on installing file beat. The link below will have the latest up to date instructions for each OS that will match your server. For my example I’ll be running through this on an Red Hat Package Manager (RPM).

To download the package run the following command, at the time of this article version 7.15.1 was the latest on the page. Please note that I am assuming you are running this as a user other than root but your user has sudo privileges, if you are running as root you will not need the sudo.

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.15.1-x86_64.rpm

This will download the package to your server. You will install the package by running the following.

sudo rpm -vi filebeat-7.15.1-x86_64.rpm

We will then need to configure Filebeat by modifying the “/etc/filebeat/filebeat.yml”. Open this with your desired editor and we will need to modify a couple of sections.

In the Filebeat Setup section we will want to enable the Filebeat type log. This is done by altering the enable: false to enable: true which is in bold in the code snippet below. We will also have to add a line for the location of the log file(s) that we want to extract. If the log file has a timestamp associated with it you can use the * option as a wildcard to get all the files with a name that starts with “unanet” in our case, but this will be based on your log file name. Our example log file is in bold below under the “paths:” section. Make sure to watch the spaces/tabs in the yaml file. Do not use tabs this will throw off your file.

# ============================== Filebeat inputs ===============================filebeat.inputs:# Each — is an input. Most options can be set at the input level, so# you can use different inputs for various configurations.# Below are the input specific configurations.- type: log# Change to true to enable this input configuration.enabled: true# Paths that should be crawled and fetched. Glob based paths.paths:- /u01/Apache/apache-tomcat-8.5.9/logs/unanet*.log# — /var/log/*.log# — c:\programdata\elasticsearch\logs\*

Next we will need to update the kibana section of the filebeat.yml config file. Uncomment the “host:” entry in replace the x.x.x.x with your host name or ip. See the code block below in bold.

# =================================== Kibana ===================================# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.# This requires a Kibana endpoint configuration.setup.kibana:# Kibana Host# Scheme and port can be left out and will be set to the default (http and 5601)# In case you specify and additional path, the scheme is required: http://localhost:5601/path# IPv6 addresses should always be defined as: https://[2001:db8::1]:5601host: “x.x.x.x:5601# Kibana Space ID# ID of the Kibana Space into which the dashboards should be loaded. By default,# the Default Space will be used.#space.id:

Then we will need to update the section labeled elastic in the filebeat.yml file. You will need to enter your host name or ip in the “hosts;” field and then add the pipeline name that you created in the “pipeline:” filed. Also you will need to add your username and password in the “username:” and “password:” field that I have highlighted in bold below.

# — — — — — — — — — — — — — — Elasticsearch Output — — — — — — — — — — — — — — output.elasticsearch:# Array of hosts to connect to.hosts: [“x.x.x.x:9200”]pipeline: “unanet-pipeline-111121”# Protocol — either `http` (default) or `https`.#protocol: “https”# Authentication credentials — either API key or username/password.#api_key: “id:api_key”username: “elastic”password: “xxxxxxxxx”

We will then need to run the setup for filebeat. This is done by executing the following command. This may take close to a minute depending on the response time of your Elasticsearch/Kibana server

sudo filebeat setup

Then we will need to start the Filebeat service with the following command

sudo service filebeat start

This will start the service and start shipping logs to your ELK server.

Validate log is ingested

We will now go into our discovery section in Elasticsearch to verify the log was created. In my case I’m going to log into our application and navigate between a couple of pages so that the transaction log is updated. We can then just look in the most recent 15 minutes for a log. We can get to this page by going to the hamburger menu and selecting Analytics → Discover

elastic hamburger menu

You can then click on one of the entries and verify your fields are created. You will see in the screen print that I have the logLevel and clientIP captured that we setup in our Grok filtering.

elastic Discover

Happy Searching

Jason DeLano

--

--