
FlinkSQL建表语句与插入语句
FlinkSQL来构建实时数仓,其思路大概如下:Flink的Table API提供了对kafka/jdbc/hbase等实时开发涉及到的组件的支持,以kafka为例,将kafka topic抽象成Flink Table,如下:
FlinkSQL读数据建表语句
CREATE TABLE flink_rtdw.demo.kafka_source_table ( topic STRING, bidWord STRING, planID STRING, eventTime INTEGER, procTime AS PROCTIME(), ets AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime)), WATERMARK FOR ets AS ets - INTERVAL '1' MINUTE ) WITH ( 'connector' = 'kafka', 'topic' = 'ba.join.shbt2.search-ocpc-click', 'properties.bootstrap.servers' = ‘Kafka-broker', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' );FlinkSQL写数据建表语句
CREATE TABLE flink_rtdw.demo.kafka_sink_table ( window_time BIGINT, topic STRING, bid_word_count BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'ultron.demo.shbt2.into.shbt2.tumlewindow.dev', 'properties.bootstrap.servers' = ‘kafka-broker', 'format' = 'json' );读取kafka_source_table中数据根据指标统计写如kafka_source_table 统计一秒滚动窗口出现次数
INSERT INTO flink_rtdw.demo.kafka_sink_table SELECT UNIX_TIMESTAMP( DATE_FORMAT( TUMBLE_START(procTime, INTERVAL '1' MINUTE), 'yyyy-MM-dd HH:mm:ss' ) ) * 1000 as window_time, topic, COUNT(bidWord) FROM flink_rtdw.demo.kafka_source_table GROUP BY TUMBLE(procTime, INTERVAL '1' MINUTE), topic;👁️ 阅读量:0
© 版权声明:本文《FlinkSQL建表语句与插入语句》内容均为本站精心整理或网友自愿分享,如需转载请注明原文出处:https://www.zastudy.cn/wen/1687027752a418984.html。