阿里云Flink中cdc模式不支持开窗函数
阿里云Flink中cdc模式不支持开窗函数
·
测试语句
CREATE TEMPORARY TABLE user_source (
customer_id VARCHAR,
longitude VARCHAR,
latitude VARCHAR,
created VARCHAR ,
ts AS TO_TIMESTAMP(created) ,
rowtime AS PROCTIME() ,
WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --为Rowtime定义Watermark。
)
WITH (
'connector' = 'hologres',
'dbname' = 'db_test', -- 测试数据库
'tablename' = 'user_source',
'username' = '${secret_values.用户名参数}',
'password' = '${secret_values.密码参数}',
'endpoint' = '${secret_values.endpoint参数}',
'binlog' = 'true',
'cdcMode' = 'true', -- cdc模式不支持开窗
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
测试代码:
-- 统计埋点数据每个用户过去2小时的被获取经纬度次数,每30秒更新1次,即2小时的窗口,30秒滑动1次
SELECT
HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '2' HOUR),
HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '2' HOUR),
COUNT (1)
FROM user_source
where longitude IS not NULL
GROUP BY
HOP (ts, INTERVAL '30' SECOND, INTERVAL '2' HOUR),customer_id;
上面运行结果会报错:
org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[vvp, default, user_source]], fields=[api_code, customer_id, longitude, latitude, created])
报错原因:这个错误是cdc模式不支持开窗的问题
所以需要把原表中 'cdcMode' = 'true'去掉就好了
CREATE TEMPORARY TABLE user_source (
customer_id VARCHAR,
longitude VARCHAR,
latitude VARCHAR,
created VARCHAR ,
ts AS TO_TIMESTAMP(created) ,
rowtime AS PROCTIME() ,
WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --为Rowtime定义Watermark。
)
WITH (
'connector' = 'hologres',
'dbname' = 'db_test', -- 测试数据库
'tablename' = 'user_source',
'username' = '${secret_values.用户名参数}',
'password' = '${secret_values.密码参数}',
'endpoint' = '${secret_values.endpoint参数}',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
更多推荐
所有评论(0)