Use schema registry in Java with Aiven for Apache Kafka®

Aiven for Apache Kafka® provides the schema registry functionality via Karapace, including the possibility to store, retrieve a schema, and also evolve it without needing to rebuild existing Consumers and Producers code.

Note

To use the schema registry functionality exposed by Karapace, you need to enable the Schema Registry (Karapace) functionality for the Aiven for Apache Kafka instance selected from the Aiven console, service Overview tab.

The following steps are required in Java to make use of schema registry:

  1. Create version 1 of a schema

  2. Use Apache Avro to compile the schema

  3. Create consumer and producer that utilize Aiven for Apache Kafka and Schema Registry

Prerequisites

To be able to produce and consume data in Java from an Aiven for Apache Kafka service you need to create a Keystore and Truststore containing the SSL certificates.

You can do it manually by downloading the certificates and composing the files locally on your machine or using the dedicated Aiven CLI command.

Variables

These are the placeholders you will need to replace in the code sample:

Variable

Description

BOOTSTRAPSERVERS

Service URI for Kafka connection, can be obtained from the Aiven console, service Overview tab

KEYSTORE

Path to the keystore

KEYSTOREPASSWORD

Password for the keystore

TRUSTSTORE

Path to the truststore

TRUSTSTOREPASSWORD

Password for the truststore

SSLKEYPASSWORD

The password of the private key in the key store file

SCHEMAREGISTRYURL

Service Registry URI for Apache Kafka connection

SCHEMAREGISTRYUSER

Service Registry username, can be obtained from the Aiven console, service Overview tab

SCHEMAREGISTRYPASSWORD

Service Registry password, can be obtained from the Aiven console, service Overview tab

TOPIC_NAME

Apache Kafka topic name to use

Create version 1 of the Avro schema

To create an Avro schema, you need a definition file. As example you can use a click record schema defined in JSON and stored in a file named ClickRecord.avsc containing the following:

{"type": "record",
  "name": "ClickRecord",
  "namespace": "io.aiven.avro.example",
  "fields": [
     {"name": "session_id", "type": "string"},
     {"name": "browser", "type": ["string", "null"]},
     {"name": "campaign", "type": ["string", "null"]},
     {"name": "channel", "type": "string"},
     {"name": "referrer", "type": ["string", "null"], "default": "None"},
     {"name": "ip", "type": ["string", "null"]}
  ]
}

The JSON configuration above defines a schema named ClickRecord in the namespace io.aiven.avro.example with fields session_id, browser, campaign, channel, referrer and ip and related data types.

Once the schema is defined, you need to compile it, and it can be done manually or automatically.

Manual schema compilation

In case of manual schema compilation, download avro-tools-1.11.0.jar from https://avro.apache.org/releases.html#Download or via maven using the following:

mvn org.apache.maven.plugins:maven-dependency-plugin:2.8:get -Dartifact=org.apache.avro:avro-tools:1.11.0:jar -Ddest=avro-tools-1.11.0.jar

The schema defined in the previous step, can be now compiled to produce a Java class ClickRecord.java in the io.aiven.avro.example package (taken from the namespace parameter):

java -jar avro-tools-1.11.0.jar compile schema ClickRecord.avsc .

Note

The package structure will also be generated if missing.

Auto schema compilation

With auto, the schema is compiled during the project build with, for example, maven-avro-plugin or gradle-avro-plugin. The following is a configuration example for maven-avro-plugin when ClickRecord.avsc is stored in the path src/main/avro/ClickRecord.avsc:

<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>${avro-maven-plugin.version}</version>
    <executions>
        <execution>
            <id>schemas</id>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
                <goal>protocol</goal>
                <goal>idl-protocol</goal>
            </goals>
            <configuration>
                <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                <outputDirectory>${project.basedir}/src/main/generated-sources/</outputDirectory>
            </configuration>
        </execution>
    </executions>
</plugin>

The automatically generated Avro-schema code will be stored under the folder ${project.basedir}/src/main/generated-sources/.

Set consumer and producer properties for schema registry

The full code to create consumer and producers using the Schema Registry in Aiven for Apache Kafka can be found in the Aiven examples GitHub repository. The following contains a list of the properties required.

For producers you need to specify:

props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, [BOOTSTRAPSERVERS]);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, [TRUSTSTORE]);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, [TRUSTSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, [KEYSTORE]);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, [KEYSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, [SSLKEYPASSWORD]);
props.put("schema.registry.url", [SCHEMAREGISTRYURL]);
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", [SCHEMAREGISTRYUSER] + ":" + [SCHEMAREGISTRYPASSWORD]);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

For consumers you need to specify:

props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, [BOOTSTRAPSERVERS]);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, [TRUSTSTORE]);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, [TRUSTSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, [KEYSTORE]);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, [KEYSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, [SSLKEYPASSWORD]);
props.put("schema.registry.url", [SCHEMAREGISTRYURL]);
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", [SCHEMAREGISTRYUSER] + ":" + [SCHEMAREGISTRYPASSWORD]);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "clickrecord-example-group");

Note

In the above properties replace all the required input parameters (within square brackets) with the appropriate information defined in the Variables section.