将数据上传到表,而无需等待流缓冲区刷新

I have a Go program which downloads data from a table (T1), formats it, and uploads it to a new temporary table (T2). Once the data has been uploaded (30s or so), the data should be copied to a third table (T3).

After uploading the formatted data to T2, querying the table returns results ok. However, when copying the table - the job completes almost instantly and the destination table (T3) is empty.

I'm copying the table as suggested here - but the result is the same when performing the action in the UI.

In the table metadata section it shows as 0B, 0 rows but there are about 100k rows and 18mb of data in there - or at least that's what comes back from a query.

Edit I did not spot that this data was still stuck in the streaming buffer - see my answer.

The comments on my question lead me to see that the issue was the streaming buffer. This was taking a long time to flush - it's not possible to flush manually.

I ended up reading this issue and comment on GitHub here. This suggested using a Load job instead.

After some research, I realised it was possible to read from an io.Reader as well as a Google Cloud Storage Reference by configuring the Loader's ReaderSource.

My original implementation that used the streaming buffer looked like this:

var vss []*bigquery.ValuesSaver
// for each row:
vss = append(vss, &bigquery.ValuesSaver{
    Schema:   schema,
    InsertID: fmt.Sprintf(index of loop),
    Row: []bigquery.Value{
        "data"
    },
})

err := uploader.Put(ctx, vss)
if err != nil {
    if pmErr, ok := err.(bigquery.PutMultiError); ok {
        for _, rowInsertionError := range pmErr {
            log.Println(rowInsertionError.Errors)
        }
    }

    return fmt.Errorf("failed to insert data: %v", err)
}

I was able to change this to a load job with code that looked like this:

var lines []string
for _, v := range rows {
    json, err := json.Marshal(v)
    if err != nil {
        return fmt.Errorf("failed generate json %v, %+v", err, v)
    }
    lines = append(lines, string(json))
}

dataString := strings.Join(lines, "
")

rs := bigquery.NewReaderSource(strings.NewReader(dataString))
rs.FileConfig.SourceFormat = bigquery.JSON
rs.FileConfig.Schema = schema
loader := dataset.Table(t2Name).LoaderFrom(rs)
loader.CreateDisposition = bigquery.CreateIfNeeded
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
    return fmt.Errorf("failed to start load job %v", err)
}
_, err := job.Wait(ctx)
if err != nil {
    return fmt.Errorf("load job failed %v", err)
}

Now the data is available in the table 'immediately' - I no longer need to wait for the streaming buffer.