Create a real-time alerting solution - Aiven console

This tutorial shows you an example of how to combine Aiven for Apache Flink with Aiven for Apache Kafka and Aiven for PostgreSQL services to create a solution that provides real-time alerting data for CPU loads.

Architecture overview

This example involves creating an Apache Kafka source topic that provides a stream of metrics data, a PostgreSQL database that contains data on the alerting thresholds, and an Apache Flink service that combines these two services and pushes the filtered data to a separate Apache Kafka topic or PostgreSQL table.

graph LR; id1(Kafka)-- metrics stream -->id3(Flink); id2(PostgreSQL)-- threshold data -->id3; id3-. filtered data .->id4(Kafka); id3-. filtered data .->id5(PostgreSQL);

The article includes the steps that you need when using the Aiven web console along with a few different samples of how you can set thresholds for alerts. For connecting to your PostgreSQL service, this example uses the Aiven CLI calling psql, but you can also use other tools if you prefer.

In addition, the instructions show you how to use a separate Python-based tool, Dockerized fake data producer for Aiven for Apache Kafka, to create sample records for your Apache Kafka topic that provides the streamed data.

Requirements

Set up Aiven services

  1. Follow the steps in this article to create these services:

    • An Aiven for Apache Kafka service with the Business-4 service plan, named demo-kafka (this streams the CPU load)

    • An Aiven for PostgreSQL service with the Business-4 service plan, named demo-postgresql (this defines the alerting threshold values)

    • An Aiven for Apache Flink service with the Business-4 service plan, named demo-flink (this analyzes the data stream to find CPUs where the average load exceeds the threshold values)

  2. Select the demo-kafka service and change the following settings on the Overview page:

    • Kafka REST API (Karapace) > Enable

      This setting allows you to integrate your Aiven for Apache Kafka service with Aiven for Apache Flink, and you can also use the API to view the data in your Apache Kafka topics.

    • Advanced configuration > Add configuration option > kafka.auto_create_topics_enable, switch the setting on and then click Save advanced configuration

      This setting allows you to create new Apache Kafka topics as you configure your Apache Flink data tables, so that you do not need to create the topics in advance.

  3. Select the demo-flink service and add the service integrations:

    1. Click Get started on the banner at the top of the Overview page.

    2. Select Aiven for Apache Kafka and then select the demo-kafka service.

    3. Click Integrate.

    4. Click the + icon under Data Flow.

    5. Select Aiven for PostgreSQL and then select the demo-postgresql service.

    6. Click Integrate.

Set up sample data

These steps show you how to create sample records to provide streamed data that is processed by the data pipelines presented in this tutorial. You can also use other existing data, although many of the examples in this tutorial are based on the use of this sample data.

Before you start, clone the Dockerized fake data producer for Aiven for Apache Kafka Git repository to your computer.

  1. Follow these instructions to create an authentication token for your Aiven account.

    This is required to allow the tool to connect to a service in your Aiven account.

  2. Go to the data producer tool directory and copy the conf/env.conf.sample file to conf/env.conf.

  3. Edit the conf/env.conf file and update the parameters with your Aiven account information and the authentication token that you created.

    See the instructions for the tool for details on the parameters.

    Note

    The NR_MESSAGES option defines the number of messages that the tool creates when you run it. Setting this parameter to 0 creates a continuous flow of messages that never stops.

  4. Run the following command to build the Docker image:

    docker build -t fake-data-producer-for-apache-kafka-docker .
    
  5. Run the following command to run the Docker image:

    docker run fake-data-producer-for-apache-kafka-docker
    

    This command pushes the following type of events to the cpu_load_stats_real topic in your Kafka service:

    {"hostname": "dopey", "cpu": "cpu4", "usage": 98.3335306302198, "occurred_at": 1633956789277}
    {"hostname": "sleepy", "cpu": "cpu2", "usage": 87.28240549074823, "occurred_at": 1633956783483}
    {"hostname": "sleepy", "cpu": "cpu1", "usage": 85.3384018012967, "occurred_at": 1633956788484}
    {"hostname": "sneezy", "cpu": "cpu1", "usage": 89.11518629380006, "occurred_at": 1633956781891}
    {"hostname": "sneezy", "cpu": "cpu2", "usage": 89.69951046388306, "occurred_at": 1633956788294}
    

Create a pipeline for basic filtering

This setup uses a fixed threshold to filter any instances of high CPU load to a separate Kafka topic.

graph LR; id1(Kafka source)-- metrics stream -->id2(Flink job); id2-- high CPU -->id3(Kafka sink);

For this setup, you need to configure a source table to read the metrics data from your Kafka topic, a sink table to send the processed messages to a separate Kafka topic, and a Flink job to process the data.

  1. In the Aiven web console, select the Jobs & Data tab in your Aiven for Apache Flink service.

  2. Go to the Data Tables subtab.

  3. Create the source Kafka table:

    1. Select your Kafka service.

    2. Select cpu_load_stats_real as the topic.

    3. Select Apache Kafka SQL Connector as the connector type.

    4. Select Key not used as the key.

    5. Select JSON as the value data format.

    6. Enter CPU_IN as the name

    7. Enter the following as the SQL schema:

      hostname STRING,
      cpu STRING,
      usage DOUBLE,
      occurred_at BIGINT,
      proctime AS PROCTIME(),
      time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),
      WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND
      
    8. Click Create Table.

  4. Create the sink Kafka table:

    1. Select your Kafka service.

    2. Enter cpu_load_stats_real_filter as the topic.

    3. Select Apache Kafka SQL Connector as the connector type.

    4. Select Key not used as the key.

    5. Select JSON as the value data format.

    6. Enter CPU_OUT_FILTER as the name

    7. Enter the following as the SQL schema:

      time_ltz TIMESTAMP(3),
      hostname STRING,
      cpu STRING,
      usage DOUBLE
      
    8. Click Create Table.

  5. Go to the Create SQL Job subtab.

  6. Enter simple_filter as the job name, select CPU_IN and CPU_OUT_FILTER as the tables, and enter the following as the SQL statement, then click Execute job:

    INSERT INTO CPU_OUT_FILTER 
    SELECT 
        time_ltz, 
        hostname, 
        cpu, 
        usage 
    FROM CPU_IN 
    WHERE usage > 80
    

    The new job is added to the list on the Jobs subtab and starts automatically once a task slot is available. The status changes to RUNNING once the job starts.

    When the job is running, you should start to see messages indicating hosts with high CPU loads in the cpu_load_stats_real_filter topic of your demo-kafka service.

Create a pipeline with windowing

This setup measures CPU load over a configured time using windows and event time.

graph LR; id1(Kafka source)-- timestamped metrics -->id3(Flink job); id3-- 30-second average CPU -->id4(Kafka sink);

This uses the same CPU_IN Kafka source table that you created in the previous section. In addition, you need a new sink table to send the processed messages to a separate Kafka topic and a new Flink job to process the data.

  1. Go to the Data Tables subtab.

  2. Create the sink Kafka table:

    1. Select your Kafka service.

    2. Enter cpu_load_stats_agg as the topic.

    3. Select Apache Kafka SQL Connector as the connector type.

    4. Select Key not used as the key.

    5. Select JSON as the value data format.

    6. Enter CPU_OUT_AGG as the name

    7. Enter the following as the SQL schema:

      window_start TIMESTAMP(3),
      window_end TIMESTAMP(3),
      hostname STRING,
      cpu STRING,
      usage_avg DOUBLE,
      usage_max DOUBLE
      
    8. Click Create Table.

  3. Go to the Create SQL Job subtab.

  4. Enter simple_agg as the job name, select CPU_OUT_AGG and CPU_IN as the tables, and enter the following as the SQL statement, then click Execute job:

    INSERT INTO CPU_OUT_AGG
    SELECT 
        window_start,
        window_end, 
        hostname, 
        cpu, 
        AVG(usage), 
        MAX(usage)
    FROM 
        TABLE( TUMBLE( TABLE CPU_IN, DESCRIPTOR(time_ltz), INTERVAL '30' SECONDS))
    GROUP BY 
        window_start,
        window_end, 
        hostname, 
        cpu
    

    The new job is added to the list on the Jobs subtab and starts automatically once a task slot is available. The status changes to RUNNING once the job starts.

    When the job is running, you should start to see messages indicating hosts with high CPU loads in the cpu_load_stats_agg topic of your demo-kafka service.

Create an aggregated data pipeline with Kafka and PostgreSQL

This setup highlights the instances where the average CPU load over a windowed interval exceeds the threshold and stores the results in PostgreSQL.

graph LR; id1(Kafka source)-- timestamped metrics -->id3(Flink job); id2(PosgreSQL source)-- host-specific thresholds -->id3; id3-- high 30-second average CPU -->id4(PostgreSQL sink);

This uses the same CPU_IN Kafka source table and SOURCE_THRESHOLDS PostgreSQL source table that you created earlier. In addition, you need a new sink table to store the processed data in PostgreSQL and a new Flink job to process the data.

Note

For creating and configuring the tables in your PostgreSQL service, these steps use the Aiven CLI to call psql. You can instead use other tools to complete these steps if you prefer.

  1. In the Aiven CLI, run the following command to connect to the demo-postgresql service:

    avn service cli demo-postgresql --project PROJECT_NAME
    
  2. Enter the following command to set up the PostgreSQL table for storing the results:

    CREATE TABLE CPU_LOAD_STATS_AGG_PG (
        time_ltz TIMESTAMP(3) PRIMARY KEY, 
        nr_cpus_over_threshold INT
    );
    
  3. In the Aiven web console, go to the Jobs & Data > Data Tables tab for your Flink service.

  4. Select your PostgreSQL service, enter CPU_OUT_AGG_PG as the name, select cpu_load_stats_agg_pg as the table, and enter the following as the SQL schema, then click Create Table:

    time_ltz TIMESTAMP(3),
    nr_cpus_over_threshold BIGINT,
    PRIMARY KEY (time_ltz) NOT ENFORCED
    
  5. Go to the Create SQL Job subtab.

  6. Enter simple_filter_pg_agg as the name, select the CPU_OUT_AGG_PG, CPU_IN, and SOURCE_THRESHOLDS tables, and enter the following as the SQL schema, then click Execute job:

    INSERT INTO CPU_OUT_AGG_PG 
    WITH JOINING_INFO AS(
        SELECT time_ltz, 
            CPU.hostname, 
            cpu, 
            usage, 
            allowed_top 
        FROM CPU_IN CPU INNER JOIN SOURCE_THRESHOLDS 
            FOR SYSTEM_TIME AS OF proctime AS ST 
            ON CPU.hostname = ST.hostname
    ),
    WINDOWING AS (
        SELECT 
            window_start,
            window_end, 
            hostname, 
            cpu, 
            AVG(usage) USAGE, 
            allowed_top
        FROM TABLE(TUMBLE(TABLE JOINING_INFO, DESCRIPTOR(time_ltz), INTERVAL '30' SECONDS))
        GROUP BY 
            window_start,
            window_end, 
            hostname, 
            cpu, 
            allowed_top
    )
    SELECT 
        window_start, 
        COUNT(*) 
    FROM WINDOWING
    WHERE USAGE > ALLOWED_TOP
    GROUP BY 
        window_start
    

    The new job is added to the list on the Jobs subtab and starts automatically once a task slot is available. The status changes to RUNNING once the job starts.

    When the job is running, you should start to see entries indicating hosts with high CPU loads in the cpu_load_stats_agg_pg table of your demo-postgresql database.