Contents Menu Expand Light mode Dark mode Auto light/dark mode
Try Aiven for free for 30 days! Get started ->
Fork me on GitHub!
Aiven
  • Platform
    • Concepts
      • Authentication tokens
      • Billing
        • Tax information regarding Aiven services
      • Beta services
      • Cloud security
      • About logging, metrics and alerting
      • Projects, accounts, and managing access permissions
      • Service forking
      • Backups at Aiven
      • Static IP addresses
      • TLS/SSL certificates
      • Bring your own account (BYOA)
      • Dynamic Disk Sizing
      • Enhanced compliance environments (ECE)
      • Disaster Recovery testing scenarios
    • HowTo
      • User/Access management
        • Change your email address
        • Create an authentication token
        • Create a new Aiven service user
        • Enable Aiven password
        • Manage user two-factor authentication
      • Service management
        • Create a new service
        • Fork your service
        • Pause or terminate your service
        • Scale your service
        • Migrate services
        • Add storage space
        • Tag your Aiven resources
      • Network management
        • Download a CA certificate
        • Restrict network access to your service
        • Enable public access in a VPC
        • Manage static IP addresses
        • Handle resolution errors of private IPs
      • Monitoring management
        • Monitoring services
        • Use Prometheus with Aiven
        • Increase metrics limit setting for Datadog
    • Reference
      • List of available cloud regions
  • Integrations
    • Datadog
      • Send metrics to Datadog
      • Send logs to Datadog
  • Aiven tools
    • Aiven CLI
      • avn account
        • avn account authentication-method
        • avn account team
      • avn card
      • avn cloud
      • avn credits
      • avn events
      • avn mirrormaker
      • avn project
      • avn service
        • avn service acl
        • avn service connection-pool
        • avn service connector
        • avn service database
        • avn service es-acl
        • avn service flink beta
        • avn service index
        • avn service integration
        • avn service m3
        • avn service privatelink
        • avn service tags
        • avn service topic
        • avn service user
      • avn ticket
      • avn user
        • avn user access-token
      • avn vpc
    • Aiven API
      • API examples
    • Aiven Terraform provider
      • Get started
      • HowTo
        • Upgrade the Aiven Terraform Provider from v1 to v2
      • Concepts
        • Data sources in Terraform
      • Reference
        • Cookbook
          • Apache Kafka and OpenSearch
          • Multicloud PostgreSQL
          • Apache Kafka and Apache Flink
          • Apache Kafka and Apache MirrorMaker
          • Apache Kafka with Karapace
          • Visualize PostgreSQL metrics with Grafana
          • PostgreSQL with custom configs
          • Apache Kafka MongoDB Source Connector
          • Apache Kafka with custom configurations
    • Aiven Operator for Kubernetes
  • Apache Kafka
    • Get started
    • Sample data generator
    • Concepts
      • Access control lists permission mapping
      • Compacted topics
      • Partition segments
      • Scaling options in Apache Kafka®
      • Authentication types
      • NOT_LEADER_FOR_PARTITION errors
    • HowTo
      • Code samples
        • Connect with Python
        • Connect with Java
        • Connect with Go
      • Tools
        • Configure consumer properties for Apache Kafka® toolbox
        • Use kcat with Aiven for Apache Kafka®
        • Connect to Apache Kafka® with Conduktor
        • Use Kafdrop Web UI with Aiven for Apache Kafka®
        • Use Provectus® UI for Apache Kafka® with Aiven for Apache Kafka®
        • Use Kpow with Aiven for Apache Kafka®
      • Security
        • Configure Java SSL to access Apache Kafka®
        • Manage users and access control lists
        • Monitor and alert logs for denied ACL
        • Use SASL Authentication with Apache Kafka®
        • Renew and Acknowledge service user SSL certificates
      • Administration tasks
        • Get the best from Apache Kafka®
        • Use Karapace with Aiven for Apache Kafka®
        • Manage configurations with Apache Kafka® CLI tools
        • Manage Apache Kafka® parameters
        • View and reset consumer group offsets
        • Configure log cleaner for topic compaction
        • Prevent full disks
        • Set Apache ZooKeeper™ configuration
      • Integrations
        • Integration of logs into Apache Kafka® topic
        • Use Apache Kafka® Streams with Aiven for Apache Kafka®
        • Configure Apache Kafka® metrics sent to Datadog
      • Topic/schema management
        • Creating an Apache Kafka® topic
        • Create Apache Kafka® topics automatically
        • Get partition details of an Apache Kafka® topic
        • Use schema registry in Java with Aiven for Apache Kafka®
        • Change data retention period
    • Reference
      • Advanced parameters
      • Metrics available via Prometheus
    • Apache Kafka Connect
      • Getting started
      • Concepts
        • List of available Apache Kafka® Connect connectors
        • JDBC source connector modes
        • Causes of “connector list not currently available”
      • HowTo
        • Administration tasks
          • Get the best from Apache Kafka® Connect
          • Bring your own Apache Kafka® Connect cluster
          • Enable Apache Kafka® Connect on Aiven for Apache Kafka®
          • Enable Apache Kafka® Connect connectors auto restart on failures
        • Source connectors
          • Create a JDBC source connector for PostgreSQL®
          • Create a JDBC source connector for MySQL
          • Create a JDBC source connector for SQL Server
          • Create a MongoDB source connector
          • Create a Debezium source connector for PostgreSQL®
          • Create a Debezium source connector for MySQL
          • Create a Debezium source connector for SQL Server
          • Create a Debezium source connector for MongoDB
        • Sink connectors
          • Create a JDBC sink connector
          • Configure AWS for an S3 sink connector
          • Create an S3 sink connector by Aiven
          • Create an S3 sink connector by Confluent
          • Configure GCP for a Google Cloud Storage sink connector
          • Create a Google Cloud Storage sink connector
          • Configure GCP for a Google BigQuery sink connector
          • Create a Google BigQuery sink connector
          • Create an OpenSearch® sink connector
          • Create an Elasticsearch sink connector
          • Configure Snowflake for a sink connector
          • Create a Snowflake sink connector
          • Create an HTTP sink connector
          • Create a MongoDB sink connector by MongoDB
          • Create a MongoDB sink connector by Lenses.io
          • Create a Redis™* stream reactor sink connector by Lenses.io
      • Reference
        • Advanced parameters
        • AWS S3 sink connector naming and data format
          • S3 sink connector by Aiven naming and data formats
          • S3 sink connector by Confluent naming and data formats
        • Google Cloud Storage sink connector naming and data formats
        • Metrics available via Prometheus
    • Apache Kafka MirrorMaker2
      • Getting started
      • Concepts
        • Topics included in a replication flow
      • HowTo
        • Integrate an external Apache Kafka® cluster in Aiven
        • Set up an Apache Kafka® MirrorMaker 2 replication flow
        • Setup Apache Kafka® MirrorMaker 2 monitoring
        • Remove topic prefix when replicating with Apache Kafka® MirrorMaker 2
      • Reference
        • List of advanced parameters
        • Terminology for Aiven for Apache Kafka® MirrorMaker 2
  • PostgreSQL
    • Get started
    • Sample dataset: Pagila
    • Concepts
      • About aiven-db-migrate
      • Perform DBA-type tasks in Aiven for PostgreSQL®
      • High availability
      • PostgreSQL® backups
      • Connection pooling
      • About PostgreSQL® disk usage
      • About TimescaleDB
      • Upgrade and failover procedures
    • HowTo
      • Code samples
        • Connect with Go
        • Connect with Java
        • Connect with NodeJS
        • Connect with PHP
        • Connect with Python
      • DBA tasks
        • Create additional PostgreSQL® databases
        • Perform a PostgreSQL® major version upgrade
        • Install or update an extension
        • Create manual PostgreSQL® backups
        • Restore PostgreSQL® from a backup
        • Migrate to a different cloud provider or region
        • Claim public schema ownership
        • Manage connection pooling
        • Access PgBouncer statistics
        • Use the PostgreSQL® dblink extension
        • Enable JIT in PostgreSQL®
        • Identify and repair issues with PostgreSQL® indexes with REINDEX
        • Identify PostgreSQL® slow queries
        • Optimize PostgreSQL® slow queries
        • Check and avoid transaction ID wraparound
        • Prevent PostgreSQL® full disk issues
      • Replication and migration
        • Create and use read-only replicas
        • Set up logical replication to Aiven for PostgreSQL®
        • Migrate to Aiven for PostgreSQL® with aiven-db-migrate
          • Enable logical replication on Amazon Aurora PostgreSQL®
          • Enable logical replication on Amazon RDS PostgreSQL®
          • Enable logical replication on Google Cloud SQL
        • Migrate to Aiven for PostgreSQL® with pg_dump and pg_restore
        • Migrate between PostgreSQL® instances using aiven-db-migrate in Python
      • Integrations
        • Connect with psql
        • Connect with pgAdmin
        • Visualize PostgreSQL® data with Grafana®
        • Monitor PostgreSQL® metrics with Grafana®
        • Monitor PostgreSQL® metrics with pgwatch2
        • Connect two PostgreSQL® services via datasource integration
        • Report and analyze with Google Data Studio
    • Reference
      • High CPU load
      • Advanced parameters
      • Extensions
      • Metrics exposed to Grafana
      • Connection limits per plan
      • Resource capability per plan
      • Terminology
  • Apache Flink
    • Get started
    • Concepts
      • Apache Flink® for data analysts
      • Apache Flink® for operators
      • Standard and upsert Apache Kafka® connectors
      • Requirements for Apache Kafka® connectors
      • Built-in SQL editor
      • Event and processing times
      • Windows
      • Watermarks
      • Checkpoints
    • HowTo
      • Create Apache Flink® integrations
      • Create an Apache Kafka®-based Apache Flink® table
      • Create a PostgreSQL®-based Apache Flink® table
      • Create an OpenSearch®-based Apache Flink® table
      • Create an Apache Flink® job
      • Define OpenSearch® timestamp data in SQL pipeline
      • Create a real-time alerting solution - Aiven console
    • Reference
      • Advanced parameters
  • ClickHouse
    • Get started
    • Sample dataset
    • Concepts
      • Online analytical processing
      • About databases and tables
      • Columnar databases
      • Indexing and data processing
    • HowTo
      • Use the ClickHouse® client
      • Use the query editor
      • Add service user accounts
      • Grant privileges
    • Reference
      • Supported table engines
      • Advanced parameters
  • OpenSearch
    • Get started
    • Sample dataset: recipes
    • Concepts
      • Access control
      • Backups
      • Indices
      • Aggregations
      • OpenSearch® vs Elasticsearch
      • Optimal number of shards
      • When to create a new index
    • HowTo
      • Use Aiven for OpenSearch® with cURL
      • Migrate Elasticsearch data to Aiven for OpenSearch®
      • Manage OpenSearch® log integration
      • Upgrade to OpenSearch®
      • Upgrade to OpenSearch® with Terraform
      • Upgrade Elasticsearch clients to OpenSearch®
      • Connect with NodeJS
      • Connect with Python
      • Search with Python
      • Search with NodeJS
      • Aggregation with NodeJS
      • Control access to content in your service
      • Restore an OpenSearch® backup
      • Dump OpenSearch® index using elasticsearch-dump
      • Set index retention patterns
      • Create alerts with OpenSearch® API
      • Handle low disk space
    • OpenSearch Dashboards
      • Getting started
      • HowTo
        • Getting started with Dev tools
        • Create alerts with OpenSearch® Dashboards
    • Reference
      • Plugins
      • Advanced parameters
      • Automatic adjustment of replication factors
      • REST API endpoint access
      • Low disk space watermarks
  • M3DB
    • Get started
    • Concepts
      • Aiven for M3 components
      • About M3DB namespaces and aggregation
      • About scaling M3
    • HowTo
      • Visualize M3DB data with Grafana®
      • Monitor Aiven services with M3DB
      • Use M3DB as remote storage for Prometheus
      • Write to M3 from Telegraf
      • Telegraf to M3 to Grafana® Example
      • Write data to M3DB with Go
      • Write data to M3DB with PHP
      • Write data to M3DB with Python
    • Reference
      • Terminology
      • Advanced parameters
      • Advanced parameters M3Aggregator
  • MySQL
    • Get started
    • Concepts
      • Understand MySQL backups
      • Understand MySQL high memory usage
    • HowTo
      • Code samples
        • Connect to MySQL from the command line
        • Using mysqlsh
        • Using mysql
        • Connect to MySQL with Python
        • Connect to MySQL with Java
      • Create additional MySQL® databases
      • Connect to MySQL with MySQL Workbench
      • Calculate the maximum number of connections for MySQL
      • Migrate to Aiven for MySQL from an external MySQL
      • Service migration check
      • Prevent MySQL disk full
    • Reference
      • Advanced parameters
      • Resource capability per plan
  • Redis
    • Get started
    • Concepts
      • High availability in Aiven for Redis™*
      • Lua scripts with Aiven for Redis™*
      • Memory usage, on-disk persistence and replication in Aiven for Redis™*
    • HowTo
      • Code samples
        • Connect with redis-cli
        • Connect with Go
        • Connect with NodeJS
        • Connect with PHP
        • Connect with Python
        • Connect with Java
      • DBA tasks
        • Configure ACL permissions in Aiven for Redis™*
        • Migrate from Redis™* to Aiven for Redis™*
      • Estimate maximum number of connection
      • Manage SSL connectivity
      • Handle warning overcommit_memory
    • Reference
      • Advanced parameters
  • Apache Cassandra
    • Reference
      • Advanced parameters
  • Grafana
    • Get started
    • HowTo
      • Log in to Aiven for Grafana®
      • Send emails from Aiven for Grafana®
    • Reference
      • Advanced parameters
      • Plugins
  • Community
    • Documentation
      • Create anonymous links
      • Create orphan pages
      • Rename files and adding redirects

Create an Apache Kafka®-based Apache Flink® table¶

To build data pipelines, Apache Flink® requires source and target data structures to be mapped as Flink tables. This functionality can be achieved via the Aiven console or Aiven CLI.

A Flink table can be defined over an existing or new Aiven for Apache Kafka® topic to be able to source or sink streaming data. To define a table over an Apache Kafka® topic, the topic name, columns data format and connector type need to be defined, together with the Flink table name to use as reference when building data pipelines.

Warning

In order to define Flink’s tables an existing integration needs to be available between the Aiven for Flink service and one or more Aiven for Apache Kafka services.

Create an Apache Kafka-based Apache Flink table with Aiven Console¶

To create a Flink table based on an Aiven for Apache Kafka topic via Aiven console:

  1. Navigate to the Aiven for Apache Flink service page, and open the Jobs and Data tab.

  2. Select the Data Tables sub-tab and select the Aiven for Apache Kafka integration to use

  3. Select the Aiven for Apache Kafka service and the topic to be used as source or target for the data pipeline. If you want to use a new topic not yet existing, write the topic name.

Warning

By default Flink will not be able to automatically create Apache Kafka topics while pushing the first record. To change this behaviour, enable in the Aiven for Apache Kafka target service the kafka.auto_create_topics_enable option in Advanced configuration section.

  1. Select the Kafka connector type, between the Apache Kafka SQL Connector for standard topic reads/writes and the Upsert Kafka SQL Connector for changelog type of integration based on message key.

Note

For more information on the connector types and the requirements for each of them, see the articles on Kafka connector types and the requirements for each connector type.

  1. Select the Key Data Format, if a value other than Key not used is selected, specify the fields from the SQL schema to be used as key. This setting is specifically needed to set message keys for topics acting as target of data pipelines.

  2. Select the Value Data Format based on the message format in the Apache Kafka topic.

Note

For Key and Value data format the following options are available

  • JSON

  • Apache Avro

  • Confluent Avro

  • Debezium Avro

  • Debezium JSON

  1. Define the Flink table name; this name will represents the Flink reference to the topic and will be used during the data pipeline definition

  2. Define the SQL schema: the SQL schema defines the fields retrieved from each message in a topic, additional transformations such as format casting or timestamp extraction, and watermark settings

Example: Define a Flink table using the standard connector over topic in JSON format¶

The Aiven for Apache Kafka service named demo-kafka contains a topic named metric-topic holding a stream of service metrics in JSON format like:

{'hostname': 'sleepy', 'cpu': 'cpu3', 'usage': 93.30629927475789, 'occurred_at': 1637775077782}
{'hostname': 'dopey', 'cpu': 'cpu4', 'usage': 88.39531418706092, 'occurred_at': 1637775078369}
{'hostname': 'happy', 'cpu': 'cpu2', 'usage': 77.90860728236156, 'occurred_at': 1637775078964}
{'hostname': 'dopey', 'cpu': 'cpu4', 'usage': 81.17372993952847, 'occurred_at': 1637775079054}

We can define a metrics_in Flink table with:

  • demo-kafka as integration service

  • metric-topic as Apache Kafka topic name

  • Apache Kafka SQL Connector since we want to threat every entry as unique events

  • Key not used as Key data format

  • JSON as Value data format

  • metrics_in as Flink table name

  • The following as SQL schema

cpu VARCHAR,
hostname VARCHAR,
usage DOUBLE,
occurred_at BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND

Note

The SQL schema includes:

  • the message fields cpu, hostname, usage, occurred_at and the related data type. The order of fields in the SQL definition doesn’t need to follow the order presented in the payload.

  • the definition of the field time_ltz as transformation to TIMESTAMP(3) from the occurred_at timestamp in Linux format.

  • the WATERMARK definition

Example: Define a Flink table using the standard connector over topic in Avro format¶

In cases when target of the Flink data pipeline needs to write in Avro format to a topic named metric-topic-tgt within the Aiven for Apache Kafka service named demo-kafka.

We can define a metrics-out Flink table with:

  • demo-kafka as integration service

  • metric-topic-tgt as Apache Kafka topic name

  • Apache Kafka SQL Connector for the standard connection mode

  • Confluent Avro as Key data format

  • hostname as field to be used as key, the key in Apache Kafka is by default used for partition selection

  • Confluent Avro as Value data format

  • metrics-out as Flink table name

  • The following as SQL schema

cpu VARCHAR,
hostname VARCHAR,
usage DOUBLE

Note

The SQL schema includes the output message fields cpu, hostname, usage and the related data type.

Example: Define a Flink table using the upsert connector over topic in Avro format¶

In cases when target of the Flink pipeline needs to write in Avro format and upsert mode to a compacted topic named metric-topic-tgt within the Aiven for Apache Kafka service named demo-kafka.

We can define a metrics-out Flink table with:

  • demo-kafka as integration service

  • metric-topic-tgt as Apache Kafka topic name

  • Upsert Kafka SQL Connector for the changelog mode

  • Confluent Avro as Key data format

Note

Unlikely the standard Apache Kafka SQL connector, when using the Upsert Kafka SQL connector the key fields are not defined. They are derived by the PRIMARY KEY definition in the SQL schema.

  • Confluent Avro as Value data format

  • metrics-out as Flink table name

  • The following as SQL schema

cpu VARCHAR,
hostname VARCHAR,
max_usage DOUBLE,
PRIMARY KEY (cpu, hostname) NOT ENFORCED

Note

The SQL schema includes:

  • the output message fields cpu, hostname, max_usage and the related data type.

  • the PRIMARY KEY definition, driving the key part of the Apache Kafka message

Did you find this useful?

Apache, Apache Kafka, Kafka, Apache Flink, Flink, Apache Cassandra, and Cassandra are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. M3, M3 Aggregator, M3 Coordinator, OpenSearch, PostgreSQL, MySQL, InfluxDB, Grafana, Terraform, and Kubernetes are trademarks and property of their respective owners. *Redis is a trademark of Redis Ltd. Any rights therein are reserved to Redis Ltd. Any use by Aiven is for referential purposes only and does not indicate any sponsorship, endorsement or affiliation between Redis and Aiven. All product and service names used in this website are for identification purposes only and do not imply endorsement.

Copyright © 2022, Aiven Team | Show Source | Last updated: February 2022
Contents
  • Create an Apache Kafka®-based Apache Flink® table
    • Create an Apache Kafka-based Apache Flink table with Aiven Console
    • Example: Define a Flink table using the standard connector over topic in JSON format
    • Example: Define a Flink table using the standard connector over topic in Avro format
    • Example: Define a Flink table using the upsert connector over topic in Avro format