9159金沙游艺场

图片 20
Chrome开发者工具不完全指南(二、进阶篇)
图片 4
IoC 容器的初始化之 BeanDefinition 的载入和解析

Kafka概念入门(一)

  • Spring Boot集成Kafka
    • 前提介绍
    • Kafka
      • 简介
      • Topics & logs
      • Distribution
      • Producers
      • Consumers
      • Guarantees
    • Kafka安装与使用
      • 安装
      • 服务启动
      • Topic
      • 消息发送与消费
    • Spring Boot集成
      • 开始
      • 配置
      • 代码
    • 总结
    • 参考资料

前言:
Kafka是一个轻量级的/分布式的/具备replication能力的日志采集组件,通常被集成到应用系统中,收集”用户行为日志”等,并可以使用各种消费终端(consumer)将消息转存到HDFS等其他结构化数据存储系统中.因为日志消息通常为文本数据,尺寸较小,且对实时性以及数据可靠性要求不严格,但是需要日志存储端具备较高的数据吞吐能力,这种”宽松”的设计要求,非常适合使用kafka。

序:如何保证kafka全局消息有序?

由于公司使用了微服务架构,很多业务拆成了很多小模块。有个场景是这样的A服务主要负责写入或者修改数据库中的数据,B服务主要负责读取,B服务使用缓存技术,当A发生了修改后,需要通知B来清除缓存。中间两个服务之间通知使用了Kafka,这个是本篇文章主要介绍的,关于
缓存技术 我也简单介绍过。

一.入门

  比如,有100条有序数据,生产者发送到kafka集群,kafka的分片有4个,可能的情况就是一个分片保存0-25,一个保存25-50……这样消息在kafka中存储是局部有序了。严格说,kafka是无法保证全局消息有序的,没有这个机制,只能局部有序。

简介

Kafka官网

Kafka is a distributed,partitioned,replicated commit
logservice。它提供了类似于JMS的特性,但是在实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。

图片 1image

1.1 简介

1、Kafka是什么

Topics & logs

一个Topic可以认为是一类消息,每个topic将被分成多个partition,每个partition在存储层面是append
log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset,offset为一个long型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”。

图片 2image

kafka和JMS(Java Message
Service)实现不同的是:即使消息被消费,消息仍然不会被立即删除.日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费.kafka通过这种简单的手段,来释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘IO开支.

对于consumer而言,它需要保存消费消息的offset,对于offset的保存和使用,有consumer来控制;当consumer正常消费消息时,offset将会”线性”的向前驱动,即消息将依次顺序被消费.事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值..(offset将会保存在zookeeper中,参见下文)

kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响.

partitions的目的有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server保存;可以将一个topic切分多任意多个partitions,来消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力..

Kafka是一个”分布式的”/”可分区的(partitioned)”/”基于备份的(replicated)”/”基于commit-log存储”的服务.
它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现.

  在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。

Distribution

一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server负责partitions中消息的读写操作;此外kafka还可以配置partitions需要备份的个数,每个partition将会被备份到多台机器上,以提高可用性.

基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个为”leader”;leader负责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可..由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个”leader”,kafka会将”leader”均衡的分散在每个实例上,来确保整体的性能稳定.

kafka消息是根据Topic进行归类,发送消息者成为Producer,消息接收者成为Consumer;此外kafka集群有多个kafka实例组成,每个实例(server)称为broker.

  Apache
Kafka是一个开源消息系统,由Scala写成。

Producers

Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于”round-robin”方式或者通过其他的一些算法等.

无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性以及保存一些meta信息.

  Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。

Consumers

本质上kafka只支持Topic.每个consumer属于一个consumer
group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费.

如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.如果所有的consumer都具有不同的group,那这就是”发布-订阅”;消息将会广播给所有的消费者.在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个”订阅”者,一个Topic中的每个partions,只会被一个”订阅者”中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来说,消息仍不是有序的.

kafka的原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.

图片 3

  Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群由多个kafka实例组成,每个实例(server)称为broker。

Guarantees

  • 发送到partitions中的消息将会按照它接收的顺序追加到日志中

  • 对于消费者而言,它们消费消息的顺序和日志中消息顺序一致.

  • 如果Topic的”replicationfactor”为N,那么允许N-1个kafka实例失效.

(摘自官网)

  题外话:无论是kafka集群,还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。

安装

我使用的是Mac,下面介绍如何使用安装。

brew updatebrew install kafka

结果

To have launchd start kafka now and restart at login: brew services start kafkaOr, if you don't want/need a background service you can just run: zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties==> Summary🍺 /usr/local/Cellar/kafka/0.11.0.1: 149 files, 35.5MB

结果显示,需要有2个配置文件

/usr/local/etc/kafka/server.properties/usr/local/etc/kafka/zookeeper.properties

其中client与server的通讯,都是基于TCP,而且消息协议非常轻量级.

原文和作者一起讨论:

服务启动

这里为了简单,直接使用brew services start kafkabrew services start zookeeper来启动服务。

Topics/logs

微信:intsmaze

Topic

首先找到kafka安装目录,可以直接使用brew info kafka,可以看出安装目录为/usr/local/Cellar/kafka/0.11.0.1,然后cd到这个目录下面。

brew info kafkakafka: stable 0.11.0.1 Publish-subscribe messaging rethought as a distributed commit loghttps://kafka.apache.org//usr/local/Cellar/kafka/0.11.0.1 (156 files, 36.0MB) * Poured from bottle on 2017-11-26 at 14:09:18From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/kafka.rb==> DependenciesRequired: zookeeper ✔==> RequirementsRequired: java = 1.8 ✔==> CaveatsTo have launchd start kafka now and restart at login: brew services start kafkaOr, if you don't want/need a background service you can just run: zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

创建一个abc123的topic

/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic abc123

查看创建的topic

./bin/kafka-topics --list --zookeeper localhost:2181

图片 4kafka

一个Topic可以认为是一类消息,每个topic将被分成多个partition(区),每个partition在存储层面是append
log文件.任何发布到此partition的消息都会直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它唯一的标记一条消息.kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行”随机读-写”,一旦消息写入log日志之后,将不能被修改.

图片 5

消息发送与消费

Kafka提供了一个命令行客户端,它将从文件或标准输入接收输入,并将其作为消息发送到Kafka集群。默认情况下,每行都将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息发送到服务器。

./bin/kafka-console-producer --broker-list localhost:9092 --topic abc123

Kafka还有一个命令行消费者,将消息转储到标准输出。

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic abc123 --from-beginning

图片 6kafka

如图,上面的是生产者,下面的是消费者,依次发送aaa,bbb,….ggg,消费者依次会收到对应的消息。

图片 7

2、Kafka核心组件
  Topic:消息根据Topic进行归类,可以理解为一个队里。
  Producer:消息生产者,就是向kafka
broker发消息的客户端。
  Consumer:消息消费者,向kafka
broker取消息的客户端。
  broker:每个kafka实例(server),一台kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic。
  Zookeeper:依赖集群保存meta信息。
 
3、Kafka消息有序性

开始

直接使用Idea创建一个Spring Boot项目即可,同时添加LombokKafka库。

图片 8image图片 9image图片 10image图片 11image

也可以接添加依赖库。

Gralde 依赖

dependencies { compile('org.springframework.boot:spring-boot-starter') compile('org.springframework.kafka:spring-kafka') compile('org.projectlombok:lombok')}

Maven 依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId></dependency><dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version></dependency>

(摘自官网)

  生产者是一个独立的集群,和kafka的broker集群,消费者集群没有太直接的干系。比如flume就可以作为生产者,内部调用kafka的客户端代码,确保把收集的数据发到kafka集群中。

配置

配置application.properties文件中kafka属性。

# kafkaspring.kafka.bootstrap-servers=localhost:9092spring.kafka.consumer.group-id=myGroupspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

kafka和JMS实现(activeMQ)不同的是:即使消息被消费,消息仍然不会被立即删除.日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费.kafka通过这种简单的手段,来释放磁盘空间.此外,kafka的性能并不会因为日志文件的太多而低下,所以即使保留较多的log文件,也不不会有问题.

  如何保证kafka全局消息有序?

代码

创建一个消息结构体

@Datapublic class Message { private Long id; private String msg; private Date sendTime;}

一个消息发送者

@Componentpublic class KafkaSender { @Autowired private KafkaTemplate<String, String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); public void send() { Message message = new Message(); message.setId(System.currentTimeMillis; message.setMsg(UUID.randomUUID().toString; message.setSendTime(new Date; kafkaTemplate.send("abc123", gson.toJson; }}

一个消息消费者

@Component@Slf4jpublic class KafkaReceiver { @KafkaListener(topics = {"abc123"}) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value; if (kafkaMessage.isPresent { Object message = kafkaMessage.get(); log.info("record =" + record); log.info("message =" + message); } }}

在主程序中调用发送方法,模拟生产者

@SpringBootApplicationpublic class SpringKafkaDemoApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(SpringKafkaDemoApplication.class, args); KafkaSender sender = context.getBean(KafkaSender.class); for (int i = 0; i < 3; i++) { sender.send(); try { Thread.sleep; } catch (InterruptedException e) { e.printStackTrace(); } } }}

运行输出如下

record =ConsumerRecord(topic = abc123, partition = 0, offset = 17, CreateTime = 1511678827095, checksum = 2229762760, serialized key size = -1, serialized value size = 102, key = null, value = {"id":1511678826816,"msg":"2ff150e4-d7f9-4b4d-9604-b8d13a1d4538","sendTime":"Nov 26, 2017 2:47:06 PM"})message ={"id":1511678826816,"msg":"2ff150e4-d7f9-4b4d-9604-b8d13a1d4538","sendTime":"Nov 26, 2017 2:47:06 PM"}record =ConsumerRecord(topic = abc123, partition = 0, offset = 18, CreateTime = 1511678830109, checksum = 1589760372, serialized key size = -1, serialized value size = 102, key = null, value = {"id":1511678830108,"msg":"e1b93a1c-d88e-4b9b-8e1d-98e05edeb7c6","sendTime":"Nov 26, 2017 2:47:10 PM"})message ={"id":1511678830108,"msg":"e1b93a1c-d88e-4b9b-8e1d-98e05edeb7c6","sendTime":"Nov 26, 2017 2:47:10 PM"}record =ConsumerRecord(topic = abc123, partition = 0, offset = 19, CreateTime = 1511678833110, checksum = 4176540846, serialized key size = -1, serialized value size = 102, key = null, value = {"id":1511678833109,"msg":"f77fbb85-0eb9-402c-8265-c37987011551","sendTime":"Nov 26, 2017 2:47:13 PM"})message ={"id":1511678833109,"msg":"f77fbb85-0eb9-402c-8265-c37987011551","sendTime":"Nov 26, 2017 2:47:13 PM"}

同时原先的命令行消费者也会受到程序发送的消息。

图片 12image

本人是刚刚入门的后端工程师,原先做过几年Java,说的比较简单,如有出错的地方,欢迎指正。

Kafka官网

kafka入门:简介、使用场景、设计原理、主要配置及集群搭建

mac kafka 环境搭建

spring boot与kafka集成

对于consumer而言,它需要保存消费消息的offset,对于offset的保存和使用,有consumer来控制;当consumer正常消费消息时,offset将会”线性”的向前驱动,即消息将依次顺序被消费.事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值..(offset将会保存在zookeeper中,参见下文)

  比如,有100条有序数据,生产者发送到kafka集群,kafka的分片有4个,可能的情况就是一个分片保存0-25,一个保存25-50……这样消息在kafka中存储是局部有序了。严格说,kafka是无法保证全局消息有序的,没有这个机制,只能局部有序。

kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响.

  但是如果只有一个分片和一个消息的生产者,那么就相当于消息全局有序了。如果有多个消息生产者,就算只有一个分片,如果这些生产者的消息都发给这个分片,那kafka中的消息连局部有序都没有办法了。

partitions的设计目的有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions(备注:基于sharding),来消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.(具体原理参见下文).

 

Distribution

4、消费者组

一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置每个partition需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性.[replicas特性在0.8V才支持]

  Consumer Group(CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。

基于replicated方案,那么就意味着需要对多个备份进行调度;一个partition可以在多个server上备份,那么其中一个server作为此partiton的leader;leader负责此partition所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可..由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个”leader”,kafka会将”leader”均衡的分散在每个实例上,来确保整体的性能稳定.[备注:kafka中将leader角色权限下放到partition这个层级]

  Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。

图片 13

  Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the
first offset就是00000000000.kafka。

kafka-cluster

 

Producers

  每个group中可以有多个consumer,每个consumer属于一个consumer
group;通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高”故障容错”性,如果group中的某个consumer失效那么其消费的partitions将会有其他consumer自动接管。

Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息发送到哪个partition;如果一个Topic有多个partitions时,你需要选择partition是算法,比如基于”round-robin”方式或者通过其他的一些算法等.无论如何选择partition路由算法,我们最直接的目的就是希望消息能够均匀的发送给每个partition,这样可以让consumer消费的消息量也能”均衡”.

  对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer;那么一个group中所有的consumer将会交错的消费整个Topic,每个group中consumer消息消费互相独立,我们可以认为一个group是一个”订阅”者。

Consumers

  在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);一个Topic中的每个partions,只会被一个”订阅者”中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息。

本质上kafka只支持Topic.每个consumer属于一个consumer
group;反过来说,每个group中可以有多个consumer.对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的一个consumer消费,此消息不会发送给一个group的多个consumer;那么一个group中所有的consumer将会交错的消费整个Topic.

  kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的。

如果所有的consumer都具有相同的group,这种情况和JMS
queue模式很像;消息将会在consumers之间负载均衡.

 

如果所有的consumer都具有不同的group,那这就是”发布-订阅”;消息将会广播给所有的消费者.

Producer客户端负责消息的分发

图片 14

  kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含”集群中存活的servers列表”/”partitions
leader列表”等信息;

(摘自官网)

  当producer获取到metadata信息之后, producer将会和Topic下所有partition
leader保持socket连接

在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);每个group中consumer消息消费互相独立;我们可以认为一个group是一个”订阅”者,一个Topic中的每个partions,只会被一个”订阅者”中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时是顺序的.事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的.

  消息由producer直接通过socket发送到broker,中间不会经过任何”路由层”,事实上,消息被路由到哪个partition上由producer客户端决定;比如可以采用”random””key-hash””轮询”等,如果一个topic中有多个partitions,那么在producer端实现”消息均衡分发”是必要的。

通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高”故障容错”性,如果group中的某个consumer失效,那么其消费的partitions将会有其他consumer自动接管.

  在producer端的配置文件中,开发者可以指定partition路由的方式。

kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.

  Producer消息发送的应答机制设置发送数据是否需要服务端的反馈,有三个值0,1,-1

Guarantees

    0:producer不会等待broker发送ack

1)
发送到partitions中的消息将会按照它接收的顺序追加到日志中,无论一个partition由多少个log文件构成,那么它发送给consumer的顺序是一定的.

    1:当leader接收到消息之后发送ack

2) 对于消费者而言,它们消费消息的顺序和日志中消息顺序一致.

    -1:当所有的follower都同步消息成功后发送ack

3) 如果Topic的”replication
factor”为N,那么允许N-1个kafka实例失效.只要有一个replication存活,那么此partition的读写操作都不会中断.

  request.required.acks=0

1.2 Use cases

Messaging

和一些常规的消息系统相比,kafka仍然是个不错的选择;它具备partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的”事务性””消息传输担保(消息确认机制)””消息分组”等企业级特性;kafka只能使用作为”常规”的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)

Websit activity tracking

kafka可以作为”网站活性跟踪”的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等.

相关文章

No Comments, Be The First!
近期评论
    功能
    网站地图xml地图