十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
本篇文章为大家展示了Flink开发怎样进行实时处理应用程序,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
专注于为中小企业提供成都网站设计、网站制作、外贸营销网站建设服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业准格尔免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了上1000家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。
JDK:1.8
Maven:3.6.1(最低Maven 3.0.4)
使用上一节中的springboot-flink-train项目
第一步:创建流处理上下文环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
第二步:读取数据,使用socket流方式读取数据
DataStreamSourcetext = env.socketTextStream("192.168.152.45", 9999);
第三步:transform
text.flatMap(new FlatMapFunction>() { @Override public void flatMap(String value, Collector > out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2 (token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();
这里我们使用逗号分隔,然后跟批处理不同的是,这里使用keyBy(0),而不是groupBy(0)。timewindow表示每隔多久执行一次。
第四步:执行
env.execute("StreamingWCJavaApp");
整体代码如下:
/** * 使用Java API来开发Flink的实时处理应用程序 * wc统计的数据源自socket */ public class StreamingWCJava02App { public static void main(String[] args) throws Exception { // 获取参数 int port; try{ ParameterTool tool = ParameterTool.fromArgs(args); port = tool.getInt("port"); } catch (Exception e) { System.out.println("端口未设置, 使用默认端口9999"); port = 9999; } // step1: 获取流处理上下文环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // step2: 读取数据 DataStreamSourcetext = env.socketTextStream("192.168.152.45", port); // step3: transform text.flatMap(new FlatMapFunction >() { @Override public void flatMap(String value, Collector > out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2 (token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print(); env.execute("StreamingWCJavaApp"); } }
首先在192.168.152.45上运行命令
nc -l 9999
然后在运行main方法。在192.168.152.45的nc上输入
abc,def,abc,ddd
在idea控制台输出如下:
4> (abc,2) 1> (def,1) 4> (ddd,1)
这个前面的"4>"表示并行度。我们可以设置setParallelism(1)来忽略这个问题。如下所示:
text.flatMap(new FlatMapFunction>() { @Override public void flatMap(String value, Collector > out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2 (token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
这样控制台的打印结果如下:
(abc,2) (ddd,1) (def,1)
这样一个简单的demo就成功了!
上面的代码中localhost与port需要用参数传递进来。
代码如下:
// 获取参数 int port; try{ ParameterTool tool = ParameterTool.fromArgs(args); port = tool.getInt("port"); } catch (Exception e) { System.out.println("端口未设置, 使用默认端口9999"); port = 9999; }
使用Flink提供的ParameterTool来接收参数。
我们在运行时就可以指定参数列表了,其中的key必须以“-”或者“--”开头。
在运行时,配置参数:
这样运行就可以从外界传递参数了
接下来使用Scala方式实现,在项目springboot-flink-train-scala中新建StreamingWCScalaApp,内容如下:
/** * 使用Scala开发Flink的实时处理应用程序 */ object StreamingWCScalaApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 引入隐式转换 import org.apache.flink.api.scala._ val text = env.socketTextStream("192.168.152.45", 9999) text.flatMap(_.split(",")) .map((_,1)) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .print() .setParallelism(1) env.execute("StreamingWCScalaApp"); } }
这种方式比java实现更加简洁。
上述内容就是Flink开发怎样进行实时处理应用程序,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。