Connect to Aiven for Apache Kafka® with Java

These examples show how to connect to an Aiven for Apache Kafka® service using the Java client library for Apache Kafka.

Note

The examples in this article provide two different options for authentication: SSL and SASL-SSL. For more information on these authentication methods read our article on Kafka authentication types.

Pre-requisites

Add a dependency for kafka-clients from your preferred artifact repository, for example Maven repository into your Java project.

Go to the Overview page of your Aiven for Apache Kafka service.

  • If you are going to connect with SSL authentication:

    • In the Connection information section:

      1. If Authentication Method is shown, choose Client Certificate

      2. Next to Access Key, click Download and save the service.key file.

      3. Next to Access Certificate, click Download and save the service.cert file.

      4. Next to CA Certificate, click Download and save the ca.pem file.

  • If you are going to connect using SASL authentication:

    1. Follow the instructions at Use SASL Authentication with Apache Kafka® to enable SASL.

    2. In the Connection Information section

      1. Select SASL as the Authentication Method

      2. Next to CA Certificate, click Download and save the ca.pem file

      3. Note the Password required for the SASL, we’ll need it for authentication

  • Created the keystore client.keystore.p12 and truststore client.truststore.jks by following our article on configuring Java SSL to access Kafka

Warning

In the below examples, we just pass the name of the keystore and truststore files, but in actual use, the full path should be used.

Variables

Variable

Description

HOST

Host name for the connection

USER_NAME

Name of the user for the connection

SSL_PORT

Port number to use for SSL

SASL_PORT

Port number to use for SASL

SASL_PASSWORD

Password required to connect using SASL

TRUSTSTORE_LOCATION

Location of your truststore (named by default as client.truststore.jks)

TRUSTSTORE_PASSWORD

Password you used when creating a truststore

TRUSTSTORE_PASSWORD

Password you used when creating a truststore

KEYSTORE_LOCATION

Location of you keystore (named by default as client.keystore.p12)

KEYSTORE_PASSWORD

Password you used when creating a keystore

KEY_PASSWORD

Password for the key in the keystore, if you chose a different password than the one for keystore

SERIALIZER

How to serialize data, you can find available options in the Apache Kafka documentation.

DESERIALIZER

How to de-serialize data, you can find available options in the Apache Kafka documentation.

With SSL authentication

Set up properties to connect to the cluster and create a producer and a consumer:

Properties properties = new Properties();
properties.put("bootstrap.servers", "{HOST}:{SSL_PORT}");
properties.put("security.protocol", "SSL");
properties.put("ssl.truststore.location", "{TRUSTSTORE_LOCATION}");
properties.put("ssl.truststore.password", "{TRUSTSTORE_PASSWORD}");
properties.put("ssl.keystore.type", "PKCS12");
properties.put("ssl.keystore.location", "{KEYSTORE_LOCATION}");
properties.put("ssl.keystore.password", "{KEYSTORE_PASSWORD}");
properties.put("ssl.key.password", "{KEY_PASSWORD}");
properties.put("key.serializer", "{SERIALIZER}");
properties.put("value.serializer", {SERIALIZER}");
properties.put("key.deserializer", "{DESERIALIZER}");
properties.put("value.deserializer", "{DESERIALIZER}");

// create a producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

With SASL-SSL authentication

Set up properties to connect to the cluster and create a producer and a consumer:

Properties properties = new Properties();

String sasl_username = "{USER_NAME}";
String sasl_password = "{SASL_PASSWORD}";

String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);

properties.put("bootstrap.servers", "{HOST}:{SASL_PORT}");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-256");
properties.put("sasl.jaas.config", jaasConfig);
properties.put("ssl.endpoint.identification.algorithm", "");

properties.put("ssl.truststore.type", "jks");
properties.put("ssl.truststore.location", "{TRUSTSTORE_LOCATION}");
properties.put("ssl.truststore.password", "{TRUSTSTORE_PASSWORD}");

properties.put("key.serializer", "{SERIALIZER}");
properties.put("value.serializer", {SERIALIZER}");
properties.put("key.deserializer", "{DESERIALIZER}");
properties.put("value.deserializer", "{DESERIALIZER}");

// create a producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);