Ingest Azure Redis Cache messages into Elasticsearch, Logstash and Kibana cluster deployed in Azure Kubernetes Service (AKS)

This is third article on the series on deploying Elasticsearch, Logstash and Kibana (ELK) in Azure Kubernetes Service (AKS) cluster. The first article covered deploying non-SSL ELK to AKS and consuming messages from Azure Event Hub. The second article described how to secure communications in ELK and use Azure AD SAML based SSO for Kibana and Elasticsearch. In this article I am going to share steps needed to ingest Azure Redis Cache messages into Elasticsearch using Logstash’s Redis plugin.

Azure Redis Cache is based on the popular open-source Redis cache. It is typically used as a cache to improve the performance and scalability of systems that rely heavily on backend data-stores. Logstash’s Redis plugin will read events from Redis instance. I will create a Logstash event processing pipeline where I will define Redis as input and Elasticsearch as output. The component diagram has been updated to add Azure Redis Cache integration.

The dev tools used to develop these components are Visual Studio for Mac/VS Code, AKS Dashboard, kubectl, bash and openssl. The code snippets in this article are mostly yaml snippets and are included for reference only as formatting may get distorted thus please refer to GitHub repository for formatted resources.

Create Azure Redis Cache

Create Azure Redis Cache using Portal or Azure CLI command az redis create

Navigate to this resource and keep note of the settings listed below as you are going to need them in subsequent sections

  1. Host Name: You will need to specify Host name is Logstash pipeline.
  2. Secondary Access Key: You will need to specify password in Logstash pipeline.
  3. StackExchange.Redis connection string: You will need to specify connection string in AzureRedisCacheSample so as to publish messages to Redis.
  4. Port: Default non-SSL port is 6379. By default, non-SSL port is disabled thus the port to which Logstash pipeline will connect is 6380.

Logstash Redis input plugin

As mentioned previously, I will create a Logstash pipeline which will use Logstash Redis input plugin. This input will read events from a Redis instance; it supports both Redis channels and lists. You can read more about Redis input plugin. You can get installed plugins list by following steps listed below

  • Run command kubectl exec -ti {Logstash_Pod_Name}  bash to connect to Logstash POD.
  • Run command bin/logstash-plugin list to see installed plugins

Deploy Logstash to Azure Kubernetes Service

Logstash is data processing pipeline that ingests data from a multitude of sources simultaneously, transforms it, and then sends it to Elasticsearch. Logstash will use Azure Event Hub plugin and Redis input plugin to ingest data into Elasticsearch. In this article, I am going to share main pointers about changes needed to Logstash resources i.e. ConfigMap and Deployment in order to subscribe to Azure Redis Cache only. You can refer to first two parts of this series for more details.

The steps needed to deploy Logstash to AKS are listed below

Create a Kubernetes ConfigMap

Create a new pipeline in Logstash for Azure Redis cache integration and main pointers about changes needed to this resource as compared to earlier parts of this series are

  • A new pipeline has been defined i.e. azureredis is the pipeline id and azureredis.cfg is the path of the configuration file for Redis cache integration. azureredis.cfg file will be mounted from ConfigMap. The Logstash event processing pipeline has three stages: inputs → filters → outputs. This file defines the logstash pipeline for Redis Cache.
    • Specify host => "{YOUR_REDIS_HOST_NAME}" based on your Redis instance host name
    • The sample Redis client is publishing messages to channel, thus data_type is channel data_type => "channel"
    • The channel name I have specified is ‘messages’. Update channel name key => "messages" based on your specified channel name.
    • Specify password as per your Redis Cache secondary key value password => "{YOUR_REDIS_SECONDARY_KEY}"
    • Since HTTPS is enabled, I have specified port as 6380 port => 6380. The default setting is non-SSL port i.e. 6379.
    • SSL is enabled ssl => true
    • Output is Elasticsearch and Index name is defined as index => "azureredis-%{+YYYY.MM.dd}

The yaml snippet to create this resource is displayed below
apiVersion: v1
kind: ConfigMap
metadata:
  name: sample-logstash-configmap
  namespace: default
data:
  logstash.yml: |
    xpack.monitoring.elasticsearch.url: https://sample-elasticsearch:9200
    dead_letter_queue.enable: true
    xpack.monitoring.enabled: true
    xpack.monitoring.elasticsearch.username: logstash_system 
    xpack.monitoring.elasticsearch.password: Password1$
    xpack.monitoring.elasticsearch.ssl.ca: "/usr/share/logstash/config/elastic-stack-ca.pem"
  pipelines.yml: |
    - pipeline.id: azureeventhubs
      path.config: "/usr/share/logstash/azureeventhubs.cfg"
    - pipeline.id: azureredis
      path.config: "/usr/share/logstash/azureredis.cfg"
  azureeventhubs.cfg: |
    input {
      azure_event_hubs {
        event_hub_connections => ["{AZURE_EVENT_HUB_CONNECTION_STRING};EntityPath=logstash"]
        threads => 2
        decorate_events => true
        consumer_group => "$Default"
        storage_connection => "{STORAGE_ACCOUNT_CONNECTION_STRING}"
        storage_container => "logstash"
        }
    }
    filter {
    }
    output {
      elasticsearch {
        hosts => ["sample-elasticsearch:9200" ]
        user => "elastic"
        password => "Password1$"
        index => "azureeventhub-%{+YYYY.MM.dd}"
        ssl => true
        cacert => "/usr/share/logstash/config/elastic-stack-ca.pem"
      }
    }
  azureredis.cfg: |
    input {
      redis {
        host => "{YOUR_REDIS_HOST_NAME}"
        key => "messages"
        data_type => "channel"
        password => "{YOUR_REDIS_SECONDARY_KEY}"
        port => 6380
        ssl => true
     }
    }
    filter {
    }
    output {
      elasticsearch {
        hosts => ["sample-elasticsearch:9200" ]
        user => "elastic"
        password => "Password1$"
        index => "azureredis-%{+YYYY.MM.dd}"
        ssl => true
        cacert => "/usr/share/logstash/config/elastic-stack-ca.pem"
      }
    }
  logstash.conf: |

Create a Kubernetes Deployment

The only change needed to Logstash_Deployment.yaml is to mount the new pipeline configuration for Redis i.e. azureredis.cfg. You can refer to previous articles of this series for further details about this resource.

volumeMounts:
- name: sample-logstash-configmap
mountPath: /usr/share/logstash/azureredis.cfg
subPath: azureredis.cfg

Deploy Logstash resources to AKS

Deploy Logstash resources to AKS. Once deployed run AzureRedisCacheSample to publish messages to Redis. This is a sample client (.NET Core 2.1) to send messages to Redis channel. This sample also subscribes to Redis channel locally and prints the messages. You need to update StackExchange.Redis connection string and channel name.

After sending a few messages, navigate to Kibana endpoint and you will see events received and events emitted in monitoring section

You can also create an Index filter in Kibana and messages sent to Redis will be displayed

 

This completes the article on ingesting Azure Redis Cache messages into Elasticsearch using Logstash’s Redis plugin. The sample code for this article can be downloaded from GitHub.

 

 

Leave a Reply

Your email address will not be published. Required fields are marked *