Advanced parameters for Aiven for Apache Kafka®¶
Below you can find a summary of every configuration option available for Aiven for Apache Kafka® service:
custom_domain
=> [‘string’, ‘null’]Custom domain Serve the web frontend using a custom CNAME pointing to the Aiven DNS name
ip_filter
=> arrayIP filter Allow incoming connections from CIDR address block, e.g. ‘10.20.0.0/16’
static_ips
=> booleanStatic IP addresses Use static public IP addresses
private_access
=> objectAllow access to selected service ports from private networks
prometheus
=> booleanAllow clients to connect to prometheus with a DNS name that always resolves to the service’s private IP addresses. Only available in certain network locations
public_access
=> objectAllow access to selected service ports from the public Internet
kafka
=> booleanAllow clients to connect to kafka from the public internet for service nodes that are in a project VPC or another type of private network
kafka_connect
=> booleanAllow clients to connect to kafka_connect from the public internet for service nodes that are in a project VPC or another type of private network
kafka_rest
=> booleanAllow clients to connect to kafka_rest from the public internet for service nodes that are in a project VPC or another type of private network
prometheus
=> booleanAllow clients to connect to prometheus from the public internet for service nodes that are in a project VPC or another type of private network
schema_registry
=> booleanAllow clients to connect to schema_registry from the public internet for service nodes that are in a project VPC or another type of private network
privatelink_access
=> objectAllow access to selected service components through Privatelink
jolokia
=> booleanEnable jolokia
kafka
=> booleanEnable kafka
kafka_connect
=> booleanEnable kafka_connect
kafka_rest
=> booleanEnable kafka_rest
prometheus
=> booleanEnable prometheus
schema_registry
=> booleanEnable schema_registry
kafka
=> objectKafka broker configuration values
compression_type
=> stringcompression.type Specify the final compression type for a given topic. This configuration accepts the standard compression codecs (‘gzip’, ‘snappy’, ‘lz4’, ‘zstd’). It additionally accepts ‘uncompressed’ which is equivalent to no compression; and ‘producer’ which means retain the original compression codec set by the producer.
group_initial_rebalance_delay_ms
=> integergroup.initial.rebalance.delay.ms The amount of time, in milliseconds, the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins. The default value for this is 3 seconds. During development and testing it might be desirable to set this to 0 in order to not delay test execution time.
group_min_session_timeout_ms
=> integergroup.min.session.timeout.ms The minimum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures.
group_max_session_timeout_ms
=> integergroup.max.session.timeout.ms The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures.
connections_max_idle_ms
=> integerconnections.max.idle.ms Idle connections timeout: the server socket processor threads close the connections that idle for longer than this.
max_incremental_fetch_session_cache_slots
=> integermax.incremental.fetch.session.cache.slots The maximum number of incremental fetch sessions that the broker will maintain.
message_max_bytes
=> integermessage.max.bytes The maximum size of message that the server can receive.
offsets_retention_minutes
=> integeroffsets.retention.minutes Log retention window in minutes for offsets topic
log_cleaner_delete_retention_ms
=> integerlog.cleaner.delete.retention.ms How long are delete records retained?
log_cleaner_min_cleanable_ratio
=> numberlog.cleaner.min.cleanable.ratio Controls log compactor frequency. Larger value means more frequent compactions but also more space wasted for logs. Consider setting log.cleaner.max.compaction.lag.ms to enforce compactions sooner, instead of setting a very high value for this option.
log_cleaner_max_compaction_lag_ms
=> integerlog.cleaner.max.compaction.lag.ms The maximum amount of time message will remain uncompacted. Only applicable for logs that are being compacted
log_cleaner_min_compaction_lag_ms
=> integerlog.cleaner.min.compaction.lag.ms The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.
log_cleanup_policy
=> stringlog.cleanup.policy The default cleanup policy for segments beyond the retention window
log_flush_interval_messages
=> integerlog.flush.interval.messages The number of messages accumulated on a log partition before messages are flushed to disk
log_flush_interval_ms
=> integerlog.flush.interval.ms The maximum time in ms that a message in any topic is kept in memory before flushed to disk. If not set, the value in log.flush.scheduler.interval.ms is used
log_index_interval_bytes
=> integerlog.index.interval.bytes The interval with which Kafka adds an entry to the offset index
log_index_size_max_bytes
=> integerlog.index.size.max.bytes The maximum size in bytes of the offset index
log_message_downconversion_enable
=> booleanlog.message.downconversion.enable This configuration controls whether down-conversion of message formats is enabled to satisfy consume requests.
log_message_timestamp_type
=> stringlog.message.timestamp.type Define whether the timestamp in the message is message create time or log append time.
log_message_timestamp_difference_max_ms
=> integerlog.message.timestamp.difference.max.ms The maximum difference allowed between the timestamp when a broker receives a message and the timestamp specified in the message
log_preallocate
=> booleanlog.preallocate Should pre allocate file when create new segment?
log_retention_bytes
=> integerlog.retention.bytes The maximum size of the log before deleting messages
log_retention_hours
=> integerlog.retention.hours The number of hours to keep a log file before deleting it
log_retention_ms
=> integerlog.retention.ms The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in log.retention.minutes is used. If set to -1, no time limit is applied.
log_roll_jitter_ms
=> integerlog.roll.jitter.ms The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in log.roll.jitter.hours is used
log_roll_ms
=> integerlog.roll.ms The maximum time before a new log segment is rolled out (in milliseconds).
log_segment_bytes
=> integerlog.segment.bytes The maximum size of a single log file
log_segment_delete_delay_ms
=> integerlog.segment.delete.delay.ms The amount of time to wait before deleting a file from the filesystem
auto_create_topics_enable
=> booleanauto.create.topics.enable Enable auto creation of topics
min_insync_replicas
=> integermin.insync.replicas When a producer sets acks to ‘all’ (or ‘-1’), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful.
num_partitions
=> integernum.partitions Number of partitions for autocreated topics
default_replication_factor
=> integerdefault.replication.factor Replication factor for autocreated topics
replica_fetch_max_bytes
=> integerreplica.fetch.max.bytes The number of bytes of messages to attempt to fetch for each partition (defaults to 1048576). This is not an absolute maximum, if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made.
replica_fetch_response_max_bytes
=> integerreplica.fetch.response.max.bytes Maximum bytes expected for the entire fetch response (defaults to 10485760). Records are fetched in batches, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. As such, this is not an absolute maximum.
max_connections_per_ip
=> integermax.connections.per.ip The maximum number of connections allowed from each ip address (defaults to 2147483647).
producer_purgatory_purge_interval_requests
=> integerproducer.purgatory.purge.interval.requests The purge interval (in number of requests) of the producer request purgatory(defaults to 1000).
socket_request_max_bytes
=> integersocket.request.max.bytes The maximum number of bytes in a socket request (defaults to 104857600).
transaction_state_log_segment_bytes
=> integertransaction.state.log.segment.bytes The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads (defaults to 104857600 (100 mebibytes)).
transaction_remove_expired_transaction_cleanup_interval_ms
=> integertransaction.remove.expired.transaction.cleanup.interval.ms The interval at which to remove transactions that have expired due to transactional.id.expiration.ms passing (defaults to 3600000 (1 hour)).
kafka_authentication_methods
=> objectKafka authentication methods
certificate
=> booleanEnable certificate/SSL authentication
sasl
=> booleanEnable SASL authentication
kafka_connect
=> booleanEnable Kafka Connect service
kafka_connect_config
=> objectKafka Connect configuration values
connector_client_config_override_policy
=> stringClient config override policy Defines what client configurations can be overridden by the connector. Default is None
consumer_auto_offset_reset
=> stringConsumer auto offset reset What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. Default is earliest
consumer_fetch_max_bytes
=> integerThe maximum amount of data the server should return for a fetch request Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum.
consumer_isolation_level
=> stringConsumer isolation level Transaction read isolation level. read_uncommitted is the default, but read_committed can be used if consume-exactly-once behavior is desired.
consumer_max_partition_fetch_bytes
=> integerThe maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer.If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress.
consumer_max_poll_interval_ms
=> integerThe maximum delay between polls when using consumer group management The maximum delay in milliseconds between invocations of poll() when using consumer group management (defaults to 300000).
consumer_max_poll_records
=> integerThe maximum number of records returned by a single poll The maximum number of records returned in a single call to poll() (defaults to 500).
offset_flush_interval_ms
=> integerThe interval at which to try committing offsets for tasks The interval at which to try committing offsets for tasks (defaults to 60000).
offset_flush_timeout_ms
=> integerOffset flush timeout Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt (defaults to 5000).
producer_compression_type
=> stringThe default compression type for producers Specify the default compression type for producers. This configuration accepts the standard compression codecs (‘gzip’, ‘snappy’, ‘lz4’, ‘zstd’). It additionally accepts ‘none’ which is the default and equivalent to no compression.
producer_max_request_size
=> integerThe maximum size of a request in bytes This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.
session_timeout_ms
=> integerThe timeout used to detect failures when using Kafka’s group management facilities The timeout in milliseconds used to detect failures when using Kafka’s group management facilities (defaults to 10000).
kafka_rest
=> booleanEnable Kafka-REST service
kafka_version
=> [‘string’, ‘null’]Kafka major version
schema_registry
=> booleanEnable Schema-Registry service
kafka_rest_config
=> objectKafka REST configuration
producer_acks
=> stringproducer.acks The number of acknowledgments the producer requires the leader to have received before considering a request complete. If set to ‘all’ or ‘-1’, the leader will wait for the full set of in-sync replicas to acknowledge the record.
producer_linger_ms
=> integerproducer.linger.ms Wait for up to the given delay to allow batching records together
consumer_enable_auto_commit
=> booleanconsumer.enable.auto.commit If true the consumer’s offset will be periodically committed to Kafka in the background
consumer_request_max_bytes
=> integerconsumer.request.max.bytes Maximum number of bytes in unencoded message keys and values by a single request
consumer_request_timeout_ms
=> integerconsumer.request.timeout.ms The maximum total time to wait for messages for a request if the maximum number of messages has not yet been reached
simpleconsumer_pool_size_max
=> integersimpleconsumer.pool.size.max Maximum number of SimpleConsumers that can be instantiated per broker
schema_registry_config
=> objectSchema Registry configuration
topic_name
=> stringtopic_name The durable single partition topic that acts as the durable log for the data. This topic must be compacted to avoid losing data due to retention policy. Please note that changing this configuration in an existing Schema Registry / Karapace setup leads to previous schemas being inaccessible, data encoded with them potentially unreadable and schema ID sequence put out of order. It’s only possible to do the switch while Schema Registry / Karapace is disabled. Defaults to _schemas.
leader_eligibility
=> booleanleader_eligibility If true, Karapace / Schema Registry on the service nodes can participate in leader election. It might be needed to disable this when the schemas topic is replicated to a secondary cluster and Karapace / Schema Registry there must not participate in leader election. Defaults to true.