I'm trying to upload a directory into Amazon S3 bucket. However, the only way to upload a directory is to iterate through all the files inside the directory and upload them one by one.
I'm using Go to iterate over the files in directory. However, for each file I iterate through, I want to spin off a goroutine that uploads the file while the main thread iterates through the next element in the directory and spins off another goroutine to upload the same.
Any idea on how I can upload all the files in the directory parallelly using Goroutines and Channels?
Revised code snippet that implements a goroutine and a channel to upload files in parallel. But I'm not sure if this is the right implementation.
func uploadDirToS3(dir string, svc *s3.S3) {
fileList := []string{}
filepath.Walk(dir, func(path string, f os.FileInfo, err error) error {
fmt.Println("PATH ==> " + path)
fileList = append(fileList, path)
return nil
})
for _, pathOfFile := range fileList[1:] {
channel := make(chan bool)
go uploadFiletoS3(pathOfFile, svc, channel)
<-channel
}
}
func uploadFiletoS3(path string, svc *s3.S3, channel chan bool) {
file, err := os.Open(path)
if err != nil {
fmt.Println(err)
}
defer file.Close()
fileInfo, _ := file.Stat()
size := fileInfo.Size()
buffer := make([]byte, size)
file.Read(buffer)
fileBytes := bytes.NewReader(buffer)
fileType := http.DetectContentType(buffer)
s3Path := file.Name()
params := &s3.PutObjectInput{
Bucket: aws.String("name-of-bucket"),
Key: aws.String(s3Path),
Body: fileBytes,
ContentLength: aws.Int64(size),
ContentType: aws.String(fileType),
}
resp, err := svc.PutObject(params)
if err != nil {
fmt.Println(err)
}
fmt.Printf("response %s", awsutil.StringValue(resp))
close(channel)
}
Any ideas on how I could implement this better? I've looked into WaitGroups but for some reason, I found Channels much easier to understand and implement in this situation.
So, you are looking for concurrency, which is rooted in go
instruction. For synchronization between started in loop goroutine, you can use chanels
OR sync.WaitGroup
. The second option is a little bit easier to do. Also you have to refactor your function and move internal for
logic into a separate function.
func uploadDirToS3(dir string, svc *s3.S3) {
fileList := []string{}
filepath.Walk(dir, func(path string, f os.FileInfo, err error) error {
fileList = append(fileList, path)
return nil
})
var wg sync.WaitGroup
wg.Add(len(fileList))
for _, pathOfFile := range fileList[1:] {
//maybe spin off a goroutine here??
go putInS3(pathOfFile, svc, &wg)
}
wg.Wait()
}
func putInS3(pathOfFile string, svc *s3.S3, wg *sync.WaitGroup) {
defer func() {
wg.Done()
}()
file, _ := os.Open(pathOfFile)
defer file.Close()
fileInfo, _ := file.Stat()
size := fileInfo.Size()
buffer := make([]byte, size)
file.Read(buffer)
fileBytes := bytes.NewReader(buffer)
fileType := http.DetectContentType(buffer)
path := file.Name()
params := &s3.PutObjectInput{
Bucket: aws.String("bucket-name"),
Key: aws.String(path),
Body: fileBytes,
ContentLength: aws.Int64(size),
ContentType: aws.String(fileType),
}
resp, _ := svc.PutObject(params)
fmt.Printf("response %s", awsutil.StringValue(resp))
}
Following does not strictly speaking answer the OP, however its an attempt to introduce parallel processing using the go language.
hope this helps.
package main
import (
"log"
"sync"
"time"
)
func main() {
// processInSync()
// The processing takes up to 3seconds,
// it displays all the output and handles errors.
// processInParallel1()
// The processing takes up to few microseconds,
// it displays some of the output and does not handle errors.
// It is super fast, but incorrect.
// processInParallel2()
// The processing takes up to 1s,
// It correctly displays all the output,
// But it does not yet handle return values.
processInParallel3()
// The processing takes up to 1s,
// It correctly displays all the output,
// and it is able to return the first error encountered.
// This merely just an introduction to what you are able to do.
// More examples are required to explains the subtletlies of channels
// to implement unbound work processing.
// I leave that as an exercise to the reader.
// For more information and explanations about channels,
// Read The Friendly Manual and the tons of examples
// we left on the internet.
// https://golang.org/doc/effective_go.html#concurrency
// https://gobyexample.com/channels
// https://gobyexample.com/closing-channels
}
func aSlowProcess(name string) error {
log.Println("aSlowProcess ", name)
<-time.After(time.Second)
return nil
}
//processInSync a dummy function calling a slow function one after the other.
func processInSync() error {
now := time.Now()
// it calls the slow process three time,
// one after the other;
// If an error is returned, returns asap.
if err := aSlowProcess("#1"); err != nil {
return err
}
if err := aSlowProcess("#2"); err != nil {
return err
}
if err := aSlowProcess("#3"); err != nil {
return err
}
// This is a sync process because it does not involve
// extra synchronisation mechanism.
log.Printf("processInSync spent %v
", time.Now().Sub(now))
return nil
}
// processInParallel1 implements parallel processing example.
// it is not yet a fully working example, to keep it simple,
// it only implements the sending part of the processing.
func processInParallel1() error {
now := time.Now()
// We want to execute those function calls in parallel
// for that we use the go keyword which allows to run the function
// into a separate routine/process/thread.
// It is called async because the main thread and the
// the new routines requires to be synchronized.
// To synchronize two independant routine we must use
// atomic (race free) operators.
// A channel is an atomic operator because it is safe to
// read and write from it from multiple parallel
// and independant routines.
// before we implement such processing, we must ask ourselve
// what is the input i need to distribute among routines,
// and what are the values i want to get from those routines.
// lets create a channel of string to distribute the input to multiple
// independant workers.
distributor := make(chan string)
// The input channel MUST be read from the new routines.
// We create three workers of slow process, reading and processing.
go func() {
value := <-distributor
aSlowProcess(value)
}()
go func() {
value := <-distributor
aSlowProcess(value)
}()
go func() {
value := <-distributor
aSlowProcess(value)
}()
// we must now write the values into the distributor
// so that each worker can read and process data.
distributor <- "#1"
distributor <- "#2"
distributor <- "#3"
log.Printf("processInParallel1 spent %v
", time.Now().Sub(now))
return nil
}
// processInParallel2 implements parallel processing example.
// it is not yet a fully working example, to keep it simple,
// it implements the sending part of the processing,
// and the synchronization mechanism to wait for all workers
// to finish before returning.
func processInParallel2() error {
now := time.Now()
// We saw in the previous example how to send values and process
// them in parallel, however, that function was not able to wait for
// those async process to finish before returning.
// To implement such synchronization mechanism
// where the main thread waits for all workers to finish
// before returning we need to use the sync package.
// It provides the best pattern to handle that requirements.
// In addition to the previous example we now instantiate a
// WaitGroup https://golang.org/pkg/sync/#WaitGroup
// The purpose of the wait group is to record a number
// of async jobs to process and wait for them to finish.
var wg sync.WaitGroup
distributor := make(chan string)
// Because we have three workers, we add three to the group.
wg.Add(1)
go func() {
// Then we make sure that we signal to the waitgroup
// that the process is done.
defer wg.Done()
value := <-distributor
aSlowProcess(value)
}()
//-
wg.Add(1)
go func() {
defer wg.Done() // as an exercise, comment this line
// and inspect the output of your program.
value := <-distributor
aSlowProcess(value)
}()
//-
wg.Add(1)
go func() {
defer wg.Done()
value := <-distributor
aSlowProcess(value)
}()
// we can now write the data for processing....
distributor <- "#1"
distributor <- "#2"
distributor <- "#3"
//....and wait for their completion
wg.Wait()
log.Printf("processInParallel2 spent %v
", time.Now().Sub(now))
return nil
}
// processInParallel3 implements parallel processing example.
// It is a fully working example that distribute jobs,
// wait for completion and catch for return values.
func processInParallel3() error {
now := time.Now()
var wg sync.WaitGroup
distributor := make(chan string)
// To catch for return values we must implement a
// way for output values to safely reach the main thread.
// We create a channel of errors for that purpose.
receiver := make(chan error)
// As previsouly we start the workers, and attach them to a waitgroup.
wg.Add(1)
go func() {
defer wg.Done()
value := <-distributor
err := aSlowProcess(value)
// to return the value we write on the output channel.
receiver <- err
}()
//-
wg.Add(1)
go func() {
defer wg.Done()
value := <-distributor
receiver <- aSlowProcess(value)
}()
//-
wg.Add(1)
go func() {
defer wg.Done()
value := <-distributor
receiver <- aSlowProcess(value)
}()
// we can now write the data for processing....
distributor <- "#1"
distributor <- "#2"
distributor <- "#3"
/// ... read the output values
err1 := <-receiver
err2 := <-receiver
err3 := <-receiver
//....and wait for routines completion....
wg.Wait()
log.Printf("processInParallel3 spent %v
", time.Now().Sub(now))
// finally check for errors
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
if err3 != nil {
return err3
}
return nil
}