Apache Kafka是一种分布式流处理平台,可以处理来自多个来源的数据流,使其能够以实时和快速的方式处理和存储大量的数据。与其他流式处理平台不同的是,Kafka采用发布和订阅的架构,使其更加具有可扩展性和可靠性,能够长时间运行而不会导致系统崩溃或数据丢失。本文将介绍如何使用Kafka发送ON数据到数据库中。
1. 安装Kafka
要运行Kafka,您需要在计算机上安装Kafka的服务器和客户端。可以通过使用以下命令来安装Kafka:
“`
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.13-2.7.0.tgz
“`
下载完成之后,解压文件并进入Kafka目录。
2. 创建主题
Kafka中的数据是按照主题划分的,因此我们需要创建一个新的主题来存储ON数据。可以使用以下命令来创建一个名为“json-topic”的主题:
“`
./bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic json-topic
“`
3. 发送ON数据
现在我们可以使用Kafka的生产者API来发送ON数据。以下是一个示例生产者代码,用于将ON数据发送到“json-topic”主题:
“`java
Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“acks”, “all”);
props.put(“retries”, 0);
props.put(“batch.size”, 16384);
props.put(“linger.ms”, 1);
props.put(“buffer.memory”, 33554432);
props.put(“key.serializer”, “org.apache.kafkmon.serialization.StringSerializer”);
props.put(“value.serializer”, “org.apache.kafkmon.serialization.StringSerializer”);
Producer producer = new KafkaProducer(props);
String jsonString = “{\”name\”:\”John Smith\”, \”age\”:30, \”city\”:\”New York\”}”;
producer.send(new ProducerRecord(“json-topic”, jsonString));
producer.close();
“`
以上代码通过Kafka生产者API,将一个ON数据字符串发送到名为“json-topic”的主题中。在实际应用中,可以根据业务需求,开发相应的生产者代码。
4. 接收ON数据
一旦我们成功发送ON数据之后,就需要从Kafka中拉取数据,并将其存储到数据库中。以下是一个示例消费者代码,用于从“json-topic”主题中拉取数据,然后将其存储到SQL数据库中:
“`java
Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“acks”, “all”);
props.put(“retries”, 0);
props.put(“batch.size”, 16384);
props.put(“linger.ms”, 1);
props.put(“buffer.memory”, 33554432);
props.put(“key.deserializer”, “org.apache.kafkmon.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafkmon.serialization.StringDeserializer”);
props.put(“group.id”, “test”);
Consumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(“json-topic”));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord record : records) {
String jsonString = record.value();
// 解析ON数据并存储到SQL数据库中
// …
}
}
consumer.close();
“`
在实际应用中,我们可以根据业务需求,开发相应的消费者代码,将数据存储到数据库中。
5. 结论
成都网站建设公司-创新互联为您提供网站建设、网站制作、网页设计及定制高端网站建设服务!
需求就是将流量数据(json格式)中某个接口数据抽取一下。如:有个identityUri=”yiyang/user/getById/13782″ , 这里的13782,是个userId,我们需要将其处理成 identityUri=”yiyang/user/getById/{}”
实际上我们生产中是将二者接口使用的。先使用2,如果没有匹配到,在使用1
这里是演示flink kafka的用法,我们简单使用正则处理
注意:kafka消费的方式是: kafkaConsumer.setStartFromGroupOffsets();
看下上面的启动日志,有这样的信息:Resetting offset for partition yiyang-0 to offset 22.
我们另外启动一个程序,发送消息,并消费两个topic中的数据
看下 ConsumeKafkaTest 中的日志
在看下另外一个服务(消费两个topic数据)的日志:
说明已经成功的把处理好的消息发送到另外一个topic中了
关于数据处理,如果只是简单的增加字段,减少字段,正则替换,也可以使用logstash工具
kafka 发送json数据库的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于kafka 发送json数据库,使用Kafka发送ON到数据库,flink处理数据从kafka到另外一个kafka的信息别忘了在本站进行查找喔。
香港云服务器机房,创新互联(www.cdcxhl.com)专业云服务器厂商,回大陆优化带宽,安全/稳定/低延迟.创新互联助力企业出海业务,提供一站式解决方案。香港服务器-免备案低延迟-双向CN2+BGP极速互访!
网页题目:使用Kafka发送ON到数据库(kafka发送json数据库)
文章地址:http://www.mswzjz.cn/qtweb/news36/381086.html
攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能