使用Goroutines和渠道将多个文件并行上传到Amazon S3

I'm trying to upload a directory into Amazon S3 bucket. However, the only way to upload a directory is to iterate through all the files inside the directory and upload them one by one.

I'm using Go to iterate over the files in directory. However, for each file I iterate through, I want to spin off a goroutine that uploads the file while the main thread iterates through the next element in the directory and spins off another goroutine to upload the same.

Any idea on how I can upload all the files in the directory parallelly using Goroutines and Channels?

Revised code snippet that implements a goroutine and a channel to upload files in parallel. But I'm not sure if this is the right implementation.

func uploadDirToS3(dir string, svc *s3.S3) {
    fileList := []string{}
    filepath.Walk(dir, func(path string, f os.FileInfo, err error) error {
        fmt.Println("PATH ==> " + path)
        fileList = append(fileList, path)
        return nil
    })
    for _, pathOfFile := range fileList[1:] {
        channel := make(chan bool)
        go uploadFiletoS3(pathOfFile, svc, channel)
        <-channel
    }
}

func uploadFiletoS3(path string, svc *s3.S3, channel chan bool) {
    file, err := os.Open(path)
    if err != nil {
        fmt.Println(err)
    }
    defer file.Close()
    fileInfo, _ := file.Stat()
    size := fileInfo.Size()

    buffer := make([]byte, size)
    file.Read(buffer)
    fileBytes := bytes.NewReader(buffer)
    fileType := http.DetectContentType(buffer)

    s3Path := file.Name()

    params := &s3.PutObjectInput{
        Bucket:        aws.String("name-of-bucket"),
        Key:           aws.String(s3Path),
        Body:          fileBytes,
        ContentLength: aws.Int64(size),
        ContentType:   aws.String(fileType),
    }

    resp, err := svc.PutObject(params)
    if err != nil {
        fmt.Println(err)
    }
    fmt.Printf("response %s", awsutil.StringValue(resp))
    close(channel)
}

Any ideas on how I could implement this better? I've looked into WaitGroups but for some reason, I found Channels much easier to understand and implement in this situation.

So, you are looking for concurrency, which is rooted in go instruction. For synchronization between started in loop goroutine, you can use chanels OR sync.WaitGroup. The second option is a little bit easier to do. Also you have to refactor your function and move internal for logic into a separate function.

func uploadDirToS3(dir string, svc *s3.S3) {
    fileList := []string{}
    filepath.Walk(dir, func(path string, f os.FileInfo, err error) error {
        fileList = append(fileList, path)
        return nil
    })
    var wg sync.WaitGroup
    wg.Add(len(fileList))
    for _, pathOfFile := range fileList[1:] {
        //maybe spin off a goroutine here??
        go putInS3(pathOfFile, svc, &wg)
    }
    wg.Wait()
}

func putInS3(pathOfFile string, svc *s3.S3, wg *sync.WaitGroup) {
    defer func() {
        wg.Done()
    }()
    file, _ := os.Open(pathOfFile)
    defer file.Close()
    fileInfo, _ := file.Stat()
    size := fileInfo.Size()
    buffer := make([]byte, size)
    file.Read(buffer)
    fileBytes := bytes.NewReader(buffer)
    fileType := http.DetectContentType(buffer)
    path := file.Name()
    params := &s3.PutObjectInput{
        Bucket:        aws.String("bucket-name"),
        Key:           aws.String(path),
        Body:          fileBytes,
        ContentLength: aws.Int64(size),
        ContentType:   aws.String(fileType),
    }

    resp, _ := svc.PutObject(params)
    fmt.Printf("response %s", awsutil.StringValue(resp))
}

Following does not strictly speaking answer the OP, however its an attempt to introduce parallel processing using the go language.

hope this helps.

package main

import (
    "log"
    "sync"
    "time"
)

func main() {

    // processInSync()
    // The processing takes up to 3seconds,
    // it displays all the output and handles errors.

    // processInParallel1()
    // The processing takes up to few microseconds,
    // it displays some of the output and does not handle errors.
    // It is super fast, but incorrect.

    // processInParallel2()
    // The processing takes up to 1s,
    // It correctly displays all the output,
    // But it does not yet handle return values.

    processInParallel3()
    // The processing takes up to 1s,
    // It correctly displays all the output,
    // and it is able to return the first error encountered.

    // This merely just an introduction to what you are able to do.
    // More examples are required to explains the subtletlies of channels
    // to implement unbound work processing.
    // I leave that as an exercise to the reader.
    // For more information and explanations about channels,
    // Read The Friendly Manual and the tons of examples
    // we left on the internet.
    // https://golang.org/doc/effective_go.html#concurrency
    // https://gobyexample.com/channels
    // https://gobyexample.com/closing-channels
}

func aSlowProcess(name string) error {
    log.Println("aSlowProcess ", name)
    <-time.After(time.Second)
    return nil
}

//processInSync a dummy function calling a slow function one after the other.
func processInSync() error {
    now := time.Now()
    // it calls the slow process three time,
    // one after the other;
    // If an error is returned, returns asap.
    if err := aSlowProcess("#1"); err != nil {
        return err
    }
    if err := aSlowProcess("#2"); err != nil {
        return err
    }
    if err := aSlowProcess("#3"); err != nil {
        return err
    }
    // This is a sync process because it does not involve
    // extra synchronisation mechanism.
    log.Printf("processInSync spent %v
", time.Now().Sub(now))
    return nil
}

// processInParallel1 implements parallel processing example.
// it is not yet a fully working example, to keep it simple,
// it only implements the sending part of the processing.
func processInParallel1() error {
    now := time.Now()

    // We want to execute those function calls in parallel
    // for that we use the go keyword which allows to run the function
    // into a separate routine/process/thread.
    // It is called async because the main thread and the
    // the new routines requires to be synchronized.
    // To synchronize two independant routine we must use
    // atomic (race free) operators.

    // A channel is an atomic operator because it is safe to
    // read and write from it from multiple parallel
    // and independant routines.

    // before we implement such processing, we must ask ourselve
    // what is the input i need to distribute among routines,
    // and what are the values i want to get from those routines.

    // lets create a channel of string to distribute the input to multiple
    // independant workers.
    distributor := make(chan string)

    // The input channel MUST be read from the new routines.
    // We create three workers of slow process, reading and processing.
    go func() {
        value := <-distributor
        aSlowProcess(value)
    }()
    go func() {
        value := <-distributor
        aSlowProcess(value)
    }()
    go func() {
        value := <-distributor
        aSlowProcess(value)
    }()

    // we must now write the values into the distributor
    // so that each worker can read and process data.
    distributor <- "#1"
    distributor <- "#2"
    distributor <- "#3"

    log.Printf("processInParallel1 spent %v
", time.Now().Sub(now))

    return nil
}

// processInParallel2 implements parallel processing example.
// it is not yet a fully working example, to keep it simple,
// it implements the sending part of the processing,
// and the synchronization mechanism to wait for all workers
// to finish before returning.
func processInParallel2() error {
    now := time.Now()

    // We saw in the previous example how to send values and process
    // them in parallel, however, that function was not able to wait for
    // those async process to finish before returning.

    // To implement such synchronization mechanism
    // where the main thread waits for all workers to finish
    // before returning we need to use the sync package.
    // It provides the best pattern to handle that requirements.

    // In addition to the previous example we now instantiate a
    // WaitGroup https://golang.org/pkg/sync/#WaitGroup
    // The purpose of the wait group is to record a number
    // of async jobs to process and wait for them to finish.

    var wg sync.WaitGroup

    distributor := make(chan string)

    // Because we have three workers, we add three to the group.
    wg.Add(1)
    go func() {
        // Then we make sure that we signal to the waitgroup 
    // that the process is done.
        defer wg.Done()
        value := <-distributor
        aSlowProcess(value)
    }()
    //-
    wg.Add(1)
    go func() {
        defer wg.Done() // as an exercise, comment this line 
    // and inspect the output of your program.
        value := <-distributor
        aSlowProcess(value)
    }()
    //-
    wg.Add(1)
    go func() {
        defer wg.Done()
        value := <-distributor
        aSlowProcess(value)
    }()

    // we can now write the data for processing....
    distributor <- "#1"
    distributor <- "#2"
    distributor <- "#3"

    //....and wait for their completion
    wg.Wait()

    log.Printf("processInParallel2 spent %v
", time.Now().Sub(now))

    return nil
}

// processInParallel3 implements parallel processing example.
// It is a fully working example that distribute jobs, 
// wait for completion and catch for return values.
func processInParallel3() error {
    now := time.Now()

    var wg sync.WaitGroup
    distributor := make(chan string)

    // To catch for return values we must implement a
    // way for output values to safely reach the main thread.
    // We create a channel of errors for that purpose.
    receiver := make(chan error)

    // As previsouly we start the workers, and attach them to a waitgroup.
    wg.Add(1)
    go func() {
        defer wg.Done()
        value := <-distributor
        err := aSlowProcess(value)
        // to return the value we write on the output channel.
        receiver <- err
    }()
    //-
    wg.Add(1)
    go func() {
        defer wg.Done()
        value := <-distributor
        receiver <- aSlowProcess(value)
    }()
    //-
    wg.Add(1)
    go func() {
        defer wg.Done()
        value := <-distributor
        receiver <- aSlowProcess(value)
    }()

    // we can now write the data for processing....
    distributor <- "#1"
    distributor <- "#2"
    distributor <- "#3"

    /// ... read the output values
    err1 := <-receiver
    err2 := <-receiver
    err3 := <-receiver

    //....and wait for routines completion....
    wg.Wait()

    log.Printf("processInParallel3 spent %v
", time.Now().Sub(now))

    // finally check for errors
    if err1 != nil {
        return err1
    }
    if err2 != nil {
        return err2
    }
    if err3 != nil {
        return err3
    }

    return nil
}