Apache Beam Go SDK-数据流无法正确自动缩放(并行化步骤)

I have a beam batch pipeline written in Go that takes a .csv file of 20 million rows (around 600mb worth of data), do basics transformation steps such as SumPerKey and write back the output to GCS.

When running the pipeline on Dataflow, it invokes a pool of 1 runner only!

I was expecting Dataflow to parallelize the job between multiple workers for this amount of data. Am I missing something ?

Here's my code:

func main() {
    flag.Parse()

    beam.Init()

    p, s := beam.NewPipelineWithRoot()

    ctx := context.Background()

    log.Infof(ctx, "Started pipeline on scope: %s", s)

    /* [TEST PIPELINE START ]*/

    sr := csvio.Read(s, *input, reflect.TypeOf(Rating{}))

    pwo := beam.ParDo(s.Scope("Pair Key With One"),
        func(x Rating, emit func(int, int)) {
            emit(x.UserId, 1)
        }, sr)

    spk := stats.SumPerKey(s, pwo)

    mp := beam.ParDo(s.Scope("Map KV To Struct"),
        func(k int, v int, emit func(UserRatings)) {
            emit(UserRatings{
                UserId:  k,
                Ratings: v,
            })
        }, spk)

    t := top.Largest(s, mp, 1000, func(x, y UserRatings) bool { return x.Ratings < y.Ratings })

    o := beam.ParDo(s, func(x []UserRatings) string {
        if data, err := json.MarshalIndent(x, "", ""); err != nil {
            return fmt.Sprintf("[Err]: %v", err)
        } else {
            return fmt.Sprintf("Output: %s", data)
        }
    }, t)

    textio.Write(s, *output, o)

    /* [TEST PIPELINE END ]*/

    if err := beamx.Run(ctx, p); err != nil {
        fmt.Println(err)
        log.Exitf(ctx, "Failed to execute job: on ctx=%v:")
    }
}

Full Code Here

I deploy the pipeline via this command line:

go run main.go \
  --runner dataflow \
  --max_num_workers 10 \
  --file gs://${BUCKET?}/ratings.csv \
  --output gs://${BUCKET?}/reporting.txt \
  --project ${PROJECT?} \
  --temp_location gs://${BUCKET?}/tmp/ \
  --staging_location gs://${BUCKET?}/binaries/ \
  --worker_harness_container_image=gcr.io/drawndom-app/beam/go:latest

Note: When I set --num_workers to 5, it invokes 5 workers but I want it to do that automatically.

Update:

I've added a Reshuffle step right before the .csv input thanks to this lib and Dataflow was able to do auto-scaling by adding 1 more worker.

I still need to understand how to optimise parallelism on my pipelines.

Worker history

Code used:

func Reshuffle(s beam.Scope, col beam.PCollection) beam.PCollection {
    s = s.Scope("Reshuffle")

    col = beam.ParDo(s, func(x beam.X) (int, beam.X) {
        return rand.Int(), x
    }, col)
    col = beam.GroupByKey(s, col)
    return beam.ParDo(s, func(key int, values func(*beam.X) bool, emit func(beam.X)) {
        var x beam.X
        for values(&x) {
            emit(x)
        }
    }, col)
}