现有一张表admic
字段为uuid,arch_no,data_source_cd,start_effective_date,end_effective_date,permit_department,data_update_dt,status
业务要求为
数仓设计为stg层仅存储本次增量数据,ods层数据为历史全量数据,同步时ods层数据与stg层数据进行比对合并形成新的全量数据,存储在dt=增量操作当天分区内,每天的分区内存的是截止到当天的全量数据
同一数据源(当data_suurce_cd<=2时根据arch_no,data_source_cd判断,当data_source_cd>2时根据arch_no,data_source_cd,permit_department,permit_file_id判断),同一数据源则ods层相同数据源数据全删,stg层数据全部插入。不同数据源,stg层直接插入
数据样本如图
目前写的是这样的,但是数据处理有问题
有没有可以写个伪代码或者指导一下怎么修改呀?
删除ODS中当日分区相同数据源数据
DELETE FROM ods
WHERE dt = '今日分区'
AND (
(data_source_cd <= 2 AND arch_no, data_source_cd) IN (SELECT arch_no, data_source_cd FROM stg)
OR
(data_source_cd > 2 AND arch_no, data_source_cd, permit_department, permit_file_id) IN (SELECT arch_no, data_source_cd, permit_department, permit_file_id FROM stg)
)
直接插入STG全量数据
INSERT INTO ods
SELECT *
FROM stg
WHERE dt = '今日分区';
需要编写一个Hive SQL的脚本来实现从stg层到ods层的增量同步,并按照一定的规则进行数据合并。供参考:
-- 创建一个临时表用于存储本次增量数据
CREATE TABLE stg_temp AS
SELECT *
FROM stg
WHERE data_update_dt = '增量操作当天';
-- 删除ods层中与本次增量数据相同数据源的数据
DELETE FROM ods
WHERE data_source_cd <= 2
AND EXISTS (
SELECT 1
FROM stg_temp
WHERE ods.arch_no = stg_temp.arch_no
AND ods.data_source_cd = stg_temp.data_source_cd
);
-- 插入本次增量数据到ods层
INSERT INTO TABLE ods
SELECT *
FROM stg_temp;
-- 创建一个临时表用于存储合并后的全量数据
CREATE TABLE ods_temp AS
SELECT *
FROM ods
WHERE dt = '增量操作当天';
-- 根据数据源进行数据合并
INSERT INTO TABLE ods_temp
SELECT *
FROM stg
WHERE data_update_dt = '增量操作当天'
AND (
(data_source_cd <= 2
AND NOT EXISTS (
SELECT 1
FROM ods_temp
WHERE ods_temp.arch_no = stg.arch_no
AND ods_temp.data_source_cd = stg.data_source_cd
))
OR (data_source_cd > 2
AND NOT EXISTS (
SELECT 1
FROM ods_temp
WHERE ods_temp.arch_no = stg.arch_no
AND ods_temp.data_source_cd = stg.data_source_cd
AND ods_temp.permit_department = stg.permit_department
AND ods_temp.permit_file_id = stg.permit_file_id
))
);
-- 删除ods层中旧的全量数据
DELETE FROM ods
WHERE dt = '增量操作当天';
-- 插入合并后的全量数据到ods层
INSERT INTO TABLE ods
SELECT *
FROM ods_temp;
-- 删除临时表
DROP TABLE IF EXISTS stg_temp;
DROP TABLE IF EXISTS ods_temp;
,为了保证脚本的效率,可能还需要考虑使用索引、分区等技术进行优化
【以下回答由 GPT 生成】
根据问题描述,可以将问题分解为以下几个步骤: 1. 创建当天的ods层分区。 2. 对于相同的数据源,比较ods层和stg层的数据,删除ods层的重复数据,然后将stg层的数据插入。 3. 对于不同的数据源,直接将stg层的数据插入。
1. 创建当天的ods层分区
根据问题描述,每天的分区里存储的是截止到当天的全量数据。假设ods表名为admic_ods,首先需要创建当天的分区。可以使用Hive的语法来创建分区,例如:
```sql
ALTER TABLE admic_ods ADD PARTITION (data_update_dt = '2022-01-01');
其中,'2022-01-01' 为当天的日期,需要根据具体情况进行替换。
为了实现对相同数据源的处理,可以使用以下步骤进行操作:
-- 创建临时表
CREATE TABLE tmp_admic(uuid, arch_no, data_source_cd, start_effective_date, end_effective_date, permit_department, data_update_dt, status)
STORED AS Parquet;
-- 将stg层数据插入临时表
INSERT INTO TABLE tmp_admic SELECT * FROM stg_admic;
-- 删除重复数据
DELETE admic_ods
WHERE data_source_cd <= 2
AND (arch_no, data_source_cd) IN (
SELECT arch_no, data_source_cd
FROM admic_ods
WHERE data_update_dt = '2022-01-01'
INTERSECT
SELECT arch_no, data_source_cd
FROM tmp_admic
);
-- 插入stg层数据
INSERT INTO TABLE admic_ods
SELECT * FROM tmp_admic;
对于不同的数据源,可以直接将stg层的数据插入到admic_ods表中。
-- 直接将stg层数据插入
INSERT INTO TABLE admic_ods
SELECT * FROM stg_admic
WHERE data_source_cd > 2;
以上是基于问题描述提供的表结构和要求的解决方案。请根据具体情况进行调整。
### 伪代码实现
```python
import datetime
from pyhive import hive
# 创建hive连接
conn = hive.Connection(host="your_host", port=10000, username="your_username", password="your_password")
# 获取当前日期
current_date = datetime.date.today().strftime("%Y-%m-%d")
# 创建当天的ods层分区
create_partition_sql = f"ALTER TABLE admic_ods ADD PARTITION (data_update_dt = '{current_date}')"
conn.cursor().execute(create_partition_sql)
# 插入相同数据源的数据
insert_same_datasource_sql = """
CREATE TABLE tmp_admic (
uuid string,
arch_no string,
data_source_cd int,
start_effective_date string,
end_effective_date string,
permit_department string,
data_update_dt string,
status string
)
STORED AS Parquet;
INSERT INTO TABLE tmp_admic SELECT * FROM stg_admic;
DELETE FROM admic_ods
WHERE data_source_cd <= 2
AND (arch_no, data_source_cd) IN (
SELECT arch_no, data_source_cd
FROM admic_ods
WHERE data_update_dt = '{current_date}'
INTERSECT
SELECT arch_no, data_source_cd
FROM tmp_admic
);
INSERT INTO TABLE admic_ods
SELECT * FROM tmp_admic;
"""
conn.cursor().execute(insert_same_datasource_sql)
# 插入不同数据源的数据
insert_different_datasource_sql = f"""
INSERT INTO TABLE admic_ods
SELECT * FROM stg_admic
WHERE data_source_cd > 2;
"""
conn.cursor().execute(insert_different_datasource_sql)
# 关闭hive连接
conn.close()
以上为基于Python的伪代码实现,使用了pyhive库连接Hive,并执行相应的SQL语句。请根据实际情况进行调整。