用多个生产者实现信号量(使用goroutines)

This has been the bane of my existence.

type ec2Params struct {
    sess *session.Session
    region string
}

type cloudwatchParams struct {
    cl cloudwatch.CloudWatch
    id string
    metric string
    region string
}

type request struct {
    ec2Params
    cloudwatchParams
}

// Control concurrency and sync
var maxRoutines = 128
var sem chan bool
var req chan request

func main() {
    sem := make(chan bool, maxRoutines)
    for i := 0; i < maxRoutines; i++ {
        sem <- true
    }
    req := make(chan request)
    go func() { // This is my the producer
        for _, arn := range arns {
            arnCreds := startSession(arn)
            for _, region := range regions {
                sess, err := session.NewSession(
                    &aws.Config{****})
                if err != nil {
                    failOnError(err, "Can't assume role")
                }
                req <- request{ec2Params: ec2Params{ **** }}
            }
        }
    }() 
    for f := range(req) {
        <- sem
        if (ec2Params{}) != f.ec2Params {
            go getEC2Metrics(****)
        } else {
            // I should be excercising this line of code too, 
            // but I'm not :(
            go getMetricFromCloudwatch(****) 
        }   
        sem <- true
    }
}

getEC2Metrics and getCloudwatchMetrics are the goroutines to execute

func getMetricFromCloudwatch(cl cloudwatch.CloudWatch, id, metric, region string) {
    // Magic
}

func getEC2Metrics(sess *session.Session, region string) {
    ec := ec2.New(sess)
    var ids []string
    l, err := ec.DescribeInstances(&ec2.DescribeInstancesInput{})
    if err != nil {
        fmt.Println(err.Error())
    } else {
        for _, rsv := range l.Reservations {
            for _, inst := range rsv.Instances {
                ids = append(ids, *inst.InstanceId)
            }
        }
        metrics := cfg.AWSMetric.Metric
        if len(ids) >= 0 {
            cl := cloudwatch.New(sess)
            for _, id := range ids{
                for _, metric := range metrics {
                    // For what I can tell, execution get stuck here
                    req <- request{ cloudwatchParams: ***** }}
                }
            }
        }
    }
}

Both the anonymous producer in main and getEC2Metrics should publish data to req asynchronically, but so far it seems like whatever getEC2Metrics is publishing to the channel is never processed. It looks like there is something stopping me from publishing from within a goroutine, but I haven't found anything. I would love to know how to go about this and to produce the indended behavior (This is, an actually working semaphore).

The base of the implementation can be found here: https://burke.libbey.me/conserving-file-descriptors-in-go/

Im frantic, JimB's comment made the wheel spin and now I've solved this!

// Control concurrency and sync
var maxRoutines = 128
var sem chan bool
var req chan request // Not reachable inside getEC2Metrics

func getEC2Metrics(sess *session.Session, region string, req chan <- request ) {
    ....
    ....
            for _, id := range ids{
                for _, metric := range metrics {
                    req <- request{ **** }} // When using the global req, 
                                            // this would block
                }
            }
    ....
    ....
}

func main() {
    sem := make(chan bool, maxRoutines)
    for i := 0; i < maxRoutines; i++ {
        sem <- true    
    }
    req := make(chan request)
    go func() {
        // Producing tasks
    }()
    for f := range(req) {
        <- sem // checking out tickets outside the goroutine does block 
               //outside of the goroutine
        go func() {
            defer func() { sem <- true }()
            if (ec2Params{}) != f.ec2Params {
                getEC2Metrics(****, req) // somehow sending the channel makes
                                         // possible to publish to it
            } else {
                getMetricFromCloudwatch(****)
            }
        }()
    }
}

There were two issues:

  1. The semaphore was not locking (I think it is because I was checking out and in tokens inside a goroutine, so there was a race condition probably).
  2. For some reason, The global channel req was not being addressed properly by getEC2Metrics, so it would leave alll the goroutines stuck while trying to publish to a channel that was apparently on scope, but it wasn't (I really don't know why yet).

I've honestly just had luck with the second item, so far I haven't found any docs regarding this quirk, but at the end I'm glad it's working.