一个关于flinksql的开窗函数的问题
我希望像hive一样,做一个开窗函数,排序是没问题的,但是sum就不支持
这是我的建表语句
CREATE TABLE `my_hive`.`dl_ods`.`oms_test001` (
is_delete VARCHAR(2147483647)
,oms_order_id VARCHAR(2147483647)
,channel_order_id VARCHAR(2147483647)
,third_order_id VARCHAR(2147483647)
,user_id VARCHAR(2147483647)
,business_id VARCHAR(2147483647)
,store_id VARCHAR(2147483647)
,order_status VARCHAR(2147483647)
,order_status_cur VARCHAR(2147483647)
,order_status_platform VARCHAR(2147483647)
,channel VARCHAR(2147483647)
,order_create_time VARCHAR(2147483647)
,order_update_time VARCHAR(2147483647)
,order_pay_time VARCHAR(2147483647)
,order_pre_start_delivery_time VARCHAR(2147483647)
,order_pre_end_delivery_time VARCHAR(2147483647)
,order_cancel_time VARCHAR(2147483647)
,buyer_full_name VARCHAR(2147483647)
,buyer_full_address VARCHAR(2147483647)
,buyer_telephone VARCHAR(2147483647)
,buyer_mobile VARCHAR(2147483647)
,delivery_carrier_no VARCHAR(2147483647)
,delivery_carrier_name VARCHAR(2147483647)
,delivery_bill_no VARCHAR(2147483647)
,complete_time VARCHAR(2147483647)
,pay_channel VARCHAR(2147483647)
,order_total_money VARCHAR(2147483647)
,order_actual_money VARCHAR(2147483647)
,order_coupon_amount VARCHAR(2147483647)
,order_coupon_type VARCHAR(2147483647)
,buyer_lng VARCHAR(2147483647)
,buyer_lat VARCHAR(2147483647)
,coordinate_type VARCHAR(2147483647)
,buyer_province VARCHAR(2147483647)
,buyer_city VARCHAR(2147483647)
,buyer_area VARCHAR(2147483647)
,buyer_address VARCHAR(2147483647)
,order_buyer_remark VARCHAR(2147483647)
,invoice_type VARCHAR(2147483647)
,invoice_title VARCHAR(2147483647)
,invoice_duty_no VARCHAR(2147483647)
,invoice_mail VARCHAR(2147483647)
,invoice_title_type VARCHAR(2147483647)
,invoice_desc VARCHAR(2147483647)
,invoice_content VARCHAR(2147483647)
,attribue VARCHAR(2147483647)
,order_refund_amount VARCHAR(2147483647)
,order_refund_reason VARCHAR(2147483647)
,order_refund_time VARCHAR(2147483647)
,version VARCHAR(2147483647)
,status VARCHAR(2147483647)
,gmt_create VARCHAR(2147483647)
,gmt_update VARCHAR(2147483647)
,merchant_code VARCHAR(2147483647)
,express_fee VARCHAR(2147483647)
,package_fee VARCHAR(2147483647)
,express_coupon_amount VARCHAR(2147483647)
,store_name VARCHAR(2147483647)
,is_self_pick VARCHAR(2147483647)
,invoice_money VARCHAR(2147483647)
,delivery_type VARCHAR(2147483647)
,is_collage VARCHAR(2147483647)
,collage_status VARCHAR(2147483647)
,collage_complete_time VARCHAR(2147483647)
,store_phone VARCHAR(2147483647)
,store_address VARCHAR(2147483647)
,pos_order_id VARCHAR(2147483647)
,promotion_discounts VARCHAR(2147483647)
,channel_third_order_id VARCHAR(2147483647)
,is_split VARCHAR(2147483647)
,order_type VARCHAR(2147483647)
,buyer_town VARCHAR(2147483647)
,order_goods_type VARCHAR(2147483647)
,buyer_nick VARCHAR(2147483647)
,buyer_message VARCHAR(2147483647)
,seller_memo VARCHAR(2147483647)
,delivery_time VARCHAR(2147483647)
,pay_product_type VARCHAR(2147483647)
,is_self_delivery VARCHAR(2147483647)
,third_express_fee VARCHAR(2147483647)
,is_prescription VARCHAR(2147483647)
,pres_audit_status VARCHAR(2147483647)
,pos_trans_time VARCHAR(2147483647)
,pos_refund_time VARCHAR(2147483647)
,refund_type VARCHAR(2147483647)
,origin_oms_order_id VARCHAR(2147483647)
,refund_sort VARCHAR(2147483647)
,refunded_amount VARCHAR(2147483647)
,zdt_user_id VARCHAR(2147483647)
,zdt_user_name VARCHAR(2147483647)
,cashier_no VARCHAR(2147483647)
,cashier_name VARCHAR(2147483647)
,is_print_ticket VARCHAR(2147483647)
,delivery_mode VARCHAR(2147483647)
,advance_booking_type VARCHAR(2147483647)
,advance_booking_amount VARCHAR(2147483647)
,advance_send_time VARCHAR(2147483647)
,is_frozen VARCHAR(2147483647)
,order_sign VARCHAR(2147483647)
,delivery_carrier_code VARCHAR(2147483647)
,manual_confirmation_type VARCHAR(2147483647)
,manual_confirmation_type_desc VARCHAR(2147483647)
,exceptoin_type VARCHAR(2147483647)
,exceptoin_type_desc VARCHAR(2147483647)
,real_express_bill_no VARCHAR(2147483647)
,member_no VARCHAR(2147483647)
,channel_store_id VARCHAR(2147483647)
,express_type VARCHAR(2147483647)
,proctime AS PROCTIME()
,ts AS TO_TIMESTAMP(gmt_create)
)WITH (
'properties.auto.commit.interval.ms' = '1000',
'canal-json.ignore-parse-errors' = 'true',
'format' = 'canal-json',
'properties.bootstrap.servers' = 'xxxx',
'connector' = 'kafka',
'topic' = 'xxxx',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset.strategy' = 'earliest',
'properties.group.id' = 'oms_test001',
'properties.enable.auto.commit' = 'true'
)
;
这是我的sql
select *
from
(
select
store_id
,oms_order_id
,ROW_NUMBER() OVER (PARTITION BY oms_order_id order by gmt_create desc) as row_num
from
my_hive.dl_ods.oms_test001
) t
where row_num = 1
;
select *
from
(
select
store_id
,oms_order_id
,sum(cast(express_fee as decimal(32,2))) OVER (PARTITION BY oms_order_id order by gmt_create desc) as express_fee
from
my_hive.dl_ods.oms_test001
) t
where express_fee > 1
;
select *
from
(
select
store_id
,oms_order_id
,sum(cast(express_fee as decimal(32,2))) OVER (PARTITION BY oms_order_id order by gmt_create RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) as express_fee
from
my_hive.dl_ods.oms_test001
) t
where express_fee > 1
;
从报错上来看,是因为我的数据源支持删除操作导致的。
我的疑问是,像我的数据源这种支持删除操作,又不断写入数据的表是不是属于流表?该怎么称呼?
我查了官网,flink是支持sum() over() 这种操作的,关于flink的窗口函数在什么条件下是支持的?什么条件下是不支持的?我这个表要想支持这个操作该怎么转换?
看起来比较像是因为你在开窗查询的时候记录发生了更新或删除
hive的sql和 flink的sql语法是不一样的,不能做对比。
在Flink SQL中,使用OVER操作时可能会出现以下几种报错情况:
1."OVER" is not supported:这个错误通常发生在Flink版本较旧的情况下,因为在较早的版本中可能不支持OVER操作。解决方法是升级到较新的Flink版本。
2.OVER window can only be used with a window alias:这个错误通常发生在没有为窗口定义别名时。在使用OVER操作之前,需要先为窗口定义别名,例如:SELECT ... FROM table_name WINDOW w AS (PARTITION BY ... ORDER BY ... ROWS BETWEEN ... AND ...)
3.OVER window with unbounded preceding/following is not supported:这个错误通常发生在使用未限定的preceeding或following时。在OVER操作中,必须指定preceeding和following的边界,例如:ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING。
4.OVER window with ORDER BY on a non-time attribute is not supported:这个错误通常发生在使用非时间属性进行ORDER BY时。在OVER操作中,ORDER BY必须基于时间属性,例如:ORDER BY event_time。
5.OVER window with GROUP BY is not supported:这个错误通常发生在同时使用GROUP BY和OVER操作时。在Flink SQL中,不支持在OVER窗口中使用GROUP BY。
6.OVER window with DISTINCT is not supported:这个错误通常发生在同时使用DISTINCT和OVER操作时。在Flink SQL中,不支持在OVER窗口中使用DISTINCT。
如果遇到以上报错情况,可以根据错误信息进行相应的调整和修改,以满足Flink SQL的语法要求。
RANGE子句只能用于数值或日期时间类型的列,不能用于字符串或其他类型的列。 你的ORDER BY列是gmt_create,是一个字符串类型的列,所以不能用RANGE子句来定义窗口范围
你可以将gmt_create列转换为TIMESTAMP类型的列,然后再使用RANGE子句或者使用ROWS(GROUP BY也可以)来定义OVER窗口
select *
from
(
select
store_id
,oms_order_id
,sum(cast(express_fee as decimal(32,2))) OVER (PARTITION BY oms_order_id order by TO_TIMESTAMP(gmt_create) RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) as express_fee
from
my_hive.dl_ods.oms_test001
) t
where express_fee > 1;
该回答通过自己思路及引用到GPTᴼᴾᴱᴺᴬᴵ搜索并已验证,得到内容具体如下:
根据您提供的信息,你在Flink SQL中使用了窗口函数,并且对express_fee
列使用了SUM()
函数。然而,你遇到了一个报错,指出你的数据源不支持删除操作。
关于你的问题:
1、 关于数据源类型:根据你提供的信息,你的数据源是一个支持删除操作并且不断写入数据的表。这种类型的表可以被称为"流式表",因为它的数据是以流的形式不断产生和更新的。在Flink中,可以使用流式表来处理流数据。
2、 关于窗口函数的支持:Flink SQL中的窗口函数在流式表和批处理表上都是支持的。但是,某些窗口函数可能对数据源的排序和分区要求更严格。在你的情况下,报错可能是由于数据源不支持某些窗口函数所需的排序操作。
3、 关于转换操作:如果你的数据源不支持特定的窗口函数操作,你可以考虑通过以下方式进行转换:
使用Flink的流处理功能:如果你的数据源是流式的,你可以使用Flink的流式处理功能来对数据进行窗口化和聚合操作。你可以通过将数据源连接到Flink的流处理作业中,并使用窗口算子(例如window()
)和聚合函数(例如sum()
)来实现你的需求。
使用批处理模式:如果你的数据源不适合流处理,你可以考虑将数据以批处理的方式加载到Flink中,并使用Flink的批处理功能进行处理。你可以将数据加载到Flink的批处理作业中,并使用窗口函数和聚合函数来实现你的需求。
需要注意的是,具体的解决方法取决于你的数据源和业务需求。以下是个例子,你可以参考一下,如果你想使用Flink SQL来处理流数据,并对express_fee
列使用SUM()
函数进行窗口化和聚合操作,你可以按照以下步骤进行更改和编写代码。
CREATE TABLE my_table (
id INT,
express_fee DECIMAL(10, 2),
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'input_topic',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'json'
);
上述示例使用Kafka作为数据源,并指定了输入主题、连接器属性和数据格式。
SELECT
TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
TUMBLE_END(event_time, INTERVAL '1' HOUR) AS window_end,
SUM(express_fee) AS total_fee
FROM my_table
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)
上述示例使用TUMBLE
函数将数据流按照1小时的窗口进行划分,并计算每个窗口中express_fee
列的总和。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 注册流式表
tEnv.executeSql("CREATE TABLE my_table (...)");
// 执行查询
Table resultTable = tEnv.sqlQuery("SELECT ... FROM my_table ...");
// 将查询结果转换为DataStream
DataStream<Row> resultStream = tEnv.toAppendStream(resultTable, Row.class);
resultStream.print();
env.execute("Streaming Job");
上述示例使用Flink的Java API创建了一个流式执行环境,并使用StreamTableEnvironment
来执行SQL查询。最后,将查询结果转换为DataStream
并打印出来。
请注意,以上示例只是一个基本的框架,你需要根据你的具体需求和数据源进行适当的修改和调整。另外,你可能需要根据实际情况选择合适的连接器和格式,以及配置其他必要的参数。
希望这些示例代码能帮助你更好地理解如何在Flink SQL中处理流数据并使用窗口函数进行聚合操作。
如果以上回答对您有所帮助,点击一下采纳该答案~谢谢
根据这个提示试一试吧,与canal-json格式有关
开窗查询的时候数据改变了吧
flinksql 的sum操作
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object aggregationTest {
//defined the dataSource's type
case class StockPrice(stockId:String, timeStamp:Long, price:Double)
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//generate ds
val stockList = List(StockPrice("stock_1", 66666, 1)
, StockPrice("stock_1", 8888, 2)
, StockPrice("stock_2", 77777, 1)
, StockPrice("stock_2", 999, 3)
, StockPrice("stock_3", 3333, 1)
)
val ds = env.fromCollection(stockList)
//transformation
val keyedStream = ds.keyBy("stockId")
val sumedStream = keyedStream.sum(2)
sumedStream.print()
env.execute()
}
}
类似 sum() over() 的操作用Flink 的流处理功能来定义适当的窗口规则和窗口函数
Flink SQL中的SUM() OVER()函数在计算窗口聚合时需要满足某个特定条件才会触发计算。这个条件通常是基于窗口规则或窗口的触发器来确定的。
参考gpt
Flink SQL 中的 OVER 操作是用于执行窗口函数(Window Function)的。窗口函数通常用于对窗口中的数据进行聚合、排序等操作。在 Flink SQL 中,支持的窗口函数包括 SUM、COUNT、AVG、MIN、MAX 等。
如果您遇到了无法使用 SUM 函数的问题,可能是由于以下原因之一:
数据类型不匹配:SUM 函数通常用于对数值型数据进行求和,如果窗口中的数据类型不是数值型,则无法使用 SUM 函数。请确保窗口中的数据类型与 SUM 函数的要求一致。
缺少 OVER 子句:窗口函数需要使用 OVER 子句来指定窗口的范围和排序方式。如果没有正确指定 OVER 子句,可能会导致窗口函数无法正常工作。请确保在使用窗口函数时,正确指定了 OVER 子句。
以下是一个使用 SUM 函数的示例:
SELECT
user_id,
event_time,
amount,
SUM(amount) OVER (PARTITION BY user_id ORDER BY event_time) AS total_amount
FROM
my_table;
在上述示例中,使用了 SUM 函数对窗口中的 amount 列进行求和,并使用 OVER 子句指定了按照 user_id 分区,并按照 event_time 排序的窗口范围。
Flink 窗口函数是用于处理流数据的重要特性之一,窗口函数允许你按照特定的条件对流数据进行分组、聚合和计算
需要特定条件才能触发的