DATAX 批量生成sqlserver数据库的josn脚本,生成的配置文件获取不到字段名和字段类型

DATAX 批量生成sqlserver数据库的josn脚本,生成的配置文件获取不到字段名和字段类型

# coding=utf-8
import json
import getopt
import os
import sys
import pymssql

#sqlserver相关配置,需根据实际情况作出修改
sqlserver_host = ""
sqlserver_port = ""
sqlserver_user = ""
sqlserver_passwd = ""

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = ""
hdfs_nn_port = ""

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/"

def get_connection():
    return pymssql.connect(host=sqlserver_host, port=int(sqlserver_port), user=sqlserver_user, passwd=sqlserver_passwd)

def get_sqlserver_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall

def get_sqlserver_columns(database, table):
    return list(map(lambda x: x[0], get_sqlserver_meta(database, table)))

def get_hive_columns(database, table):
    def type_mapping(sqlserver_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "numeric":"string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "nvarchar": "string",
            "date": "string",
            "time": "string",
            "datetime": "string",
            "date": "string",
            "text": "string",
            "bit": "boolean"
        }
        return mappings[sqlserver_type]
    meta = get_sqlserver_meta(database, table)
    return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta))

def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "sqlserverreader",
                    "parameter": {
                        "username": sqlserver_user,
                        "password": sqlserver_passwd,
                        "column": get_sqlserver_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:sqlserver://" + sqlserver_host + ":" + sqlserver_port + "; DatabaseName=" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)

def main(args):
    source_database = ""
    source_table = ""
    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value
    generate_json(source_database, source_table)

if __name__ == '__main__':
    main(sys.argv[1:])



运行结果及报错内容

{"job": {"content": [{"writer": {"parameter": {"writeMode": "append", "fieldDelimiter": "\t", "column": [], "path": "${targetdir}", "fileType": "text", "defaultFS": "", "compress": "gzip", "fileName": ""}, "name": "hdfswriter"}, "reader": {"parameter": {"username": "", "column": [], "connection": [{"table": [""], "jdbcUrl": [""]}], "password": "", "splitPk": ""}, "name": "sqlserverreader"}}], "setting": {"speed": {"channel": 3}, "errorLimit": {"record": 0, "percentage": 0.02}}}}
column为空值

想要的结果:
column中有字段名和类型

这跟https://ask.csdn.net/questions/7795646/53922934是一样的


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang3.StringUtils;
 
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
/**
 * Description DataX生成通用Json工具
 * Author libo2
 * Date 2019/7/31 19:33
 **/
public class Pg2PgJson {
    public static void main(String[] args) throws Exception {
        //读取文件 设定格式:ip|port|username|password|schema|table|target_table|splitPk|pk_column|columns|add_column
        BufferedReader br = new BufferedReader(new FileReader("E:/libo2/Desktop/test/haiwai.txt"));
        String line = null;
        //读每行数据
        while ((line = br.readLine()) != null) {
            //文件内容放入数组
            String[] info = line.split("\\|");
            String target_table = info[6];
            //拼装全量Json信息
            String str = toJson(line.trim(), "full");
            //全量抽数json
            BufferedWriter bw = new BufferedWriter(new FileWriter("E:/libo2/Desktop/test/ods/" + target_table + ".json"));
            //写入全量文件
            bw.write(str);
            bw.flush();
            bw.close();
            //拼装增量Json信息
            String str1 = toJson(line.trim(), "add");
            //增量抽数json
            BufferedWriter bw1 = new BufferedWriter(new FileWriter("E:/libo2/Desktop/test/stg/" + target_table + ".json"));
            //写入增量文件
            bw1.write(str1);
            bw1.flush();
            bw1.close();
        }
    }
 
    /**
     * @description Json信息
     * @author libo2
     * @date 2019/7/31 20:09
     */
    public static String toJson(String line, String flag) {
        //文件内容放入数组
        String[] info = line.split("\\|");
        String ip = info[0];
        String port = info[1];
        String username = info[2];
        String password = info[3];
        String schema = info[4];
        String tableName = info[5];
        String target_table = info[6];
        String splitPk = info[7];
        if ("null".equals(splitPk.toLowerCase())) {
            splitPk = null;
        }
        String pkColumn = info[8];
        if ("null".equals(pkColumn.toLowerCase())) {
            pkColumn = null;
        }
        String allColumns = info[9];
        String addColumn = null;//增量字段
        if (info.length > 10) {
            addColumn = info[10];
        }
        //从最里层往外扩
        //********reader部分********
        StringBuilder sb = new StringBuilder();
        sb.append("jdbc:postgresql://" + ip + ":" + port + "/");
        sb.append(schema);
        sb.append("?autoReconnect=true");//开启自动重连,防止连接时间短超时
        Map<String, Object> m1 = new HashMap<>();
        List<String> rjdbcList = new ArrayList<>();
        rjdbcList.add(sb.toString());
        m1.put("jdbcUrl", rjdbcList);//源jdbc信息
        List<String> rtableList = new ArrayList<>();
        rtableList.add(tableName);
        m1.put("table", rtableList);//源表名
        List<Object> rconnList = new ArrayList<>();
        rconnList.add(m1);
        Map<String, Object> mm1 = new HashMap<>();
        mm1.put("column", allColumns.split(","));//源各个字段
        mm1.put("connection", rconnList);//源连接信息
        mm1.put("username", username);//源用户名
        mm1.put("password", password);//源密码
        mm1.put("where", "1=1");//源条件
        if (StringUtils.isNoneEmpty(splitPk)) {
            mm1.put("splitPk", splitPk);//源分割字段
        }
        if ("add".equals(flag)) {//增量条件
            if (StringUtils.isNoneEmpty(addColumn)) {
                String[] addCol = addColumn.split(";");
                String add1 = addCol[0];
                String add2 = addCol[1];
                mm1.put("where", String.format("%s>=now() - interval '2 day' or %s>=now() - interval '2 day'", add1, add2));
            }
        }
        Map<String, Object> mmm1 = new HashMap<>();
        mmm1.put("name", "postgresqlreader");//源数据源
        mmm1.put("parameter", mm1);//源参数
        Map<String, Object> mmmm1 = new HashMap<>();
        mmmm1.put("reader", mmm1);
 
        //********writer部分********
        Map<String, Object> m2 = new HashMap<>();
        m2.put("jdbcUrl", "jdbc:postgresql://192.168.66.94:5434/dw");//目标jdbc信息
        List<String> wtableList = new ArrayList<>();
        String ods_table = target_table;//全量表
        List<Object> wconnList = new ArrayList<>();
        wconnList.add(m2);
        Map<String, Object> mm2 = new HashMap<>();
        mm2.put("column", allColumns.split(","));//目标各个字段
        mm2.put("connection", wconnList);//目标连接信息
        mm2.put("username", "aaaaa");//目标用户名
        mm2.put("password", "123456");//目标密码
        String stg_table = target_table + "_stg";//增量临时表
        if ("add".equals(flag)) {
            wtableList.add(stg_table);
            mm2.put("preSql", new String[]{String.format("truncate table %s;", stg_table)});//执行语句之前操作
            if (StringUtils.isNoneEmpty(pkColumn) && StringUtils.isNoneEmpty(addColumn)) {//执行语句之后操作
                mm2.put("postSql", new String[]{String.format("delete from %s a where exists (select 1 from %s b where a.%s=b.%s);insert into %s select * from %s;", ods_table, stg_table, pkColumn, pkColumn, ods_table, stg_table)});
            } else {
                mm2.put("postSql", new String[]{String.format("delete from %s;insert into %s select * from %s;", ods_table, ods_table, stg_table)});
            }
        } else {
            wtableList.add(ods_table);
            mm2.put("preSql", new String[]{String.format("truncate table %s;", ods_table)});//执行语句之前操作
        }
        m2.put("table", wtableList);//目标表名
        Map<String, Object> mmm2 = new HashMap<>();
        mmm2.put("name", "postgresqlwriter");//目标数据源
        mmm2.put("parameter", mm2);//目标参数
        mmmm1.put("writer", mmm2);
 
        List<Object> contentList = new ArrayList<>();
        contentList.add(mmmm1);
        Map<String, Object> m3 = new HashMap<>();
        m3.put("content", contentList);
 
        Map<String, Object> m4 = new HashMap<>();
        m4.put("channel", "10");
        Map<String, Object> mm4 = new HashMap<>();
        mm4.put("speed", m4);
        m3.put("setting", mm4);
 
        Map<String, Object> m5 = new HashMap<>();
        m5.put("job", m3);
        String str = JSON.toJSONString(m5);
        //JSON格式化
        JSONObject object = JSONObject.parseObject(str);
        str = JSON.toJSONString(object, SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue, SerializerFeature.WriteDateUseDateFormat);
        return str;
    }
 
}