增量同步hivesql从stg层到ods层merge阶段sql问题

现有一张表admic
字段为uuid,arch_no,data_source_cd,start_effective_date,end_effective_date,permit_department,data_update_dt,status
业务要求为
数仓设计为stg层仅存储本次增量数据,ods层数据为历史全量数据,同步时ods层数据与stg层数据进行比对合并形成新的全量数据,存储在dt=增量操作当天分区内,每天的分区内存的是截止到当天的全量数据
同一数据源(当data_suurce_cd<=2时根据arch_no,data_source_cd判断,当data_source_cd>2时根据arch_no,data_source_cd,permit_department,permit_file_id判断),同一数据源则ods层相同数据源数据全删,stg层数据全部插入。不同数据源,stg层直接插入

img

数据样本如图
目前写的是这样的,但是数据处理有问题

img

有没有可以写个伪代码或者指导一下怎么修改呀?

 删除ODS中当日分区相同数据源数据 
DELETE FROM ods 
WHERE dt = '今日分区'
  AND (
    (data_source_cd <= 2 AND arch_no, data_source_cd) IN (SELECT arch_no, data_source_cd FROM stg) 
    OR
    (data_source_cd > 2 AND arch_no, data_source_cd, permit_department, permit_file_id) IN (SELECT arch_no, data_source_cd, permit_department, permit_file_id FROM stg)
  )

 直接插入STG全量数据
INSERT INTO ods 
SELECT * 
FROM stg
WHERE dt = '今日分区';

需要编写一个Hive SQL的脚本来实现从stg层到ods层的增量同步,并按照一定的规则进行数据合并。供参考:

-- 创建一个临时表用于存储本次增量数据
CREATE TABLE stg_temp AS
  SELECT *
  FROM stg
  WHERE data_update_dt = '增量操作当天';

-- 删除ods层中与本次增量数据相同数据源的数据
DELETE FROM ods
WHERE data_source_cd <= 2
  AND EXISTS (
    SELECT 1
    FROM stg_temp
    WHERE ods.arch_no = stg_temp.arch_no
      AND ods.data_source_cd = stg_temp.data_source_cd
  );

-- 插入本次增量数据到ods层
INSERT INTO TABLE ods
SELECT *
FROM stg_temp;

-- 创建一个临时表用于存储合并后的全量数据
CREATE TABLE ods_temp AS
SELECT *
FROM ods
WHERE dt = '增量操作当天';

-- 根据数据源进行数据合并
INSERT INTO TABLE ods_temp
SELECT *
FROM stg
WHERE data_update_dt = '增量操作当天'
  AND (
    (data_source_cd <= 2
    AND NOT EXISTS (
      SELECT 1
      FROM ods_temp
      WHERE ods_temp.arch_no = stg.arch_no
        AND ods_temp.data_source_cd = stg.data_source_cd
    ))
    OR (data_source_cd > 2
    AND NOT EXISTS (
      SELECT 1
      FROM ods_temp
      WHERE ods_temp.arch_no = stg.arch_no
        AND ods_temp.data_source_cd = stg.data_source_cd
        AND ods_temp.permit_department = stg.permit_department
        AND ods_temp.permit_file_id = stg.permit_file_id
    ))
  );

-- 删除ods层中旧的全量数据
DELETE FROM ods
WHERE dt = '增量操作当天';

-- 插入合并后的全量数据到ods层
INSERT INTO TABLE ods
SELECT *
FROM ods_temp;

-- 删除临时表
DROP TABLE IF EXISTS stg_temp;
DROP TABLE IF EXISTS ods_temp;

,为了保证脚本的效率,可能还需要考虑使用索引、分区等技术进行优化

【以下回答由 GPT 生成】

问题分析

根据问题描述,可以将问题分解为以下几个步骤: 1. 创建当天的ods层分区。 2. 对于相同的数据源,比较ods层和stg层的数据,删除ods层的重复数据,然后将stg层的数据插入。 3. 对于不同的数据源,直接将stg层的数据插入。

解决方案

1. 创建当天的ods层分区

根据问题描述,每天的分区里存储的是截止到当天的全量数据。假设ods表名为admic_ods,首先需要创建当天的分区。可以使用Hive的语法来创建分区,例如:

```sql
ALTER TABLE admic_ods ADD PARTITION (data_update_dt = '2022-01-01');

其中,'2022-01-01' 为当天的日期,需要根据具体情况进行替换。

  1. 对于相同的数据源,比较ods层和stg层的数据,删除ods层的重复数据,然后将stg层的数据插入。

为了实现对相同数据源的处理,可以使用以下步骤进行操作:

  • 创建一个临时表tmp_admic来存储stg层的数据,然后将tmp_admic表的数据插入到admic_ods表中。
-- 创建临时表
CREATE TABLE tmp_admic(uuid, arch_no, data_source_cd, start_effective_date, end_effective_date, permit_department, data_update_dt, status)
STORED AS Parquet;

-- 将stg层数据插入临时表
INSERT INTO TABLE tmp_admic SELECT * FROM stg_admic;

-- 删除重复数据
DELETE admic_ods
WHERE data_source_cd <= 2
AND (arch_no, data_source_cd) IN (
   SELECT arch_no, data_source_cd
   FROM admic_ods
   WHERE data_update_dt = '2022-01-01'
   INTERSECT
   SELECT arch_no, data_source_cd
   FROM tmp_admic
);

-- 插入stg层数据
INSERT INTO TABLE admic_ods
SELECT * FROM tmp_admic;
  1. 对于不同的数据源,直接将stg层的数据插入。

对于不同的数据源,可以直接将stg层的数据插入到admic_ods表中。

-- 直接将stg层数据插入
INSERT INTO TABLE admic_ods
SELECT * FROM stg_admic
WHERE data_source_cd > 2;

以上是基于问题描述提供的表结构和要求的解决方案。请根据具体情况进行调整。

### 伪代码实现

```python
import datetime
from pyhive import hive

# 创建hive连接
conn = hive.Connection(host="your_host", port=10000, username="your_username", password="your_password")

# 获取当前日期
current_date = datetime.date.today().strftime("%Y-%m-%d")

# 创建当天的ods层分区
create_partition_sql = f"ALTER TABLE admic_ods ADD PARTITION (data_update_dt = '{current_date}')"
conn.cursor().execute(create_partition_sql)

# 插入相同数据源的数据
insert_same_datasource_sql = """
CREATE TABLE tmp_admic (
    uuid string,
    arch_no string,
    data_source_cd int,
    start_effective_date string,
    end_effective_date string,
    permit_department string,
    data_update_dt string,
    status string
)
STORED AS Parquet;

INSERT INTO TABLE tmp_admic SELECT * FROM stg_admic;

DELETE FROM admic_ods
WHERE data_source_cd <= 2
AND (arch_no, data_source_cd) IN (
   SELECT arch_no, data_source_cd
   FROM admic_ods
   WHERE data_update_dt = '{current_date}'
   INTERSECT
   SELECT arch_no, data_source_cd
   FROM tmp_admic
);

INSERT INTO TABLE admic_ods
SELECT * FROM tmp_admic;
"""
conn.cursor().execute(insert_same_datasource_sql)

# 插入不同数据源的数据
insert_different_datasource_sql = f"""
INSERT INTO TABLE admic_ods
SELECT * FROM stg_admic
WHERE data_source_cd > 2;
"""
conn.cursor().execute(insert_different_datasource_sql)

# 关闭hive连接
conn.close()

以上为基于Python的伪代码实现,使用了pyhive库连接Hive,并执行相应的SQL语句。请根据实际情况进行调整。


如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^