测试语句

CREATE TEMPORARY TABLE user_source (
  customer_id VARCHAR,   
  longitude VARCHAR,
  latitude VARCHAR,                
  created VARCHAR ,
  ts AS TO_TIMESTAMP(created) ,
 rowtime AS PROCTIME() ,
  WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --为Rowtime定义Watermark。
)
WITH (
    'connector' = 'hologres',
    'dbname' = 'db_test',    -- 测试数据库
    'tablename' = 'user_source',
    'username' = '${secret_values.用户名参数}',
    'password' = '${secret_values.密码参数}',
    'endpoint' = '${secret_values.endpoint参数}',
    'binlog' = 'true',
    'cdcMode' = 'true',  -- cdc模式不支持开窗
    'binlogMaxRetryTimes' = '10',
    'binlogRetryIntervalMs' = '500',
    'binlogBatchReadSize' = '100'
  );

测试代码:

-- 统计埋点数据每个用户过去2小时的被获取经纬度次数,每30秒更新1次,即2小时的窗口,30秒滑动1次
SELECT
 HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '2' HOUR),
    HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '2' HOUR),
    COUNT (1)
FROM user_source
where longitude IS not NULL
GROUP BY
    HOP (ts, INTERVAL '30' SECOND, INTERVAL '2' HOUR),customer_id;   

上面运行结果会报错:

org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[vvp, default, user_source]], fields=[api_code, customer_id, longitude, latitude, created])

报错原因:这个错误是cdc模式不支持开窗的问题

所以需要把原表中 'cdcMode' = 'true'去掉就好了

CREATE TEMPORARY TABLE user_source (
  customer_id VARCHAR,   
  longitude VARCHAR,
  latitude VARCHAR,                
  created VARCHAR ,
  ts AS TO_TIMESTAMP(created) ,
 rowtime AS PROCTIME() ,
  WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --为Rowtime定义Watermark。
)
WITH (
    'connector' = 'hologres',
    'dbname' = 'db_test',    -- 测试数据库
    'tablename' = 'user_source',
    'username' = '${secret_values.用户名参数}',
    'password' = '${secret_values.密码参数}',
    'endpoint' = '${secret_values.endpoint参数}',
    'binlog' = 'true',
    'binlogMaxRetryTimes' = '10',
    'binlogRetryIntervalMs' = '500',
    'binlogBatchReadSize' = '100'
  );

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐