- 发送数据
- Producer通过producer.send()函数发送数据
- send发送的是record,record包含了message的topic和value,可选的还有key和partition等
- 发送之前需要先把key和value序列化,序列化就是将其变成字节流,对于一般类型kafka已经内置了序列化和反序列化函数,复杂类型的可能需要自己实现
- 获取partition的值,producer发送消息的时候需要确定发送到哪个topic的哪个partition
- 指明partition的情况下,直接发送到指明的partition
- 没指明partition的情况下,将key的hash值与partition个数取余作为partition值
- 如果连key都没有的话,则随机一个整数对partition个数取余作为partition值,之后每次都自增,即round-robin算法
- 向accumulator写数据
- 每个topic-partition都会有一个队列,队列的单元是recordbatch
- producer要发送的record先追加到recordbatch中,如果不存在或者空间不足则新建一个recordbatch
- 写入成功后,如果发现有recordbatch满足发送条件,就会唤醒sender线程发送recordbatch
- 将broker相同的recordbatch放在一起发送
kafka中的Producer
Welcome to my other publishing channels