builtin.features |
* |
|
gzip, snappy, ssl, sasl, regex, lz4 |
标示该librdkafka的支持的内建特性。应用程序可以查看或设置这些值来检查是否支持这些特性。Type: CSV flags |
client.id |
* |
|
rdkafka |
客户端标示。Type: string |
metadata.broker.list |
* |
|
|
初始化的broker列表。应用程序也可以使用 rd_kafka_brokers_add() 在运行时添加 broker。Type: string |
bootstrap.servers |
* |
|
|
参考 metadata.broker.list |
message.max.bytes |
* |
1000 .. 1000000000 |
1000000 |
最大发送消息大小。Type: integer |
message.copy.max.bytes |
* |
0 .. 1000000000 |
65535 |
消息拷贝到缓存的最大大小。如果消息大于这个值,将会消耗更多的iovec而采用引用(零拷贝)方式。Type: integer |
receive.message.max.bytes |
* |
1000 .. 1000000000 |
100000000 |
最大接收消息大小。这是一个安全预防措施,防止协议饱和时内存耗尽。这个值至少为 fetch.message.max.bytes * 消费者分区数 + 消息头大小 (e.g. 200000 bytes). Type: integer |
max.in.flight.requests.per.connection |
* |
1 .. 1000000 |
1000000 |
客户端保持的最大发送请求数。 该配置应用于每一个 broker 连接. Type: integer |
metadata.request.timeout.ms |
* |
10 .. 900000 |
60000 |
无数据请求超时时间,毫秒。 适用于 metadata 请求等。Type: integer |
topic.metadata.refresh.interval.ms |
* |
-1 .. 3600000 |
300000 |
Topic metadata 刷新间隔,毫秒。metadata 自动刷新错误和连接。设置为 -1 关闭刷新间隔。 Type: integer |
metadata.max.age.ms |
* |
|
|
参考 topic.metadata.refresh.interval.ms |
topic.metadata.refresh.fast.cnt |
* |
0..1000 |
10 |
当 topic 丢失 leader, metadata 请求的发送次数,发送间隔是 topic.metadata.refresh.fast.interval.ms而不是 topic.metadata.refresh.interval.ms。 该配置用于快速修复broker leader。Type: integer |
topic.metadata.refresh.fast.interval.ms |
* |
1..60000 |
250 |
参考topic.metadata.refresh.fast.cnt。Type: integer |
topic.metadata.refresh.sparse |
* |
true, false |
true |
极少的 metadata 请求 (消费者的网络带宽很小) Type: boolean |
topic.blacklist |
* |
|
|
Topic 黑名单,逗号分隔的正则表达式列表,匹配topic名字,匹配到的 topic 如果不存在,就在 broker metadata 信息中忽略。Type: pattern list |
debug |
* |
generic, broker, topic, metadata, queue, msg, protocol, cgrp, security, fetch, feature, all |
|
逗号分隔的列表,控制 debug 上下文。调试生产者:broker,topic,msg。调试消费者:cgrp,topic,fetch. Type: CSV flags |
socket.timeout.ms |
* |
10..300000 |
60000 |
网络请求超时时间。Type: integer |
socket.blocking.max.ms |
* |
1 .. 60000 |
100 |
broker 在 socket 操作时最大阻塞时间。值越低,响应越快,但会略微提高CPU使用率。Type: integer |
socket.send.buffer.bytes |
* |
0 .. 100000000 |
0 |
Broker socket 发送缓冲大小。系统默认为 0。Type: integer |
socket.receive.buffer.bytes |
* |
0 .. 100000000 |
0 |
Broker socket 接收缓冲大小。系统默认为 0。Type: integer |
socket.keepalive.enable |
* |
true,false |
false |
Broker sockets 允许 TCP 保持活力 (SO_KEEPALIVE)。Type: boolean |
socket.max.fails |
* |
0 .. 1000000 |
3 |
Broker 关闭连接的最大错误次数(e.g., timed out requests)。0不关闭。提示:连接自动重新建立。Type: integer |
broker.address.ttl |
* |
0 .. 86400000 |
1000 |
保存 broker 地址响应结果的时间 (毫秒)。Type: integer |
broker.address.family |
* |
any, v4, v6 |
any |
允许的 broker IP 地址族:any, v4, v6。Type: enum value |
reconnect.backoff.jitter.ms |
* |
0 .. 3600000 |
500 |
通过这个值调节 broker 重连尝试 +-50%。Type: integer |
statistics.interval.ms |
* |
0 .. 86400000 |
0 |
librdkafka 统计间隔。应用程序需要通过 rd_kafka_conf_set_stats_cb() 设置统计的回调函数。粒度是 1000ms. 0 关闭统计。Type: integer |
enabled_events |
* |
0 .. 2147483647 |
0 |
参考 rd_kafka_conf_set_events()。Type: integer |
error_cb |
* |
|
|
错误回调函数 (参考 rd_kafka_conf_set_error_cb()) Type: pointer |
throttle_cb |
* |
|
|
调节回调函数 (参考 rd_kafka_conf_set_throttle_cb()) Type: pointer |
stats_cb |
* |
|
|
统计回调函数 (参考 rd_kafka_conf_set_stats_cb()) Type: pointer |
log_cb |
* |
|
|
日志回调函数 (参考 rd_kafka_conf_set_log_cb()) Type: pointer |
log_level |
* |
0..7 |
6 |
日志界别 (syslog(3) levels) Type: integer |
log.thread.name |
* |
true, false |
false |
在日志消息中打印内部线程名。(useful for debugging librdkafka internals) Type: boolean |
log.connection.close |
* |
true,false |
true |
记录 broker 断开连接。由于受 0.9 版本 broker 的 connection.max.idle.ms 的影响,最好关闭。Type: boolean |
socket_cb |
* |
|
|
为Socket创建回调函数提供无缝 CLOEXEC Type: pointer |
open_cb |
* |
|
|
为文件打开回调函数提供无缝 CLOEXEC Type: pointer |
opaque |
* |
|
|
对应用程序不开放 (set with rd_kafka_conf_set_opaque()) Type: pointer |
default_topic_conf |
* |
|
|
默认 topic 配置,用于自动订阅 topics Type: pointer |
internal.termination.signal |
* |
0..128 |
0 |
用于 librdkafka 调用 rd_kafka_destroy() 快速终止的信号。如果没有设置信号, 终止过程会延迟直到所有内部线程的系统调用超时返回,且 rd_kafka_wait_destroyed() 返回 true。如果设置了信号,延迟会最小化。应用程序需要屏蔽该信号,而作为内部信号句柄。Type: integer |
api.version.request |
* |
true,false |
false |
请求 broker 支持的API版本,调整可用协议特性的功能。如果设置为false,将使用 broker.version.fallback设置的回退版本。 提示: 依赖的 broker 版本 >=0.10.0。如果 broker(老版本)不支持该请求,使用 broker.version.fallback设置的回退版本。Type: boolean |
api.version.fallback.ms |
* |
0 .. 604800000 |
1200000 |
配置 ApiVersionRequest 失败多长时间后,使用 broker.version.fallback 回退版本。提示: ApiVersionRequest 只用新的 broker 能使用。Type: integer |
broker.version.fallback |
* |
|
0.9.0 |
老版本的 broker(<0.10.0)不支持客户端查询支持协议特性(ApiVersionRequest, see api.version.request),所以要客户端不知道什么特性可以使用。 用户使用本属性指示 broker 版本,如果 ApiVersionRequest 失败(或不可用),客户端据此属性自动调整特性。与api.version.fallback.ms 配合使用。有效值:0.9.0, 0.8.2, 0.8.1, 0.8.0. Type: string |
security.protocol |
* |
plaintext, ssl, sasl_plaintext, sasl_ssl |
plaintext |
与 broker 通讯的协议。Type: enum value |
ssl.cipher.suites |
* |
|
|
密码套件是个组合体,包括鉴权,加密,认证和秘钥交换程序,用于网络连接的安全设置交换,使用 TLS 或 SSL 网络协议。查看手册 ciphers(1) 和 SSL_CTX_set_cipher_list(3)。 Type: string |
ssl.key.location |
* |
|
|
客户端的私钥(PEM)路径,用于鉴权。Type: string |
ssl.key.password |
* |
|
|
私钥密码。Type: string |
ssl.certificate.location |
* |
|
|
客户端的公钥(PEM)路径,用于鉴权。Type: string |
ssl.ca.location |
* |
|
|
CA 证书文件或路径,用于校验 broker key。Type: string |
ssl.crl.location |
* |
|
|
CRL 路径,用于 broker 的证书校验。Type: string |
sasl.mechanisms |
* |
GSSAPI, PLAIN |
GSSAPI |
使用 SASL 机制鉴权。 支持:GSSAPI, PLAIN. 提示: 只能配置一种机制名。Type: string |
sasl.kerberos.service.name |
* |
|
kafka |
Kafka 运行的 Kerberos 首要名。Type: string |
sasl.kerberos.principal |
* |
|
kafkaclient |
客户端的 Kerberos 首要名。Type: string |
sasl.kerberos.kinit.cmd |
* |
|
kinit -S “%{sasl.kerberos.service.name}/%{broker.name}” -k -t “%{sasl.kerberos.keytab}” %{sasl.kerberos.principal} |
完整的 kerberos kinit 命令串,%{config.prop.name} 替换为与配置对象一直的值,%{broker.name} broker 的主机名。Type: string |
sasl.kerberos.keytab |
* |
|
|
Kerberos keytab 文件的路径。如果不设置,则使用系统默认的。提示:不会自动使用,必须在 sasl.kerberos.kinit.cmd 中添加到模板,如 … -t %{sasl.kerberos.keytab}。 Type: string |
sasl.kerberos.min.time.before.relogin |
* |
1 .. 86400000 |
60000 |
Key 恢复尝试的最小时间,毫秒。Type: integer |
sasl.username |
* |
|
|
使用 PLAIN 机制时,SASL 用户名。Type: string |
sasl.password |
* |
|
|
使用 PLAIN 机制时,SASL 密码。Type: string |
group.id |
* |
|
|
客户端分组字符串。同组的客户端使用相同的 group.id。Type: string |
partition.assignment.strategy |
* |
|
range,roundrobin |
partition 分配策略,当选举组 leader 时,分配 partition 给组成员的策略。Type: string |
session.timeout.ms |
* |
1 .. 3600000 |
30000 |
客户端组会话探测失败超市时间。Type: integer |
heartbeat.interval.ms |
* |
1 .. 3600000 |
1000 |
组会话保活心跳间隔。Type: integer |
group.protocol.type |
* |
|
consumer |
组协议类型。Type: string |
coordinator.query.interval.ms |
* |
1 .. 3600000 |
600000 |
多久查询一次当前的客户端组协调人。如果当前的分配协调人挂了,为了更快的恢复协调人,探测时间间隔会除以 10。Type: integer |
enable.auto.commit |
C |
true, false |
true |
在后台周期性的自动提交偏移量。Type: boolean |
auto.commit.interval.ms |
C |
0 .. 86400000 |
5000 |
消费者偏移量提交(写入)到存储的频率,毫秒。(0 = 不可用) Type: integer |
enable.auto.offset.store |
C |
true, false |
true |
为应用程序自动保存最后消息的偏移量。Type: boolean |
queued.min.messages |
C |
1 .. 10000000 |
100000 |
每一个 topic+partition,本地消费者队列的最小消息数。Type: integer |
queued.max.messages.kbytes |
C |
1 .. 1000000000 |
1000000 |
每一个 topic+partition,本地消费者队列的最大大小,单位kilobytes。该值应该大于 fetch.message.max.bytes。Type: integer |
fetch.wait.max.ms |
C |
0 .. 300000 |
100 |
为写满fetch.min.bytes,broker 的最大等待时间。Type: integer |
fetch.message.max.bytes |
C |
1 .. 1000000000 |
1048576 |
每一个 topic+partition 初始化的最大大小(bytes)用于从 broker 读消息。如果客户端遇到消息大于这个值,会逐步扩大直到塞下这个消息。Type: integer |
max.partition.fetch.bytes |
C |
|
|
参考 fetch.message.max.bytes |
fetch.min.bytes |
C |
1 .. 100000000 |
1 |
broker 请求的最小数据大小,单位bytes。如果达到 fetch.wait.max.ms 时间,则不管这个配置,将已收到的数据发送给客户端。Type: integer |
fetch.error.backoff.ms |
C |
0 .. 300000 |
500 |
对于 topic+partition,如果接受错误,下一个接受请求间隔多长时间。Type: integer |
offset.store.method |
C |
none, file, broker |
broker |
偏移量存储方式:’file’ - 本地文件存储 (offset.store.path, et.al), ‘broker’ - 在 broker 上提交存储 (要求 Apache Kafka 0.8.2 或以后版本)。Type: enum value |
consume_cb |
C |
|
|
消息消费回调函数 (参考 rd_kafka_conf_set_consume_cb()) Type: pointer |
rebalance_cb |
C |
|
|
消费者组重新分配后调用 (参考 rd_kafka_conf_set_rebalance_cb()) Type: pointer |
offset_commit_cb |
C |
|
|
偏移量提交结果回调函数 (参考 rd_kafka_conf_set_offset_commit_cb()) Type: pointer |
enable.partition.eof |
C |
true, false |
true |
当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件。Type: boolean |
queue.buffering.max.messages |
P |
1 .. 10000000 |
100000 |
生产者队列允许的最大消息数。Type: integer |
queue.buffering.max.kbytes |
P |
1 .. 2147483647 |
4000000 |
生产者队列允许的最大大小,单位kb。Type: integer |
queue.buffering.max.ms |
P |
1 .. 900000 |
1000 |
生产者队列缓存数据的最大时间,毫秒。Type: integer |
message.send.max.retries |
P |
0 .. 10000000 |
2 |
消息集发送失败重试次数。提示 重试会导致重排。Type: integer |
retries |
P |
|
|
参考 message.send.max.retries |
retry.backoff.ms |
P |
1 .. 300000 |
100 |
重试消息发送前的补偿时间。Type: integer |
compression.codec |
P |
none, gzip, snappy, lz4 |
none |
压缩消息集使用的压缩编解码器。这里配置的是所有 topic 的默认值,可能会被 topic 上的 compression.codec 属性覆盖。Type: enum value |
batch.num.messages |
P |
1 .. 1000000 |
10000 |
一个消息集最大打包消息数量。整个消息集的大小仍受限于 message.max.bytes。 Type: integer |
delivery.report.only.error |
P |
true, false |
false |
只对失败的消息提供分发报告。Type: boolean |
dr_cb |
P |
|
|
分发报告回调函数 (参考 rd_kafka_conf_set_dr_cb()) Type: pointer |
dr_msg_cb |
P |
|
|
分发报告回调函数 (参考 rd_kafka_conf_set_dr_msg_cb()) Type: pointer |