关于flinksql 的over操作的疑问

问题遇到的现象和发生背景

一个关于flinksql的开窗函数的问题

遇到的现象和发生背景,请写出第一个错误信息

我希望像hive一样,做一个开窗函数,排序是没问题的,但是sum就不支持

用代码块功能插入代码,请勿粘贴截图。 不用代码块回答率下降 50%

这是我的建表语句

CREATE TABLE `my_hive`.`dl_ods`.`oms_test001` (

 is_delete                          VARCHAR(2147483647)
,oms_order_id                       VARCHAR(2147483647)
,channel_order_id                   VARCHAR(2147483647)
,third_order_id                     VARCHAR(2147483647)
,user_id                            VARCHAR(2147483647)
,business_id                        VARCHAR(2147483647)
,store_id                           VARCHAR(2147483647)
,order_status                       VARCHAR(2147483647)
,order_status_cur                   VARCHAR(2147483647)
,order_status_platform              VARCHAR(2147483647)
,channel                            VARCHAR(2147483647)
,order_create_time                  VARCHAR(2147483647)
,order_update_time                  VARCHAR(2147483647)
,order_pay_time                     VARCHAR(2147483647)
,order_pre_start_delivery_time      VARCHAR(2147483647)
,order_pre_end_delivery_time        VARCHAR(2147483647)
,order_cancel_time                  VARCHAR(2147483647)
,buyer_full_name                    VARCHAR(2147483647)
,buyer_full_address                 VARCHAR(2147483647)
,buyer_telephone                    VARCHAR(2147483647)
,buyer_mobile                       VARCHAR(2147483647)
,delivery_carrier_no                VARCHAR(2147483647)
,delivery_carrier_name              VARCHAR(2147483647)
,delivery_bill_no                   VARCHAR(2147483647)
,complete_time                      VARCHAR(2147483647)
,pay_channel                        VARCHAR(2147483647)
,order_total_money                  VARCHAR(2147483647)
,order_actual_money                 VARCHAR(2147483647)
,order_coupon_amount                VARCHAR(2147483647)
,order_coupon_type                  VARCHAR(2147483647)
,buyer_lng                          VARCHAR(2147483647)
,buyer_lat                          VARCHAR(2147483647)
,coordinate_type                    VARCHAR(2147483647)
,buyer_province                     VARCHAR(2147483647)
,buyer_city                         VARCHAR(2147483647)
,buyer_area                         VARCHAR(2147483647)
,buyer_address                      VARCHAR(2147483647)
,order_buyer_remark                 VARCHAR(2147483647)
,invoice_type                       VARCHAR(2147483647)
,invoice_title                      VARCHAR(2147483647)
,invoice_duty_no                    VARCHAR(2147483647)
,invoice_mail                       VARCHAR(2147483647)
,invoice_title_type                 VARCHAR(2147483647)
,invoice_desc                       VARCHAR(2147483647)
,invoice_content                    VARCHAR(2147483647)
,attribue                           VARCHAR(2147483647)
,order_refund_amount                VARCHAR(2147483647)
,order_refund_reason                VARCHAR(2147483647)
,order_refund_time                  VARCHAR(2147483647)
,version                            VARCHAR(2147483647)
,status                             VARCHAR(2147483647)
,gmt_create                         VARCHAR(2147483647)
,gmt_update                         VARCHAR(2147483647)
,merchant_code                      VARCHAR(2147483647)
,express_fee                        VARCHAR(2147483647)
,package_fee                        VARCHAR(2147483647)
,express_coupon_amount              VARCHAR(2147483647)
,store_name                         VARCHAR(2147483647)
,is_self_pick                       VARCHAR(2147483647)
,invoice_money                      VARCHAR(2147483647)
,delivery_type                      VARCHAR(2147483647)
,is_collage                         VARCHAR(2147483647)
,collage_status                     VARCHAR(2147483647)
,collage_complete_time              VARCHAR(2147483647)
,store_phone                        VARCHAR(2147483647)
,store_address                      VARCHAR(2147483647)
,pos_order_id                       VARCHAR(2147483647)
,promotion_discounts                VARCHAR(2147483647)
,channel_third_order_id             VARCHAR(2147483647)
,is_split                           VARCHAR(2147483647)
,order_type                         VARCHAR(2147483647)
,buyer_town                         VARCHAR(2147483647)
,order_goods_type                   VARCHAR(2147483647)
,buyer_nick                         VARCHAR(2147483647)
,buyer_message                      VARCHAR(2147483647)
,seller_memo                        VARCHAR(2147483647)
,delivery_time                      VARCHAR(2147483647)
,pay_product_type                   VARCHAR(2147483647)
,is_self_delivery                   VARCHAR(2147483647)
,third_express_fee                  VARCHAR(2147483647)
,is_prescription                    VARCHAR(2147483647)
,pres_audit_status                  VARCHAR(2147483647)
,pos_trans_time                     VARCHAR(2147483647)
,pos_refund_time                    VARCHAR(2147483647)
,refund_type                        VARCHAR(2147483647)
,origin_oms_order_id                VARCHAR(2147483647)
,refund_sort                        VARCHAR(2147483647)
,refunded_amount                    VARCHAR(2147483647)
,zdt_user_id                        VARCHAR(2147483647)
,zdt_user_name                      VARCHAR(2147483647)
,cashier_no                         VARCHAR(2147483647)
,cashier_name                       VARCHAR(2147483647)
,is_print_ticket                    VARCHAR(2147483647)
,delivery_mode                      VARCHAR(2147483647)
,advance_booking_type               VARCHAR(2147483647)
,advance_booking_amount             VARCHAR(2147483647)
,advance_send_time                  VARCHAR(2147483647)
,is_frozen                          VARCHAR(2147483647)
,order_sign                         VARCHAR(2147483647)
,delivery_carrier_code              VARCHAR(2147483647)
,manual_confirmation_type           VARCHAR(2147483647)
,manual_confirmation_type_desc      VARCHAR(2147483647)
,exceptoin_type                     VARCHAR(2147483647)
,exceptoin_type_desc                VARCHAR(2147483647)
,real_express_bill_no               VARCHAR(2147483647)
,member_no                          VARCHAR(2147483647)
,channel_store_id                   VARCHAR(2147483647)
,express_type                       VARCHAR(2147483647)
,proctime AS PROCTIME()
,ts AS TO_TIMESTAMP(gmt_create)
)WITH (       

 'properties.auto.commit.interval.ms' = '1000',
'canal-json.ignore-parse-errors' = 'true',
 'format' = 'canal-json',
'properties.bootstrap.servers' = 'xxxx',
'connector' = 'kafka',
'topic' = 'xxxx',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset.strategy' = 'earliest',
'properties.group.id' = 'oms_test001',
'properties.enable.auto.commit' = 'true'
)
;
这是我的sql


select  *
from  
(
select 
store_id 
,oms_order_id
,ROW_NUMBER() OVER (PARTITION BY  oms_order_id order by gmt_create desc) as row_num
from 
my_hive.dl_ods.oms_test001
) t 
where row_num = 1
;

select  *
from  
(
select 
store_id 
,oms_order_id
,sum(cast(express_fee as decimal(32,2))) OVER (PARTITION BY  oms_order_id order by gmt_create desc) as express_fee
from 
my_hive.dl_ods.oms_test001
) t 
where express_fee > 1
;

 
select  *
from  
(
select 
store_id 
,oms_order_id
,sum(cast(express_fee as decimal(32,2))) OVER (PARTITION BY  oms_order_id order by gmt_create RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) as express_fee
from 
my_hive.dl_ods.oms_test001
) t 
where express_fee > 1
;
运行结果及详细报错内容

img

img

img

我的解答思路和尝试过的方法,不写自己思路的,回答率下降 60%

从报错上来看,是因为我的数据源支持删除操作导致的。

我想要达到的结果,如果你需要快速回答,请尝试 “付费悬赏”

img

我的疑问是,像我的数据源这种支持删除操作,又不断写入数据的表是不是属于流表?该怎么称呼?
我查了官网,flink是支持sum() over() 这种操作的,关于flink的窗口函数在什么条件下是支持的?什么条件下是不支持的?我这个表要想支持这个操作该怎么转换?

看起来比较像是因为你在开窗查询的时候记录发生了更新或删除

hive的sql和 flink的sql语法是不一样的,不能做对比。

在Flink SQL中,使用OVER操作时可能会出现以下几种报错情况:

1."OVER" is not supported:这个错误通常发生在Flink版本较旧的情况下,因为在较早的版本中可能不支持OVER操作。解决方法是升级到较新的Flink版本。

2.OVER window can only be used with a window alias:这个错误通常发生在没有为窗口定义别名时。在使用OVER操作之前,需要先为窗口定义别名,例如:SELECT ... FROM table_name WINDOW w AS (PARTITION BY ... ORDER BY ... ROWS BETWEEN ... AND ...)

3.OVER window with unbounded preceding/following is not supported:这个错误通常发生在使用未限定的preceeding或following时。在OVER操作中,必须指定preceeding和following的边界,例如:ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING。

4.OVER window with ORDER BY on a non-time attribute is not supported:这个错误通常发生在使用非时间属性进行ORDER BY时。在OVER操作中,ORDER BY必须基于时间属性,例如:ORDER BY event_time。

5.OVER window with GROUP BY is not supported:这个错误通常发生在同时使用GROUP BY和OVER操作时。在Flink SQL中,不支持在OVER窗口中使用GROUP BY。

6.OVER window with DISTINCT is not supported:这个错误通常发生在同时使用DISTINCT和OVER操作时。在Flink SQL中,不支持在OVER窗口中使用DISTINCT。

如果遇到以上报错情况,可以根据错误信息进行相应的调整和修改,以满足Flink SQL的语法要求。

RANGE子句只能用于数值或日期时间类型的列,不能用于字符串或其他类型的列。 你的ORDER BY列是gmt_create,是一个字符串类型的列,所以不能用RANGE子句来定义窗口范围
你可以将gmt_create列转换为TIMESTAMP类型的列,然后再使用RANGE子句或者使用ROWS(GROUP BY也可以)来定义OVER窗口

select  *
from  
(
select 
store_id 
,oms_order_id
,sum(cast(express_fee as decimal(32,2))) OVER (PARTITION BY  oms_order_id order by TO_TIMESTAMP(gmt_create) RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) as express_fee
from 
my_hive.dl_ods.oms_test001
) t 
where express_fee > 1;

该回答通过自己思路及引用到GPTᴼᴾᴱᴺᴬᴵ搜索并已验证,得到内容具体如下:

根据您提供的信息,你在Flink SQL中使用了窗口函数,并且对express_fee列使用了SUM()函数。然而,你遇到了一个报错,指出你的数据源不支持删除操作。

关于你的问题:

1、 关于数据源类型:根据你提供的信息,你的数据源是一个支持删除操作并且不断写入数据的表。这种类型的表可以被称为"流式表",因为它的数据是以流的形式不断产生和更新的。在Flink中,可以使用流式表来处理流数据。

2、 关于窗口函数的支持:Flink SQL中的窗口函数在流式表和批处理表上都是支持的。但是,某些窗口函数可能对数据源的排序和分区要求更严格。在你的情况下,报错可能是由于数据源不支持某些窗口函数所需的排序操作。

3、 关于转换操作:如果你的数据源不支持特定的窗口函数操作,你可以考虑通过以下方式进行转换:

  • 使用Flink的流处理功能:如果你的数据源是流式的,你可以使用Flink的流式处理功能来对数据进行窗口化和聚合操作。你可以通过将数据源连接到Flink的流处理作业中,并使用窗口算子(例如window())和聚合函数(例如sum())来实现你的需求。

  • 使用批处理模式:如果你的数据源不适合流处理,你可以考虑将数据以批处理的方式加载到Flink中,并使用Flink的批处理功能进行处理。你可以将数据加载到Flink的批处理作业中,并使用窗口函数和聚合函数来实现你的需求。


需要注意的是,具体的解决方法取决于你的数据源和业务需求。以下是个例子,你可以参考一下,如果你想使用Flink SQL来处理流数据,并对express_fee列使用SUM()函数进行窗口化和聚合操作,你可以按照以下步骤进行更改和编写代码。

  1. 创建流式表:首先,你需要创建一个流式表来表示你的输入数据流。你可以使用Flink的Table API或SQL DDL语句来定义表结构和模式。以下是一个示例DDL语句的示例:
CREATE TABLE my_table (
  id INT,
  express_fee DECIMAL(10, 2),
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'input_topic',
  'connector.properties.bootstrap.servers' = 'localhost:9092',
  'format.type' = 'json'
);

上述示例使用Kafka作为数据源,并指定了输入主题、连接器属性和数据格式。

  1. 定义查询:接下来,你可以编写Flink SQL查询来对流式表进行窗口化和聚合操作。以下是一个示例查询的代码:
SELECT
  TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
  TUMBLE_END(event_time, INTERVAL '1' HOUR) AS window_end,
  SUM(express_fee) AS total_fee
FROM my_table
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)

上述示例使用TUMBLE函数将数据流按照1小时的窗口进行划分,并计算每个窗口中express_fee列的总和。

  1. 执行查询:最后,你可以使用Flink的流处理作业来执行查询并处理数据流。以下是一个示例的Java代码片段,展示了如何执行查询:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// 注册流式表
tEnv.executeSql("CREATE TABLE my_table (...)");
// 执行查询
Table resultTable = tEnv.sqlQuery("SELECT ... FROM my_table ...");
// 将查询结果转换为DataStream
DataStream<Row> resultStream = tEnv.toAppendStream(resultTable, Row.class);
resultStream.print();

env.execute("Streaming Job");

上述示例使用Flink的Java API创建了一个流式执行环境,并使用StreamTableEnvironment来执行SQL查询。最后,将查询结果转换为DataStream并打印出来。

请注意,以上示例只是一个基本的框架,你需要根据你的具体需求和数据源进行适当的修改和调整。另外,你可能需要根据实际情况选择合适的连接器和格式,以及配置其他必要的参数。

希望这些示例代码能帮助你更好地理解如何在Flink SQL中处理流数据并使用窗口函数进行聚合操作。


如果以上回答对您有所帮助,点击一下采纳该答案~谢谢

根据这个提示试一试吧,与canal-json格式有关

img

开窗查询的时候数据改变了吧

flinksql 的sum操作


import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object aggregationTest {

//defined the dataSource's type
  case class StockPrice(stockId:String, timeStamp:Long, price:Double)


  def main(args: Array[String]): Unit = {

    //create env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //generate ds

    val stockList = List(StockPrice("stock_1", 66666, 1)
      , StockPrice("stock_1", 8888, 2)
      , StockPrice("stock_2", 77777, 1)
      , StockPrice("stock_2", 999, 3)
      , StockPrice("stock_3", 3333, 1)
    )

    val ds = env.fromCollection(stockList)

    //transformation

    val keyedStream = ds.keyBy("stockId")

    val sumedStream = keyedStream.sum(2)

    sumedStream.print()

    env.execute()

  }

}

类似 sum() over() 的操作用Flink 的流处理功能来定义适当的窗口规则和窗口函数

Flink SQL中的SUM() OVER()函数在计算窗口聚合时需要满足某个特定条件才会触发计算。这个条件通常是基于窗口规则或窗口的触发器来确定的。

参考gpt
Flink SQL 中的 OVER 操作是用于执行窗口函数(Window Function)的。窗口函数通常用于对窗口中的数据进行聚合、排序等操作。在 Flink SQL 中,支持的窗口函数包括 SUM、COUNT、AVG、MIN、MAX 等。

如果您遇到了无法使用 SUM 函数的问题,可能是由于以下原因之一:

  1. 数据类型不匹配:SUM 函数通常用于对数值型数据进行求和,如果窗口中的数据类型不是数值型,则无法使用 SUM 函数。请确保窗口中的数据类型与 SUM 函数的要求一致。

  2. 缺少 OVER 子句:窗口函数需要使用 OVER 子句来指定窗口的范围和排序方式。如果没有正确指定 OVER 子句,可能会导致窗口函数无法正常工作。请确保在使用窗口函数时,正确指定了 OVER 子句。

以下是一个使用 SUM 函数的示例:

SELECT
  user_id,
  event_time,
  amount,
  SUM(amount) OVER (PARTITION BY user_id ORDER BY event_time) AS total_amount
FROM
  my_table;

在上述示例中,使用了 SUM 函数对窗口中的 amount 列进行求和,并使用 OVER 子句指定了按照 user_id 分区,并按照 event_time 排序的窗口范围。

Flink 窗口函数是用于处理流数据的重要特性之一,窗口函数允许你按照特定的条件对流数据进行分组、聚合和计算

需要特定条件才能触发的