Create a real-time alerting solution - Aiven CLI

This Aiven CLI tutorial shows you an example of how to combine Aiven for Apache Flink with Aiven for Apache Kafka and Aiven for PostgreSQL services to create a solution that provides real-time alerting data for CPU loads.

Architecture overview

This example involves creating an Apache Kafka source topic that provides a stream of metrics data, a PostgreSQL database that contains data on the alerting thresholds, and an Apache Flink service that combines these two services and pushes the filtered data to a separate Apache Kafka topic or PostgreSQL table.

graph LR; id1(Kafka)-- metrics stream -->id3(Flink); id2(PostgreSQL)-- threshold data -->id3; id3-. filtered data .->id4(Kafka); id3-. filtered data .->id5(PostgreSQL);

The article includes the steps that you need when using the Aiven CLI along with a few different samples of how you can set thresholds for alerts. For connecting to your PostgreSQL service, this example uses the Aiven CLI calling psql, but you can also use other tools if you prefer.

In addition, the instructions show you how to use a separate Python-based tool, Dockerized fake data producer for Aiven for Apache Kafka, to create sample records for your Apache Kafka topic that provides the streamed data.

Requirements

Set up Aiven services

Note

The commands given in this example use business-4 service plans, but you can use any other service plan instead if you prefer.

  1. Using the Aiven CLI, run the following command to create an Aiven for Apache Kafka service named demo-kafka:

    avn service create demo-kafka               \
        --service-type kafka                    \
        --cloud CLOUD_AND_REGION                \
        --plan business-4                       \
        -c kafka.auto_create_topics_enable=true \
        -c kafka_rest=true                      \
        -c schema_registry=true                 \
        --project PROJECT_NAME
    
  2. Run the following command to create an Aiven for PostgreSQL service named demo-postgresql:

    avn service create demo-postgresql          \
        --service-type pg                       \
        --cloud CLOUD_AND_REGION                \
        --plan business-4                       \
        --project PROJECT_NAME
    
  3. Run the following command to create an Aiven for Apache Flink service named demo-flink:

    avn service create demo-flink               \
        --service-type flink                    \
        --cloud CLOUD_AND_REGION                \
        --plan business-4                       \
        --project PROJECT_NAME
    
  4. Add the demo-kafka and demo-postgresql integrations to the demo-flink service.

    1. Enter the following command to add the demo-kafka service integration:

      avn service integration-create           \
          --project PROJECT_NAME               \
          --service-type flink                 \
          -s demo-kafka                        \
          -d demo-flink
      
    2. Enter the following command to add the demo-postgresql service integration:

      avn service integration-create           \
          --project PROJECT_NAME               \
          --service-type flink                 \
          -s demo-postgresql                   \
          -d demo-flink
      
    3. Enter the following command to list the integrations:

      avn service integration-list demo-flink
      

      The output should show you that both demo-kafka and demo-postgresql integrations are enabled as well as the corresponding integration_id values that you need later when creating data tables.

Set up sample data

These steps show you how to create sample records to provide streamed data that is processed by the data pipelines presented in this tutorial. You can also use other existing data, although many of the examples in this tutorial are based on the use of this sample data.

Before you start, clone the Dockerized fake data producer for Aiven for Apache Kafka Git repository to your computer.

  1. Follow these instructions to create an authentication token for your Aiven account.

    This is required to allow the tool to connect to a service in your Aiven account.

  2. Go to the data producer tool directory and copy the conf/env.conf.sample file to conf/env.conf.

  3. Edit the conf/env.conf file and update the parameters with your Aiven account information and the authentication token that you created.

    See the instructions for the tool for details on the parameters.

    Note

    The NR_MESSAGES option defines the number of messages that the tool creates when you run it. Setting this parameter to 0 creates a continuous flow of messages that never stops.

  4. Run the following command to build the Docker image:

    docker build -t fake-data-producer-for-apache-kafka-docker .
    
  5. Run the following command to run the Docker image:

    docker run fake-data-producer-for-apache-kafka-docker
    

    This command pushes the following type of events to the cpu_load_stats_real topic in your Kafka service:

    {"hostname": "dopey", "cpu": "cpu4", "usage": 98.3335306302198, "occurred_at": 1633956789277}
    {"hostname": "sleepy", "cpu": "cpu2", "usage": 87.28240549074823, "occurred_at": 1633956783483}
    {"hostname": "sleepy", "cpu": "cpu1", "usage": 85.3384018012967, "occurred_at": 1633956788484}
    {"hostname": "sneezy", "cpu": "cpu1", "usage": 89.11518629380006, "occurred_at": 1633956781891}
    {"hostname": "sneezy", "cpu": "cpu2", "usage": 89.69951046388306, "occurred_at": 1633956788294}
    

Create a pipeline for basic filtering

This setup uses a fixed threshold to filter any instances of high CPU load to a separate Kafka topic.

graph LR; id1(Kafka source)-- metrics stream -->id2(Flink job); id2-- high CPU -->id3(Kafka sink);

For this setup, you need to configure a source table to read the metrics data from your Kafka topic, a sink table to send the processed messages to a separate Kafka topic, and a Flink job to process the data.

  1. Using the Aiven CLI, create a Kafka table named CPU_IN.

    Variable

    Value

    KAFKA_INTEGRATION_ID

    The ID for your demo-kafka service integration.

    TABLE_SQL

    hostname STRING,
    cpu STRING,
    usage DOUBLE,
    occurred_at BIGINT,
    proctime AS PROCTIME(),
    time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),
    WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND
    

    Run the following command, replacing the variables listed in the above table with the corresponding values:

    avn service flink table create demo-flink KAFKA_INTEGRATION_ID \
        --table-name CPU_IN                                        \
        --kafka-topic cpu_load_stats_real                          \
        --schema-sql "TABLE_SQL"
    
  2. Create an output table named CPU_OUT_FILTER.

    Variable

    Value

    TABLE_SQL

    time_ltz TIMESTAMP(3),
    hostname STRING,
    cpu STRING,
    usage DOUBLE
    

    Run the following command, replacing the variables listed in the above table with the corresponding values:

    avn service flink table create demo-flink KAFKA_INTEGRATION_ID \
        --table-name CPU_OUT_FILTER                                \
        --kafka-topic cpu_load_stats_real_filter                   \
        --schema-sql "TABLE_SQL"
    
  3. Run the following command to list the tables for the demo-flink service:

    avn service flink table list demo-flink
    

    The output for this command shows you the table IDs, which you need in the command that you use to create Flink jobs:

    INTEGRATION_ID                        TABLE_ID                              TABLE_NAME
    ====================================  ====================================  ==========
    917bbec0-0f34-4a31-b910-c585feb95d09  305c44d9-22d5-4be8-987f-57c7642e8a89  CPU_IN
    917bbec0-0f34-4a31-b910-c585feb95d09  3d33a7c5-3716-4b21-9739-f79228f9f28f  CPU_OUT_FILTER
    
  4. Create a data pipeline job named simple_filter.

    Variable

    Value

    CPU_IN_ID

    The table ID for your CPU_IN table.

    CPU_OUT_FILTER_ID

    The table ID for your CPU_OUT_FILTER table.

    JOB_SQL

    INSERT INTO CPU_OUT_FILTER 
    SELECT 
        time_ltz, 
        hostname, 
        cpu, 
        usage 
    FROM CPU_IN 
    WHERE usage > 80
    

    Run the following command, replacing the variables listed in the above table with the corresponding values:

    avn service flink job create demo-flink simple_filter     \
        --table-ids CPU_IN_ID CPU_OUT_FILTER_ID               \
        --statement "JOB_SQL"
    

    The new job is added and starts automatically once a task slot is available.

    When the job is running, you should start to see messages indicating hosts with high CPU loads in the cpu_load_stats_real_filter topic of your demo-kafka service.

Create a pipeline with windowing

This setup measures CPU load over a configured time using windows and event time.

graph LR; id1(Kafka source)-- timestamped metrics -->id3(Flink job); id3-- 30-second average CPU -->id4(Kafka sink);

This uses the same CPU_IN Kafka source table that you created in the previous section. In addition, you need a new sink table to send the processed messages to a separate Kafka topic and a new Flink job to process the data.

  1. Using the Aiven CLI, create a Kafka table named CPU_OUT_AGG.

    Variable

    Value

    KAFKA_INTEGRATION_ID

    The ID for your demo-kafka service integration.

    TABLE_SQL

    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    hostname STRING,
    cpu STRING,
    usage_avg DOUBLE,
    usage_max DOUBLE
    

    Run the following command, replacing the variables listed in the above table with the corresponding values:

    avn service flink table create demo-flink KAFKA_INTEGRATION_ID  \
        --table-name CPU_OUT_AGG                                    \
        --kafka-topic cpu_load_stats_agg                            \
        --schema-sql "TABLE_SQL"
    
  2. Run the following command to list the tables for the demo-flink service and get the IDs for the CPU_IN and CPU_OUT_AGG tables:

    avn service flink table list demo-flink
    
  3. Create a data pipeline job named simple_agg.

    Variable

    Value

    CPU_IN_ID

    The table ID for your CPU_IN table.

    CPU_OUT_AGG_ID

    The table ID for your CPU_OUT_AGG table.

    JOB_SQL

    INSERT INTO CPU_OUT_AGG
    SELECT 
        window_start,
        window_end, 
        hostname, 
        cpu, 
        AVG(usage), 
        MAX(usage)
    FROM 
        TABLE( TUMBLE( TABLE CPU_IN, DESCRIPTOR(time_ltz), INTERVAL '30' SECONDS))
    GROUP BY 
        window_start,
        window_end, 
        hostname, 
        cpu
    

    Run the following command, replacing the variables listed in the above table with the corresponding values:

    avn service flink job create demo-flink simple_agg        \
        --table-ids CPU_IN_ID CPU_OUT_AGG_ID                  \
        --statement "JOB_SQL"
    

    The new job is added and starts automatically once a task slot is available.

    When the job is running, you should start to see messages indicating hosts with high CPU loads in the cpu_load_stats_agg topic of your demo-kafka service.

Create an aggregated data pipeline with Kafka and PostgreSQL

This setup highlights the instances where the average CPU load over a windowed interval exceeds the threshold and stores the results in PostgreSQL.

graph LR; id1(Kafka source)-- timestamped metrics -->id3(Flink job); id2(PosgreSQL source)-- host-specific thresholds -->id3; id3-- high 30-second average CPU -->id4(PostgreSQL sink);

This uses the same CPU_IN Kafka source table and SOURCE_THRESHOLDS PostgreSQL source table that you created earlier. In addition, you need a new sink PostgreSQL table to store the processed data and a new Flink job to process the data.

  1. In the Aiven CLI, run the following command to connect to the demo-postgresql service:

    avn service cli demo-postgresql --project PROJECT_NAME
    
  2. Enter the following command to set up the PostgreSQL table for storing the results:

    CREATE TABLE CPU_LOAD_STATS_AGG_PG (
        time_ltz TIMESTAMP(3) PRIMARY KEY, 
        nr_cpus_over_threshold INT
    );
    
  3. Create a PostgreSQL table named CPU_OUT_AGG_PG.

    Variable

    Value

    POSTGRESQL_INTEGRATION_ID

    The ID for your demo-postgresql service integration.

    TABLE_SQL

    time_ltz TIMESTAMP(3),
    nr_cpus_over_threshold BIGINT,
    PRIMARY KEY (time_ltz) NOT ENFORCED
    

    Run the following command, replacing the variables listed in the above table with the corresponding values:

    avn service flink table create demo-flink POSTGRESQL_INTEGRATION_ID  \
        --table-name CPU_OUT_AGG_PG                                      \
        --jdbc-table cpu_load_stats_agg_pg                               \
        --schema-sql "TABLE_SQL"
    
  4. Run the following command to list the tables for the demo-flink service and get the IDs for the CPU_IN, CPU_OUT_AGG_PG, and SOURCE_THRESHOLDS tables:

    avn service flink table list demo-flink
    
  5. Create a data pipeline job named simple_filter_pg_agg.

    Variable

    Value

    CPU_IN_ID

    The table ID for your CPU_IN table.

    CPU_OUT_AGG_PG_ID

    The table ID for your CPU_OUT_AGG_PG table.

    SOURCE_THRESHOLDS_ID

    The table ID for your SOURCE_THRESHOLDS table.

    JOB_SQL

    INSERT INTO CPU_OUT_AGG_PG 
    WITH JOINING_INFO AS(
        SELECT time_ltz, 
            CPU.hostname, 
            cpu, 
            usage, 
            allowed_top 
        FROM CPU_IN CPU INNER JOIN SOURCE_THRESHOLDS 
            FOR SYSTEM_TIME AS OF proctime AS ST 
            ON CPU.hostname = ST.hostname
    ),
    WINDOWING AS (
        SELECT 
            window_start,
            window_end, 
            hostname, 
            cpu, 
            AVG(usage) USAGE, 
            allowed_top
        FROM TABLE(TUMBLE(TABLE JOINING_INFO, DESCRIPTOR(time_ltz), INTERVAL '30' SECONDS))
        GROUP BY 
            window_start,
            window_end, 
            hostname, 
            cpu, 
            allowed_top
    )
    SELECT 
        window_start, 
        COUNT(*) 
    FROM WINDOWING
    WHERE USAGE > ALLOWED_TOP
    GROUP BY 
        window_start
    

    Run the following command, replacing the variables listed in the above table with the corresponding values:

    avn service flink job create demo-flink simple_filter_pg_agg     \
        --table-ids CPU_IN_ID CPU_OUT_AGG_PG_ID SOURCE_THRESHOLDS_ID \
        --statement "JOB_SQL"
    

    The new job is added and starts automatically once a task slot is available.

    When the job is running, you should start to see entries indicating hosts with high CPU loads in the cpu_load_stats_agg_pg table of your demo-postgresql database.