如何使用Kafka Connect创建用于处理实时数据的开源数据管道?

如何使用Kafka Connect创建用于处理实时数据的开源数据管道?

译文
作者:布加迪 2021-07-29 08:00:00

开发

前端

Kafka 本文介绍了如何使用完全开源的技术创建实时数据管道,这类开源技术包括 Kafka Connect、Apache Kafka和Kibana 等。

创新互联公司是一家专注于成都做网站、网站设计、外贸营销网站建设与策划设计,班玛网站建设哪家好?创新互联公司做网站,专注于网站建设十载,网设计领域的专业建站公司;建站业务涵盖:班玛等地区。班玛做网站价格咨询:18982081108

【51CTO.com快译】Kafka Connect是一种特别强大的开源数据流工具;有了它,将Kafka与其他数据技术结合使用非常轻松。作为一种分布式技术,Kafka Connect提供了特别高的可用性和独立于Kafka集群的弹性扩展。Kafka Connect使用源或sink连接件发送进出Kafka主题的数据,无需代码即可与多种非Kafka技术实现整合。

图1

可靠的开源Kafka连接件可供许多流行的数据技术使用,您还有机会编写自己的连接件。本文介绍了一个真实的实际数据用例,即如何使用Kafka Connect将来自Kafka的实时流数据与Elasticsearch(以启用索引Kafka记录的可扩展搜索)和Kibana(以便可视化那些结果)整合起来。

图2

针对表明Kafka和Kafka Connect优点的一个用例,我受到CDC新冠疫情数据跟踪器的启发。基于Kafka的跟踪器从多个位置、以多种格式并使用多种协议收集实时新冠病毒检测数据,并将这些事件处理成易于使用的可视化结果。跟踪器还有必要的数据治理机制,以确保结果快速到达,并值得信任。

我开始寻找一个同样复杂且引人注目的用例——但理想情况下,不像新冠疫情那样令人担忧。最终,我发现了一个有趣的领域:月潮,包括公开可用的流REST API和采用简单JSON格式的丰富数据。

月潮数据

潮汐遵循太阴日,这是一个24小时50分钟的周期;在此期间,地球完全自转到轨道卫星下方的同一点。每个太阴日有月球引力引起的两个高潮和两个低潮:

图3. 来自美国国家海洋和大气管理局

美国国家海洋和大气管理局(NOAA)提供了一个REST API,可以从全球潮汐站轻松获取详细的传感器数据。

图4

比如说,下列REST调用指定了潮汐站ID、数据类型(我选择了海平面)和数据(平均海平面),并请求一个采用公制单位的最近结果:

https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8724580&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json

该调用返回JSON结果,含有潮汐站的经纬度、时间和水位值。请注意,您必须记住您调用的是什么,以便了解所返回结果的数据类型、数据和单位!

  
 
 
 
  1. {"metadata": { 
  2.    "id":"8724580", 
  3.    "name":"Key West", 
  4.    "lat":"24.5508”, 
  5.    "lon":"-81.8081"}, 
  6.  "data":[{ 
  7.    "t":"2020-09-24 04:18", 
  8.    "v":"0.597", 
  9.       "s":"0.005", "f":"1,0,0,0", "q":"p"}]} 

启动数据管道(使用REST源连接件)

要开始创建Kafka Connect流数据管道,我们必须先准备Kafka集群和Kafka Connect集群。

图5

接下来,我们引入一个REST连接件,比如这个可用的开源连接件。我们会将其部署到AWS S3存储桶(如果需要,参照这些说明)。 然后我们将要求Kafka Connect集群使用S3存储桶,对它同步以便在集群中可见,配置连接件,最后让它运行起来。这种“BYOC”(自带连接件)方法确保您有无数的方法来寻找满足特定要求的连接件。

图6

下列示例演示使用“curl”命令将完全开源的Kafka Connect部署环境配置成可使用REST API。请注意,您需要更改URL、名称和密码以匹配您自己的部署:

  
 
 
 
  1. curl https://connectorClusterIP:8083/connectors -k -u name:password -X POST -H 'Content-Type: application/json' -d ' 
  2.     "name": "source_rest_tide_1", 
  3.     "config": { 
  4.       "key.converter":"org.apache.kafka.connect.storage.StringConverter", 
  5.       "value.converter":"org.apache.kafka.connect.storage.StringConverter", 
  6.       "connector.class": "com.tm.kafka.connect.rest.RestSourceConnector", 
  7.       "tasks.max": "1", 
  8.       "rest.source.poll.interval.ms": "600000", 
  9.       "rest.source.method": "GET", 
  10.       "rest.source.url": "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8454000&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json", 
  11.       "rest.source.headers": "Content-Type:application/json,Accept:application/json", 
  12.       "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector", 
  13.       "rest.source.destination.topics": "tides-topic" 
  14.     } 

该代码创建的连接件任务以10分钟为间隔轮询REST API,并将结果写入到“tides-topic”Kafka主题。通过随机选择五个潮汐传感器以这种方式收集数据,潮汐数据现在通过五个配置和五个连接件填充了潮汐主题。

图7

结束管道(使用Elasticsearch sink连接件)

为了将该潮汐数据放在某个地方,我们将在管道末端引入Elasticsearch集群和Kibana。 我们将配置一个开源Elasticsearch sink连接件,以便向Elasticsearch发送数据。

图8

以下示例配置使用sink名称、类、Elasticsearch索引和我们的Kafka主题。如果索引尚未存在,会创建一个有默认映射的索引。

  
 
 
 
  1. curl https://connectorClusterIP:8083/connectors -k -u name:password -X POST -H 'Content-Type: application/json' -d ' 
  2.   "name" : "elastic-sink-tides", 
  3.   "config" : 
  4.   { 
  5.     "connector.class" : "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector", 
  6.     "tasks.max" : 3, 
  7.     "topics" : "tides", 
  8.     "connect.elastic.hosts" : ”ip", 
  9.     "connect.elastic.port" : 9201, 
  10.     "connect.elastic.kcql" : "INSERT INTO tides-index SELECT * FROM tides-topic", 
  11.     "connect.elastic.use.http.username" : ”elasticName", 
  12.     "connect.elastic.use.http.password" : ”elasticPassword" 
  13.   } 
  14. }' 

该管道现在可运作起来。然而,由于默认索引映射,进入到Tides索引的所有潮汐数据是字符串。

图9

需要自定义映射以准确地绘制我们的时间序列数据。我们将为下面的潮汐索引创建这个自定义映射,使用JSON“t”字段用于自定义日期,“v”作为两倍数,“name”作为代表聚合的关键字。

  
 
 
 
  1. curl -u elasticName:elasticPassword ”elasticURL:9201/tides-index"  -X PUT -H 'Content-Type: application/json' -d' 
  2. "mappings" : { 
  3.   "properties" : { 
  4.      "data" : { 
  5.         "properties" : { 
  6.              "t" : { "type" : "date", 
  7.                      "format" : "yyyy-MM-dd HH:mm" 
  8.              }, 
  9.              "v" : { "type" : "double" }, 
  10.              "f" : { "type" : "text" }, 
  11.              "q" : { "type" : "text" }, 
  12.              "s" : { "type" : "text" } 
  13.              } 
  14.        }, 
  15.        "metadata" : { 
  16.           "properties" : { 
  17.              "id" : { "type" : "text" }, 
  18.              "lat" : { "type" : "text" }, 
  19.              "long" : { "type" : "text" }, 
  20.              "name" : { "type" : ”keyword" } }}}}         }' 

每次更改Elasticsearch索引映射时,通常都需要Elasticsearch“重新索引”(删除索引并重新索引所有数据)。数据既可以从现有的Kafka sink连接件重放,就像我们在这个用例中所做的那样,也可以使用Elasticsearch重新索引操作来获取。

使用Kibana可视化数据

为了可视化潮汐数据,我们先用Kibana创建一个索引模式,将“t”配置为时间过滤器字段。然后,我们将创建一个可视化,选择线图类型。最后,我们将配置图设置,以便y轴显示30分钟内的平均潮位,x 轴显示随时间变化的该数据。

结果是下图显示了五个样本潮汐站的潮汐变化,管道从这些潮汐站收集数据:

图10

结果

我们可以从可视化中清楚地看到潮汐的周期性,每个太阴日出现两次高潮。

图11

更令人惊讶的是,每个全球潮汐站的高潮和低潮之间的间隔不一样。这不仅受月球的影响,还受太阳、当地地理、天气和气候变化的影响。这个示例Kafka Connect管道利用Kafka、Elasticsearch和Kibana帮助演示可视化的优点:它们通常可以揭示原始数据无法揭示的信息!

网站栏目:如何使用Kafka Connect创建用于处理实时数据的开源数据管道?
文章URL:http://www.mswzjz.cn/qtweb/news48/24748.html

攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能