十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
这篇文章主要讲解了“flinksql表的查询转换方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“flinksql表的查询转换方法”吧!
网站建设公司,为您提供网站建设,网站制作,网页设计及定制网站建设服务,专注于企业网站制作,高端网页制作,对成都服务器托管等多个行业拥有丰富的网站建设经验的网站建设公司。专业网站设计,网站优化推广哪家好,专业成都网站推广优化,H5建站,响应式网站。
package com.jd.data; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.OldCsv; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; import scala.Tuple3; import static org.apache.flink.table.api.Expressions.$; public class TableAipDemo03 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1、创建表执行环节 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String path = "/Users/liuhaijing/Desktop/flinktestword/aaa.txt"; tableEnv.connect(new FileSystem().path(path)) .withFormat(new OldCsv()) // 定义格式化方法 .withSchema(new Schema().field("a", DataTypes.STRING()) // 定义表的结构 .field("b", DataTypes.STRING()) .field("c", DataTypes.STRING()) ) .createTemporaryTable("xxx"); // 表的查询与转换 Table xxx = tableEnv.from("xxx"); // 简单查询 Table select = xxx.select("a, b").filter($("a").isEqual("a")); Table select2 = select.groupBy($("a")) .select($("a"), $("a").count().as("count")); select2.printSchema(); // 可撤回的方式 输出结果前面会有一列 boolean true 表示以这次为准, false为作废 tableEnv.toRetractStream(select2, Row.class ).print(); env.execute("job"); } }
感谢各位的阅读,以上就是“flinksql表的查询转换方法”的内容了,经过本文的学习后,相信大家对flinksql表的查询转换方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!