Property | C/P | Range | Default | Importance | Description |
---|---|---|---|---|---|
builtin.features | * | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins, zstd, sasl_oauthbearer, http, oidc | low | 指示librdkafka这个版本的内置特性。应用程序既可以查询此值,也可以尝试使用所需功能列表设置该值,以检查库支持。 类型:CSV标志 |
|
client.id | * | rdkafka | low | 客户端标识符。 类型:字符串 |
|
metadata.broker.list | * | high | 作为代理主机或host:port的CSV列表的代理的初始列表。应用程序也可以使用rd_kafka_brokers_add() 在运行时添加代理。类型:字符串 |
||
bootstrap.servers | * | high | mate.broker.list 的别名。:代理的初始列表,作为代理主机或host:port的CSV列表。应用程序也可以使用rd_kafka_brokers_add() 在运行时添加代理。< br > 类型:字符串 |
||
message.max.bytes | * | 1000 .. 1000000000 | 1000000 | medium | 最大Kafka协议请求消息大小。由于协议版本之间的帧开销不同,生产者无法在生产时可靠地强制执行严格的最大消息限制,并且可能在协议producerrequests中超过最大消息大小,代理将强制执行主题的 max.message 。字节的限制(参见Apache Kafka文档)。< br > 类型:整数 |
message.copy.max.bytes | * | 0 .. 1000000000 | 65535 | low | 要复制到缓冲区的消息的最大大小。大于这个值的消息将通过引用(零拷贝)传递,代价是更大的iovec。< br > 类型:整数 |
receive.message.max.bytes | * | 1000 .. 2147483647 | 100000000 | medium | 最大Kafka协议响应消息大小。这可以作为一种安全预防措施,以避免在协议中断的情况下内存耗尽。此值必须至少为 fetch.max.Bytes + 512 字节,允许协议开销;除非显式设置了configuration属性,否则该值将自动调整。< br > 类型:整数 |
max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | low | 每个代理连接的最大在线请求数。这是一个应用于所有代理通信的通用属性,但是它主要与生成请求相关。特别要注意的是,其他机制将每个代理的未完成的消费者获取请求数量限制为一个。< br > 类型:整数 |
max.in.flight | * | 1 .. 1000000 | 1000000 | low | max.in.flight.requests.per.connection 的别名,每个代理连接的最大在线请求数。这是一个应用于所有代理通信的通用属性,但是它主要与生成请求相关。特别要注意的是,其他机制将每个代理的未完成的消费者获取请求数量限制为一个。< br > 类型:整数 |
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | 刷新主题和代理元数据的时间段(以毫秒为单位),以便主动发现任何新的代理、主题、分区或分区leader更改。使用-1关闭间隔刷新功能(不推荐)。如果没有本地引用的主题(没有创建主题对象、没有生成消息、没有订阅或没有分配),那么每隔一段时间只刷新代理列表,但不会超过每隔10秒刷新一次。< br > 类型:整数 |
metadata.max.age.ms | * | 1 .. 86400000 | 900000 | low | 元数据缓存最大年龄。默认为:topic.metadata.refresh.interval.ms * 3 类型:integer |
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 100 | low | 当一个主题失去它的领导者时,一个新的元数据请求将立即进入队列,然后在这个初始间隔内呈指数增长,直到 retry.backoff.max.ms ,直到主题元数据被刷新。如果没有显式设置,它将默认为 retry.backoff.ms 。这用于从转换的领导代理中快速恢复。< br > 类型:整数 |
topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10 | low | 弃用 No longer used. Type: integer |
topic.metadata.refresh.sparse | * | true, false | true | low | 稀疏元数据请求(占用较少网络带宽) 类型:boolean |
topic.metadata.propagation.max.ms | * | 0 .. 3600000 | 30000 | low | Apache Kafka的主题创建是异步的,一个新主题在整个集群中传播到所有代理需要一些时间。如果客户端在手动创建主题之后请求主题元数据,但在主题已完全传播到客户端正在请求元数据的代理之前,该主题将看起来不存在,并且客户端将标记该主题,失败的队列生成消息为ERR__UNKNOWN_TOPIC 。此设置延迟将主题标记为不存在,直到已配置的传播最大时间过去。最大传播时间从主题首次在客户机中被引用的时间开始计算,例如,在produce()上。< br > 类型:整数 |
topic.blacklist | * | low | 主题黑名单,一个逗号分隔的正则表达式列表,用于匹配主题名称,在代理元数据信息中应该忽略这些名称,就好像主题不存在一样。 类型:pattern list |
||
debug | * | generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, all | medium | 要启用的调试上下文的逗号分隔列表。详细的生产者调试:broker,topic,msg。Consumer: consumer,cgrp,topic,fetch 类型:CSV flags |
|
socket.timeout.ms | * | 10 .. 300000 | 60000 | low | 网络请求的默认超时时间。Producer: producerrequests 将使用较小的 socket.timeout.ms 值和剩余的 message.timeout.ms 值来处理批处理中的第一条消息。Consumer:FetchRequests 将使用 fetch.wait.max.ms + socket.timeout.ms 。Admin:管理员请求将使用socket.timeout.ms 或显式设置 rd_kafka_AdminOptions_set_operation_timeout() 的值。Type: integer |
socket.blocking.max.ms | * | 1 .. 60000 | 1000 | low | 弃用 No longer used. Type: integer |
socket.send.buffer.bytes | * | 0 .. 100000000 | 0 | low | 代理套接字发送缓冲区大小。如果为 0,则使用系统默认值。 Type: integer |
socket.receive.buffer.bytes | * | 0 .. 100000000 | 0 | low | 代理套接字接收缓冲区大小。如果为 0,则使用系统默认值. Type: integer |
socket.keepalive.enable | * | true, false | false | low | 在代理套接字上启用TCP保持存活 (SO_KEEPALIVE) Type: boolean |
socket.nagle.disable | * | true, false | false | low | 在代理套接字上禁用Nagle算法 (TCP_NODELAY)。 Type: boolean |
socket.max.fails | * | 0 .. 1000000 | 1 | low | 当达到此发送失败数(例如,超时请求)时,断开与代理的连接。禁用 0。警告:强烈建议将此设置保留为默认值1,以避免在请求超时的情况下客户端和代理变得不同步。注意:连接将自动重新建立。 Type: integer |
broker.address.ttl | * | 0 .. 86400000 | 1000 | low | 缓存代理地址解析结果的时间(毫秒)。 Type: integer |
broker.address.family | * | any, v4, v6 | any | low | 允许的代理IP地址族:any、v4、v6 Type: enum value |
socket.connection.setup.timeout.ms | * | 1000 .. 2147483647 | 30000 | medium | 代理连接设置(TCP连接设置以及SSL和SASL握手)所允许的最大时间。如果到代理的连接在此之后不能完全正常工作,该连接将被关闭并重试。 Type: integer |
connections.max.idle.ms | * | 0 .. 2147483647 | 0 | medium | 在指定的不活动时间之后关闭代理连接。禁用0。如果此属性保持默认值,则执行一些启发式方法来确定合适的默认值,目前仅限于识别Azure上的代理(有关更多信息,请参阅librdkafka问题#3109)。 Type: integer |
reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 0 | low | DEPRECATED No longer used. See reconnect.backoff.ms and reconnect.backoff.max.ms . Type: integer |
reconnect.backoff.ms | * | 0 .. 3600000 | 100 | medium | 连接关闭后重新连接到代理之前等待的初始时间。时间呈指数增长,直到达到 reconnect.backoff.max.ms 设置的值。-25%到+50%的抖动应用于每个重新连接回退。值为0禁用回退并立即重新连接。 Type: integer |
reconnect.backoff.max.ms | * | 0 .. 3600000 | 10000 | medium | 在连接关闭后重新连接到代理之前等待的最长时间。 Type: integer |
statistics.interval.ms | * | 0 .. 86400000 | 0 | high | Librdkafka统计数据发出间隔。应用程序还需要使用 rd_kafka_conf_set_stats_cb() 注册一个stats回调。粒度为1000ms。值为0禁用统计信息。 Type: integer |
enabled_events | * | 0 .. 2147483647 | 0 | low | 查看 rd_kafka_conf_set_events() Type: integer |
error_cb | * | low | 错误回调(使用 rd_kafka_conf_set_error_cb() 设置) Type: see dedicated API |
||
throttle_cb | * | low | 节流回调(使用 rd_kafka_conf_set_throttle_cb() 设置) Type: see dedicated API |
||
stats_cb | * | low | 统计回调(设置rd_kafka_conf_set_stats_cb()) Type: see dedicated API |
||
log_cb | * | low | 日志回调(使用rd_kafka_conf_set_log_cb()设置) Type: see dedicated API |
||
log_level | * | 0 .. 7 | 6 | low | 日志级别(syslog(3) 级别) Type: integer |
log.queue | * | true, false | false | low | 禁用librdkafka内部线程的自发log_cb,而是使用 rd_kafka_set_log_queue() 在队列集上对日志消息进行排队,并通过标准轮询api提供日志回调或事件。注意:日志消息将停留在临时队列中,直到设置日志队列为止。 Type: boolean |
log.thread.name | * | true, false | true | low | 在日志消息中打印内部线程名(用于调试librdkafka内部) Type: boolean |
enable.random.seed | * | true, false | true | low | 如果启用,librdkafka将在rd_kafka_new()的第一次调用时使用srand(current_time.milliseconds)初始化PRNG(只有当你的平台上没有rand_r()时才需要)。如果禁用,应用程序必须在调用 rd_kafka_new()之前调用srand()。 Type: boolean |
log.connection.close | * | true, false | true | low | 日志代理断开连接。当与具有侵略性 connections.max.idle.ms 值的0.9代理交互时,关闭此选项可能会很有用。 Type: boolean |
background_event_cb | * | low | 后台队列事件回调(设置rd_kafka_conf_set_background_event_cb()) Type: see dedicated API |
||
socket_cb | * | low | 套接字创建回调以提供无竞争的CLOEXEC Type: see dedicated API |
||
connect_cb | * | low | Socket连接回调 Type: see dedicated API |
||
closesocket_cb | * | low | Socket关闭回调 Type: see dedicated API |
||
open_cb | * | low | 文件打开回调以提供无竞争的CLOEXEC Type: see dedicated API |
||
resolve_cb | * | low | 地址解析回调(通过rd_kafka_conf_set_resolve_cb()设置)。. Type: see dedicated API |
||
opaque | * | low | 应用程序opaque(通过rd_kafka_conf_set_opaque()设置) Type: see dedicated API |
||
default_topic_conf | * | low | 自动订阅主题的默认主题配置 Type: see dedicated API |
||
internal.termination.signal | * | 0 .. 128 | 0 | low | librdkafka将使用该信号快速终止rd_kafka_destroy()。如果这个信号没有设置,那么在rd_kafka_wait_destroyed()返回true之前会有一个延迟,因为内部线程正在超时它们的系统调用。如果这个信号被设置,延迟将是最小的。当安装了内部信号处理程序时,应用程序应该屏蔽这个信号。 Type: integer |
api.version.request | * | true, false | true | high | 请求代理支持的API版本来调整功能以适应可用的协议特性。如果设置为false,或者ApiVersionRequest失败,则会使用回退版本broker.version.fallback。注意:取决于代理版本>=0.10.0。如果请求不被(旧的)代理支持,则使用 broker.version.fallback 回退。Type: boolean |
api.version.request.timeout.ms | * | 1 .. 300000 | 10000 | low | 代理API版本请求超时时间。 Type: integer |
api.version.fallback.ms | * | 0 .. 604800000 | 0 | medium | 指定在ApiVersionRequest失败的情况下使用broker.version.fallback回退的时间。注意:ApiVersionRequest仅在建立到代理的新连接时发出(例如在升级之后)。 Type: integer |
broker.version.fallback | * | 0.10.0 | medium | 较旧的代理版本(0.10.0之前)无法让客户端查询支持的协议特性(ApiVersionRequest,参见api.version.request),因此客户端不可能知道它可能使用哪些特性。作为一种解决方案,用户可以将此属性设置为预期的代理版本,如果ApiVersionRequest失败(或被禁用),客户端将自动相应地调整其功能集。回退代理版本将用于 api.version.rollback.ms 。有效值为:0.9.0,0.8.2,0.8.1,0.8.0。或者 >= 0.10的其他值,例如0.10.2.1,表示启用ApiVersionRequests。 Type: string |
|
allow.auto.create.topics | * | true, false | false | low | 当订阅或分配不存在的主题时,允许在代理上自动创建主题。代理还必须配置 auto.create.topics.enable=true 此配置生效。注意:生产者的默认值(true)与消费者的默认值(false)不同。此外,消费者默认值与Java消费者不同(true),并且Java生产者不支持此属性。需要代理版本>= 0.11.0.0,对于较旧的代理版本,只应用代理配置。 Type: boolean |
security.protocol | * | plaintext, ssl, sasl_plaintext, sasl_ssl | plaintext | high | 用于与代理通信的协议。 Type: enum value |
ssl.cipher.suites | * | low | 密码套件是认证、加密、MAC和密钥交换算法的命名组合,用于协商使用TLS或SSL网络协议的网络连接的安全设置。参见手册页的密码(1)和SSL_CTX_set_cipher_list(3)。 Type: string |
||
ssl.curves.list | * | low | TLS ClientHello消息中的支持曲线扩展指定了客户端愿意让服务器使用的曲线(标准/命名,或“显式”GF(2^k)或GF(p))。参见SSL_CTX_set1_curves_list(3)的手册页。需要OpenSSL >= 1.0.2。 Type: string |
||
ssl.sigalgs.list | * | low | 客户端使用TLS ClientHello signature_algorithms扩展向服务器指示在数字签名中可以使用哪些签名/哈希算法对。参见SSL_CTX_set1_sigalgs_list(3)的手册页。需要OpenSSL >= 1.0.2。 Type: string |
||
ssl.key.location | * | low | 用于身份验证的客户端私钥(PEM)的路径 Type: string |
||
ssl.key.password | * | low | 私钥口令(用于ssl.key.location和set_ssl_cert()) Type: string |
||
ssl.key.pem | * | low | 用于认证的客户端私钥字符串(PEM格式)。 Type: string |
||
ssl_key | * | low | rd_kafka_conf_set_ssl_cert()设置的客户端私钥 Type: see dedicated API |
||
ssl.certificate.location | * | low | 用于身份验证的客户端公钥(PEM)路径 Type: string |
||
ssl.certificate.pem | * | low | 用于认证的客户端公钥字符串(PEM格式). Type: string |
||
ssl_certificate | * | low | rd_kafka_conf_set_ssl_cert()设置的客户端公钥 Type: see dedicated API |
||
ssl.ca.location | * | low | 用于验证代理密钥的CA证书的文件或目录路径。默认值:在Windows上,系统的CA证书自动在Windows根证书存储中查找。在Mac OSX上,此配置默认为probe。建议使用Homebrew安装openssl,以提供CA证书。在Linux上安装发行版的ca-certificates包。如果OpenSSL是静态链接的,或者ssl.ca.location设置为探测,则将探测一系列标准路径,并将发现的第一个路径用作默认的CA证书位置路径。如果OpenSSL是动态链接的,将使用OpenSSL库的默认路径(参见OpenSSL version -a中的OPENSSLDIR)。 Type: string |
||
ssl.ca.pem | * | low | 用于验证代理密钥的CA证书字符串(PEM格式). Type: string |
||
ssl_ca | * | low | 通过rd_kafka_conf_set_ssl_cert()设置的CA证书 Type: see dedicated API |
||
ssl.ca.certificate.stores | * | Root | low | 要从中加载CA证书的Windows证书存储区列表,以逗号分隔。证书将按照指定存储区的顺序加载。如果不能从任何指定的存储中加载证书,则会记录错误,并使用OpenSSL库的默认CA位置。存储名称通常是以下一个或多个:MY、Root、Trust、CA。 Type: string |
|
ssl.crl.location | * | low | 用于验证代理证书有效性的CRL路径。. Type: string |
||
ssl.keystore.location | * | low | 用于身份验证的客户机密钥存储库(pkcs# 12)的路径. Type: string |
||
ssl.keystore.password | * | low | 客户端的密钥存储库(pkcs# 12)密码。. Type: string |
||
ssl.providers | * | low | OpenSSL 3.0.x 的以逗号分隔的生产者列表。例如, "default,legacy". Type: string |
||
ssl.engine.location | * | low | DEPRECATED Path to OpenSSL engine library. OpenSSL >= 1.1.x required. DEPRECATED: OpenSSL engine support is deprecated and should be replaced by OpenSSL 3 providers. Type: string |
||
ssl.engine.id | * | dynamic | low | OpenSSL引擎id,用于加载引擎的名称。. Type: string |
|
ssl_engine_callback_data | * | low | OpenSSL引擎回调数据(通过rd_kafka_conf_set_engine_callback_data()设置)。 Type: see dedicated API |
||
enable.ssl.certificate.verification | * | true, false | true | low | 启用OpenSSL的内置代理(服务器)证书验证。应用程序可以通过实现certificate_verify_cb来扩展此验证。 Type: boolean |
ssl.endpoint.identification.algorithm | * | none, https | https | low | 使用代理证书验证代理主机名的端点标识算法。https - RFC2818中指定的服务器(代理)主机名验证。none -没有端点验证。需要OpenSSL >= 1.0.2。 Type: enum value |
ssl.certificate.verify_cb | * | low | 回调以验证代理证书链。 Type: see dedicated API |
||
sasl.mechanisms | * | GSSAPI | high | 用于身份验证的SASL机制。支持:GSSAPI, PLAIN, sam - sha -256, sam - sha -512, OAUTHBEARER。注意:除了名称之外,只能配置一种机制。. Type: string |
|
sasl.mechanism | * | GSSAPI | high | sasl.mechanisms 的别名 Type: string |
|
sasl.kerberos.service.name | * | kafka | low | Kafka运行的Kerberos主体名称,不包括 /hostname@REALM Type: string |
|
sasl.kerberos.principal | * | kafkaclient | low | 此客户机的Kerberos主体名称。(Windows上不支持,将使用登录用户的主体)。 Type: string |
|
sasl.kerberos.kinit.cmd | * | kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} \ | \ | kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} | |
sasl.kerberos.keytab | * | low | Kerberos keytab文件路径. 此配置属性仅作为sasl.kerberos.kinit.cmd中的变量使用,如 …- t”% {sasl.kerberos.keytab} Type: string |
||
sasl.kerberos.min.time.before.relogin | * | 0 .. 86400000 | 60000 | low | M按键刷新尝试之间的最小时间(以毫秒为单位)。通过将此属性设置为0来禁用自动密钥刷新. Type: integer |
sasl.username | * | high | SASL用户名用于 PLAIN 和 SASL- scram -..机制 Type: string |
||
sasl.password | * | high | SASL密码用于 PLAIN 和 SASL-scram -..机制 Type: string |
||
sasl.oauthbearer.config | * | low | SASL / OAUTHBEARER配置. 格式依赖于实现,必须进行相应的解析。默认的不安全令牌实现(参见https://tools.ietf.org/html/rfc7515#appendix-A.5)识别具有有效名称的空格分隔的名称=值对,包括principalClaimName、principal、scopeClaimName、scope和lifesecseconds. principalClaimName默认值为“sub”,scopeClaimName默认值为“scope”,lifesseconds默认值为“3600”。范围值为CSV格式,默认值为no/empty scope。例如:principalClaimName=azp principal=admin scopeClaimName=roles scope=role1,role2 lifesecseconds =600。此外,SASL扩展可以通过extension_NAME=value传递给代理。例如:principal=admin extension_traceId=123 Type: string |
||
enable.sasl.oauthbearer.unsecure.jwt | * | true, false | false | low | 如果没有设置oauthbearer_refresh_cb,则启用内置的不安全JWT OAUTHBEARER令牌处理程序。这个内置处理程序应该只用于开发或测试,而不是用于生产。 Type: boolean |
oauthbearer_token_refresh_cb | * | low | SASL/OAUTHBEARER令牌刷新回调(由rd_kafka_conf_set_oauthbearer_token_refresh_cb()设置,由rd_kafka_poll()等触发。当刷新客户端的OAUTHBEARER令牌时将触发此回调。另请参见rd_kafka_conf_enable_sasl_queue() Type: see dedicated API |
||
sasl.oauthbearer.method | * | default, oidc | default | low | 设置为“default”或“oidc”以控制使用哪种登录方法。如果设置为"oidc",还必须指定以下属性:sasl.oauthbearer.client.id , sasl.oauthbearer.client.secret , 和 sasl.oauthbearer.token.endpoint.url . Type: enum value |
sasl.oauthbearer.client.id | * | low | 应用程序的公共标识符。在授权服务器处理的所有客户机中必须是唯一的。仅当sasl.oauthbear .method设置为"oidc"时使用。 Type: string |
||
sasl.oauthbearer.client.secret | * | low | 只有应用程序和授权服务器知道的客户端秘密。这应该是一个足够随机的字符串,不能猜测。仅当sasl.oauthbear .method设置为"oidc"时使用。 Type: string |
||
sasl.oauthbearer.scope | * | low | 客户端使用它来指定对代理的访问请求的范围。仅当sasl.oauthbear .method设置为"oidc"时使用. Type: string |
||
sasl.oauthbearer.extensions | * | low | 允许向代理提供额外的信息。键=值对的逗号分隔列表。例如,“supportFeatureX = true, organizationId = sales-emea”。仅当sasl.oauthbear .method设置为"oidc"时使用。. Type: string |
||
sasl.oauthbearer.token.endpoint.url | * | low | OAuth/OIDC发行者令牌端点HTTP(S)用于检索令牌的URI。仅当sasl.oauthbear .method设置为"oidc"时使用。. Type: string |
||
plugin.library.paths | * | low | 加载的插件库列表(;分离)。库搜索路径依赖于平台(参见Unix的dlopen(3)和Windows的LoadLibrary())。如果没有指定文件扩展名,则会自动添加特定于平台的扩展名(如.dll或.so). Type: string |
||
interceptors | * | low | 通过rd_kafka_conf_interceptor_add_..()添加的拦截器以及由拦截器处理的任何配置。. Type: see dedicated API |
||
group.id | C | high | 客户端组id字符串。所有共享同一个 group.id 的客户端属于同一组. Type: string |
||
group.instance.id | C | medium | 启用静态组成员。静态组成员能够在配置的session.timeout.ms中离开和重新加入组,而不会提示组重新平衡。这应该与更大的session.timeout.ms结合使用,以避免由暂时不可用(例如进程重启)引起的组重新平衡。需要代理版本>= 2.3.0. Type: string |
||
partition.assignment.strategy | C | range,roundrobin | medium | 一个或多个分区分配策略的名称。当选的组领导将使用组中所有成员都支持的策略将分区分配给组成员。如果有多个符合条件的策略,则优先级由该列表的顺序决定(列表中较早的策略具有较高的优先级)。合作和非合作(急切)策略不能混合使用。可用策略:范围、轮合、合作粘滞。. Type: string |
|
session.timeout.ms | C | 1 .. 3600000 | 45000 | high | 客户端组会话和故障检测超时。消费者发送周期性心跳(heartbeat.interval.ms),以向代理指示其活动状态。如果代理在会话超时时间内没有收到组成员的心脏,则代理将从组中删除消费者并触发再平衡。允许的范围由代理配置属性group.min.session.timeout.ms和group.max.session.timeout.ms配置。另请参见max.poll.interval.ms。. Type: integer |
heartbeat.interval.ms | C | 1 .. 3600000 | 3000 | low | 组会话保持心跳间隔. Type: integer |
group.protocol.type | C | consumer | low | 组协议类型。注意:目前,唯一支持的组协议类型是 consumer . Type: string |
|
coordinator.query.interval.ms | C | 1 .. 3600000 | 600000 | low | 查询当前客户端组协调器的频率。如果当前分配的协调器停止工作,则配置的查询间隔将除以10,以便在重新分配协调器时更快地恢复。. Type: integer |
max.poll.interval.ms | C | 1 .. 86400000 | 300000 | high | 对于高级消费者,使用消息的调用之间的最大允许时间(例如,rd_kafka_consumer_poll())。如果超过这个间隔,则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个消费者组成.警告:此时可能无法进行偏移提交。注意:建议设置为enable.auto.offset。对于长时间处理的应用程序,Store =false,然后在消息处理后显式地存储偏移量(使用offsets_store()),以确保在处理完成之前不会自动提交偏移量。间隔每秒检查两次。请参阅KIP-62了解更多信息。 Type: integer |
enable.auto.commit | C | true, false | true | high | 在后台自动定期提交偏移量。注意:将此设置为false不会阻止消费者获取先前提交的开始偏移量。为了避免这种行为,在调用assign()时为每个分区设置特定的开始偏移. Type: boolean |
auto.commit.interval.ms | C | 0 .. 86400000 | 5000 | medium | 用户偏移量提交(写入)到偏移量存储的频率(以毫秒为单位)。(0 =禁用)。此设置由高级使用者使. Type: integer |
enable.auto.offset.store | C | true, false | true | high | 自动存储上次提供给应用程序的消息的偏移量。偏移量存储是每个分区下一个要(自动)提交的偏移量的内存存储. Type: boolean |
queued.min.messages | C | 1 .. 10000000 | 100000 | medium | librdkafka试图在本地消费者队列中维护的每个主题+分区的最小消息数。. Type: integer |
queued.max.messages.kbytes | C | 1 .. 2097151 | 65536 | medium | 本地使用者队列中排队预取消息的最大千字节数。如果使用高级使用者,则此设置适用于单个使用者队列,而不考虑分区的数量。当使用遗留的简单使用者或使用单独的分区队列时,此设置适用于每个分区。fetch.message.max.bytes可能会超过此值。此属性的优先级高于queue .min.message. Type: integer |
fetch.wait.max.ms | C | 0 .. 300000 | 500 | low | 代理可能等待用Fetch .min.bytes的消息填充Fetch响应的最大时间. Type: integer |
fetch.queue.backoff.ms | C | 0 .. 300000 | 1000 | medium | 在当前读取队列阈值(queue .min.)达到的情况下,主题+分区的下一个读取请求需要延迟多长时间?消息(或queue .max. Messages .kbytes)已被超过。如果队列阈值设置较低,并且应用程序正在经历消息之间的长(~1s)延迟,则可能需要减少此属性。较低的值可能会增加CPU利用率. Type: integer |
fetch.message.max.bytes | C | 1 .. 1000000000 | 1048576 | medium | 从代理获取消息时请求的每个主题+分区的初始最大字节数。如果客户端遇到一个大于此值的消息,它将逐渐尝试增加该值,直到可以获取整个消息. Type: integer |
max.partition.fetch.bytes | C | 1 .. 1000000000 | 1048576 | medium | fetch.message.max.bytes 的别名. Type: integer |
fetch.max.bytes | C | 0 .. 2147483135 | 52428800 | medium | 代理应该为Fetch请求返回的最大数据量。消息是由消费者批量获取的,如果Fetch请求的第一个非空分区中的第一个消息批处理大于此值,则消息批处理仍将被返回,以确保消费者可以进行处理.代理接受的最大消息批大小是通过message.max.bytes(代理配置)或max.message.bytes(代理主题配置)定义的。Fetch.max.bytes会自动向上调整为至少为message.max.bytes(消费者配置). Type: integer |
fetch.min.bytes | C | 1 .. 100000000 | 1 | low | 代理响应的最小字节数。如果fetch.wait.max.ms过期,不管这个设置如何,累积的数据将被发送到客户端. Type: integer |
fetch.error.backoff.ms | C | 0 .. 300000 | 500 | medium | 在获取错误的情况下,对主题+分区的下一个获取请求延迟多长时间。. Type: integer |
offset.store.method | C | none, file, broker | broker | low | DEPRECATED Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker). Type: enum value |
isolation.level | C | read_uncommitted, read_committed | read_committed | high | 控制如何读取事务性写入的消息:read_committed -只返回已提交的事务性消息。Read_uncommitted—返回所有消息,甚至是已经中止的事务性消息. Type: enum value |
consume_cb | C | low | 消息消费回调(设置rd_kafka_conf_set_consume_cb()) Type: see dedicated API |
||
rebalance_cb | C | low | 在消费者组被重新平衡后调用(使用rd_kafka_conf_set_rebalance_cb()设置) Type: see dedicated API |
||
offset_commit_cb | C | low | 偏移量提交结果传播回调 (通过rd_kafka_conf_set_offset_commit_cb()设置) Type: see dedicated API |
||
enable.partition.eof | C | true, false | false | low | 每当消费者到达分区的末端时,触发 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件. Type: boolean |
check.crcs | C | true, false | false | medium | 验证已使用消息的CRC32,确保消息没有发生在线上或磁盘上的损坏。此检查是在CPU使用率略微增加时进行的. Type: boolean |
client.rack | * | low | 此客户端的机架标识符。这可以是任何字符串值,表示该客户端物理位置。它与代理配置 broker.rack 相对应. Type: string |
||
transactional.id | P | high | 启用事务性生成程序。事务。Id用于跨流程重启标识相同的事务生成器实例。它允许生产者保证与同一生产者的早期实例相对应的事务在开始任何新事务之前已经完成,并且任何僵尸实例都被隔离.如果没有事务性。如果提供了Id,则生产者被限制为幂等交付(如果启用)。幂等是集合)。需要代理版本>= 0.11.0 Type: string |
||
transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | 事务协调器在主动终止正在进行的事务之前等待生产者更新事务状态的最大时间(以毫秒为单位)。如果此值大于代理中的transaction.max.timeout.ms设置,则init_transactions()调用将以ERR_INVALID_TRANSACTION_TIMEOUT失败.事务超时会自动调整message.timeout.ms和socket.timeout。除非显式配置,否则它们不能超过事务超时(socket.timeout)。Ms必须至少比transaction.timeout.ms低100ms)。如果没有向事务API方法提供timeout(-1),这也是默认超时值 Type: integer |
enable.idempotence | P | true, false | false | high | 当设置为true时,生产者将确保消息仅成功地按照原始的生产顺序生产一次。当幂等性被启用时,以下配置属性会自动调整(如果用户没有修改):connection=5(必须小于或等于5),retries=INT32_MAX(必须大于0),acks=all, queue .strategy=fifo。如果用户提供的配置不兼容,生产者实例化将失败. Type: boolean |
enable.gapless.guarantee | P | true, false | false | low | EXPERIMENTAL: subject to change or removal. When set to true , any error that could result in a gap in the produced message series when a batch of messages fails, will raise a fatal error (ERR__GAPLESS_GUARANTEE) and stop the producer. Messages failing due to message.timeout.ms are not covered by this guarantee. Requires enable.idempotence=true . Type: boolean |
queue.buffering.max.messages | P | 0 .. 2147483647 | 100000 | high | 生产者队列上允许的最大消息数。该队列由所有主题和分区共享。值为0禁用此限制. Type: integer |
queue.buffering.max.kbytes | P | 1 .. 2147483647 | 1048576 | high | 生产者队列上允许的最大消息总大小总和。该队列由所有主题和分区共享。此属性的优先级高于queue.buffer .max.messages。. Type: integer |
queue.buffering.max.ms | P | 0 .. 900000 | 5 | high | 在构造消息批次(MessageSets)传输到代理之前,延迟以毫秒为单位,等待生产者队列中的消息累积。更高的值允许更大、更有效(更少的开销、改进的压缩)的消息批次累积,但代价是增加消息传递延迟. Type: float |
linger.ms | P | 0 .. 900000 | 5 | high | queue.buffering.max.ms 的别名 . Type: float |
message.send.max.retries | P | 0 .. 2147483647 | 2147483647 | high | 重试发送失败消息的次数。注意:重试可能导致重新排序,除非 enable.idempotence 设为 true. Type: integer |
retries | P | 0 .. 2147483647 | 2147483647 | high | Alias for message.send.max.retries . Type: integer |
retry.backoff.ms | P | 1 .. 300000 | 100 | medium | 在重新尝试协议请求之前的退回时间(以毫秒为单位),这是第一次退回时间,并且将以指数方式退回,直到重试次数用尽,并且它的上限为retry.backoff.max.ms Type: integer |
retry.backoff.max.ms | P | 1 .. 300000 | 1000 | medium | 在重新尝试协议请求之前的最大退回时间(以毫秒为单位),这是指数级退回请求所允许的最大退回时间. Type: integer |
queue.buffering.backpressure.threshold | P | 1 .. 1000000 | 1 | low | 尚未传输的未完成代理请求的阈值需要对生产者的消息累加器进行反压。如果尚未传输的请求数量等于或超过这个数量,那么原本会触发的生成请求创建(例如,根据linger.ms)将被延迟。数量越少,批次越大,效果越好。当在速度较慢的机器上使用压缩时,较高的值可以改善延迟 Type: integer |
compression.codec | P | none, gzip, snappy, lz4, zstd | none | medium | 用于压缩消息集的压缩编解码器。这是所有主题的默认值,可以被主题配置属性compression.codec覆盖。 Type: enum value |
compression.type | P | none, gzip, snappy, lz4, zstd | none | medium | compression.codec 的别名 Type: enum value |
batch.num.messages | P | 1 .. 1000000 | 10000 | medium | 在一个MessageSet中批处理的最大消息数。MessageSet的总大小也受到批处理的限制。Size和message.max.bytes。. Type: integer |
batch.size | P | 1 .. 2147483647 | 1000000 | medium | 在一个MessageSet中批处理的所有消息的最大大小(以字节为单位),包括协议帧开销。此限制在将第一条消息添加到批处理之后应用,无论第一条消息的大小如何,这是为了确保超过批处理的消息。尺寸是生产出来的。MessageSet的总大小也受到batch.num.messages和message.max.bytes的限制. Type: integer |
delivery.report.only.error | P | true, false | false | low | 仅为失败的消息提供传递报告 Type: boolean |
dr_cb | P | low | 发送报告回调(使用rd_kafka_conf_set_dr_cb()设置) Type: see dedicated API |
||
dr_msg_cb | P | low | 发送报告回调(使用rd_kafka_conf_set_dr_msg_cb()设置) Type: see dedicated API |
||
sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | 以毫秒为单位等待为每个主题分配新的粘接分区。缺省情况下,将linger.ms的时间设置为两倍。要禁用粘性行为,请将其设置为0。在所有情况下,该行为都会影响键为NULL的消息,以及在使用consistent_random分区时键长度为零的消息。否则,这些消息将被随机分配。较高的值允许更有效地批处理这些消息 Type: integer |
client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | 控制客户端如何使用DNS查找。默认情况下,当查找返回一个主机名的多个IP地址时,将在认为连接失败之前尝试所有这些IP地址进行连接.这既适用于引导服务器,也适用于广告服务器。如果将该值设置为resolve_canonical_bootstrap_servers_only,则将解析每个条目并将其展开为规范名称列表。注意:这里的默认行为不同于Java客户机的默认行为,后者仅连接到为主机名返回的第一个IP地址 Type: enum value |
本文为Larwas原创文章,转载无需和我联系,但请注明来自larwas博客 https://larwas.com
最新评论