将工作分摊到一部分,但限制工人数量

I'm trying to improve the performance of an app. One part of its code uploads a file to a server in chunks.

The original version simply does this in a sequential loop. However, it's slow and during the sequence it also needs to talk to another server before uploading each chunk.

The upload of chunks could simply be placed in a goroutine. It works, but is not a good solution because if the source file is extremely large it ends up using a large amount of memory.

So, I try to limit the number of active goroutines by using a buffered channel. Here is some code that shows my attempt. I've stripped it down to show the concept and you can run it to test for yourself.

package main

import (
    "fmt"
    "io"
    "os"
    "time"
)

const defaultChunkSize = 1 * 1024 * 1024

// Lets have 4 workers
var c = make(chan int, 4)

func UploadFile(f *os.File) error {
    fi, err := f.Stat()
    if err != nil {
        return fmt.Errorf("err: %s", err)
    }
    size := fi.Size()

    total := (int)(size/defaultChunkSize + 1)
    // Upload parts
    buf := make([]byte, defaultChunkSize)
    for partno := 1; partno <= total; partno++ {
        readChunk := func(offset int, buf []byte) (int, error) {
            fmt.Println("readChunk", partno, offset)
            n, err := f.ReadAt(buf, int64(offset))
            if err != nil {
                return n, err
            }

            return n, nil
        }

        // This will block if there are not enough worker slots available
        c <- partno

        // The actual worker.
        go func() {
            offset := (partno - 1) * defaultChunkSize
            n, err := readChunk(offset, buf)
            if err != nil && err != io.EOF {
                return
            }

            err = uploadPart(partno, buf[:n])
            if err != nil {
                fmt.Println("Uploadpart failed:", err)
            }
            <-c
        }()
    }

    return nil
}

func uploadPart(partno int, buf []byte) error {
    fmt.Printf("Uploading partno: %d, buflen=%d
", partno, len(buf))
    // Actually upload the part.  Lets test it by instead writing each
    // buffer to another file.  We can then use diff to compare the 
    // source and dest files.

    // Open file.  Seek to (partno - 1) * defaultChunkSize, write buffer
    f, err := os.OpenFile("/home/matthewh/Downloads/out.tar.gz", os.O_CREATE|os.O_WRONLY, 0755)
    if err != nil {
        fmt.Printf("err: %s
", err)
    }

    n, err := f.WriteAt(buf, int64((partno-1)*defaultChunkSize))
    if err != nil {
        fmt.Printf("err=%s
", err)
    }
    fmt.Printf("%d bytes written
", n)
    defer f.Close()
    return nil
}

func main() {
    filename := "/home/matthewh/Downloads/largefile.tar.gz"
    fmt.Printf("Opening file: %s
", filename)

    f, err := os.Open(filename)
    if err != nil {
        panic(err)
    }

    UploadFile(f)
}

It almost works. But there are several problems. 1) The final partno 22 is occuring 3 times. The correct length is actually 612545 as the file length isn't a multiple of 1MB.

// Sample output
...
readChunk 21 20971520
readChunk 22 22020096
Uploading partno: 22, buflen=1048576
Uploading partno: 22, buflen=612545
Uploading partno: 22, buflen=1048576

Another problem, the upload could fail and I am not familiar enough with go and how best to solve failure of the goroutine.

Finally, I want to ordinarily return some data from the uploadPart when it succeeds. Specifically, it'll be a string (an HTTP ETag header value). These etag values need to be collected by the main function.

What is a better way to structure this code in this instance? I've not yet found a good golang design pattern that correctly fulfills my needs here.

Skipping for the moment the question of how better to structure this code, I see a bug in your code which may be causing the problem you're seeing. Since the function you're running in the goroutine uses the variable partno, which changes with each iteration of the loop, your goroutine isn't necessarily seeing the value of partno at the time you invoked the goroutine. A common way of fixing this is to create a local copy of that variable inside the loop:

for partno := 1; partno <= total; partno++ {
    partno := partno
    // ...
}

Data race #1

Multiple goroutines are using the same buffer concurrently. Note that one gorouting may be filling it with a new chunk while another is still reading an old chunk from it. Instead, each goroutine should have it's own buffer.

Data race #2

As Andy Schweig has pointed, the value in partno is updated by the loop before the goroutine created in that iteration has a chance to read it. This is why the final partno 22 occurs multiple times. To fix it, you can pass partno as a argument to the anonymous function. That will ensure each goroutine has it's own part number.

Also, you can use a channel to pass the results from the workers. Maybe a struct type with the part number and error. That way, you will be able to observe the progress and retry failed uploads.

For an example of a good pattern check out this example from the GOPL book.

Suggested changes

As noted by dev.bmax buf moved into go routine, as noted by Andy Schweig partno is param to anon function, also added WaitGroup since UploadFile was exiting before uploads were complete. Also defer f.Close() file, good habit.

package main

import (
    "fmt"
    "io"
    "os"
    "sync"
    "time"
)

const defaultChunkSize = 1 * 1024 * 1024

// wg for uploads to complete
var wg sync.WaitGroup

// Lets have 4 workers
var c = make(chan int, 4)

func UploadFile(f *os.File) error {
    // wait for all the uploads to complete before function exit
    defer wg.Wait()

    fi, err := f.Stat()
    if err != nil {
        return fmt.Errorf("err: %s", err)
    }
    size := fi.Size()
    fmt.Printf("file size: %v
", size)

    total := int(size/defaultChunkSize + 1)
    // Upload parts
    for partno := 1; partno <= total; partno++ {

        readChunk := func(offset int, buf []byte, partno int) (int, error) {
            fmt.Println("readChunk", partno, offset)
            n, err := f.ReadAt(buf, int64(offset))
            if err != nil {
                return n, err
            }

            return n, nil
        }

        // This will block if there are not enough worker slots available
        c <- partno

        // The actual worker.
        go func(partno int) {
            // wait for me to be done
            wg.Add(1)
            defer wg.Done()

            buf := make([]byte, defaultChunkSize)

            offset := (partno - 1) * defaultChunkSize
            n, err := readChunk(offset, buf, partno)
            if err != nil && err != io.EOF {
                return
            }

            err = uploadPart(partno, buf[:n])
            if err != nil {
                fmt.Println("Uploadpart failed:", err)
            }
            <-c
        }(partno)
    }

    return nil
}

func uploadPart(partno int, buf []byte) error {
    fmt.Printf("Uploading partno: %d, buflen=%d
", partno, len(buf))

    // Actually do the upload.  Simulate long running task with a sleep
    time.Sleep(time.Second)
    return nil
}

func main() {
    filename := "/home/matthewh/Downloads/largefile.tar.gz"
    fmt.Printf("Opening file: %s
", filename)

    f, err := os.Open(filename)
    if err != nil {
        panic(err)
    }
    defer f.Close()

    UploadFile(f)
}

I'm sure you can deal a little smarter with the buf situation. I'm just letting go deal with the garbage. Since you are limiting your workers to specific number 4 you really need only 4 x defaultChunkSize buffers. Please do share if you come up with something simple and shareworth.

Have fun!