Connect to Aiven for Apache Kafka® with Python

These examples show how to connect to an Aiven for Apache Kafka® service using the kafka-python library, as either a producer or consumer.

Note

The examples given here provide different options for the different authentication methods. For more information on the supported methods, see our article on Kafka authentication types.

Pre-requisites

Install the Python kafka-python library:

pip install kafka-python

Go to the Overview page of your Aiven for Apache Kafka service.

  • If you are going to connect with SSL authentication:

    • In the Connection information section:

      1. If Authentication Method is shown, choose Client Certificate

      2. Next to Access Key, click Download and save the service.key file.

      3. Next to Access Certificate, click Download and save the service.cert file.

      4. Next to CA Certificate, click Download and save the ca.pem file.

  • If you are going to connect using SASL authentication:

    1. Follow the instructions at Use SASL Authentication with Apache Kafka® to enable SASL.

    2. In the Connection Information section

      1. Select SASL as the Authentication Method

      2. Next to CA Certificate, click Download and save the ca.pem file

Note that the CA Certificate ca.pem file has the same contents by either route.

Warning

In the below examples, we just pass the name of the certificate files, but in actual use, the full path should be used.

You can also use the Aiven command line tool to download the files. See the documentation for avn service user-creds-download

Variables

These are the placeholders you will need to replace in the code samples. The values are from the Connection information on the service overview page.

If you are using SSL (remember to choose Client Certificate if Authentication Method is shown):

Variable

Description

HOST

Host name for the connection

SSL_PORT

Port number to use

If you are using SASL (Authentication Method should be SASL):

Variable

Description

HOST

Host name for the connection

SASL_PORT

Port number to use

SASL_USERNAME

User to connect with

SASL_PASSWORD

Password for this user

For consumers you will also need:

Variable

Description

TOPIC_NAME

The name of the topic to read from

START_FROM

The value to use for the auto_offset_reset parameter, which says which message to start consuming from.

Allowed values are:

  • latest - consume from the end of the topic partition. This is the default.

  • earliest - consume from the beginning of the topic partition

For more information on auto_offset_reset, see the Kafka documentation on auto.offset.reset and Consumer Position.

Connect a producer

With SSL authentication

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=f"{HOST}:{SSL_PORT}",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="service.cert",
    ssl_keyfile="service.key",
)

With SASL authentication

from kafka import KafkaProducer

# Choose an appropriate SASL mechanism, for instance:
SASL_MECHANISM = 'SCRAM-SHA-256'

producer = KafkaProducer(
   bootstrap_servers=f"{HOST}:{SASL_PORT}",
   sasl_mechanism = SASL_MECHANISM,
   sasl_plain_username = SASL_USERNAME,
   sasl_plain_password = SASL_PASSWORD,
   security_protocol="SASL_SSL",
   ssl_cafile="ca.pem",
)

Connect a consumer

With SSL authentication

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "TOPIC_NAME",
    auto_offset_reset="START_FROM",
    bootstrap_servers=f"{HOST}:{SSL_PORT}",
    client_id = CONSUMER_CLIENT_ID,
    group_id = CONSUMER_GROUP_ID,
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="service.cert",
    ssl_keyfile="service.key",
)

With SASL authentication

from kafka import KafkaConsumer

# Choose an appropriate SASL mechanism, for instance:
SASL_MECHANISM = 'SCRAM-SHA-256'

consumer = KafkaConsumer(
    "TOPIC_NAME",
    auto_offset_reset = "START_FROM",
    bootstrap_servers = f'{HOST}:{SASL_PORT}',
    client_id = CONSUMER_CLIENT_ID,
    group_id = CONSUMER_GROUP_ID,
    sasl_mechanism = SASL_MECHANISM,
    sasl_plain_username = SASL_USERNAME,
    sasl_plain_password = SASL_PASSWORD,
    security_protocol = "SASL_SSL",
    ssl_cafile = "ca.pem"
)