Contents Menu Expand Light mode Dark mode Auto light/dark mode
Try Aiven for free for 30 days! Get started ->
Fork me on GitHub!
Aiven
  • Platform
    • Concepts
      • Authentication tokens
      • Beta services
      • Cloud security
      • About logging, metrics and alerting
      • Projects, accounts, and managing access permissions
      • Service forking
      • Backups at Aiven
      • Static IP addresses
      • TLS/SSL certificates
      • Bring your own account (BYOA)
      • Disaster Recovery testing scenarios
    • HowTo
      • Create a new Aiven service
      • Create a new Aiven service user
      • Create an authentication token
      • Change your email address
      • Enable Aiven password
      • Disable user 2FA setting
      • Download a CA certificate
      • Fork your service
      • Tag your Aiven resources
      • How to pause or terminate your service
      • Enable public access in a VPC
      • Handle resolution errors of private IPs
      • Add storage space
      • Restricting access to your service
      • Scaling services
      • Migrate services
      • Monitoring services
      • Use Prometheus with Aiven
      • Increase metrics limit setting for Datadog
      • Manage static IP addresses
    • Reference
      • List of available cloud regions
  • Integrations
    • Datadog
      • Send metrics to Datadog
      • Send logs to Datadog
  • Aiven tools
    • Aiven CLI
      • avn account
        • avn account authentication-method
        • avn account team
      • avn card
      • avn cloud
      • avn credits
      • avn events
      • avn mirrormaker
      • avn project
      • avn service
        • avn service acl
        • avn service connection-pool
        • avn service connector
        • avn service database
        • avn service es-acl
        • avn service flink beta
        • avn service index
        • avn service integration
        • avn service m3
        • avn service privatelink
        • avn service tags
        • avn service topic
        • avn service user
      • avn ticket
      • avn user
        • avn user access-token
    • Aiven API
      • API examples
    • Aiven Terraform provider
      • Get started
      • HowTo
        • Upgrade the Aiven Terraform Provider from v1 to v2
      • Concepts
        • Data sources in Terraform
      • Reference
        • Cookbook
          • Apache Kafka and OpenSearch
          • Multicloud PostgreSQL
    • Aiven Operator for Kubernetes
  • Apache Kafka
    • Get started
    • Sample data generator
    • Concepts
      • Access control lists permission mapping
      • Compacted topics
      • Partition segments
      • Scaling options in Apache Kafka®
      • Authentication types
      • NOT_LEADER_FOR_PARTITION errors
    • HowTo
      • Tools
        • Configure consumer properties for Apache Kafka® toolbox
        • Use kcat with Aiven for Apache Kafka®
        • Connect to Apache Kafka® with Conduktor
        • Use Kafdrop Web UI with Aiven for Apache Kafka®
      • Security
        • Configure Java SSL to access Apache Kafka®
        • Manage users and access control lists
        • Monitor and alert logs for denied ACL
        • Use SASL Authentication with Apache Kafka®
        • Renew and Acknowledge service user SSL certificates
      • Administration tasks
        • Get the best from Apache Kafka®
        • Manage configurations with Apache Kafka® CLI tools
        • Manage Apache Kafka® parameters
        • View and reset consumer group offsets
        • Configure log cleaner for topic compaction
        • Prevent full disks
        • Set Apache ZooKeeper™ configuration
      • Integrations
        • Integration of logs into Apache Kafka® topic
        • Use Apache Kafka® Streams with Aiven for Apache Kafka®
      • Topic/schema management
        • Creating an Apache Kafka® topic
        • Create Apache Kafka® topics automatically
        • Get partition details of an Apache Kafka® topic
        • Use schema registry in Java with Aiven for Apache Kafka®
        • Change data retention period
    • Reference
      • Advanced parameters
      • Metrics available via Prometheus
    • Apache Kafka Connect
      • Getting started
      • Concepts
        • List of available Apache Kafka® Connect connectors
        • JDBC source connector modes
        • Causes of “connector list not currently available”
      • HowTo
        • Administration tasks
          • Get the best from Apache Kafka® Connect
          • Bring your own Apache Kafka® Connect cluster
          • Enable Apache Kafka® Connect on Aiven for Apache Kafka®
          • Enable Apache Kafka® Connect connectors auto restart on failures
        • Source connectors
          • Create a JDBC source connector for PostgreSQL®
          • Create a JDBC source connector for MySQL
          • Create a JDBC source connector for SQL Server
          • Create a MongoDB source connector
          • Create a Debezium source connector for PostgreSQL®
          • Create a Debezium source connector for MySQL
          • Create a Debezium source connector for SQL Server
          • Create a Debezium source connector for MongoDB
        • Sink connectors
          • Create a JDBC sink connector
          • Configure AWS for an S3 sink connector
          • Create an S3 sink connector by Aiven
          • Create an S3 sink connector by Confluent
          • Configure GCP for a Google Cloud Storage sink connector
          • Create a Google Cloud Storage sink connector
          • Configure GCP for a Google BigQuery sink connector
          • Create a Google BigQuery sink connector
          • Create an OpenSearch® sink connector
          • Create an Elasticsearch sink connector
          • Configure Snowflake for a sink connector
          • Create a Snowflake sink connector
          • Create an HTTP sink connector
          • Create a Redis™* stream reactor sink connector by Lenses.io
      • Reference
        • AWS S3 sink connector naming and data format
          • S3 sink connector by Aiven naming and data formats
          • S3 sink connector by Confluent naming and data formats
        • Google Cloud Storage sink connector naming and data formats
        • Metrics available via Prometheus
    • Apache Kafka MirrorMaker2
      • Getting started
      • Concepts
        • Topics included in a replication flow
      • HowTo
        • Integrate an external Apache Kafka® cluster in Aiven
        • Set up an Apache Kafka® MirrorMaker 2 replication flow
        • Setup Apache Kafka® MirrorMaker 2 monitoring
        • Remove topic prefix when replicating with Apache Kafka® MirrorMaker 2
      • Reference
        • Terminology for Aiven for Apache Kafka® MirrorMaker 2
  • PostgreSQL
    • Get started
    • Sample dataset: Pagila
    • Concepts
      • About aiven-db-migrate
      • Perform DBA-type tasks in Aiven for PostgreSQL®
      • High availability
      • PostgreSQL® backups
      • Connection pooling
      • About PostgreSQL® disk usage
      • About TimescaleDB
      • Upgrade and failover procedures
    • HowTo
      • Code samples
        • Connect with Go
        • Connect with Java
        • Connect with NodeJS
        • Connect with PHP
        • Connect with Python
      • DBA tasks
        • Create additional PostgreSQL® databases
        • Perform a PostgreSQL® major version upgrade
        • Install or update an extension
        • Create manual PostgreSQL® backups
        • Restore PostgreSQL® from a backup
        • Migrate to a different cloud provider or region
        • Claim public schema ownership
        • Manage connection pooling
        • Access PgBouncer statistics
        • Use the PostgreSQL® dblink extension
        • Enable JIT in PostgreSQL®
        • Identify and repair issues with PostgreSQL® indexes with REINDEX
        • Check and avoid transaction ID wraparound
        • Prevent PostgreSQL® full disk issues
      • Replication and migration
        • Create and use read-only replicas
        • Set up logical replication to Aiven for PostgreSQL®
        • Migrate to Aiven for PostgreSQL® with aiven-db-migrate
        • Migrate to Aiven for PostgreSQL® with pg_dump and pg_restore
      • Integrations
        • Connect with psql
        • Connect with pgAdmin
        • Visualize PostgreSQL® data with Grafana®
        • Monitor PostgreSQL® metrics with Grafana®
        • Monitor PostgreSQL® metrics with pgwatch2
        • Connect two PostgreSQL® services via datasource integration
        • Report and analyze with Google Data Studio
    • Reference
      • High CPU load
      • Advanced parameters
      • Extensions
      • Metrics exposed to Grafana
      • Connection limits per plan
      • Resource capability per plan
      • Terminology
  • Apache Flink
    • Get started
    • Concepts
      • Apache Flink® for data analysts
      • Apache Flink® for operators
      • Standard and upsert Apache Kafka® connectors
      • Requirements for Apache Kafka® connectors
      • Built-in SQL editor
      • Event and processing times
      • Windows
      • Watermarks
      • Checkpoints
    • HowTo
      • Create Apache Flink® integrations
      • Create an Apache Kafka®-based Apache Flink® table
      • Create a PostgreSQL®-based Apache Flink® table
      • Create an OpenSearch®-based Apache Flink® table
      • Create an Apache Flink® job
      • Define OpenSearch® timestamp data in SQL pipeline
      • Create a real-time alerting solution - Aiven console
      • Create a real-time alerting solution - Aiven CLI
    • Reference
      • Advanced parameters
  • ClickHouse
    • Get started
    • Sample dataset
    • Concepts
      • Online analytical processing
      • About databases and tables
      • Columnar databases
      • Indexing and data processing
    • HowTo
      • Use the ClickHouse® client
      • Use the query editor
      • Add service user accounts
      • Grant privileges
    • Reference
      • Supported table engines
  • OpenSearch
    • Get started
    • Sample dataset: recipes
    • Concepts
      • Access control
      • Backups
      • Indices
      • Aggregations
      • OpenSearch® vs Elasticsearch
    • HowTo
      • Use Aiven for OpenSearch® with cURL
      • Migrate Elasticsearch data to Aiven for OpenSearch®
      • Manage OpenSearch® log integration
      • Upgrade to OpenSearch®
      • Upgrade to OpenSearch® with Terraform
      • Upgrade Elasticsearch clients to OpenSearch®
      • Connect with NodeJS
      • Connect with Python
      • Search with Python
      • Search with NodeJS
      • Aggregation with NodeJS
      • Control access to content in your service
      • Restore an OpenSearch® backup
      • Dump OpenSearch® index using elasticsearch-dump
      • Set index retention patterns
      • Create alerts with OpenSearch® API
    • OpenSearch Dashboards
      • Getting started
      • HowTo
        • Getting started with Dev tools
        • Create alerts with OpenSearch® Dashboards
    • Reference
      • Plugins
      • Advanced parameters
      • Automatic adjustment of replication factors
      • REST API endpoint access
  • M3DB
    • Get started
    • Concepts
      • Aiven for M3 components
      • About M3DB namespaces and aggregation
      • About scaling M3
    • HowTo
      • Visualize M3DB data with Grafana®
      • Monitor Aiven services with M3DB
      • Use M3DB as remote storage for Prometheus
      • Write to M3 from Telegraf
      • Telegraf to M3 to Grafana® Example
      • Write data to M3DB with Go
      • Write data to M3DB with PHP
      • Write data to M3DB with Python
    • Reference
      • Terminology
      • Advanced parameters
  • MySQL
    • Get started
    • Concepts
      • Understand MySQL backups
      • Understand MySQL high memory usage
    • HowTo
      • Code samples
        • Connect to MySQL from the command line
        • Using mysqlsh
        • Using mysql
        • Connect to MySQL with Python
        • Connect to MySQL with Java
      • Create additional MySQL® databases
      • Connect to MySQL with MySQL Workbench
      • Calculate the maximum number of connections for MySQL
      • Migrate to Aiven for MySQL from an external MySQL
      • Service migration check
      • Prevent MySQL disk full
    • Reference
      • Advanced parameters
      • Resource capability per plan
  • Redis
    • Get started
    • Concepts
      • High availability in Aiven for Redis™*
      • Lua scripts with Aiven for Redis™*
      • Memory usage, on-disk persistence and replication in Aiven for Redis™*
    • HowTo
      • Code samples
        • Connect with redis-cli
        • Connect with Go
        • Connect with NodeJS
        • Connect with PHP
        • Connect with Python
        • Connect with Java
      • DBA tasks
        • Configure ACL permissions in Aiven for Redis™*
        • Migrate from Redis™* to Aiven for Redis™*
      • Estimate maximum number of connection
      • Manage SSL connectivity
      • Handle warning overcommit_memory
    • Reference
      • Advanced parameters
  • Apache Cassandra
  • Grafana
    • Get started
    • HowTo
      • Log in to Aiven for Grafana®
      • Send emails from Aiven for Grafana®
    • Reference
      • Advanced paramters
      • Plugins
  • Aiven community

Create an OpenSearch®-based Apache Flink® table¶

To build data pipelines, Apache Flink® requires source and target data structures to be mapped as Flink tables. This functionality can be achieved via the Aiven console or Aiven CLI.

A Flink table can be defined over an existing or new Aiven for OpenSearch® index, to sink streaming data. To define a table over an OpenSearch® index, the index name and column data formats need to be defined, together with the Flink table name to use as reference when building data pipelines.

Warning

Aiven for OpenSearch® can only be used as the target of a data pipeline. You’ll be able to create jobs that write data to an OpenSearch® index. Reading data from an OpenSearch® index is currently not possible.

Create an OpenSearch®-based Apache Flink® table with Aiven Console¶

To create an Apache Flink table based on an Aiven for OpenSearch® index via Aiven console:

  1. Navigate to the Aiven for Apache Flink® service page, and open the Jobs and Data tab.

    Warning

    In order to define Flink’s tables an existing integration needs to be available between the Aiven for Flink® service and one or more Aiven for OpenSearch® services.

  2. Select the Data Tables sub-tab and select the Aiven for OpenSearch® integration to use

  3. Select the Aiven for OpenSearch® service and the index to be used as target for the data pipeline. If you want to use a new index that does not yet exist, just write the index name and it will be created.

  4. Define the Flink table name; this name represents the Flink reference to the topic and will be used during the data pipeline definition

  5. Define the SQL schema: the SQL schema defines the fields pushed for each message in an index

Example: Define an Apache Flink® table to OpenSearch®¶

We want to push the result of an Apache Flink® job to an index named metrics in an Aiven for OpenSearch® service named demo-opensearch. The job result should generate the following data:

{'hostname': 'sleepy', 'cpu': 'cpu3', 'usage': 93.30629927475789, 'occurred_at': 1637775077782}
{'hostname': 'dopey', 'cpu': 'cpu4', 'usage': 88.39531418706092, 'occurred_at': 1637775078369}
{'hostname': 'happy', 'cpu': 'cpu2', 'usage': 77.90860728236156, 'occurred_at': 1637775078964}
{'hostname': 'dopey', 'cpu': 'cpu4', 'usage': 81.17372993952847, 'occurred_at': 1637775079054}

We can define a metrics-out Apache Flink® table with:

  • demo-opensearch as integration service

  • metrics as OpenSearch® index name

  • metrics_out as Flink table name

  • the following as SQL schema

cpu VARCHAR,
hostname VARCHAR,
usage DOUBLE,
occurred_at BIGINT

After clicking on the Create button, the metrics_out table should be visible in the table browser

Did you find this useful?

Apache, Apache Kafka, Kafka, Apache Flink, Flink, Apache Cassandra, and Cassandra are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. M3, M3 Aggregator, M3 Coordinator, OpenSearch, PostgreSQL, MySQL, InfluxDB, Grafana, Terraform, and Kubernetes are trademarks and property of their respective owners. *Redis is a trademark of Redis Ltd. Any rights therein are reserved to Redis Ltd. Any use by Aiven is for referential purposes only and does not indicate any sponsorship, endorsement or affiliation between Redis and Aiven. All product and service names used in this website are for identification purposes only and do not imply endorsement.

Copyright © 2022, Aiven Team | Show Source | Last updated: February 2022
Contents
  • Create an OpenSearch®-based Apache Flink® table
    • Create an OpenSearch®-based Apache Flink® table with Aiven Console
    • Example: Define an Apache Flink® table to OpenSearch®