Apache Flink和 Redis 是两个强大的工具,可以一起使用来构建可以处理大量数据的实时数据处理管道。Flink 为处理数据流提供了一个高度可扩展和容错的平台,而 Redis 提供了一个高性能的内存数据库,可用于存储和查询数据。在本文中,将探讨如何使用 Flink 来使用异步函数调用 Redis,并展示如何使用它以非阻塞方式将数据推送到 Redis。
成都创新互联长期为上千余家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为吴川企业提供专业的成都网站建设、做网站,吴川网站改版等技术服务。拥有10余年丰富建站经验和众多成功案例,为您定制开发。
“Redis:不仅仅是一个缓存
Redis 是一种功能强大的 NoSQL 内存数据结构存储,已成为开发人员的首选工具。虽然它通常被认为只是一个缓存,但 Redis 远不止于此。它可以作为数据库、消息代理和缓存三者合一。
Redis 的优势之一是它的多功能性。它支持各种数据类型,包括字符串、列表、集合、有序集合、哈希、流、HyperLogLogs 和位图。Redis 还提供地理空间索引和半径查询,使其成为基于位置的应用程序的宝贵工具。
Redis 的功能超出了它的数据模型。它具有内置的复制、Lua 脚本和事务,并且可以使用 Redis Cluster 自动分区数据。此外,Redis 通过 Redis Sentinel 提供高可用性。
注意:在本文中,将更多地关注Redis集群模式
Redis 集群使用带哈希槽的算法分片来确定哪个分片拥有给定的键并简化添加新实例的过程。同时,它使用 Gossiping 来确定集群的健康状况,如果主节点没有响应,可以提升辅助节点以保持集群健康。必须有奇数个主节点和两个副本才能进行稳健设置,以避免脑裂现象(集群无法决定提升谁并最终做出分裂决定)
为了与 Redis 集群对话,将使用lettuce和 Redis Async Java 客户端。
Apache Flink 是一个开源、统一的流处理和批处理框架,旨在处理实时、高吞吐量和容错数据处理。它建立在 Apache Gelly 框架之上,旨在支持有界和无界流上的复杂事件处理和有状态计算,它的快速之处在于其利用内存中性能和异步检查本地状态。
与数据库的异步交互是流处理应用程序的游戏规则改变者。通过这种方法,单个函数实例可以同时处理多个请求,从而允许并发响应并显着提高吞吐量。通过将等待时间与其他请求和响应重叠,处理管道变得更加高效。
我们将以电商数据为例,计算24小时滑动窗口中每个品类的销售额,滑动时间为30秒,下沉到Redis,以便更快地查找下游服务。
充足的数据集
Category, TimeStamp
Electronics,1679832334
Furniture,1679832336
Fashion,1679832378
Food,16798323536
Flink Kafka 消费者类
package Aysnc_kafka_redis;
import AsyncIO.RedisSink;
import akka.japi.tuple.Tuple3;
import deserializer.Ecommdeserialize;
import model.Ecomm;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;
public class FlinkAsyncRedis {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Ecommdeserialize jsonde = new Ecommdeserialize();
KafkaSourcesource = KafkaSource. builder()
.setTopics("{dummytopic}")
.setBootstrapServers("{dummybootstrap}")
.setGroupId("test_flink")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(jsonde)
.build();
DataStreamorderData = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
orderData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
@Override
public long extractTimestamp(Ecomm element) {
return element.getEventTimestamp(); // extract watermark column from stream
}
});
SingleOutputStreamOperator> aggregatedData = orderData.keyBy(Ecomm::getCategory)
.window(SlidingEventTimeWindows.of(Time.hours(24),Time.seconds(30)))
.apply((WindowFunction, String, TimeWindow>) (key, window, input, out) -> {
long count = 0;
for (Ecomm event : input) {
count++; // increment the count for each event in the window
}
out.collect(new Tuple3<>(key, window.getEnd(), count)); // output the category, window end time, and count
});
// calling async I/0 operator to sink data to redis in UnOrdered way
SingleOutputStreamOperatorsinkResults = AsyncDataStream.unorderedWait(aggregatedData,new RedisSink(
"{redisClusterUrl}"),
1000, // the timeout defines how long an asynchronous operation take before it is finally considered failed
TimeUnit.MILLISECONDS,
100); //capacity This parameter defines how many asynchronous requests may be in progress at the same time.
sinkResults.print(); // print out the redis set response stored in the future for every key
env.execute("RedisAsyncSink"); // you will be able to see your job running on cluster by this name
}
}
Redis 设置键异步 I/0 运算符
package AsyncIO;
import akka.japi.tuple.Tuple3;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import lombok.AllArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import scala.collection.immutable.List;
import java.util.ArrayList;
import java.util.Collections;
@AllArgsConstructor
public class RedisSink extends RichAsyncFunction, String> {
String redisUrl;
public RedisSink(String redisUrl){
this.redisUrl=redisUrl;
}
private transient RedisClusterClient client = null;
private transient StatefulRedisClusterConnectionclusterConnection = null;
private transient RedisAdvancedClusterAsyncCommandsasyncCall = null;
// method executes any operator-specific initialization
@Override
public void open(Configuration parameters) {
if (client == null ) {
client = RedisClusterClient.create(redisUrl);
}
if (clusterConnection == null) {
clusterConnection = client.connect();
}
if (asyncCall == null) {
asyncCall = clusterConnection.async();
}
}
// core logic to set key in redis using async connection and return result of the call via ResultFuture
@Override
public void asyncInvoke(Tuple3stream, ResultFuture resultFuture) {
String productKey = stream.t1();
System.out.println("RedisKey:" + productKey); //for logging
String count = stream.t3().toString();
System.out.println("Redisvalue:" + count); //for logging
RedisFuturesetResult = asyncCall.set(productKey,count);
setResult.whenComplete((result, throwable) -> {if(throwable!=null){
System.out.println("Callback from redis failed:" + throwable);
resultFuture.complete(new ArrayList<>());
}
else{
resultFuture.complete(new ArrayList(Collections.singleton(result)));
}});
}
// method closes what was opened during initialization to free any resources
// held by the operator (e.g. open network connections, io streams)
@Override
public void close() throws Exception {
client.close();
}
}
当前题目:像Flink一样使用Redis
地址分享:http://www.mswzjz.cn/qtweb/news29/253029.html
攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能