image

Kafka学习笔记

  • WORDS 36324

Kafka学习笔记

Kafka简介

Kafka是一个发布与订阅消息系统,Kafka中的数据单元被称为消息消息有一个可选的元数据,就是用于 Kafka将消息写入不同的分区。为了提高效率,Kafka会将属于属于同一 Topic和分区的消息进行批次写入。

Kafka中的消息通过 Topic进行分类,一个 Topic又可以分为若干分区

Kafka的客户端就是 Kafka系统的用户,被分为两种基本类型:

  • 生产者:创建消息。默认情况下,生产者会把消息均匀的发布到对应 Topic的所有分区中
  • 消费者:读取消息。消费者通过检查消息的偏移量来区分已经读取过的消息。消费者可以是消费者群组的一部分,属于同一群组的多个消费者共同读取一个 Topic,群组可以保证每个分区只被一个消费者读取。

一台单独的 Kafka服务器成为 brokerbroker会接受消息并设置偏移量保存到硬盘,同时对读取分区的请求做出响应,返回已经发布的消息。

多个 broker组成了集群。每个集群中有一个充当集群控制器角色的 border(自动选举)。控制器负责管理工作,包括为 broker分配分区和监控 broker。在集群中,一个分区从属于一个 broker被称为分区的首领,分配给其他 broker的分区副本被称为追随者

保留消息Kafka的重要特性。broker的默认消息保留策略是:

  1. 要么保留一段时间
  2. 要么保留消息总量达到一定的字节数

安装Kafka

Kafka生产者

创建生产者

要向 Kafka发送消息,首先需要创建一个生产者对象,并设置一些属性。其中有3个必须配置的属性:

  • bootstrap.serversbroker的地址,可以由多个 host:port组成
  • key.serializer:类名,消息 key的序列化器
  • value.serializer:类名,消息 value的序列化器

序列化器必须被设置为一个实现了 org.apache.kafka.common.serializetion.Serializer接口的类,客户端默认提供了 ByteArraySerializerStringSerializerIntergerSerializer等序列化器

Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "10.10.10.10:9092");
// 使用内置的String序列化器
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);

发送消息

发送消息先从创建一个 ProducerRecord对象开始,其中需要包含目标 Topic和需要发送的内容。此外,还可以指定 key、分区、时间戳和标头。

如果没有显式指定分区,那么数据将被传给分区器通过 key选择一个分区。然后,该消息会被添加到一个消息批次里,一条独立的线程负责将消息发送到对应的 broker

broker在收到消息时会返回一个响应。如果写入成功,就返回一个 RecordMetaData对象,其中包含了主题和分区信息,以及消息在分区中的偏移量;如果写入失败,则会返回一个错误。

发送消息主要有三种方式:

  • 发送并忘记:将消息发送给服务器,但并不关心是否成功送达,如果发送了不可重试的错误或超时,那么消息将会丢失

    // 创建连接
    KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);
    // 创建消息 三个参数分别为 Topic value key
    ProducerRecord<String, String> record = new ProducerRecord<>("demo", "hello world", "test");
    try {
        producer.send(record);
    } catch (Exception e) {
        e.printStackTrace();
    }
    
  • 同步发送:生产者是异步的,调用 send()方法发送消息后,会返回一个 Future对象,可以调用 get()方法等待执行完成,这样就可以知道消息的发送结果

    KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);
    ProducerRecord<String, String> record = new ProducerRecord<>("demo", "hello world", "test");
    try {
        RecordMetadata metadata = producer.send(record).get();
        System.out.println(metadata);
    } catch (Exception e) {
        e.printStackTrace();
    }
    
  • 异步发送:调用 send()函数,并传入一个回调函数,当服务器响应后,这个函数会被触发

    KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);
    ProducerRecord<String, String> record = new ProducerRecord<>("demo", "hello world", "test");
    try {
        // 回调函数,data:返回的信息  e:异常
        producer.send(record, (data, e) -> {
            if (e == null) {
                System.out.println(data);
            } else {
                e.printStackTrace();
            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
    

生产者配置

  • client.id:客户端标识符,可以是任意字符串。

  • acksacks指定了生产者在多少个分区副本收到消息后才会认为消息写入成功

    • acks=0:生产者不会等待任何 broker的回应,如果没有收到消息,生产者也从得知,那么消息就丢失了。因为不需要等待响应,所有吞吐量高,性能最好。
    • acks=1:只要集群的首领收到消息,就会认为消息成功写入
    • acks=all:当所有副本全部收到消息时,才会收到写入成功的回应。这种方式最安全,但是延迟最高

    acks值设置的越小,生产者发送消息的速度就越快

  • max.block.ms:控制在调用 send()或通过 partitionsFor()显式请求元数据时可以发生阻塞的时间。当生产者的发送缓冲区被填满或元数据不可用时,这些方法就可能阻塞,达到配置的时间后就会抛出一个超时异常。

  • delivery.timeout.ms:从消息准备好发送到 broker响应或客户端放弃发送所花费的时间,应该大于 linger.msrequest.timeout.ms

  • request.timeout.ms:控制生产者发送消息时等待服务器响应的时间。如果设置的值已经触发,但服务器没有响应,那么生产者将重试发送或者执行回调。

  • retriesretry.backoff.ms:生产者接收到服务器的错误消息时,retries参数控制生产者在宣布失败前可以重试多少次,retry.backoff.ms控制重试时间间隔。

  • linger.ms:指定生产者在发送消息批次之前等待更多消息加入批次的时间。

  • buffer.memory:设置生产者发送给服务器的消息内存缓冲区大小,如果调用 send()方式的速度超过将消息发送给服务器的速度,那么缓存空间可能会被耗尽,在 max.block.ms后还没有可用空间,就会抛出异常。

  • compression.type:默认情况下,生产者发送的消息是未经压缩的,可以被设置为以下几个参数:

    • snappy:占用较少的 CPU时间,提供可观的压缩比
    • gzip:占用较多的 CPU时间,提供更高的压缩比
    • lz4
    • zstd
  • batch.size:指定一个消息批次可以使用的内存大小,按照字节数计算。

  • max.in.flight.requests.per.connection:指定生产者在收到服务器响应之前可以发送多少个消息批次。

  • max.request.size:限制可发送的单条最大消息的大小和单个请求的消息总量的大小。

  • receive.buffer.bytessend.buffer.bytes:分别指定 TCP socket接收和发送数据包的缓冲区大小。如果设置为 -1,则使用操作系统默认值。

  • enable.idempotence:开启幂等生产者,开启后生产者会给发送的每一条消息加上序列号。如果 broker接收到相同序列号的消息,那么会拒绝第二条。

序列化器

自定义序列化器

使用 Gson将类型序列化为 Json的序列化器

// Object支持所有类
public class GsonSerializer implements Serializer<Object> {
    private final Gson gson = new Gson();
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Serializer.super.configure(configs, isKey);
    }

    @Override
    public byte[] serialize(String s, Object o) {
        try {
            // 使用Gson的序列化
            String json = gson.toJson(o);
            return json.getBytes(StandardCharsets.UTF_8);
        } catch (JsonParseException e) {
            // 序列化失败,抛出异常
            throw new SerializationException("gson serializer error");
        }
    }

    @Override
    public void close() {
        Serializer.super.close();
    }
}

使用Avro序列化器

Avro使用得较少,就略过了

使用Protobuf序列化器

使用 protoc生成的 Java类自带 toByteArray()方法,可以非常方便的将类转为二进制数据

先编写 .proto文件用于生成 Java

syntax = "proto3";

// 为每个消息生产类
option java_multiple_files = true;
// 指定包名
option java_package = "com.zeroxn.entity";
// 指定类名
option java_outer_classname = "student";

message Student {
  int32 id = 1;
  string name = 2;
  int32 age = 3;
}

通过命名编译为 .java文件

protoc --java_out=./java/ demo.proto

项目添加 protobuf依赖

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <!-- 一定要使用3.22以上版本 不然类中有个方法缺失 -->
    <version>3.22.0</version>
</dependency>

编写自定义序列化器

// 使用泛型
public class ProtobufSerializer<T extends GeneratedMessageV3> implements Serializer<T> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Serializer.super.configure(configs, isKey);
    }

    @Override
    public byte[] serialize(String s, T t) {
        // 调用类中自带的方法即可
        return t.toByteArray();
    }

    @Override
    public void close() {
        Serializer.super.close();
    }
}

测试

Student student = Student.newBuilder()
    .setId(1)
    .setAge(18)
    .setName("小明")
    .build();
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "10.10.10.10:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 使用ProtobufSerializer序列化器
kafkaProps.put("value.serializer", "ProtobufSerializer");
KafkaProducer<String, Student> producer = new KafkaProducer<>(kafkaProps);
ProducerRecord<String, Student> record = new ProducerRecord<>("demo", "test student", student);
try {
    RecordMetadata metadata = producer.send(record).get();
    // 查看响应结果
    System.out.println(metadata);
} catch (Exception e) {
    e.printStackTrace();
}

分区

如果一条消息的 keynull,并且使用了默认的分区器,那么记录将随机发送给 Topic的分区。分区器默认使用轮询调度将消息均衡分布。

如果 key不为空,并且使用了默认的分区器,那么 Kafka会对 key进行 hash运算,然后通过哈希值将消息映射到特定的分区。

除了默认的分区器,Kafka客户端还提供了 RoundRobinPartitionerUniformStickyPartitioner。在消息包含 key的情况下,可以使用它们来实现随机分区分配和粘性随机分配。

自定义分区策略

如果需要自定义分区策略,例如为某些 key使用单独的分区,则需要使用自定义分区器

public class CustomPartitioner implements Partitioner {
    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> map) {}

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
        int size = partitionInfos.size();
        if (keyBytes == null || (!(key instanceof String))) {
            throw new InvalidRecordException("不受支持的key");
        }
        // 如果key为student  则保存到最后一个分区
        if (key.equals("student")) {
            return size - 1;
        }
        // 否则使用默认的hash算法进行分区
        return Math.abs(Utils.murmur2(keyBytes) % (size - 1));
    }
}

标头

除了 keyvalue,消息还可以包含标头。可以在标头中添加有关记录的元数据。标头由键值对组成

ProducerRecord<String, Student> record = new ProducerRecord<>("demo", "test student", student);
record.headers().add("auth", "token".getBytes(StandardCharsets.UTF_8));

拦截器

KafkaProducerInterceptor拦截器包含两个重要方法:

  • ProducerRecord<K, V> onSend(ProducerRecord<K, V> record):这个方法会在记录被发送给 Kafka前(序列化前)调用,方法的返回记录将被序列化并发送给 Kafka
  • void onAcknowledgement(RecordMetadata metadata, Exception e):这个方法会在收到 Kafka确认响应时调用。

常见的生产者拦截器应用场景包括:捕获监控和跟踪信息、为信息添加标头以及敏感信息脱敏

一个简单的统计消息发送数量的拦截器

public class CountProducerInterceptor implements ProducerInterceptor<String, String> {
    // 使用原子类确保线程安全
    private static final AtomicInteger sendCount = new AtomicInteger();
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        // 发送消息时就自加
        sendCount.incrementAndGet();
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        System.out.println("接收到消息");
    }

    @Override
    public void close() {
        // 生产者关闭时,输入发送的消息总数
        System.out.println("发送的消息总数:" + sendCount.get());
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

生产者代码

Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "10.10.10.10:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 配置拦截器
kafkaProps.put("interceptor.classes", "CountProducerInterceptor");
KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);
// 循环发送10条信息
for (int i = 0; i < 10; i++) {
    try {
        String value = "student value " + i;
        ProducerRecord<String, String> record = new ProducerRecord<>("demo", "student", value);
        producer.send(record, (data, e) -> {
            if (e == null) {
                System.out.println("消息发送成功:" +  data);
            } else {
                e.printStackTrace();
            }
        });
        Thread.sleep(2000);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
// 显式调用close()方法,不然无法触发close()事件
producer.close();

配额和节流

配额机制可以限制生产和消费信息的速率,Kafka提供了三种配额类型:

  • 生产:限制客户端发送和接收数据的速率(bit/s)
  • 消费:同生产
  • 请求:限制 broker用于处理客户端请求的时间百分比

通过 quota.producer.default配置配额

# 设置每个生产者平均发送的信息不超过2MBps
quota.producer.default=2M
# 单独为客户端设置  clientA 为客户端Id
quota.producer.override="clientA:4M,ClientB:2M"

节流行为通过以下几个参数暴露给客户端

  • producer-throttle-time-avg
  • producer-throttle-time-avg
  • fetch-throttle-time-avg
  • fetch-throttle-time-avg

生产请求和消费请求因节流而被延迟的平均时间和最长时间

Kafka消费者

消费者通过 KafkaConsumerKafka Topic中读取消息。消费者从属于消费者群组,一个群组里的消费者订阅的是同一个 Topic,每个消费者负责读取这个 Topic的部分消息。

  • 只包含一个消费者的群组会接收 Topic中所有分区的消息
  • 群组中消费者数量小于 Topic分区数量,则会尽可能平均分配每个消费者接收的分区数
  • 消费者数量大于 Topic分区数量,一个消费者接收一个分区消息,多出来的消费者会空闲

Topic中的消息可以被多个消费者群组消费,每个群组都能接收到所有消息

当一个新消费者加入群组时,它将开始读取一部分原由其它消费者读取的消息。一个消费者关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由其它消费者读取。Topic发生变化会导致分区重分配。

分区的所有权从一个消费者转移到另一个消费者的行为称为再平衡。再平衡可以分为两种类型:

  • 主动再平衡:在进行主动再平衡期间,所有消费者都会停止读取消息,放弃分区所有权,重新加入消费者群组并获得重新分配到的分区。
  • 协作再平衡:将一个消费者的部分分区重新分配给另一个消费者。

默认情况下,消费者的群组成员身份标识是临时的。当一个消费者离开群组时,分配给它的分区所有权将被撤销。

可以给消费者分配一个固定的 group.instance.id,让它成为群组的固定成员。固定成员离开群组再次加入时,会继续持有之前的身份和分配的分区。

如果两个相同的 group.instance.id加入同一群组,第二个消费者会收到错误。

创建消费者

创建消费者的流程和创建生产者类似

Properties kafkaProps = new Properties();
// 指定地址和key,value反序列化器
kafkaProps.put("bootstrap.servers", "10.10.10.10:9092");
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 指定消费者群组id
kafkaProps.put("group.id", "demo");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
// 订阅Topic 接受一个集合或者正则表达式
consumer.subscribe(Collections.singletonList("demo"));

轮询

消费者 API最核心的东西就是通过轮询向服务器请求数据

while (true) {
    // poll方法会返回一个记录列表,每一条记录都包含了主题和分区的信息
    // 参数用于设置poll方法的阻塞时间,如果为0,那么会立即返回
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    records.forEach(System.out::println);
}

线程安全

不能在同一线程中运行多个同属一个群组的消费者,一个消费者使用一个线程

消费者配置

  • fetch.min.bytes:指定消费者从服务器获取记录的最小字节数,默认1字节

  • fetch.max.wait.ms:指定 broker的等待时间,默认500ms

  • fetch.max.bytes:指定 kafka返回数据的最大字节数,默认 50MB

  • max.poll.records:控制单次调用 poll()方法返回的记录条数

  • max.partition.fetch.bytes:指定服务器从每个分区里返回给消费者的最大字节数,默认 1MB

  • session.timeout.ms:指定消费者多长时间内不传输数据任被认为在线,默认10s

  • heartbeat.interval.ms:指定消费者向服务器发送心跳的频率,通常是 session.timeout.ms1/3

  • max.poll.interval.ms:指定消费者被认定离线之前可以在多长时间内不发起轮询

  • default.api.timeout.ms:如果在调用消费者API时,没有显式指定超市时间,那么就会使用这个属性的值

  • request.timeout.ms:指定消费者收到 broker响应之前可以等待的最长时间

  • auto.offset.reset:指定消费者在读取没有偏移量或偏移量无效的分区时该如何处理

    • lastest:默认,从最新的记录开始读取
  • enable.auto.commit:指定消费者是否自动提交偏移量,默认true

  • partition.assignment.strategy:分区的分配策略

    • range(区间):会把每一个 Topic的若干连续分区分配给消费者
    • roundRobin(轮询):会被分区按顺序逐个分配给消费者
    • stikcy(粘性):粘性分配器的分配比例比轮询更加均衡,主要体现在重新分配
    • cooperative sticky(协作粘性):分配策略和粘性一致,支持协作(增量式)再平衡
  • client.id:客户端标识符

  • client.rack:默认情况下,消费者会从分区的首领副本获取消息,如果需要从最近的副本获取消息,那么需要设置 client.rack参数

  • group.instance.id:唯一的任意字符串,用于消费者群组的固定名称

  • offsets.retention.minutesbroker端的配置属性。群组提交的每一个分区的最后一个偏移量会被保留下来。如果群组失去了所有成员并超过了这个属性设置的时间,那么偏移量会被删除。

提交和偏移量

偏移量提交就是更新当前分区的读取位置。如果消费者一直处理运行状态,那么偏移量没有什么实际作用,但是,如果触发再均衡,消费者需要读取每个分区最后一次提交的偏移量,然后从指定位置继续读取消息。

如果最后一次提交的偏移量小于客户端处理的最后一条消息的偏移量,那么处于两个消息偏移量之间的消息就会被重复处理。

如果最后一次体将的偏移量大于客户端处理的最后体条消息的偏移量,那么处于两个消息偏移量之间的消息就会丢失。

自动提交

如果 enable.auto.commit被设置为 true,那么消费者会每过 5s自动提交 poll()返回的最大偏移量。自动提交的间隔时间由 auto.commit.interval.ms设定,默认 5s

提交当前偏移量

使用 commitSync()方法可以提交 poll()方法返回的最新偏移量,提交成功后马上返回,如果提交失败则抛出异常。

Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "10.10.10.10:9092");
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("enable.auto.commit", "false");
kafkaProps.put("group.id", "demo");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
consumer.subscribe(Collections.singletonList("demo"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    records.forEach(System.out::println);
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        e.printStackTrace();
    }
}

异步提交

手动提交如果在 broker对响应做出回应之前,程序会一直阻塞。异步提交则只管发送请求,无须等待 broker做出响应。

// 直接进行异步提交
consumer.commitAsync();

// 带回调的异步提交  回调方法会在broker响应后执行
consumer.commitAsync((map, e) -> {
    if (e != null) {
        System.out.println("提交错误");
    } else {
        System.out.println(map);
    }
});

同步和异步组合提交

try {
    consumer.commitAsync();
    consumer.commitSync();
} catch (CommitFailedException e) {
    e.printStackTrace();
} finally {
    consumer.close();
}

提交特定的偏移量

commitAsync()commitSync()允许提交特定的分区和偏移量

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
int count = 0;
for (ConsumerRecord<String, String> record : records) {
    System.out.println(record);
    // 每处理完两条记录提交一次
    if (count % 2 == 0) {
        offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
        consumer.commitSync(offsets, null);
    }
    count++;
}

再均衡监听器

Kafka提供了接口让消费者在分配到新分区或旧分区被移除时执行一些操作

public class CustomConsumerRebalanceListener implements ConsumerRebalanceListener {
    // 消费者放弃对分区的所有权时调用,可能是再均衡或者消费者被关闭
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
      
    }
    // 重新分配分区之后,开始读取信息前调用
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {

    }
    // 只会在使用了协助再平衡算法并且之前不是通过再均衡获得的分区被重新分配时调用
    @Override
    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        ConsumerRebalanceListener.super.onPartitionsLost(partitions);
    }
}
// 订阅Topic时需要传入再均衡监听器
consumer.subscribe(Collections.singletonList("demo"), new CustomConsumerRebalanceListener());

从特定偏移量位置读取记录

poll()方法默认会从分区的最新偏移量位置开始读取消息,Kafka还提供了多种方法从分区的起始位置、结束位置和指定偏移量读取消息。

  • seekToBeginning():从分区的起始位置读取所有信息
  • seekToEnd():从分区的末尾开始读取所有信息

如何退出

如果需要马上关闭消费者,可以在另一个线程中调用 consumer.wakeup()方法,调用此方法会导致 poll()抛出 WakeupException异常。退出轮询线程之前一定要调用 consumer.close()方法

反序列化器

反序列化器和序列化器使用一致,只不过是将字节数组转换为 Java对象

使用Protobuf反序列化器

// 使用泛型确保类是Protobuf生成的类
public class ProtobufDeserializer<T extends GeneratedMessageV3> implements Deserializer<T> {
    // 类的反序列器,需要手动指定
    private final Parser<T> parser;

    public ProtobufDeserializer(Parser<T> parser) {
        this.parser = parser;
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Deserializer.super.configure(configs, isKey);
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        try {
            // 将字节数组反序列化为实体类
            return parser.parseFrom(bytes);
        } catch (InvalidProtocolBufferException e) {
            throw new SerializationException("消息反序列化失败");
        }
    }
    @Override
    public T deserialize(String topic, Headers headers, byte[] data) {
        return Deserializer.super.deserialize(topic, headers, data);
    }

    @Override
    public void close() {
        Deserializer.super.close();
    }
}
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "10.10.10.10:9092");
kafkaProps.put("group.id", "group");
ProtobufDeserializer<Student> valueDeserializer = new ProtobufDeserializer<>(Student.parser());
// key和value的反序列器使用手动注入,因为value反序列器需要手动创建
KafkaConsumer<String, Student> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), valueDeserializer);
consumer.subscribe(Collections.singletonList("kafka_protobuf_demo"));
while (true) {
    ConsumerRecords<String, Student> records = consumer.poll(Duration.ofMillis(200));
    for (ConsumerRecord<String, Student> record : records) {
        logger.info("接收到消息,Topic:{},分区:{},消息:{}", record.topic(), record.partition(), record.value());
    }
}

独立消费者

当一些简单场景不需要使用消费者群组时,比如只需要一个消费者读取一个 Topic的所有分区或某个分区。就不再需要消费者群组和再均衡了。

如果知道需要读取哪些分区,就不需要订阅 Topic了,可以直接将目标分区分配给消费者。

// 直接分配Topic的指定分区给消费者  读取单分区的消息
KafkaConsumer<String, Student> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), valueDeserializer);
consumer.assign(Collections.singletonList(new TopicPartition("kafka_protobuf_demo", 0)));

// 读取一个Topic中所有分区的消息
KafkaConsumer<String, Student> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), valueDeserializer);
List<PartitionInfo> partitionInfos = consumer.partitionsFor("kafka_protobuf_demo");
List<TopicPartition> partitionList = new ArrayList<>(partitionInfos.size());
for (PartitionInfo partitionInfo : partitionInfos) {
    partitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
consumer.assign(partitionList);

编程式管理Kafka

AdminClient

AdminClient为之前只能通过命令行完成的管理功能提供了编程式 API。查看、创建和删除 Topic,描述集群,管理 ACL和修改配置。

AdminClient是异步的,每一个方法向集群控制器发出请求后会立即返回一个或多个 Future对象。在 Kafka中,从控制器到 broker元数据传播是异步的,当控制器状态被完全更新时,返回的 Future将被视为已完成。这叫做最终一致性

Kafka协议支持的所有管理操作都可以使用 KafkaAdminClient来实现,每一个方法都会接受一个特定于该方法的 Options对象做为参数。所有方法都有一个 timeout配置参数,限定等待时间。

生命周期:创建、配置和关闭

Properties kafkaProps = new Properties();
// 指定集群地址 多个地址使用,号分隔
kafkaProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.10.10:9092");
// 创建
AdminClient adminClient = AdminClient.create(kafkaProps);
// 进行操作

// 关闭
adminClient.close();

AdminClient的配置参数

  • client.dns.lookup
    • 使用DNS别名:通过域名连接而不是IP地址
    • 单个DNS映射多个IP地址
  • request.timeout.ms:等待返回响应的时间

Topic管理

获取所有 Topic

ListTopicsResult topicsResult = adminClient.listTopics();
try {
    // 使用get等待结果,然后输出名称
    topicsResult.names().get().forEach(System.out::println);
} catch (InterruptedException | ExecutionException ex) {
    ex.printStackTrace();
}

查询 Topic是否存在

try {
    // 接收一个集合参数 需要获取的Topic名称
    DescribeTopicsResult topicsResult = adminClient.describeTopics(List.of("demo", "kafka_test"));
    // 从返回的集合里面通过name获取一个Topic
    TopicDescription kafkaDemo = topicsResult.topicNameValues().get("demo").get();
    // 输入Topic的分区数量
    System.out.println(kafkaDemo.partitions().size());
} catch (Exception ex) {
    // 如果错误类型未UnknownTopicOrPartitionException则说明Topic不存在
    if (ex.getCause() instanceof UnknownTopicOrPartitionException) {
        System.out.println("Topic未创建");
    } else {
        ex.printStackTrace();
    }
}

创建 Topic

// 创建Topic接收3个参数,名称 分区数 副本数量,也可以只指定名称
CreateTopicsResult createResult = adminClient.createTopics(Collections.singletonList(new NewTopic("kafka_test", 1, (short) -1)));
// 通过分区数判断Topic是否创建成功
if (createResult.numPartitions("kafka_test").get() != 1) {
    System.out.println("Topic创建失败");
}

删除 Topic

List<String> kafkaList = Collections.singletonList("kafka_test");
try {
    // 删除Topic并调用get()方法等待操作完成,不然删除操作是异步的 后续可能还存在
    adminClient.deleteTopics(kafkaList).all().get();
    // 查询Topic是否还存在
    DescribeTopicsResult topicsResult = adminClient.describeTopics(kafkaList);
    topicsResult.topicNameValues().get("kafka_test").get();
} catch (Exception e) {
    e.printStackTrace();
}

配置管理

配置管理是通过描述和更新一系列配套资源来实现的

// 指定需要获取配置的对象
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "kafka_demo");
// 获取配置并等待
DescribeConfigsResult configsResult = adminClient.describeConfigs(Collections.singletonList(configResource));
Config config = configsResult.all().get().get(configResource);
// 打印所有配置信息
config.entries().forEach(System.out::println);
// 创建一个配置项
ConfigEntry entry = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
// 判断配置项是否存在
if (!config.entries().contains(entry)) {
    Collection<AlterConfigOp> configOps = new ArrayList<>();
    // SET:设置值  DELETE:删除值
    configOps.add(new AlterConfigOp(entry, AlterConfigOp.OpType.SET));
    Map<ConfigResource, Collection<AlterConfigOp>> alterConf = new HashMap<>();
    // 向Map中添加目标对象和配置项
    alterConf.put(configResource, configOps);
    // 更新配置并等待执行完成
    adminClient.incrementalAlterConfigs(alterConf).all().get();
}

消费者群组管理

查看消费者群组

获取所有消费者群组

  • valid():忽略错误,只返回正常获取到的消费者群组
  • errors():获取所有异常
  • all():返回的第一个错误会做为异常抛出
Collection<ConsumerGroupListing> groupListings = adminClient.listConsumerGroups().valid().get();
groupListings.forEach(System.out::println);

获取消费者群组的详细信息,信息包括群组成员、它们的标识符和主机地址、分配给它们的分区、分配分区的算法和群组协调器的主机地址。

// 获取指定的消费者群组列表信息
DescribeConsumerGroupsResult result = adminClient.describeConsumerGroups(Collections.singletonList("group"));
// 从列表中获取指定的消费者群组信息
ConsumerGroupDescription description = result.describedGroups().get("group").get();
System.out.println(description);

获取消费者群组消费的每个分区偏移量

// 先获取指定消费者群组读取分区的提交偏移量
Map<TopicPartition, OffsetAndMetadata> groupMap = adminClient.listConsumerGroupOffsets("group").partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> offsetSpecMap = new HashMap<>(groupMap.size());
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : groupMap.entrySet()) {
    // OffsetSpec有三种实现  latest():最新 earliest(): 最早  forTimestemp():指定时间或指定时间之后
    offsetSpecMap.put(entry.getKey(), OffsetSpec.latest());
}
// 再通过TopicPartition和latest()获取所有分区最新的偏移量
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> infoMap = adminClient.listOffsets(offsetSpecMap).all().get();
// 最后循环遍历 拿到偏移量差值
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : groupMap.entrySet()) {
    String topic = entry.getKey().topic();
    int partition = entry.getKey().partition();
    long commitOffset = entry.getValue().offset();
    long latestOffset = infoMap.get(entry.getKey()).offset();
    System.out.println("topic:" + topic + "分区:" + partition + "offset:" + commitOffset + "latestOffset:" + latestOffset);
}

修改消费者群组

AdminClient提供了删除群组、移除成员、删除提交偏移量和修改偏移量等用于修改消费者群组的操作。

修改偏移量是最有用的,显式的将提交的偏移量修改为最早的偏移量,可以强制消费者从 Topic开始位置进行消费。

Topic中的偏移量发生变化时,消费者群组并不会收到通知,为了防止在消费者无法知晓的情况下修改偏移量,Kafka不允许在消费者群组处于活动状态时修改偏移量

集群元数据

客户端在绝大部分情况下并不需要知道它所连接的集群信息,只需要关心 Topic和分区即可

// 获取链接的集群信息
DescribeClusterResult clusterResult = adminClient.describeCluster();
// 集群Id
System.out.println(clusterResult.clusterId().get());
// 集群内所有的节点信息
clusterResult.nodes().get().forEach(System.out::println);
// 集群的控制器节点信息
System.out.println(clusterResult.controller().get());

高级管理操作

为Topic添加分区

通常,分区数量是在创建 Topic时就确定好的,很少需要为 Topic添加分区,而且添加分区可能存在风险

Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
// 指定Topic名称和新的分区总数(不是需要添加的分区数量)
newPartitionsMap.put("kafka_demo", NewPartitions.increaseTo(4));
// 执行操作并等待完成 不推荐一次为多个Topic进行添加分区操作
adminClient.createPartitions(newPartitionsMap).all().get();

从Topic中删除信息

Kafka提供了 Topic数据保留策略,但并不支持按照合规性来保留数据。某些情况下,Topic可能会保留超过允许时间的数据。

使用 deleteRecord方法可以删除早于指定偏移量的数据,返回被删除消息的最大偏移量

Map<TopicPartition, OffsetSpec> requestMap = new HashMap<>();
requestMap.put(new TopicPartition("kafka_demo", 1), OffsetSpec.latest());
// 先获取指定Topic和分区的最新偏移量
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> infoMap = adminClient.listOffsets(requestMap).all().get();
Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
// 添加到待删除的Map中
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> entry : infoMap.entrySet()) {
    deleteMap.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue().offset()));
}
// 删除最新偏移量之前的所有消息并等待执行完成
adminClient.deleteRecords(deleteMap).all().get();

首领选举

首领选举有两种类型:

  • 首选首领选举

    kafka会每5分钟检查一次首领是否是首选首领副本,如果不是,但它有成为首领的资格,就会选取首选首领副本作为首领

  • 不彻底的首领选举

    如果一个分区的首领副本变得不可用,而其他副本没有资格成为首领。那么这个分区将没有首领,也就不可用了

// 指定Topic和分区进行首领选举
Set<TopicPartition> partitionSet = new HashSet<>();
// 分区参数如果为null则会对所有分区进行操作
partitionSet.add(new TopicPartition("kafka_demo", 0));
try {
    adminClient.electLeaders(ElectionType.PREFERRED, partitionSet).all().get();
} catch (ExecutionException e) {
    e.printStackTrace();
}

如果集群的状态是健康的,那么这个方法不会执行任何操作

重新分配分区副本

没有集群环境,后面补上

测试

Kafka提供了测试客户端 MockAdminClient,模拟了一部分 AdminClient的API,可以用来初始化 broker并执行管理操作。

深入Kafka

KRaft

Kafka的新控制器 KRaft取代了传统基于 Zookeeper的控制器。

在传统架构中,ZooKeeper起到了两个重要作用:

  • 选举控制器
  • 保存集群元数据

在新架构中,控制器节点形成了一个 Raft仲裁,管理中元数据事件日志。日志中包含集群元数据的每一个变更。因为使用了 Raft算法,所有控制器节点可以在不依赖外部系统的情况下选举首领。首领节点被称为主控制器,负载处理所有来自 brokerRPC调用。跟随着控制器会从主控制器复制数据,并作为主控制器的热备。

处理请求

broker会在监听的每一个端口上运行一个接收器线程,线程会创建连接,并把它交给处理器线程(网络线程)。处理器线程负责从客户端处理请求,把它们放进请求队列,然后从响应队列取出响应,把它们发送给客户端。

请求消息被放入请求队列后,请求处理线程会处理它们,请求类型常见的有一下几种:

  • 生产请求:生产者发送的请求
  • 获取请求:消费者和跟随者副本发送的请求(使用零复制技术发送消息)
  • 管理请求:管理客户端发送的请求

物理存储

Kafka的基本存储单元是分区。

分层存储

在分层存储架构中,Kafka集群配置了两个存储层:

  • 本地存储层:使用 broker的本地存储保存日志片段
  • 远程存储层:使用 HDFSS3等专用存储系统保存日志片段

Kafka用户可以单独为每一层配置保留策略。

可靠的数据传递

消息的可靠性

  • Kafka可以保证分区中的消息是有序的
  • 一条消息只有在被写入分区的所有同步副本时才会被任务是已提交
  • 只要还有一个副本可用,已提交的消息就不会丢失
  • 消费者只能读取已提交的消息

复制

KafkaTopic会被分为多个分区,分区是最基本的数据构建块同时事件是有序的。一个分区可以在线(可用)也可以离线(不可用)。每个分区可以有多个副本,其中有一个副本是首领。所有事件都会发送给首领,其他副本只需要保持与首领同步即可。当首领副本不可用时,其中一个同步副本会成为新首领。

broker配置

broker中有3个配置参数会影响 Kafka的消息存储可靠性。它们既可以配置在 broker级别,也可以配置在 Topic级别。

复制系数

Topic级别的配置参数是 replication.factor,在 broker级别可以通过 default.replication.factor来配置。

如果复制系数为 3,代表每个分区会被 3个不同的 broker复制。更高的复制系数会带来更高的可用性、可靠性和更少的灾难性,同时也会占用更高倍数的磁盘空间。

确定 Topic副本需要从一下几个因素考虑:

  • 可用性:副本数量越多,可用性越高
  • 持久性:副本数量越多,丢失副本数据的可能性就越低,同时占用空间越大
  • 吞吐量:每增加一个副本都会增加 broker内的复制流量
  • 端到端延迟:每一条记录必须同步到所有副本后才能被消费者消费
  • 成本:副本数越多,存储和网络成本就越高

不彻底的首领选举

如果一个分区的首领不可用时,该分区的其他副本存在未同步的数据,那么就会发生不彻底的首领选举。由参数 unclean.leader.election.enable控制

默认情况下,该参数的值未 false,不允许不同步副本成为首领,这是最安全的选项可以保证数据不丢失。

如果允许数据丢失让分区可用,可以设置为 true

最少同步副本

min.insync.replicas参数控制需要的最少同步副本数量,如果设置为 3却只有两个可用同步副本时,broker会停止接受生产者的请求,尝试发送数据的生产者会收到异常。

保持副本同步

replica.lag.time.max.ms参数控制 broker从首领同步数据的最大延迟时间,如果超出时间数据还未能和分区首领保持同步,则将变成不可用副本。

精确一次性语义

幂等生产者

如果一个操作被执行多次的结果与执行一次相同,那么这个操作就是幂等的。一个典型的场景就是消息重复问题。

幂等生产者的工作原理

如果启用了幂等生产者,每条消息都将包含生产者ID和序列号,将它们越目标 Topic和分区组合在一起,得到消息的唯一标识。broker会通过 max.inflight.requests参数控制跟踪最后写入分区的消息数量,如果消息的唯一标识和被跟踪消息中的重复,那么会拒绝这条消息并返回错误。

幂等生产者的局限性

幂等生产者只能防止由生产者内部重试逻辑引发的消息重复。如果生产者调用 send()方法发送了两次一样的消息那么也会导致消息重复。

事务

Kafka的事务机制是专门为了流式处理应用程序添加的,可以保证流式处理应用程序生成准确的结果,适用于流式处理应用程序的基础模式,即消费-处理-生产

以流式处理应用程序为例,从一个 Topic读取数据,对数据进行处理后,再将结果写入另一个 Topic,添加了事务意味着消费、处理和生产都是原子操作,要么所有操作都成功,要么都不成功。为了支持这种行为,Kafka事务引入了原子多分区写入的概念。

如果需要使用事务和原子多分区写入,需要使用事务性生产者,本质上就是一个配置了 transational.idinitTransations()方法初始化的 Kafka生产者。

可以通过设置事务的隔离级别 isolation.level参数来控制消费者如何读取以事务方式写入的信息:

  • read_committed:返回已成功提交的事务或以非事务方式提交的消息
  • read_uncommitted:返回所有记录,包括执行中或者已中止的事务消息

事务的应用场景

如果流式处理应用程序需要对消息进行聚合,一些消息被统计了不止一次,那么就很难知道结果是不是正确的

事务可以解决的问题

  1. 应用程序崩溃导致的重复处理
  2. 离线应用程序导致的重复处理

事务不能解决的问题

  1. 在流式处理中执行外部操作
  2. Kafka中读取数据并写入数据库
  3. 从数据库中读取数据写入 Kafka,再从 Kafka将数据写入另一个数据库
  4. 将数据从一个集群复制到另一个集群
  5. 发布和订阅模式

使用事务

Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.10.10:9092");
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "producer");
// 唯一的事务ID
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "demo_transaction");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.10.10:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// 初始化事务
producer.initTransactions();
consumer.subscribe(Collections.singletonList("kafka_demo"));
while (true) {
    try {
        // 获取消息
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
        if (records.count() > 0) {
            // 开启事务
            producer.beginTransaction();
            Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(records.count());
            for (ConsumerRecord<String, String> record : records) {
                // 业务处理逻辑
                System.out.println(record);
                offsetMap.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
            }
            // 发送偏移量
            producer.sendOffsetsToTransaction(offsetMap, consumer.groupMetadata());
            // 提交事务
            producer.commitTransaction();
        }
    } catch (Exception ex) {
        // 发生异常则回滚事务
        producer.abortTransaction();
        ex.printStackTrace();
    }
}

事务ID和隔离

应用程序的事务ID在重启前后必须保持一致,并且不能重复,否则 broker无法隔离僵尸实例

数据管道

Kafka的数据管道可以作为各个数据阶段之间的缓冲区,解耦数据的生产者和消费者。例如把 PostgreSQL中的数据同步到 Elasticsearch

构建数据管道通常需要考虑以下问题:

  • 及时性
  • 可靠性
  • 高吞吐量和动态吞吐量
  • 数据格式
  • 转换
  • 安全性
  • 故障处理
  • 耦合性和灵活性

Kafka Connect

ConnectKafka的一部分,提供了一组API和一个运行时,可以用于开发和运行连接器插件

运行Connect

# 下载Kafka
wget https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz

# 解压并进入目录
tar -zxvf kafka_2.13-3.6.1.tgz && cd kafka_2.13-3.6.1

# 修改配置文件
vim config/connect-distributed.properties
# 一个运行正常的broker地址
bootstrap.servers=localhost:9092

# 集群ID
group.id=connect-cluster

# key和value序列化器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# 配置插件目录
#plugin.path=

配置文件修改完成后直接启动即可

# 启动connect服务
bin/connect-distributed.sh config/connect-distributed.properties

# 启动成功后访问本地地址,会返回kafka-connect的版本信息
curl http://localhost:8083

连接器:MySQL到Elasticsearch

暂时忽略 Flink是更好的选择

跨集群数据镜像

暂时忽略 没有集群环境

管理Kafka

Topic操作

# 列出所有Topic
kafka-topics.sh --bootstrap-server localhost:9092 --list

# 创建新Topic
# --create --topic 表示创建的是一个Topic
# --partitions 指定Topic的分区数量
# --replication-factor 指定Topic的分区副本数量
# 可选参数 --if-not-exists 如果要创建的Topic已存在也不会抛出异常
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic custom-topic --partitions 2

# 获取Topic详情
# --describe --topic 表示获取Topic详情
# 可选参数
# --topics-with-overrides 列出配置与集群默认值不同的Topic
# --exclude-internal 排除双下划线开头的Topic
# --under-replicated-partitions 找出一个或多个副本与首领不同步的分区
# --at-min-isr-partitions 找出副本数量等于最少同步副本数的分区
# --under-min-isr-partitions 超出ISR低于配置的最小值的分区
# --unavailable-partitions 找出没有首领的分区
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic custom-topic

# 增加分区
# 使用--alter修改 --partitions输入新的分区数量
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic custom-topic --partitions 4

# 删除Topic
# 删除Topic broker的delete.topic.enable配置必须为true
# 删除Topic的操作时异步的 会将待删除的Topic打上标记 删除时间取决于清理策略
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic custom-topic

消费者群组操作

# 列出所有消费者群组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 删除消费者群组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group custom-consumer

# 偏移量管理
# 导出偏移量 将custom-consumer消费者群组读取的custom-topic Topic的偏移量导出为csv文件
# --dry-run 必须添加这个参数 否则偏移量会被重置
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --export --group custom-consumer --topic custom-topic --reset-offsets --to-current --dry-run > offsets.csv

# 导入偏移量 通过导入备份的csv文件来重置消费者群组的偏移量
# 导入偏移量之前 必须先关闭所有的消费者
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group custom-consumer --from-file offsets.csv --execute

动态配置变更

Topicbroker、用户和客户端都支持配置参数运行时动态更新。

# 修改Topic配置
# --entity-type 指定修改的类型是Topic
# --entity-name 指定待修改主体的名称
# --add-config 指定需要修改的配置 格式为key=value格式 可以使用[]一次添加多条
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name custom-topic --add-config <key>=<value>

# 修改客户端和用户配置
kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config "key=value" --entity-type clients --entity-name <Client Id> --entity-type users --entity-name <user Id>

# 修改broker配置
# broker的集群级别的配置主要放在配置文件中 但还是有参数可以在运行时修改
# min.insync.replicas
# max.connections
# unclean.leader.election.enable

# 查看被覆盖的配置
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type topics --entity-name custom-topic

# 移除被覆盖的配置
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name custom-topic --delete-config <config-name>

生产和消费

控制台生产者

可以使用控制台向 kafkatopic中写入消息,消息的键和值以 Tab字符分隔,使用默认的序列化器生成原始字节。

# 一行输入是一条消息
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic custom-topic

控制台消费者

控制台消费者读取的消息会使用换行符分隔,打印到标准输出中

# --topic 读取指定Topic
# --whitelist 指定正则表达式 读取表达式所匹配的所有Topic的消息
# --from-beginning 指定从最旧的偏移量开始读取
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic custom-topic

分区管理

首选首领选举

# 为所有Topic启动首选首领选举
# 如果需要指定Topic或分区进行选举 可以使用 --topic 和 --partitions 参数
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type PREFERRED --all-topic-partitions

# 也可以指定一个文件,其中包含所有参数选举的分区
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type PREFERRED --path-to-json-file partitions.json
// partitions.json
{
    "partitions": [
        {
            "partitions": 1,
            "topic": "custom-topic"
        },
        {
            "partitions": 2,
            "topic": "demo-topic"
        }
    ]
}

修改分区副本

在一下集中场景中,可能需要手动修改分区的副本

  • broker的负载分布不均衡,自动首领选举无法解决
  • broker离线,分区不同步
  • 新增了 broker,需要快速分配分区
  • 修改 Topic的复制系数

修改分区副本需要先基于 brokerTopic生成迁移清单,然后再执行调整。假设集群中添加了两个新的 broker,需要将两个 Topic移到新的 broker

// topics.json
{
    "topics": [
        {
            "topic": "topic1"
        },
        {
            "topic": "topic2"
        }
    ],
    "version": 1
}

使用上面的 json文件生成移动清单

# --broker-list 需要转移到的broker序号
# --generate 会生成两个json片段 第一个是原来的分区配置 第二个用于执行分区调整
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics.json --broker-list 3,4 --generate

将生成的 json片段保存为 json文件

# 将上面生成的第二个json片段保存为expand.json
# 命令执行后会开始重新分配 输出结果和上面的输出一致
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand.json --execute

不安全的操作

  • 移动集群控制器
  • 移除待删除的Topic
  • 手动删除Topic

流式处理

Kafka可靠的传递能力成了流式处理系统完美的数据来源,从 0.10.1版本开始,Kafka提供了流式处理开发库 Kafka Stream,可以直接再应用程序中读取、处理和生成事件。

数据流是无边界数据集的抽象表示。无边界意味着无限和持续增长。除了无边界,事件流模型还有其他属性

  • 事件流是有序的
  • 不可变的数据记录
  • 事件流是可重放的
  • 请求和响应
  • 批处理
  • 流式处理

流式处理相关概念

拓扑

一个流式处理应用程序包含一个或多个处理拓扑。处理拓扑从一个或多个源数据流开始,经过满是流处理器的图路径,直到结果被写入一个或多个目标数据流。

时间

时间可能是流式处理中最为重要的概念,流式处理系统包含以下几种时间:

  • 事件时间:指事件的发生时间和消息的创建时间
  • 日志追加时间:指事件到达并保存到 broker的时间
  • 处理时间:指应用程序收到事件后对其进行处理的时间

状态

如果流处理流程中包含多个事件,就把这些信息叫做状态。状态通常会保存在应用程序的本地变量里。

  • 本地状态或内部状态:只能被单个程序实例访问,优点是速度快,缺点是受可用内存的限制。
  • 外部状态:使用外部数据存储来维护和管理,通常使用 NoSQL系统

流和表

表是记录的集合,每条记录都有一个主键标识,并包含了一组由模式定义的属性。

流是一系列事件,每个事件就是一个变更。

要将表转化为流,需要捕获所有对表做出的更改。

要将流转化为表,需要应用流里面所有的变更。也叫流的物化

时间窗口

大部分针对流的操作是基于时间窗口的,比如移动平均数,一周内销量最好的产品。两个流的连接操作也是基于时间窗口的,会连接发生在相同时间片段内的事件。

  • 窗口大小:窗口大小表示时间范围
  • 窗口移动频率:5分钟平均数可以每分钟或每秒变化一次,或者在新事件到达时发生变化。时间间隔固定的窗口叫做跳跃窗口,移动间隔与窗口大小相等的窗口叫做滚动窗口
  • 窗口可更新时间:

处理保证

流式处理应用程序无论是否出现故障,都需要能够一次且仅一次处理每一条记录。Streams借助 Kafka的事务特性为流式处理应用程序提供精确一次性保证。

processing.guarantee=exactly_once启用

流式处理设计模式

单事件处理

处理单个事件是流式处理最基本的模式。也叫做 映射(map)模式过滤器(filter)模式。在这种模式中,应用程序会读取流中的事件,修改它们,再把它们生成到另一个流中。一个例子是,从流中读取日志信息,将ERROR日志写入到高优先级流,其它消息写入到低优先级流。

使用本地状态

大部分流式应用程序要用到聚合信息,特别是基于时间窗口的聚合。实现这些聚合操作,需要维护流的状态。可以通过本地状态实现

关联文章

0 条评论