1.概述

Kafka是一个分布表示实时数据流平台,可独立部署在单台服务器上,也可部署在多台服务器上构成集群。它提供了发布与订阅的功能,用户可以发送数据到Kafka集群中,也可以从Kafka集群中读取数据。之前在Kafka 2.8.0版本时,Kafka社区提出了KRaft协议的概念,现在社区发布了Kafka 3.0,里面涉及优化和新增了很多功能,其中就包含KRaft协议的改机。今天,笔者就给大家介绍一下Kafka 3.0新增了哪些特性以及优化了哪些功能。

2.内容

在 Kafka 3.0 中包含了许多重要的新功能,其中比较显著的变化如下所示:

  • 弃用对Java 8 和Scala 2.12 的支持;
  • Kafka Raft 支持元数据主题的快照以及自动管理仲裁中的其他改进;
  • 默认情况下为Kafka 生产者提供更加强大的交付保证;
  • 弃用消息格式 v0 和 v1;
  • OffsetFetch 和 FindCoordinator 请求中的优化;
  • 更灵活的 Mirror Maker 2 配置和 Mirror Maker 1 的弃用;
  • 能够在 Kafka Connect 中的单个调用中重新其中连接器的任务;
  • 现在默认启用连接器日志上下文和连接器客户单覆盖;
  • Kafka Streams 中时间戳同步的增强语义;
  • 改进了 Stream 和 TaskId 的公共 API;
  • Kafka 中的默认 serde 变为 null。

2.1 关于升级到 Kafka 3.0

在Kafka 3.0中,社区对于Zookeeper的版本已经升级到3.6.3了,其中我们可以预览 KRaft 模式,但是无法从 2.8 或者更早的版本升级到该模式。许多实现依赖 jar 现在在运行时类路劲中可用,而不是在编译和运行时类路劲中。升级后的编译错误可以通过显示添加缺少的依赖 jar 或更新应用程序以不使用内部类来修复。

消费者配置的默认值 session.timeout.ms 从10 秒增加到了45 秒,而Broker配置 log.message.format.version 和 Topic 配置 message.format.version 已经被启用。两种配置的值始终假定为 3.0 或者更高,通过 inter.broker.protocol.version 来配置。如果设置了 log.message.format.version 或者 message.format.version 建议在升级到 3.0的同时清理掉这两个属性,同时设置 inter.broker.protocol.version 值为 3.0 。

Streams API 删除了在 2.5.0 或者更早版本中弃用的所有弃用 API,Kafka Streams 不再对“connect:json”模块有编译时的依赖,依赖此传递依赖项的项目必须明确声明它。

现在,通过指定的自定义主体构建起实现 principal.builder.class 现在必须实现 KafkaPrincipalSerde 接口以允许Broker 之间的转发。另外,一些过时的类,方法和工具以及从clients、connect、core、和tools模块进行了删除。

该Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)方法已被弃用。请使用 Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)来替换,ConsumerGroupMetadata 可以通过检索KafkaConsumer#groupMetadata()更强大的语义。需要注意的是,完整的消费者组元数据集只有 Brokers 或 2.5 或更高版本才能支持,因此你必须升级你的 Kafka 集群以获得更强的语义。否则,你可以通过new ConsumerGroupMetadata(consumerGroupId)与较老版本的Broker进行交互。

连接器中 internal.key.converter 和 internal.value.converter 属性已被完全删除。自版本 2.0.0 起,不推荐使用这些 Connect 工作器属性。现在被硬编码为使用 schemas.enable 设置为的 JSON 转换器false。如果你的集群一直在使用不同的内部键或值转换器,你可以按照官网文档中概述的迁移步骤,将你的 Connect 集群安全地升级到 3.0。 基于 Connect 的 MirrorMaker (MM2) 包括对支持的更改IdentityReplicationPolicy,无需重命名 Topic 即可启用复制。DefaultReplicationPolicy默认情况下仍然使用现有的,但可以通过 replication.policy 配置属性启用身份复制 。这对于从旧版 MirrorMaker (MM1) 迁移的用户,或者对于不希望 Topic 重命名的具有简单单向复制拓扑的用例特别有用。请注意IdentityReplicationPolicy与 DefaultReplicationPolicy 不同,无法根据 Topic 名称阻止复制循环,因此在构建复制拓扑时要注意避免循环。

2.1.1 目的

虽然 internal.key.converter 和 internal.value.converter 中 Connect 工作器属性,以及以这些名称为前缀的所有属性都已弃用,但是有时候用户仍会尝试使用这些属性进行调试,在与未弃用的Key 和 Value转化器相关的属性意外混淆后,或者只是对其进行盲目的配置后,进行调试。这些实验的结果可能会产生不好的后果,配置了新内保转换器却无法读取具有较旧内部转换器的内保 Topic 数据,这最多会导致偏移量和连机器配置的丢失。

以下连接属性会将被删除:

  • internal.key.converter
  • internal.value.converter 
  • internal.key.converter.   # 以工作器内部密钥转换器为前缀的属性
  • internal.value.converter.   # 以工作线程的内部值转换器为前缀的属性

Connect 的行为就好像上面没有提供一样。具体来说,对于它的键和值转换器,它将使用开箱即用的 JsonConverter,配置为 schemas.enable 属性值为 false 。

2.1.2 升级步骤

运行未使用JsonConverter 并对 schemas.enable 设置 false 的 Connect 集群用户,可以按照以下步骤将其 Connect 集群升级到 3.0:

  1. 停止集群上的所有工作线程
  2. 对于每个内部主题(配置、偏移量和状态):
    1. 创建一个新主题来代替现有主题
    2. 对于现有主题中的每条消息:
      1. 使用 Connect 集群的旧内部键和值转换器反序列化消息的键和值
      2. 使用 禁用模式的JSON 转换器序列化消息的键和值(通过将schemas.enable属性设置为false)
      3. 用新的键和值向新的内部主题写一条消息
  3. 重新配置每个 Connect worker 以使用步骤 2 中新创建的内部主题
  4. 启动集群上的所有worker

2.2 新功能

在本次 Kafka 3.0  版本中新增了以下功能:

  • 添加了InsertHeader 和 DropHeader 连接转换 
  • 在 KRaft 模式中实现 createPartitions
  • 如果分区从 fetcher 中删除,副本 fetcher 不应在发散时期更新分区状态

2.2.1 添加 InsertHeader 和 DropHeader

之前在核心 Kafka 产品中引入了 Headers,在 Kafka Connect Framework 中公开它们将是有利的。Kafka 的 Header 是带有二进制值的简单名称,而 Connect API 已经有一个非常有用的层来处理不同类型的数据。Connect 的 Header 支持应该使用像 Kafka 这样的字符串名称,但使用与 Connect 记录键和值相同的类型来表示值。这将提供与 Connect 框架的其余部分的一致性,并使连接器和转换能够轻松地访问、修改和创建记录上的 Header。

Kafka 将 Header 定义为具有字符串名称和二进制值,但 Connect 将使用用于记录键和值的相同机制来表示 Header 值。每个 Header 值可能有一个对应的 Schema,允许连接器和转换以一致的方式处理 Header 值、记录键和记录值。Connect 将定义一种 HeaderConverter 机制以类似于Converter框架的方式序列化和反序列化标头值 ,这样现有的 Converter实现也可以实现 HeaderConverter. 由于来自不同供应商的连接器和转换可能被组合到单个管道中,因此不同的连接器和转换可以轻松地将 Header 值从原始形式转换为连接器和/或转换期望的类型,这一点很重要。

注意:
为了简洁和清晰,显示的代码不包括 JavaDoc,但提议的更改确实包括所有公共 API 和方法的 JavaDoc。

1.Connect Header 和 Header API

org.apache.kafka.connect.Header 将添加一个新接口并用作记录上单个标头的公共 API。该接口为键、值和值的模式定义了简单的 getter。这些是不可变对象,还有一些方法可以创建Header具有不同名称或值的新对象。代码片段如下所示:

package org.apache.kafka.connect.header;
public interface Header {
 
    // Access the key and value
    String key(); // never null
    Schema schema(); // may be null
    Object value(); // may be null
 
    // Methods to create a copy
    Header with(Schema schema, Object value);
    Header rename(String key);
}

org.apache.kafka.connect.Headers 还将添加一个新接口并用作记录标题有序列表的公共 API。这是在 Kafka 客户端的 org.apache.kafka.common.header.Headers接口之后作为标题的有序列表进行模式化的,其中允许多个具有相同名称的标题。Connect Headers接口定义了Header按顺序和/或按名称访问各个 对象以及获取有关Header对象数量的信息的方法 。它还定义了Header使用各种签名来添加、删除和保留 对象的方法,这些签名将易于连接器和转换使用。由于多个Header对象可以具有相同的名称,因此转换需要一种简单的方法来修改和/或删除现有Header对象, apply(HeaderTransform) 并且apply(String, HeaderTransform) 方法可以轻松使用自定义 lambda 函数来执行此操作。代码片段如下所示:

package org.apache.kafka.connect.header;
public interface Headers extends Iterable<Header> {
 
    // Information about the Header instances
    int size();
    boolean isEmpty();
    Iterator<Header> allWithName(String key);
    Header lastWithName(String key);
 
    // Add Header instances to this object
    Headers add(Header header);
    Headers add(String key, SchemaAndValue schemaAndValue);
    Headers add(String key, Object value, Schema schema);
    Headers addString(String key, String value);
    Headers addBoolean(String key, boolean value);
    Headers addByte(String key, byte value);
    Headers addShort(String key, short value);
    Headers addInt(String key, int value);
    Headers addLong(String key, long value);
    Headers addFloat(String key, float value);
    Headers addDouble(String key, double value);
    Headers addBytes(String key, byte[] value);
    Headers addList(String key, List<?> value, Schema schema);
    Headers addMap(String key, Map<?, ?> value, Schema schema);
    Headers addStruct(String key, Struct value);
    Headers addDecimal(String key, BigDecimal value);
    Headers addDate(String key, java.util.Date value);
    Headers addTime(String key, java.util.Date value);
    Headers addTimestamp(String key, java.util.Date value);
 
    // Remove and/or retain the latest Header
    Headers clear();
    Headers remove(String key);
    Headers retainLatest(String key);
    Headers retainLatest();
 
    // Create a copy of this Headers object
    Headers duplicate();
 
    // Apply transformations to named or all Header objects
    Headers apply(HeaderTransform transform);
    Headers apply(String key, HeaderTransform transform);
  
    interface HeaderTransform {
        Header apply(Header header);
    }
}

2.Connect Records

每条 Kafka 消息都包含零个或多个标头名称-值对,因此 Connect 记录类将被修改为具有Headers可以就地修改的非空对象。现有的 ConnectRecord 抽象类是两个基类 SourceRecord和 SinkRecord,并且将被改变为具有新的 headers填充字段 ConnectHeaders对象。所有现有构造函数和方法的签名都将保持不变以保持后向兼容性,但现有构造函数将headers使用ConnectHeaders对象填充新字段。而且, toString(), hashCode()和 equalTo(Object)方法将改为使用新的 headers领域。
一个新的构造函数和几个新方法将被添加到这个现有的类中,代码片段如下所示:

package org.apache.kafka.connect.connector;
public abstract class ConnectRecord<R extends ConnectRecord<R>> {
 
 
   /* The following will be added to this class */
    
   private final Headers headers;
   public ConnectRecord(String topic, Integer kafkaPartition,
                     Schema keySchema, Object key,
                     Schema valueSchema, Object value,
                     Long timestamp, Iterable<Header> headers) {
       this(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp);
       if (headers == null) {
           this.headers = new ConnectHeaders();
       } else if (headers instanceof ConnectHeaders) {
           this.headers = (ConnectHeaders)headers;
       } else {
           this.headers = new ConnectHeaders(headers);
       }
   }
 
   public Headers headers() {
       return headers;
   }
 
   public abstract R newRecord(String topic, Integer kafkaPartition, Schema keySchema,
                               Object key, Schema valueSchema, Object value, Long timestamp,
                               Iterable<Header> headers);
}

现有的 SourceRecord类将被修改以添加一个新的构造函数并实现附加 newRecord(...)方法。同样,所有现有构造函数和方法的签名将保持不变以保持向后兼容性。代码片段如下所示:

package org.apache.kafka.connect.source;
public class SourceRecord extends ConnectRecord<SourceRecord> {
      
    /* The following will be added to this class */
 
    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
                        String topic, Integer partition,
                        Schema keySchema, Object key,
                        Schema valueSchema, Object value,
                        Long timestamp, Iterable<Header> headers) {
        super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
        this.sourcePartition = sourcePartition;
        this.sourceOffset = sourceOffset;
    }
 
 
    @Override
    public SourceRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value,
                                  Long timestamp, Iterable<Header> headers) {
        return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers);
    }
}

同样,SinkRecord 将修改现有 类以添加新的构造函数并实现附加 newRecord(...) 方法。同样,所有现有构造函数和方法的签名将保持不变以保持向后兼容性。代码片段如下所示:

package org.apache.kafka.connect.sink;
public class SinkRecord extends ConnectRecord<SinkRecord> {
      
    /* The following will be added to this class */
 
    public SinkRecord(String topic, int partition,
                      Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
                      Long timestamp, TimestampType timestampType, Iterable<Header> headers) {
        super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
        this.kafkaOffset = kafkaOffset;
        this.timestampType = timestampType;
    }
 
    @Override
    public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value,
                                Long timestamp, Iterable<Header> headers) {
        return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
    }
}

3.序列化与反序列化

本次更新中添加了一个新 org.apache.kafka.connect.storage.HeaderConverter 接口,该org.apache.kafka.connect.storage.Converter接口在现有接口的基础上进行了模式化, 但具有特定于 Header 的方法名称和签名。代码片段如下所示:

package org.apache.kafka.connect.storage;
public interface HeaderConverter extends Configurable, Closeable {
 
    /**
     * Convert the header name and byte array value into a {@link Header} object.
     * @param topic the name of the topic for the record containing the header
     * @param headerKey the header's key; may not be null
     * @param value the header's raw value; may be null
     * @return the {@link SchemaAndValue}; may not be null
     */
    SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value);
 
    /**
     * Convert the {@link Header}'s {@link Header#valueAsBytes() value} into its byte array representation.
     * @param topic the name of the topic for the record containing the header
     * @param headerKey the header's key; may not be null
     * @param schema the schema for the header's value; may be null
     * @param value the header's value to convert; may be null
     * @return the byte array form of the Header's value; may be null if the value is null
     */
    byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value);
 
    /**
     * Configuration specification for this set of header converters.
     * @return the configuration specification; may not be null
     */
    ConfigDef config();
}

需要注意的是,不同的是 Converter,新 HeaderConverter接口扩展了 Configurable 现在对于可能具有附加配置属性的 Connect 接口通用的接口。

现有实现 Converter 也可能实现 HeaderConverter,并且ConverterConnect 中的所有三个现有 实现都将相应地更改以通过序列化/反序列化 Header 值来实现这个新接口,类似于它们序列化/反序列化键和值的方式:

  • StringConverter
  • ByteArrayConverter
  • JsonConverter

HeaderConverter 将添加一个新实现来将所有内置原语、数组、映射和结构与字符串表示形式相互转换。与StringConverter使用 toString()方法的不同 SimpleHeaderConverter,除了不带引号的简单字符串值之外, 使用类似 JSON 的表示形式表示基本类型、数组、映射和结构。这种形式直接对应于许多开发人员认为将值序列化为字符串的方式,并且可以 SimpleHeaderConverter解析这些任何和所有这样的值,并且大部分时间来推断正确的模式。因此,这将用于HeaderConverterConnect 工作程序中使用的默认值 。

下表描述了SimpleHeaderConverter将如何持久化这些值,表格如下:

类型 描述 例子
BOOLEAN true或者false  
 BYTE_ARRAY 字节数组的Base64编码字符串  
INT8 Java字节的字符串表示形式  
INT16 Java Short的字符串表示形式  
INT32 Java Int的字符串表示形式  
INT64 Java Long的字符串表示形式  
FLOAT32 Java 浮点数的字符串表示形式  
FLOAT64 Java Double的字符串表示形式  
STRING 字符串的UTF-8表示  
ARRAY 数组的类似 JSON 的表示形式。数组值可以是任何类型,包括基本类型和非基本类型。  
MAP 类似 JSON 的表示形式。尽管大多数正确创建的映射都具有相同类型的键和值,但也支持具有任何键和值的映射。映射值可以是任何类型,包括基本类型和非基本类型。 { "foo": "value", "bar": "strValue", "baz": "other" }
STRUCT 类似 JSON 的表示形式。Struct 对象可以序列化,但反序列化时将始终解析为映射,因为模式不包含在序列化形式中。 { "foo": true, "bar": "strValue", "baz": 1234 }
DECIMAL 对应的字符串表示java.math.BigDecimal。  
TIME IOS-8601 时间表示,格式为“HH:mm:ss.SSS'Z'”。 16:31:05.387UTC
DATE 日期的 ISO-8601 表示,格式为“YYYY-MM-DD”。 2021-09-25
TIMESTAMP 时间戳的 ISO-8601 表示,格式为“YYYY-MM-DD'T'HH:mm:ss.SSS'Z'”。

2021-09-25T 16:31:05.387UTC

4.属性配置

Connect 工作器需要配置为使用 HeaderConverter 实现,因此header.converter 将定义一个名为的附加工作器配置 ,默认为 SimpleHeaderConverter. 具有相同名称和默认值的类似配置属性将添加到连接器配置中,允许连接器覆盖工作程序的 Header 转换器。请注意,每个连接器任务都有自己的标头转换器实例,就像键和值转换器一样。

5.转换 Header 值

每个 Header 都有一个可由接收器连接器和简单消息转换使用的值。但是,标头值的类型首先取决于标头的创建方式以及它们的序列化和反序列化方式。将添加一组新的转换实用程序方法,使 SMT 和接收器连接器可以轻松地将标头值转换为易于使用的类型。这些转换可能需要原始架构和值。与字符串之间的转换使用与上述相同的机制SimpleHeaderConverter。
例如,SMT 或接收器连接器可能期望标头值为 long,并且可以使用这些实用方法来转换任何数值(例如,int、short、String、BigDecimal 等)。或者,接收器连接器可能需要 Timestamp 逻辑数据类型,因此它可以使用该 Values.convertToTimestamp(s,v) 方法从时间戳或日期的任何 ISO-8601 格式字符串表示转换,或表示为 long 或字符串的过去纪元的毫秒数。
这些实用方法可用于 Header 值或键、值或结构、数组和映射中的任何值。代码片段如下所示:

package org.apache.kafka.connect.data;
public class Values {
 
    // All methods return null when value is null, and throw a DataException
    // if the value cannot be converted to the desired type.
    // If the value is already the desired type, these methods simply return it.
    public static Boolean convertToBoolean(Schema schema, Object value) throws DataException {...}
    public static Byte convertToByte(Schema schema, Object value) throws DataException {...}
    public static Short convertToShort(Schema schema, Object value) throws DataException {...}
    public static Integer convertToInteger(Schema schema, Object value) throws DataException {...}
    public static Long convertToLong(Schema schema, Object value) throws DataException {...}
    public static Float convertToFloat(Schema schema, Object value) throws DataException {...}
    public static Double convertToDouble(Schema schema, Object value) throws DataException {...}
    public static String convertToString(Schema schema, Object value) {...}
    public static java.util.Date convertToTime(Schema schema, Object value) throws DataException {...}
    public static java.util.Date convertToDate(Schema schema, Object value) throws DataException {...}
    public static java.util.Date convertToTimestamp(Schema schema, Object value) throws DataException {...}
    public static BigDecimal convertToDecimal(Schema schema, Object value, int scale) throws DataException {...}
  
    // These only support converting from a compatible string form, which is the same
    // format used in the SimpleHeaderConverter described above
    public static List<?> convertToList(Object value) {...}
    public static Map<?, ?> convertToMap(Object value) {...}
  
    // Only supports returning the value if it already is a Struct.
    public static Struct convertToStruct(Object value) {...}
}

3.优化与调整

在 Kafka 3.0 中优化和调整了以下内容:

  • [ KAFKA-3745 ] - 考虑向 ValueJoiner 接口添加连接键
  • [ KAFKA-4793 ] - Kafka Connect: POST /connectors/(string: name)/restart 不会启动失败的任务
  • [ KAFKA-5235 ] - GetOffsetShell:支持多个主题和消费者配置覆盖
  • [ KAFKA-6987 ] - 用 CompletableFuture 重新实现 KafkaFuture
  • [ KAFKA-7458 ] - 在引导阶段避免强制处理
  • [ KAFKA-8326 ] - 添加 Serde> 支持
  • [ KAFKA-8372 ] - 删除不推荐使用的 RocksDB#compactRange API
  • [ KAFKA-8478 ] - 在强制处理之前轮询更多记录
  • [ KAFKA-8531 ] - 更改默认复制因子配置
  • [ KAFKA-8613 ] -对流中的窗口操作强制使用宽限期
  • [ KAFKA-8897 ] - RocksDB 增加版本
  • [ KAFKA-9559 ] - 将默认的“默认 serde”从 ByteArraySerde 更改为 null
  • [ KAFKA-9726 ] - MM2 模仿 MM1 的 IdentityReplicationPolicy
  • [ KAFKA-10062 ] - 添加一种方法来检索 Streams 应用程序已知的当前时间戳
  • [ KAFKA-10201 ] - 更新代码库以使用更具包容性的术语
  • [ KAFKA-10449 ] - Connect-distributed 示例配置文件没有针对侦听器的说明
  • [ KAFKA-10585 ] - Kafka Streams 应该从清理中清理状态存储目录
  • [ KAFKA-10619 ] - Producer 将默认启用 EOS
  • [ KAFKA-10675 ] - 来自 ConnectSchema.validateValue() 的错误消息应包括架构的名称。
  • [ KAFKA-10697 ] - 删除 ProduceResponse.responses
  • [ KAFKA-10746 ] - 消费者轮询超时到期应记录为警告而不是信息。
  • [ KAFKA-10767 ] - 为 ThreadCacheTest 中缺少的方法添加单元测试用例
  • [ KAFKA-10769 ] - 删除 JoinGroupRequest#containsValidPattern 因为它与 Topic#containsValidPattern 重复
  • [ KAFKA-10885 ] - 重构 MemoryRecordsBuilderTest/MemoryRecordsTest 以避免大量(不必要的)被忽略的测试用例
  • [ KAFKA-12177 ] - 保留不是幂等的
  • [ KAFKA-12234 ] - 扩展 OffsetFetch 请求以接受多个组 ID。
  • [ KAFKA-12287 ] - 当按时间戳或持续时间重置偏移量找不到偏移量并默认为最新时,在消费者组上添加警告日志记录。
  • [ KAFKA-12288 ] - 删除任务级文件系统锁
  • [ KAFKA-12294 ] - 考虑使用转发机制来创建元数据自动主题
  • [ KAFKA-12313 ] - 考虑弃用 default.windowed.serde.inner.class 配置
  • [ KAFKA-12329 ] - 当主题不存在时,kafka-reassign-partitions 命令应该给出更好的错误信息
  • [ KAFKA-12335 ] - 将 junit 从 5.7.0 升级到 5.7.1
  • [ KAFKA-12344 ] - 在 Scala API 中支持 SlidingWindows
  • [ KAFKA-12347 ] - 提高 Kafka Streams 跟踪进度的能力
  • [ KAFKA-12349 ] - 跟进 KIP-500 中的 PartitionEpoch
  • [ KAFKA-12362 ] - 确定任务是否空闲
  • [ KAFKA-12379 ] - KIP-716:允许使用 MirrorMaker2 配置 offsetsync 主题的位置
  • [ KAFKA-12396 ] - 收到空密钥时kstreams 的专用异常
  • [ KAFKA-12398 ] - 修复脆弱的测试 `ConsumerBounceTest.testClose`
  • [ KAFKA-12408 ] - 文档省略了 ReplicaManager 指标
  • [ KAFKA-12409 ] - ReplicaManager 中的计量器泄漏
  • [ KAFKA-12415 ] - 为 Gradle 7.0 做准备并限制非 api 依赖项的传递范围
  • [ KAFKA-12419 ] - 删除 3.0 中弃用的 Kafka Streams API
  • [ KAFKA-12436 ] - 弃用 MirrorMaker v1
  • [ KAFKA-12439 ] - 在 KIP-500 模式下,我们应该能够为被围栏的节点分配新的分区
  • [ KAFKA-12442 ] - 将 ZSTD JNI 从 1.4.8-4 升级到 1.4.9-1
  • [ KAFKA-12454 ] - 当当前 kafka 集群中不存在给定的 brokerIds 时,在 kafka-log-dirs 上添加错误日志记录
  • [ KAFKA-12464 ] - 增强约束粘性分配算法
  • [ KAFKA-12479 ] - 在 ConsumerGroupCommand中将分区偏移请求合并为单个请求
  • [ KAFKA-12483 ] - 默认情况下在连接器配置中启用客户端覆盖
  • [ KAFKA-12484 ] - 默认情况下启用 Connect 的连接器日志上下文
  • [ KAFKA-12499 ] - 根据 Streams EOS 上的提交间隔调整事务超时
  • [ KAFKA-12509 ] - 加强 StateDirectory 线程锁定
  • [ KAFKA-12541 ] - 扩展 ListOffset 以获取具有最大时间戳的偏移量 (KIP-734)
  • [ KAFKA-12573 ] - 删除了不推荐使用的`Metric#value`
  • [ KAFKA-12574 ] - 弃用 eos-alpha
  • [ KAFKA-12577 ] - 删除不推荐使用的 `ConfigEntry` 构造函数
  • [ KAFKA-12584 ] - 删除不推荐使用的 `Sum` 和 `Total` 类
  • [ KAFKA-12591 ] - 删除不推荐使用的 `quota.producer.default` 和 `quota.consumer.default` 配置
  • [ KAFKA-12612 ] - 从 3.0 中的 ConsumerRecord/RecordMetadata 中删除校验和
  • [ KAFKA-12614 ] - 使用 Jenkinsfile 进行主干和发布分支构建
  • [ KAFKA-12620 ] - 控制器生成的生产者 ID
  • [ KAFKA-12637 ] - 删除不推荐使用的 PartitionAssignor 接口
  • [ KAFKA-12662 ] - 为 ProducerPerformance 添加单元测试
  • [ KAFKA-12663 ] - 更新 FindCoordinator 以一次解析多个 Coordinator
  • [ KAFKA-12675 ] - 提高粘性通用分配器的可扩展性和性能
  • [ KAFKA-12779 ] - TaskMetadata 应该返回实际的 TaskId 而不是纯字符串
  • [ KAFKA-12788 ] - 改进 KRaft 副本放置
  • [ KAFKA-12803 ] - 支持在 KRaft 模式下重新分配分区
  • [ KAFKA-12819 ] - 测试的生活质量改进
  • [ KAFKA-12849 ] - 考虑将 TaskMetadata 迁移到与内部实现的接口
  • [ KAFKA-12874 ] - 将默认消费者会话超时增加到 45 秒 (KIP-735)
  • [ KAFKA-12906 ] - 消费者应在反序列化异常中包含分区和偏移量
  • [ KAFKA-12909 ] - 允许用户选择加入虚假的左/外流流加入改进
  • [ KAFKA-12921 ] - 将 ZSTD JNI 从 1.4.9-1 升级到 1.5.0-1
  • [ KAFKA-12922 ] - MirrorCheckpointTask 应该关闭主题过滤器
  • [ KAFKA-12931 ] - KIP-746:修改 KRaft 元数据记录
  • [ KAFKA-12934 ] - 将一些控制器类移动到元数据包
  • [ KAFKA-12981 ] - 确保同步读取/更新 LogSegment.maxTimestampSoFar 和 LogSegment.offsetOfMaxTimestampSoFar
  • [ KAFKA-13000 ] - 改进 MockClient 中 UnsupportedVersionException 的处理
  • [ KAFKA-13021 ] - 从 KIP-633 改进 API 更改和地址跟进的 Javadocs
  • [ KAFKA-13026 ] - 幂等生产者 (KAFKA-10619) 后续测试
  • [ KAFKA-13041 ] - 支持使用 ducker-ak 调试系统测试
  • [ KAFKA-13209 ] - 升级码头服务器以修复 CVE-2021-34429
  • [ KAFKA-13258 ] - AlterClientQuotas 响应失败时不包含错误
  • [ KAFKA-13259 ] - DescribeProducers 响应在失败时不包含错误
  • [ KAFKA-13260 ] - FindCoordinator errorCounts 不处理 v4

4.BUG修复

在 Kafka 3.0 中修复了如下BUG:

  • [ KAFKA-3968 ] - 将新 FileMessageSet 刷新到磁盘时,不会在父目录上调用 fsync()
  • [ KAFKA-5146 ] - Kafka Streams:删除对 connect-json 的编译依赖
  • [ KAFKA-6435 ] - 应用程序重置工具可能会删除不正确的内部主题
  • [ KAFKA-7421 ] - 类加载期间 Kafka Connect 中的死锁
  • [ KAFKA-8315 ] - 历史连接问题
  • [ KAFKA-8562 ] - 尽管 KAFKA-5051,SASL_SSL 仍然执行反向 DNS 查找
  • [ KAFKA-8784 ] - 删除 RocksDBConfigSetter#close 的默认实现
  • [ KAFKA-8940 ] - 片状测试 SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
  • [ KAFKA-9186 ] - Kafka Connect 用可能来自 DelegatingClassLoader 的错误消息淹没日志
  • [ KAFKA-9189 ] - 如果与 Zookeeper 的连接丢失,则会阻止关闭
  • [ KAFKA-9295 ] - KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
  • [ KAFKA-9527 ] - 当 --to-datetime 或 --by-duration 在具有空分区的 --input-topics 上运行时,应用程序重置工具返回 NPE
  • [ KAFKA-9672 ] - ISR 中的死代理导致 isr-expiration 失败并出现异常
  • [ KAFKA-9858 ] - bzip2 1.0.6 中 bzip2recover 中的 CVE-2016-3189 释放后使用漏洞允许远程攻击者通过精心制作的 bzip2 文件导致拒绝服务(崩溃),与设置为之前的块端相关块的开始。
  • [ KAFKA-10046 ] - 弃用的 PartitionGrouper 配置被忽略
  • [ KAFKA-10192 ] - 片状测试 BlockingConnectorTest#testBlockInConnectorStop
  • [ KAFKA-10340 ] - 在尝试为不存在的主题生成记录而不是永远挂起时,源连接器应该报告错误
  • [ KAFKA-10614 ] - 选举/辞职的组协调员应防范领导时代
  • [ KAFKA-12170 ] - Connect Cast 无法正确处理“字节”类型的字段
  • [ KAFKA-12252 ] - 当工人失去领导权时,分布式牧人滴答线程快速循环
  • [ KAFKA-12262 ] - 当拥有密钥的追随者成为领导者时,永远不会分发新的会话密钥
  • [ KAFKA-12297 ] - MockProducer 的实现与异步发送回调的文档相矛盾
  • [ KAFKA-12303 ] - 当存在空值时,Flatten SMT 会删除一些字段
  • [ KAFKA-12308 ] - ConfigDef.parseType 死锁
  • [ KAFKA-12330 ] - 当 FetchResponse 已满时,FetchSessionCache 可能会导致分区饥饿
  • [ KAFKA-12336 ] - 使用命名的 Consumed 参数调用 stream[K, V](topicPattern: Pattern) API 时自定义流命名不起作用
  • [ KAFKA-12350 ] - 关于refresh.topics.interval.seconds默认值不正确的文档
  • [ KAFKA-12393 ] - 记录多租户注意事项
  • [ KAFKA-12426 ] - 缺少在 RaftReplicaManager 中创建 partition.metadata 文件的逻辑
  • [ KAFKA-12427 ] - Broker 不会关闭带有缓冲数据的静音空闲连接
  • [ KAFKA-12474 ] - 如果无法写入新的会话密钥,Worker 可能会死
  • [ KAFKA-12492 ] - 示例 RocksDBConfigSetter 的格式混乱
  • [ KAFKA-12514 ] - SubscriptionState 中的 NPE
  • [ KAFKA-12520 ] - 在启动时不必要地重建生产者状态
  • [ KAFKA-12522 ] - Cast SMT 应该允许空值记录通过
  • [ KAFKA-12548 ] - 无效的记录错误消息未发送到应用程序
  • [ KAFKA-12557 ] - org.apache.kafka.clients.admin.KafkaAdminClientTest#testClientSideTimeoutAfterFailureToReceiveResponse 间歇性地无限期挂起
  • [ KAFKA-12611 ] - 修复了在 ProducerPerformance 中错误地使用随机负载的问题
  • [ KAFKA-12619 ] - 确保在初始化高水印之前提交 LeaderChange 消息
  • [ KAFKA-12650 ] - InternalTopicManager#cleanUpCreatedTopics 中的 NPE
  • [ KAFKA-12655 ] - CVE-2021-28165 - 将码头升级到 9.4.39
  • [ KAFKA-12660 ] - 追加失败后不更新偏移提交传感器
  • [ KAFKA-12661 ] - 当值不为空时,ConfigEntry#equal 不比较其他字段
  • [ KAFKA-12667 ] - StateDirectory 关闭时错误日志不正确
  • [ KAFKA-12672 ] - 运行 test -kraft -server-start 导致错误
  • [ KAFKA-12677 ] - raftCluster 总是发送到错误的活动控制器并且从不更新
  • [ KAFKA-12684 ] - 有效的分区列表被成功选择的分区列表错误地替换
  • [ KAFKA-12686 ] - AlterIsr 响应处理中的竞争条件
  • [ KAFKA-12691 ] - TaskMetadata timeSinceIdlingStarted 未正确报告
  • [ KAFKA-12700 ] - admin.listeners 配置在文档中有不稳定的有效值
  • [ KAFKA-12702 ] - InterBrokerSendThread 中捕获的未处理异常
  • [ KAFKA-12718 ] - SessionWindows 过早关闭
  • [ KAFKA-12730 ] - 单个 Kerberos 登录失败会导致 Java 9 以后的所有连接失败
  • [ KAFKA-12747 ] - 片状测试 RocksDBStoreTest.shouldReturnUUIDsWithStringPrefix
  • [ KAFKA-12749 ] - 被抑制的 KTable 上的更新日志主题配置丢失
  • [ KAFKA-12752 ] - CVE-2021-28168 将球衣升级到 2.34 或 3.02
  • [ KAFKA-12754 ] - 读取偏移量时,TaskMetadata endOffsets 不会更新
  • [ KAFKA-12777 ] - AutoTopicCreationManager 不处理响应错误
  • [ KAFKA-12782 ] - Javadocs 搜索将您发送到一个不存在的 URL
  • [ KAFKA-12792 ] - 修复指标错误并引入 TimelineInteger
  • [ KAFKA-12815 ] - KTable.transformValue 可能有不正确的记录元数据
  • [ KAFKA-12835 ] - 代理上的主题 ID 可能不匹配(代理间协议版本更新后)
  • [ KAFKA-12851 ] - 片状测试 RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
  • [ KAFKA-12856 ] - 将 Jackson 升级到 2.12.3
  • [ KAFKA-12865 ] - 描述 ACL 中管理客户端 API 的文档错误
  • [ KAFKA-12866 ] - 即使使用 chroot Kafka 也需要 ZK root 访问权限
  • [ KAFKA-12867 ] - Trogdor ConsumeBenchWorker 使用 maxMessages 配置提前退出
  • [ KAFKA-12870 ] - RecordAccumulator 卡在刷新状态
  • [ KAFKA-12880 ] - 在 3.0 中删除不推荐使用的 Count 和 SampledTotal
  • [ KAFKA-12889 ] - 日志清理组考虑空日志段以避免留下空日志
  • [ KAFKA-12890 ] - 消费者组陷入“CompletingRebalance”
  • [ KAFKA-12896 ] - 由重复的组长 JoinGroups 引起的组重新平衡循环
  • [ KAFKA-12897 ] - KRaft 控制器无法在单个代理集群上创建具有多个分区的主题
  • [ KAFKA-12898 ] - 订阅中拥有的分区必须排序
  • [ KAFKA-12904 ] - Connect 的验证 REST 端点使用不正确的超时
  • [ KAFKA-12914 ] - StreamSourceNode.toString() 抛出 StreamsBuilder.stream(Pattern) ctor
  • [ KAFKA-12925 ] - 中间接口缺少前缀扫描
  • [ KAFKA-12926 ] - 运行 kafka-consumer-groups.sh 时,ConsumerGroupCommand 的 java.lang.NullPointerException 出现负偏移
  • [ KAFKA-12945 ] - 删除 3.0 中的端口、主机名和相关配置
  • [ KAFKA-12948 ] - 节点处于连接状态的 NetworkClient.close(node) 使 NetworkClient 无法使用
  • [ KAFKA-12949 ] - TestRaftServer 的 scala.MatchError:test-kraft-server-start.sh 上的 null
  • [ KAFKA-12951 ] - 恢复 GlobalKTable 时的无限循环
  • [ KAFKA-12964 ] - 损坏的段恢复可以删除新的生产者状态快照
  • [ KAFKA-12983 ] - 在加入组之前并不总是调用 onJoinPrepare
  • [ KAFKA-12984 ] - 合作粘性分配器可能会因无效的 SubscriptionState 输入元数据而卡住
  • [ KAFKA-12991 ] - 修复对 `AbstractCoordinator.state` 的不安全访问
  • [ KAFKA-12993 ] - Streams“内存管理”文档的格式混乱
  • [ KAFKA-12996 ] - 当获取偏移量小于领导者起始偏移量时,未正确处理 OffsetOutOfRange 以用于发散时期
  • [ KAFKA-13002 ] - 对于非 MAX_TIMESTAMP 规范,listOffsets 必须立即降级
  • [ KAFKA-13003 ] - KafkaBroker 通告套接字端口而不是配置的通告端口
  • [ KAFKA-13007 ] - KafkaAdminClient getListOffsetsCalls 为每个主题分区构建集群快照
  • [ KAFKA-13008 ] - 流将在等待分区延迟时长时间停止处理数据
  • [ KAFKA-13010 ] - 片状测试 org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
  • [ KAFKA-13029 ] - FindCoordinators 批处理可能会在滚动升级期间破坏消费者
  • [ KAFKA-13033 ] - 协调器不可用错误应该导致添加到取消映射列表中进行新的查找
  • [ KAFKA-13037 ] - “线程状态已经是 PENDING_SHUTDOWN” 日志垃圾邮件
  • [ KAFKA-13053 ] - KRaft 记录的凹凸框架版本
  • [ KAFKA-13056 ] - 当控制器共同驻留时,代理不应生成快照
  • [ KAFKA-13057 ] - 许多代理 RPC 在 KRaft 模式下未启用
  • [ KAFKA-13058 ] - `AlterConsumerGroupOffsetsHandler` 不能正确处理分区错误。
  • [ KAFKA-13073 ] - 由于 MockLog 的实现不一致,模拟测试失败
  • [ KAFKA-13078 ] - 过早关闭 FileRawSnapshotWriter
  • [ KAFKA-13080 ] - 获取快照请求未定向到控制器中的kraft
  • [ KAFKA-13092 ] - LISR 请求中的性能回归
  • [ KAFKA-13096 ] - 添加/删除/替换线程时不会更新 QueryableStoreProvider 呈现 IQ 不可能
  • [ KAFKA-13098 ] - 在元数据日志目录中恢复快照时没有此类文件异常
  • [ KAFKA-13099 ] - 使 transactionalIds 过期时消息太大错误
  • [ KAFKA-13100 ] - 控制器无法恢复到内存快照
  • [ KAFKA-13104 ] - 控制器应在 RaftClient 辞职时通知它
  • [ KAFKA-13112 ] - 控制器提交的偏移量与 raft 客户端侦听器上下文不同步
  • [ KAFKA-13119 ] - 在启动时验证 KRaft controllerListener 配置
  • [ KAFKA-13127 ] - 修复杂散分区查找逻辑
  • [ KAFKA-13129 ] - 修复与 ConfigCommand 更改相关的损坏系统测试
  • [ KAFKA-13132 ] - 在 LISR 请求中升级到主题 ID 在 3.0 中引入了差距
  • [ KAFKA-13137 ] - KRaft 控制器指标 MBean 名称被错误引用
  • [ KAFKA-13139 ] - 在没有任务的情况下请求重新启动连接器后的空响应导致 NPE
  • [ KAFKA-13141 ] - 如果存在分歧时期,领导者不应更新追随者获取偏移量
  • [ KAFKA-13143 ] - 禁用 KRaft 控制器的元数据端点
  • [ KAFKA-13160 ] - 修复了在使用 KRaft 时调用代理的配置处理程序以传递预期默认资源名称的代码。
  • [ KAFKA-13161 ] - 在 KRaft 中分区更改后未更新跟随者领导者和 ISR 状态
  • [ KAFKA-13167 ] - KRaft 代理应在受控关闭期间立即心跳
  • [ KAFKA-13168 ] - KRaft 观察者不应该有副本 ID
  • [ KAFKA-13173 ] - KRaft 控制器不能正确处理同时代理到期
  • [ KAFKA-13198 ] - TopicsDelta 在处理 PartitionChangeRecord 时不会更新已删除的主题
  • [ KAFKA-13214 ] - 消费者在断开连接后不应重置组状态
  • [ KAFKA-13215 ] - 片状测试 org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
  • [ KAFKA-13219 ] - BrokerState 指标不适用于 KRaft 集群
  • [ KAFKA-13262 ] - 模拟客户端现在有最终的 close() 方法
  • [ KAFKA-13266 ] - 从提取器中删除分区后应创建“InitialFetchState”
  • [ KAFKA-13270 ] - Kafka 可能无法连接到 ZooKeeper,永远重试,永远不会启动
  • [ KAFKA-13276 ] - 公共 DescribeConsumerGroupsResult 构造函数指的是 KafkaFutureImpl
  • [ KAFKA-13277 ] - 请求/响应中长标记字符串的序列化抛出 BufferOverflowException

5.任务

在 Kafka 3.0 中的开发任务如下:

  • [ KAFKA-8405 ] - 删除不推荐使用的 `kafka-preferred-replica-election` 命令
  • [ KAFKA-8734 ] - 删除 PartitionAssignorAdapter 和不推荐使用的 PartitionAssignor 接口
  • [ KAFKA-10070 ] - 参数化连接单元测试以删除代码重复
  • [ KAFKA-10091 ] - 改善任务空闲
  • [ KAFKA-12482 ] - 删除不推荐使用的 rest.host.name 和 rest.port Connect worker 配置
  • [ KAFKA-12519 ] - 考虑删除旧的内置指标版本的流
  • [ KAFKA-12578 ] - 删除不推荐使用的安全类/方法
  • [ KAFKA-12579 ] - 从 3.0 的客户端中删除各种不推荐使用的方法
  • [ KAFKA-12581 ] - 删除不推荐使用的 Admin.electPreferredLeaders
  • [ KAFKA-12588 ] - 在 shell 命令中删除不推荐使用的 --zookeeper
  • [ KAFKA-12590 ] - 删除不推荐使用的 SimpleAclAuthorizer
  • [ KAFKA-12592 ] - 删除不推荐使用的 LogConfig.Compact
  • [ KAFKA-12600 ] - 删除客户端配置`client.dns.lookup`的弃用配置值`default`
  • [ KAFKA-12625 ] - 修复通知文件
  • [ KAFKA-12717 ] - 删除内部转换器配置属性
  • [ KAFKA-12724 ] - 将 2.8.0 添加到系统测试和流升级测试
  • [ KAFKA-12794 ] - DescribeProducersRequest.json 中的尾随 JSON 令牌可能会导致某些 JSON 解析器中的解析错误
  • [ KAFKA-12800 ] - 配置 jackson 以拒绝生成器中的尾随输入
  • [ KAFKA-12820 ] - 升级 maven-artifact 依赖以解决 CVE-2021-26291
  • [ KAFKA-12976 ] - 从删除主题调用中删除UNSUPPORTED_VERSION错误
  • [ KAFKA-12985 ] - CVE-2021-28169 - 将码头升级到 9.4.42
  • [ KAFKA-13035 ] - Kafka Connect:更新 POST /connectors/(string: name)/restart 文档以包含任务重启行为
  • [ KAFKA-13051 ] - 需要为 3.0 定义 Principal Serde
  • [ KAFKA-13151 ] - 在 KRaft 中禁止策略配置

6.总结

Kafka 3.0 的发布标志着社区对 Kafka 项目迈向了一个新的里程牌。另外,感谢Kafka PMC对Kafka Eagle监控系统的认可,为了维护Apache社区的商标权益,现在对Kafka Eagle正式改名为EFAKEagle For Apache Kafka),EFAK会持续更新迭代优化,为大家管理Kafka集群和使用Kafka应用提供便利,欢迎大家使用EFAK,也可以到Github或者EAFK官网上关注 EFAK 的最新动态。

7.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。