我为什么放弃Kafka,选择Pulsar?

我为什么放弃Kafka,选择Pulsar?

作者:闻数起舞 2021-02-01 07:20:51

开发

架构

开发工具

Kafka 最近,我一直在研究 Pulsar 及其与 Kafka 的比较。通过快速搜索,你会看到这两个最著名的开源消息传递系统之间正在进行的"战争"。

 最近,我一直在研究 Pulsar 及其与 Kafka 的比较。通过快速搜索,你会看到这两个最著名的开源消息传递系统之间正在进行的"战争"。

图片来自 Pexels

作为 Kafka 的用户,我着实对 Kafka 的某些问题感到困惑,但 Pulsar 却让人眼前一亮、令我非常兴奋。所以最后,我设法花了一些时间了解背景资料,并且做了很多研究。

在本文中,我将重点介绍 Pulsar 的优势,并说明 Pulsar 胜于 Kafka 的理由。让我们开始!

Kafka 基础知识

Kafka 是消息传递系统之王。它由 LinkedIn 于 2011 年创建,并在 Confluent 的支持下得到了广泛的传播。

Confluent 已向开源社区发布了许多新功能和附加组件,例如用于模式演化的 Schema Registry,用于从其他数据源轻松流式传输的 Kafka Connect 等。

数据库到 Kafka,Kafka Streams 进行分布式流处理,最近使用 KSQL 对 Kafka topic 执行类似 SQL 的查询等等。

Kafka 快速,易于安装,非常受欢迎,可用于广泛的范围或用例。从开发人员的角度来看,尽管 Apache Kafka 一直很友好,但在操作运维方面却是一团糟。

因此,让我们回顾一下 Kafka 的一些痛点:

Kafka 演示[2]

Kakfa的诸多痛点如下:

  • 扩展 Kafka 十分棘手,这是由于 broker 与存储数据的耦合架构结构所致。剥离一个 broker 意味着它必须复制 topic 分区和副本,这非常耗时。
  • 没有与租户完全隔离的本地多租户。
  • 存储会变得非常昂贵,尽管可以长时间存储数据,但是由于成本问题却很少用到它。
  • 万一副本不同步,有可能丢失消息。
  • 必须提前计划和计算 broker、topic、分区和副本的数量(确保计划的未来使用量增长),以避免扩展问题,这非常困难。
  • 如果仅需要消息传递系统,则使用偏移量可能会很复杂。
  • 集群重新平衡会影响相连的生产者和消费者的性能。
  • MirrorMaker[3] Geo 复制机制存在问题。像 Uber 这样的公司已经创建了自己的解决方案来克服这些问题。

如您所见,大多数问题与操作运维方面有关。尽管安装起来相对容易,但 Kafka 难以管理和调优。而且,它也缺乏应有的灵活和弹性。

Pulsar 基础知识

Pulsar 由 Yahoo!在 2013 年创建,并于 2016 年捐赠给 Apache 基金会。Pulsar 现在是 Apache 软件基金会的顶级项目。

Yahoo!、Verizon、Twitter 等公司已在生产中使用它来处理成千上万消息。它具有运行成本低、灵活等特性。Pulsar 旨在解决 Kafka 的大部分难题,使其更易于扩展。

Pulsar 非常灵活:它既可以应用于像 Kafka 这样的分布式日志应用场景,也可以应用于像 RabbitMQ 这样的纯消息传递系统场景。

它支持多种类型的订阅、多种交付保证、保留策略以及处理模式演变的方法,以及其他诸多特性。

Pulsar 架构图[4]

Pulsar 的特性如下:

  • 内置多租户,不同的团队可以使用相同的集群并将其隔离,解决了许多管理难题。它支持隔离、身份验证、授权和配额。
  • 多层体系结构:Pulsar 将所有 topic 数据存储在由 Apache BookKeeper 支持的专业数据层中。

存储和消息传递的分离解决了扩展、重新平衡和维护集群的许多问题。它还提高了可靠性,几乎不可能丢失数据。

  • 另外,在读取数据时可以直连 BookKeeper,且不影响实时摄取。例如,可以使用 Presto 对 topic 执行 SQL 查询,类似于 KSQL,但不会影响实时数据处理。
  • 虚拟 topic:由于采用 n 层体系结构,因此对 topic 的数量没有限制,topic 及其存储是分离的。用户还可以创建非持久性 topic。
  • N 层存储:Kafka 的一个问题是,存储费用可能变高。因此,它很少用于存储"冷"数据,并且消息经常被删除,Apache Pulsar 可以借助分层存储自动将旧数据卸载到 Amazon S3 或其他数据存储系统,并且仍然向客户端展示透明视图;Pulsar 客户端可以从时间开始节点读取,就像所有消息都存在于日志中一样。
  • Pulsar Function:易于部署、轻量级计算过程、对开发人员友好的 API,无需运行自己的流处理引擎(如 Kafka)。
  • 安全性:它具有内置的代理、多租户安全性、可插拔的身份验证等特性。
  • 快速重新平衡:分区被分为易于重新平衡的分片。
  • 服务器端重复数据删除和无效字段:无需在客户端中执行此操作,也可以在压缩期间删除重复数据。
  • 内置 Schema registry(架构注册表):支持多种策略,易于操作。
  • 地理复制和内置 Discovery:易于将集群复制到多个区域。
  • 集成的负载均衡器和 Prometheus 指标。
  • 多重集成:Kafka、RabbitMQ 等。
  • 支持多种编程语言,例如 GoLang、Java、Scala、Node、Python…...
  • 分片和数据分区在服务器端透明进行,客户端不需要了解分片与分区数据。

Pulsar 特性列表[5]

Pulsar 入门

Pulsar 入门非常容易,使用前提是安装 JDK。

①下载 Pulsar 并解压缩(备注:目前 Apache Pulsar 最新版本为 2.7.0):

  
 
 
 
  1. $ wget https://archive.apache.org/dist/pulsar/pulsar-2.6.1/apache-pulsar-2.6.1-bin.tar.gz 

②下载连接器(可选):

  
 
 
 
  1. $ wget https://archive.apache.org/dist/pulsar/pulsar-2.6.1/connectors/{connector}-2.6.1.nar 

③下载 nar 文件后,将文件复制到 Pulsar 目录中的 Connectors 目录。

④启动 Pulsar!

  
 
 
 
  1. $ bin/pulsar standalone 

Pulsar 提供了一个称为 Pulsar-Client 的 CLI 工具,我们可以使用它与集群进行交互。

生产消息:

  
 
 
 
  1. $ bin/pulsar-client produce my-topic --messages "hello-pulsar" 

消费消息:

  
 
 
 
  1. $ bin/pulsar-client consume my-topic -s "first-subscription" 

Akka 流示例

举一个客户端示例,我们在 Akka 上使用 Pulsar4s。

首先,我们需要创建一个 Source 来消费数据流,所需要的只是一个函数,该函数将按需创建消费者并查找消息 ID:

  
 
 
 
  1. val topic = Topic("persistent://standalone/mytopic") 
  2. val consumerFn = () => client.consumer(ConsumerConfig(topic, subscription)) 

然后,我们传递 ConsumerFn 函数来创建源:

  
 
 
 
  1. import com.sksamuel.pulsar4s.akka.streams._ 
  2. val pulsarSource = source(consumerFn, Some(MessageId.earliest)) 

Akka 源的物化值是 Control 的一个实例,该对象提供了一种"关闭"方法,可用于停止消费消息。现在,我们可以像往常一样使用 Akka Streams 处理数据。

要创建一个接收器:

  
 
 
 
  1. val topic = Topic("persistent://standalone/mytopic") 
  2. val producerFn = () => client.producer(ProducerConfig(topic)) 
  3. import com.sksamuel.pulsar4s.akka.streams._ 
  4. val pulsarSink = sink(producerFn) 

完整示例摘自 Pulsar4s[6]:

  
 
 
 
  1. object Example { 
  2.   import com.sksamuel.pulsar4s.{ConsumerConfig, MessageId, ProducerConfig, PulsarClient, Subscription, Topic} 
  3.   import org.apache.pulsar.client.api.Schema 
  4.   implicit val system: ActorSystem = ActorSystem() 
  5.   implicit val materializer: ActorMaterializer = ActorMaterializer() 
  6.   implicit val schema: Schema[Array[Byte]]= Schema.BYTES 
  7.   val client = PulsarClient("pulsar://localhost:6650") 
  8.   val intopic = Topic("persistent://sample/standalone/ns1/in") 
  9.   val outtopic = Topic("persistent://sample/standalone/ns1/out") 
  10.   val consumerFn = () => client.consumer(ConsumerConfig(topics = Seq(intopic), subscriptionName = Subscription("mysub"))) 
  11.   val producerFn = () => client.producer(ProducerConfig(outtopic)) 
  12.   val control = source(consumerFn, Some(MessageId.earliest)) 
  13.     .map { consumerMessage => ProducerMessage(consumerMessage.data) } 
  14.     .to(sink(producerFn)).run() 
  15.   Thread.sleep(10000) 
  16.   control.stop() 

Pulsar Function 示例

Pulsar Function 处理来自一个或多个 topic 的消息,对其进行转换并将结果输出到另一个 topic:

Pulsar Function[7]

可以在两个接口之间进行选择以编写函数:

  • 语言原生接口:不需要特定的 Pulsar 库或特殊的依赖项;无法访问上下文,仅支持 Java 和 Python。
  • Pulsar Function SDK:可用于 Java/Python/ Go,并提供更多功能,比如访问上下文对象。

只需编写一个简单的函数即可使用语言原生接口转换消息:

  
 
 
 
  1. def process(input): 
  2. return "{}!".format(input) 

用 Python 编写的这个简单函数只是向所有传入的字符串添加一个感叹号,并将结果字符串发布到 topic。

使用 SDK 需要导入依赖项,例如在 Go 中,我们可以编写:

  
 
 
 
  1. package main 
  2. import ( 
  3. "context" 
  4. "fmt" 
  5. "github.com/apache/pulsar/pulsar-function-go/pf" 
  6. func HandleRequest(ctx context.Context, in []byte) error { 
  7. fmt.Println(string(in) + "!") 
  8. return nil 
  9. func main() { 
  10. pf.Start(HandleRequest) 

如果要发布无服务器功能并将其部署到集群,可以使用 Pulsar-Admin CL;如果使用 Python,我们可以编写:

  
 
 
 
  1. $ bin/pulsar-admin functions create \ 
  2. --py ~/router.py \ 
  3. --classname router.RoutingFunction \ 
  4. --tenant public \ 
  5. --namespace default \ 
  6. --name route-fruit-veg \ 
  7. --inputs persistent://public/default/basket-items 
  8. Pulsar Function 的一个重要功能是用户可以在发布该函数时设置交付保证: 
  9. $ bin/pulsar-admin functions create \ 
  10. --name my-effectively-once-function \ 
  11. --processing-guarantees EFFECTIVELY_ONCE 

有以下选择:

Pulsar 的优势

与 Kafka 相比,让我们回顾下 Pulsar 的主要优势:

  • 更多功能:Pulsar Function、多租户、Schema registry、n 层存储、多种消费模式和持久性模式等。
  • 更大的灵活性:3 种订阅类型(独占,共享和故障转移),用户可以在一个订阅上管理多个 topic。
  • 持久性选项:非持久(快速)、持久、压缩(每个消息仅最后一个键),用户可以选择交付保证。Pulsar 具有服务器端重复数据删除和无效字样多保留政策和 TTL 的特性。
  • 无需提前定义扩展需求。
  • 支持队列与流两种消息消费模型,所以 Pulsar 既可以代替 RabbitMQ 也可以代替 Kafka。
  • 存储与 broker 分离,因此扩展性更好,重新平衡更快、更可靠。
  • 易于操作运维:架构解耦和 n 层存储。
  • 与 Presto 的 SQL 集成,可直接查询存储而不会影响 broker。
  • 借助 n 层自动存储选项,可以更低成本地存储。
  • 更快:基准测试[8]在各种情况下都表现出更好的性能。Pulsar 具有较低的延迟和更好的扩展功能。
  • Pulsar Function 支持无服务器计算,无需部署管理。
  • 集成 Schema registry。
  • 集成的负载平衡器和 Prometheus 指标。
  • 地理复制效果更好,更易于设置。Pulsar 内置 Discover-ability。
  • 创建 topic 数量没有限制。
  • 与 Kafka 兼容,易于集成。

Pulsar 的劣势

Pulsar 并不完美,Pulsar 也存在一些问题:

  • 相对缺乏支持、文档和案例。
  • n 层体系结构导致需要更多组件:BookKeeper。
  • 插件和客户端相对 Kafka 较少。
  • 云中的支持较少,Confluent 具有托管云产品。

不过,上面的情况都在得到快速改善,目前 Pulsar 也逐渐被越来越多的公司和组织使用。

Apache Pulsar 商业支持公司 StreamNative 也推出了 StreamNative Cloud,Apache Pulsar 正在快速成长,我们都可以看到令人欣喜的变化。

Confluent 曾发布博客对比 Pulsar 和 Kafka ,但请注意,这些问题可能有偏见。

Pulsar 使用场景

Pulsar 可用于广泛的场景:

  • 发布/订阅队列消息传递。
  • 分布式日志。
  • 事件溯源,用于永久性事件存储。
  • 微服务。
  • SQL 分析。
  • Serverless 功能。

什么时候应该考虑 Pulsar?

  • 同时需要像 RabbitMQ 这样的队列和 Kafka 这样的流处理程序。
  • 需要易用的地理复制。
  • 实现多租户,并确保每个团队的访问权限。
  • 需要长时间保留消息,并且不想将其卸载到另一个存储中。
  • 需要高性能,基准测试表明 Pulsar 提供了更低的延迟和更高的吞吐量。

如果在云端,请注意考虑基于云的解决方案。云提供商拥有涵盖某些场景的不同服务。

例如,对于队列消息,云提供商提供了许多服务,比如 Google pub / sub;对于分布式日志,有 Confluent 云或 AWS Kinesis;StreamNative 也提供了基于 Pulsar 的云端服务。

云提供商还提供了非常好的安全性。Pulsar 的优势在于可以在一个平台上提供许多功能。

一些团队可能将其用作微服务的消息传递系统,而另一些团队则将其用作数据处理的分布式日志。

结论

我是 Kafka 的忠实粉丝,我对 Pulsar 如此感兴趣的原因是:竞争驱动创新。

Kafka 是一种成熟,富有弹性且经过考验的产品,在世界范围内获得了巨大成功,无法想象大多数公司没有它会怎样。

但是我确实看到 Kafka 成为其自身成功的受害者,由于需要支持许多大型公司导致巨大的增长减慢了功能开发的速度、移除 ZooKeeper 依赖项等重要功能花费的时间太长,这为诸如 Pulsar 等工具蓬勃发展创造了空间。

Pulsar 虽然年轻却势头很猛,在将 Pulsar 纳入组织之前,需进行分析、基准测试、研究并进行 POC。

从小处着手,在将 Kafka 迁移到 Pulsar 之前进行概念验证,并在决定进行完全迁移之前评估影响。

引用链接:

  • [1] 《Pulsar Advantages Over Kafka》:

https://itnext.io/pulsar-advantages-over-kafka-7e0c2affe2d6

  • [2] Kafka 演示:

https://talks.rmoff.net/pZC6Za/slides

  • [3] MirrorMaker:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330

  • [4] Pulsar 架构图:

https://pulsar.apache.org/docs/en/concepts-architecture-overview/

  • [5] Pulsar 特性列表:

https://pulsar.apache.org/

  • [6] Pulsar4s:

https://github.com/sksamuel/pulsar4s/blob/master/pulsar4s-akka-streams/src/test/scala/com/sksamuel/pulsar4s/akka/streams/Example.scala

  • [7] Pulsar Function:

https://pulsar.apache.org/docs/en/functions-overview/

  • [8] 基准测试:

https://medium.com/swlh/performance-comparison-between-apache-pulsar-and-kafka-latency-79fb0367f407

作者:闻数起舞

编辑:陶家龙

出处:转载自 Java 高级架构,原中文版本由闻数起舞翻译自 Lewis Fairweather 的文章《Pulsar Advantages Over Kafka》[1],文章转载时有改动。

网页名称:我为什么放弃Kafka,选择Pulsar?
本文链接:http://www.mswzjz.cn/qtweb/news27/314027.html

攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能