Kafka学习笔记


Kafka学习笔记

记录Kafka与消息队列基本概念的学习笔记

什么是消息队列?

消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上, 队列存储消息直到它们被应用程序读走。通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的

Kafka的通讯模型

在消息队列中,有多种通讯的模型,Kafka选择使用了一种发布订阅模式的通讯模型: 发布/订阅功能使消息的分发可以突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发

Kafka专用术语

Broker:Kafka 集群包含一个或多个服务器,这种服务器被称为 broker.通常来说,一个机器(虚拟或者物理上的)就是一个Broker

Topic:每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。

Partition:Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition。

Producer:负责发布消息到 Kafka broker。

Consumer:消息消费者,向 Kafka broker 读取消息的客户端

Consumer Group:每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。

部署基本命令

下载安装

Kafka是由Apache基金会进行维护的,所以我们可以在其官方网页上找到二进制版本进行下载.

下载好了之后,我们将 kafka-2.3.1-src.tgz 解压放到本地中,注意,在这里我们使用linux服务器作为演示.并以单机版本作为示例.

首先,我们需要安装Kafka的注册环境,通常来说就是zookeeper

启动zookeeper

这里下载好zookeeper并解压后,将config目录下的zoo-simple.cfg复制一下改名为zoo.cfg 就可以运行其目录下的zkServer.sh运行了.

sh zkServer.sh start

我们可以使用telnet localhost 2181 并发送srvr来验证其是否启动成功

建立zookeeper集群

同样地,我们可以轻松地于上面的步骤相似,来建立一个zookeeper的集群

首先,假设我们分别有3台物理机,其IP地址分别为192.168.163.129,192.168.163.131,192.168.163.132

那么,我们在这3台机子上分别下载并解压好对应的zookeeper,并在每一个zookeeper的zoo.cfg文件中加入如下语句

server.0=192.168.163.129:2888:3888
server.1=192.168.163.131:2888:3888
server.2=192.168.163.132:2888:3888

这个配置的模式是这样的: server.A=B:C:D,A是以整数为例的id,代表着服务器的编号,这个编号是对应的,B是服务器的IP地址,C为服务器与集群中leader服务器交换信息的端口.D为选举时服务器相应通信的端口.

然后我们还需要在每一机子上面的zookeeper/data目录中,创建一个myid文件,里面只放置一个数字,就是本机对应的id.

然后就可以一个个启动,创建集群了.

在启动之后,我们可以使用sh zkServer.sh status命令来查看当前服务器上zookeeper节点的状态

/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /root/zookeeper/apache-zookeeper-3.5.5-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

启动kafka

在启动之前,我们可以去看一下kafka的配置是怎么样的.

./config/server.properties

其中有几个比较关键的地方是:

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.

zookeeper.connect=localhost:2181

这边是指定zookeeper的连接地址,我们的zookeeper在本地,所以直接填写localhost就好了,如果有多个zookeeper形成了集群,我们可以根据上面的注释知道,用逗号分割不同机器的地址与端口就可以以集群模式连接.zookeeper的连接是以hostname:port/path作为的一个列表,我们不仅可以指定地址,端口号,还可以指定zookeeper的放置路径,默认使用根路径

# A comma separated list of directories under which to store log files
log.dirs=../data

Kafka 把所有消息都保存在磁盘上,存放这些日志片段的目录是通过 log.dirs 指定的.在这里的log files,并不是我们常说的日志文件,在kafka中,log files就是我们传递消息的时候存放在磁盘的消息本身

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

前面我们说过了broker的概念,在这里可以指定broker的id,一定是一个不重复的整数

 # The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

这个的意思是消息文件会存放多少天,默认是7天.

如果kafka需要使用外网进行访问的话,我们还需要对其作一些额外的配置

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.163.129:9092

主要是对这两个属性进行配置,让kafka在外网进行监听.

我们在确定好这些消息之后(其实如果是单机版本的话,使用默认的就可以了),就可以开始启动kafka了.

我们进入bin目录下,运行以下命令

sh kafka-server-start.sh -daemon ../config/server.properties 

其中,-daemon是指定kafka以守护进程的方式运行,如果不指定这个方式,那么就会以阻塞线程的方式运行.

建立kafka集群

创建kafka集群同样很简单,我们只需要在配置文件中,将broker.id=0修改成自己对应的id,(不冲突即可),然后将advertised.listeners中的ip地址即可.

修改了这两个配置之后,我们就可以直接启动.

创建topic

运行成功后,我们可以尝试创建一个topic.关于topic上面也已经讲过了,在kafka中是用于区分消息的类别的.

sh kafka-topics.sh --create --zookeeper localhost:2181 --topic first --partitions 1 --replication-factor 1

在这个命令中,我们使用--create来声明创建一条topic,以--zookeeper来声明zookeeper的位置,以--topic来指定topic 的名字,partitions与replication-factor涉及到kafka的备份问题,一般来说,replication-factor不能超过brokers的个数,在这里我们使用了单机的版本,所以两个都直接设为1即可.

一个简单的kafka集群如下

如上所示,我们可以看到,每个主题都有至少一个分区和一个复制集,并且每个分区从属于一个broker,每个分区通过复制机制,来将同样的数据复制到不同的broker中,这个时候就会发生分区复制了.

一个主题有多个分区,是可以根据broker在不同机器的特性,提高一个主题同时有多个消息写入时的性能,而一个主题的同一个分区会复制到不同的broker中,成为副本,是为了备份(备份的机制比较复杂,涉及到如何备份,保持数据的一致性等等),容灾和可用性.

在运行完了上面的步骤后,我们就已经建立起来了一个简单的kafka服务器了.

Kafka的消息传递应用

Kafka的生产者

下面是Kafka的一张生产者将消息发送到Kafka中的图片

在这里,我们有一个ProducerRecord作为生产者,在其中,记录了Topic,也就是主题,Value,也就是值,还可以指定键或者分区然后我们调用Send()函数将其发送到Kafka中,无论是什么类型的消息,在传输时,都会被序列化成字节数组.然后数据传输到分区器后,就会涉及到我们之前所说的数据的分区策略了.此时Kafka会选择对象中的键,来选择一个分区,然后这些数据作为一条记录,被添加到一个记录的批次里面,有一个单独的线程来处理这些记录,并将其发送到相应的broker上.如果成功了,就返回一个RecordMetaData对象,包含记录插入的主题与分区信息,以及offset偏移量.

生产者客户端的更详细的基本架构如下

每个生产者由两个线程协调运行,主线程从生产者类中创建一个消息,经过拦截器,序列化器和分区器之后,缓存到消息累加器,然后由发送者线程将其发送到Kafka集群中. 消息累加器的作用就是将要发送的消息缓存起来,让发送者一次发送,提高效率.默认大小为32MB,如果空间不足,send()方法调用就被会暂时阻塞起来,不让消息进入累加器.

创建一个生产者

我们可以使用Kafka中给定的脚本创建一个生产者,其中有几个属性是需要了解的

  • bootstrap.servers 指定broker的地址.
  • key.serializer 键的序列化器
  • value.serializer 值的序列化器

然后我们新建一个Java项目,引入maven依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.3.0</version>
        </dependency>

然后我们根据上面说到的配置,来进行一个简单的消息发送

public class Producer1 {
    private Properties kafkaProps = new Properties();

    public static void main(String[] args) {
        Producer1 producer1 = new Producer1();
        producer1.kafkaProps.put("bootstrap.servers", "192.168.163.129:9092");
        producer1.kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer1.kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> stringStringKafkaProducer = new KafkaProducer<>(producer1.kafkaProps);
        ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("first", "testKey", "testValue");
        stringStringKafkaProducer.send(stringStringProducerRecord);
    }
}

在这里,我们这样就可以实现一个简单的消息发送了.在这个Send函数中,我们还可以实现消息的回调,在这里,我们实现一个简单的回调类,将Kafka返回的消息打印出来

    static class CallBack1 implements Callback{
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            System.out.println(metadata);
        }
    }
    
    
    //方法的调用如下
      try {
            stringStringKafkaProducer.send(stringStringProducerRecord,new CallBack1()).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

这样就可以使回调函数打印出我们发送消息后的消息topic,分区和偏移量位置了

其完整代码如下

package producer.KafkaProducer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Scanner;

/**
 * @author Boyn
 * @date 2019/11/29
 */
public class KafkaProducerAnalysis {
    public static final String brokerList = "192.168.163.129:9092";
    public static final String topic = "testTopic";

    public static Properties initConfig() {
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        return configProperties;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        Scanner scanner = new Scanner(System.in);
        try {
            while (true) {
                String s = scanner.nextLine();
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, s);
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.out.println(exception.getMessage());
                    }
                    System.out.printf("Send Complete.offset:%d, topic:%s, partition:%d \n", metadata.offset(), metadata.topic(), metadata.partition());
                });
            }
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            producer.close();
        }
    }
}

Kafka的消费者

在Kafka中,消费者的概念比较宽泛,消费者通常来说,多个消费者从属于一个消费者的群组,一个群组里面的消费者只能够订阅同一个主题,每一个消费者接收主题中的部分分区信息.

值得注意的是,同一个消费者群组里面的多个消费者不能够接收同一个分区的消息,而一个消费者可以接收多个分区的消息.这样说有一些拗口,我们来用图解释一下

这里,一个消费者对应着4个分区,它可以同时消费多个分区

这里,一共有两个消费者,而每个消费者对应2个分区

这里,消费者和分区是一对一的关系

这里消费者的数量多于分区的数量,所以有一部分的消费者就会空闲下来

在上面所说的对应关系,只是对于同一个群组中的消费者来说的.如果我们有另外一个消费者群组,那么其关系跟消费者群组1没有任何关系.简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组, 然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理一部分消息

创建一个消费者

package consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author Boyn
 * @date 2019/11/29
 */
public class KafkaConsumerAnalysis {
    private static final String brokerList = "192.168.163.129:9092";
    private static final String topic = "testTopic";
    private static final String groupId = "testGroup";
    private static final AtomicBoolean isRunning = new AtomicBoolean(true);
    private KafkaConsumer<String, String> consumer;

    public static Properties initConfig(){
        Properties configProperties = new Properties();
        configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client");
        return configProperties;
    }

    public List<PartitionInfo> getPartitionInfo(){
        return consumer.partitionsFor(topic);
    }

    public KafkaConsumerAnalysis(){
        Properties properties = initConfig();
        consumer = new KafkaConsumer<String, String>(properties);
    }

    public void startReceive(){
        consumer.subscribe(Collections.singletonList(topic));
        try{
            while (isRunning.get()) {
                ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord record : consumerRecords) {
                    System.out.printf("topic:%s partition:%d offset:%s key:%s value:%s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }

    public static void main(String[] args) {
        KafkaConsumerAnalysis analysis = new KafkaConsumerAnalysis();
        List<PartitionInfo> partitionInfos = analysis.getPartitionInfo();
        System.out.println(partitionInfos.size());
        for (PartitionInfo info : partitionInfos) {
            System.out.printf("topic:%s partition:%s replicas_number:%d leader_node:%s\n",info.topic(),info.partition(),info.replicas().length,info.leader().toString());
        }
        analysis.startReceive();
    }
}


自定义序列化器与反序列器

自定义字节数组的格式

在上面,我们实现了一个简单的传递字符串消息的生产者与消费者组合.但是在日常的生产环境中,不会完全由字符串来构成消息,更多的是各式各样的对象,这个时候我们可能就需要自定义序列化器了.假设我们定义了这么一个POJO,作为公司的实体,包含名字与地址

package consumer.CompanyConsumer;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author Boyn
 * @date 2019/11/29
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Company {
    private String name;
    private String address;
}

然后我们可以自定义其序列化器与反序列化器,注意,序列化器是给生产者使用的,而反序列化器是给消费者使用的.

先给出序列化器

package producer.CompanyProducer;

import org.apache.kafka.common.serialization.Serializer;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
 * @author Boyn
 * @date 2019/11/29
 */
public class CompanySerializer implements Serializer<Company> {
    @Override
    public byte[] serialize(String topic, Company data) {
        if(data==null){
            return null;
        }else {
            byte[] name,address;
            if (data.getName() != null) {
                name = data.getName().getBytes(StandardCharsets.UTF_8);
            } else {
                name = new byte[0];
            }
            if (data.getAddress() != null) {
                address = data.getAddress().getBytes(StandardCharsets.UTF_8);
            } else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        }
    }
}

其实现了KafkaAPI中的Serializer接口,实现了serialize方法

再给出反序列化器的实现

package consumer.CompanyConsumer;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
 * @author Boyn
 * @date 2019/11/29
 */
public class CompanyDeserializer implements Deserializer<Company> {

    @Override
    public Company deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        if (data.length < 8) {
            throw new SerializationException("数据收到的长度为:" + data.length + " 小于最小长度");
        }
        String name,address;

        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLen,addressLen;
        nameLen = buffer.getInt();
        byte[] nameBytes = new byte[nameLen];
        buffer.get(nameBytes);
        addressLen = buffer.getInt();

        byte[] addressBytes = new byte[addressLen];
        buffer.get(addressBytes);

        name = new String(nameBytes, StandardCharsets.UTF_8);
        address = new String(addressBytes, StandardCharsets.UTF_8);
        return new Company(name, address);
    }
}

同样,它实现了KafkaAPI的Deserializer接口里面的deserialize方法

然后我们给出这两者的生产者与消费者的实现,由于与前面的十分相似,就不再进行赘述了

package consumer.CompanyConsumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import producer.CompanyProducer.CompanySerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author Boyn
 * @date 2019/11/29
 */
public class CompanyConsumer {
    private static final String brokerList = "192.168.163.129:9092";
    private static final String topic = "company";
    private static final String groupId = "company-consumer";
    private static final AtomicBoolean isRunning = new AtomicBoolean(true);
    private KafkaConsumer<String, String> consumer;

    public static Properties initConfig(){
        Properties configProperties = new Properties();
        configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());
        configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());
        configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client");
        return configProperties;
    }

    public List<PartitionInfo> getPartitionInfo(){
        return consumer.partitionsFor(topic);
    }

    public CompanyConsumer(){
        Properties properties = initConfig();
        consumer = new KafkaConsumer<String, String>(properties);
    }

    public void startReceive(){
        consumer.subscribe(Collections.singletonList(topic));
        try{
            while (isRunning.get()) {
                ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord record : consumerRecords) {
                    System.out.printf("topic:%s partition:%d offset:%s key:%s value:%s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }

    public static void main(String[] args) {
        CompanyConsumer consumer = new CompanyConsumer();
        List<PartitionInfo> partitionInfos = consumer.getPartitionInfo();
        System.out.println(partitionInfos.size());
        for (PartitionInfo info : partitionInfos) {
            System.out.printf("topic:%s partition:%s replicas_number:%d leader_node:%s\n",info.topic(),info.partition(),info.replicas().length,info.leader().toString());
        }
        consumer.startReceive();
    }
}

package producer.CompanyProducer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Date;
import java.util.Properties;

/**
 * @author Boyn
 * @date 2019/11/29
 */
public class CompanyProducer {
    private static final String brokerList = "192.168.163.129:9092";
    private static final String topic = "company";
    private KafkaProducer<String, Company> producer;
    public static Properties initConfig() {
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,CompanySerializer.class.getName());
        return configProperties;
    }

    public CompanyProducer(){
        Properties properties = initConfig();
        producer = new KafkaProducer<>(properties);
    }

    public RecordMetadata send(Company company) {
        ProducerRecord<String, Company> record = new ProducerRecord<>(topic, company);
        try {
            return producer.send(record).get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    public void close(){
        producer.close();
    }

    public static void main(String[] args) {
        CompanyProducer producer = new CompanyProducer();
        Company company = Company.builder().address("ABCD").name("123").build();
        RecordMetadata send = producer.send(company);
        System.out.printf("Send Complete. time:%s topic:%s partition:%d offset:%d \n", new Date(send.timestamp()), send.topic(), send.partition(), send.offset());
        producer.close();
    }
}

使用序列化工具

在前面,我们的序列化与反序列化用的是自己定义的数组结构,这样就会带来一个问题,其通用性不够广,我们如果想要对其他的类,或者Company要增加字段,都要重新去写这个方法,无疑十分不方便,在这里,我们使用Protostuff这个序列化工具来帮助我们进行通用的序列化与反序列化

首先,在pom.xml中加入依赖

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>1.6.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/io.protostuff/protostuff-core -->
        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>1.6.0</version>
        </dependency>

然后将两个方法进行改写

@Override
    public Company deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        Schema schema = RuntimeSchema.getSchema(Company.class);
        Company ans = new Company();
        ProtostuffIOUtil.mergeFrom(data, ans, schema);
        return ans;
    }
    
   @Override
    public byte[] serialize(String topic, Company data) {
        if(data==null){
            return null;
        }else {
            Schema schema = RuntimeSchema.getSchema(data.getClass());
            LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
            byte[] protostuff;
            try {
                protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            }finally {
                buffer.clear();
            }
            return protostuff;
        }
    }

SpringBoot整合Kafka

设置一个简单的(发布/订阅)

在实际的用途中,我们通常要跟SpringBoot结合起来一起用,所以我们需要将Kafka和SpringBoot整合起来一起用.在Spring的官网上,我们可以看到几个实际的例子,那么我们就参照这些例子来看看如何将两者整合起来.

首先我们需要引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

假设我们有一个消息类

package top.boyn.kafkaboot.pojo;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author Boyn
 * @date 2019/11/29
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Message {
    private String message;
    private long sendTime;
    private long msgId;
}

消息类是作为消息队列中传输的对象.

接下来我们讲解一下Kafka在SpringBoot中的配置,为了方便,我们这次先使用默认的配置,在application.yml中,作如下配置

spring:
  kafka:
    bootstrap-servers: 192.168.163.129:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: test-con
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

并在作一个配置类,来配置消费者容器的工厂Bean和消息转换器的Bean,在这里简要说说明一下这个配置,ConcurrentKafkaListenerContainer负责管理我们的消费者,是一个容器的类型,自动分配对应的消费者来接收消息.

并且配置了一个错误重试策略,在失败3次之后才会最终报错

    @Bean
    @Autowired
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTemplate<Object, Object> template) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(
                new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 2))); // dead-letter after 3 tries
        return factory;
    }

    @Bean
    public RecordMessageConverter converter() {
        return new StringJsonMessageConverter();
    }

然后我们以接收网络消息为例,创建一个生产者的流程,

SendController

package top.boyn.kafkaboot.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import top.boyn.kafkaboot.producer.KafkaSender;

import javax.annotation.Resource;

/**
 * @author Boyn
 * @date 2019/11/29
 */
@RestController
public class SendController {
    @Resource
    KafkaSender sender;
    @RequestMapping("/api/send")
    public void send(@RequestParam("msg") String msg){
        sender.send(msg);
    }
}

KafkaSender

package top.boyn.kafkaboot.producer;

import com.alibaba.fastjson.JSON;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import top.boyn.kafkaboot.pojo.Message;

import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author Boyn
 * @date 2019/11/29
 */
@Component
public class KafkaSender {
    AtomicLong atomicLong = new AtomicLong(1L);
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String msg){
        Message message = Message.builder().message(msg).msgId(atomicLong.getAndIncrement()).sendTime(System.currentTimeMillis()).build();
        kafkaTemplate.send("msg-test", JSON.toJSONString(message));
    }
}

在这里,我们首先开放了一个接口用于接收消息,然后调用Sender类来进行消息发送.

Sender类中,我们根据消息构造一个Message类,然后序列化为JSON字符串通过Kafka送到msg-test这个topic中.

但是还不算完,我们有一个生产者,还需要一个消费者.

在SpringBoot中,我们可以通过一个@KafkaListener来创建一个消费者

package top.boyn.kafkaboot.consumer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import top.boyn.kafkaboot.pojo.Message;

import java.util.Optional;

/**
 * @author Boyn
 * @date 2019/11/29
 */
@Component
public class Consumer {
    @KafkaListener(topics = "msg-test")
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Message message = JSON.parseObject(String.valueOf(kafkaMessage.get()), Message.class);
            System.out.println(message);
        }
    }
}

有了上面这几个类,我们就可以实现简单的消息发送了.

使用Json进行序列化与反序列化

接下来我们可以开始对其进行更"Spring"的改进.

如果我们以JSON为默认的序列化格式,那么我们可以使用Spring给我们提供的JsonSerializer

    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

然后将Sender中的Template的值改为Object类型,然后我们就可以直接通过对象进行发送了,Spring会为我们自动序列化成JSON字符串.

@Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void send(String msg){
        Message message = Message.builder().message(msg).msgId(atomicLong.getAndIncrement()).sendTime(System.currentTimeMillis()).build();
        kafkaTemplate.send("msg-test", message);
    }

同时,消费者这边也需要用这样的JSON反序列化

我们更改一下消费者的配置就可以了

consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: test-con
      properties:
        spring.json.trusted.packages: top.boyn.kafkaboot.pojo

多个Topic进行发布订阅

在生产环境中,我们肯定不止一个Topic进行发布订阅的,这一节我们就将设置多个topic进行同时发布和订阅

我们先创建两个topic,使用Bean的方式

    @Bean
    public NewTopic msg_test1(){
        return new NewTopic("msg-test-01", 1, (short) 1);
    }
    
    @Bean
    public NewTopic msg_test2(){
        return new NewTopic("msg-test-02", 1, (short) 1);
    }

其中,第一个参数是topic的名字,第二个是分区数量,第三个是复制集数量

然后我们复制出两个完全一样的类,成为MessageType[12].

然后在配置中,我们需要对收到的消息进行类型映射,改一下我们上面说到converter方法里面的设置,

在这里,我们将设置以默认以Jackson为JSON库的类型映射器,然后需要添加一个包地址作为序列化时信任的包.

接下来在类型映射器中我们将type1,type2设置为对应的类嵌入到类型映射器中.

@Bean
    public RecordMessageConverter converter() {
        StringJsonMessageConverter converter = new StringJsonMessageConverter();
        DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
        typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
        typeMapper.addTrustedPackages("top.boyn.kafkaboot.pojo");
        Map<String, Class<?>> mappings = new HashMap<>();
        mappings.put("type1", MessageType1.class);
        mappings.put("type2", MessageType2.class);
        typeMapper.setIdClassMapping(mappings);
        converter.setTypeMapper(typeMapper);
        return converter;
    }

我们在配置好了之后,就做一个多发的控制器

package top.boyn.kafkaboot.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import top.boyn.kafkaboot.pojo.Message;
import top.boyn.kafkaboot.pojo.MessageType1;
import top.boyn.kafkaboot.pojo.MessageType2;

import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author Boyn
 * @date 2019/11/30
 */
@RestController
public class MultiTypeSendController {
    @Resource
    private KafkaTemplate<Object, Object> template;
    private AtomicLong atomicLong = new AtomicLong(1L);

    @PostMapping(path = "/send/type1/{msg}")
    public void sendFoo(@PathVariable String msg) {
        this.template.send("msg-test-01", MessageType1.builder().message(msg).msgId(atomicLong.getAndIncrement()).sendTime(System.currentTimeMillis()).build());
    }

    @PostMapping(path = "/send/type2/{msg}")
    public void sendBar(@PathVariable String msg) {
        this.template.send("msg-test-02", MessageType2.builder().message(msg).msgId(atomicLong.getAndIncrement()).sendTime(System.currentTimeMillis()).build());
    }

    @PostMapping(path = "/send/unknown/{msg}")
    public void sendUnknown(@PathVariable String msg) {
        this.template.send("msg-test", Message.builder().message(msg).msgId(atomicLong.getAndIncrement()).sendTime(System.currentTimeMillis()).build());
    }
}

定义了几个不同的路径作为不同topic的映射.为了方便,我们把上面说到了Sender的功能直接放在了Controller中,实际生产上最好对其作分离.

然后我们再定义几个消费者

package top.boyn.kafkaboot.consumer;

import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import top.boyn.kafkaboot.pojo.Message;
import top.boyn.kafkaboot.pojo.MessageType1;
import top.boyn.kafkaboot.pojo.MessageType2;

/**
 * @author Boyn
 * @date 2019/11/30
 */
@Component
@KafkaListener(id = "multiGroup",topics = {"msg-test-01","msg-test-02","msg-test"})
public class MultiConsumer {
    @KafkaHandler
    public void type1(MessageType1 type1){
        System.out.println(type1);
    }
    @KafkaHandler
    public void type2(MessageType2 type2){
        System.out.println(type2);
    }
    @KafkaHandler
    public void unknownType(Message msg){
        System.out.println(msg);
    }
}

在之前,我们的@KafkaListener注解是放在方法上的,在这里我们为了要进行多个映射,我们将其放在类的注解上,并且为其声明需要监听的topic,而后在类中定义几个方法,作为各个类的映射.

再谈分区

在Kafka中,生产者将消息发送到分区中,消费者以消费组的名义来订阅主题,实际上也是从分区中读取消息.

分区与生产者

一个主题的分区数量可以在server.properties中指定默认值,默认为1,也可以在创建主题时自己指定.

生产者将消息投递到分区是遵循一定的策略的,默认的分区策略是

  • 如果在发消息的时候指定了分区,则消息投递到指定的分区
  • 如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
  • 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区

分区与消费者

消费者从分区中获取消息是以消费组为单位进行的,那么一个主题中有很多分区,具体是怎么分配的呢,这里也有关于消费者的分区策略

range策略

range策略是基于每个主题的

对于每个主题,我们以数字顺序排列可用分区,以字典顺序排列消费者。然后,将分区数量除以消费者总数,以确定分配给每个消费者的分区数量。如果没有平均划分(PS:除不尽),那么最初的几个消费者将有一个额外的分区。

简而言之,就是,

  1. range分配策略针对的是主题(PS:也就是说,这里所说的分区指的某个主题的分区,消费者值的是订阅这个主题的消费者组中的消费者实例)

  2. 首先,将分区按数字顺序排行序,消费者按消费者名称的字典序排好序

  3. 然后,用分区总数除以消费者总数。如果能够除尽,则皆大欢喜,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区

例如,假设有两个消费者C0和C1,两个主题t0和t1,并且每个主题有3个分区,分区的情况是这样的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,基于以上信息,最终消费者分配分区的情况是这样的:

C0: [t0p0, t0p1, t1p0, t1p1]

C1: [t0p2, t1p2]

roundrobin 策略

轮询分配策略是基于所有可用的消费者和所有可用的分区的

与前面的range策略最大的不同就是它不再局限于某个主题

如果所有的消费者实例的订阅都是相同的,那么这样最好了,可用统一分配,均衡分配

例如,假设有两个消费者C0和C1,两个主题t0和t1,每个主题有3个分区,分别是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,最终分配的结果是这样的:

C0: [t0p0, t0p2, t1p1]

C1: [t0p1, t1p0, t1p2]

一些Kafka的小细节

分区个数的问题

分区个数是不是越多越好呢?我们知道,Kafka通过分区将topic的消息打散,保存到不同的broker中,让吞吐量大大提高.生产者和消费者都是多线程并行操作的,每个线程处理的都是一个分区的数据

理论上来说,分区数越高,那么吞吐量就越高,但是实际上,分区的吞吐量还会收到消息的写入方式,压缩方式和发送方式,分区策略等参数的影响..所以实际上,一直增加分区的数量不仅不能提升吞吐量,还会降低吞吐量,严重的时候还会使Kafka进程崩溃

提交offset的问题

在消费者中,默认的设置为消费时自动提交offset.如果我们设置了不自动提交,那么我们就需要手动提交,不管是什么提交,如果消费者的运行是正常的,那么都不会有问题,而如果消费者出现了问题,比如突然崩溃了之后,如果使用自动提交的方式,那么有可能在恢复之后就会出现重复消费,而如果是手动的话,同样可能出现重复消费的错误,如果我们在poll之后先提交再处理数据,那么效果就会好一些.

再均衡

再均衡的使用是为了解决消费者组中某一个消费者上线或者下线的时候,维持消息推送稳定性的一个机制.使得我们可以安全地增删消费者.不过,如果消费者下线的时候,没有提交数据,那么有可能在再均衡结束之后会发生重复消费的情况.

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×