如何在golang中将扳手行提取为Json或Parquet格式?

I'm new to golang and spanner, I want to save a snapshot of our spanner DB to Google cloud storage in every 5 mins. The format that I want to use is Parquet or JSON.

stmt = spanner.NewStatement("SELECT * FROM " + tableName + " WHERE UpdatedAt >= @startDateTime AND UpdatedAt <= @endDateTime")
iter := txn.Query(ctx, stmt)
defer iter.Stop()
for {
    row, err := iter.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Println("Failed to read data, err = %s", err)
    }
}

I have got all the rows, but I don't have a clue on how to extract all the column values and write it to a Parquet or JSON file or upload it to GCS. Is it possible to extract all the column values without knowing the type of the value or column name? Any help would be appreciated.

The column type is required to retrieve the value. See the "Supported types and their corresponding Cloud Spanner column type(s)" in the Row documentation. You can get the column names from the Row.ColumnNames. It might make sense to use Row.ToStruct with a struct corresponding to the table, and write that to json, for example using the "encoding/json" package's Marshal.

I would like to share my tedious solution and hope it will help someone in the future. In my case, I got a task to save a snapshot of our spanner DB in a short time interval and save those data in parquet format to GCS. So that later we can use the big query to query those data.

Firstly, I got the spanner rows that I wanted with a simple statement like this:

stmt := spanner.NewStatement(fmt.Sprintf("SELECT * FROM %s WHERE UpdatedAt >= @startDateTime AND UpdatedAt <= @endDateTime", tableName))
    stmt.Params["startDateTime"] = time.Unix(1520330400, 0)
    stmt.Params["endDateTime"] = time.Unix(1520376600, 0)
iter := txn.Query(ctx, stmt)
values := readRows(iter)

func readRows(iter *spanner.RowIterator) []spanner.Row {
    var rows []spanner.Row
    defer iter.Stop()
    for {
        row, err := iter.Next()
        if err == iterator.Done {
            break
        }
        if err != nil {
            log.Println("Failed to read data, err = %s", err)
        }
        rows = append(rows, *row)
    }
    return rows
}

Yea, that was easy. But it was encouraging, cause this is my first time coding in golang. However, took me a while to found out that it's not possible to decode the value without knowing the type of each column, but all I need is each column's string value and save it in parquet format.

So I wrote another query to get the spanner type for each column like this:

typeStmt = spanner.NewStatement("SELECT t.column_name, t.spanner_type FROM information_schema.columns AS t WHERE t.table_name = @tableName")
typeStmt.Params["tableName"] = tableName

iterTypes := txn.Query(ctx, typeStmt)
types := readRows(iterTypes)
// use a map to keep all the types
dataTypes := make(map[string]string)
        for i := 0; i < len(types); i++ {
            var columnName string
            var dataType string
            types[i].Column(0, &columnName)
            types[i].Column(1, &dataType)
            dataTypes[columnName] = dataType
        }
formattedRows, md := extractDataByType(dataTypes, values)

and I convert the spanner type to go type with a switch:

    func decodeValueByType(index int, row spanner.Row, value interface{}) {
            err := row.Column(index, value)
            if err != nil {
                log.Println("Failed to extract value, err = %s", err)
            }
        }

    func prepareParquetWriter(md *[]string, parquetType string, columnNames []string, index int) {
        if len(*md) < len(columnNames) {
            *md = append(*md, fmt.Sprintf("name=%s, type=%s", columnNames[index], parquetType))
        }
    }

func extractDataByType(types map[string]string, rows []spanner.Row) ([][]string, []string) {
    var formattedRows [][]string
    var md []string
    for _, row := range rows {
        columnNames := row.ColumnNames()
        var vals []string
        for i := 0; i < row.Size(); i++ {
            switch types[columnNames[i]] {
            case "STRING(MAX)":
                var value spanner.NullString
                decodeValueByType(i, row, &value)
                prepareParquetWriter(&md, "UTF8", columnNames, i)
                vals = append(vals, fmt.Sprintf("%v", value))
            case "TIMESTAMP":
                var value spanner.NullTime
                decodeValueByType(i, row, &value)
                prepareParquetWriter(&md, "TIMESTAMP_MILLIS", columnNames, i)
                vals = append(vals, fmt.Sprintf("%v", value))
            case "INT64":
                var value spanner.NullInt64
                decodeValueByType(i, row, &value)
                prepareParquetWriter(&md, "INT64", columnNames, i)
                vals = append(vals, fmt.Sprintf("%v", value))
            case "BOOL":
                var value spanner.NullBool
                decodeValueByType(i, row, &value)
                prepareParquetWriter(&md, "BOOLEAN", columnNames, i)
                vals = append(vals, fmt.Sprintf("%v", value))
            }
        }
        formattedRows = append(formattedRows, vals)
    }
    log.Println("parquet format: %s", md)
    return formattedRows, md
}

Finally, I got my data in a 2 dimensional array and generated my parquet configuration in an array.

I haven't finished the parquet writer for GCS, but I used xitongsys/parquet-go to wrote the file locally like this:

fw, err := ParquetFile.NewLocalFileWriter(fmt.Sprintf("dataInParquet/%s_%s.parquet", name, time.Now().Format("20060102150405")))
            if err != nil {
                log.Println("Can't open file", err)
                return
            }
            pw, err := ParquetWriter.NewCSVWriter(md, fw, 4)
            if err != nil {
                log.Println("Can't create csv writer", err)
                return
            }
            for _, row := range formattedRows {
                rec := make([]*string, len(row))
                for i := 0; i < len(row); i++ {
                    rec[i] = &row[i]
                }
                if err = pw.WriteString(rec); err != nil {
                    log.Println("WriteString error", err)
                }
            }
            if err = pw.WriteStop(); err != nil {
                log.Println("WriteStop error", err)
            }
            log.Println("Write Finished")
            fw.Close()

Please let me know if anyone know a better way of doing this. Thanks. ;-)

Btw, this is just my experiment code, If you want to use any of this code please adjust accordingly. My production implementation needs to support more features like querying multiple databases with goroutine, support both spanner and MySQL, save data in either parquet or JSON format. Would like to hear more ideas if anyone is doing something samilar.