如何将多个goroutine同步到所选goroutine的终止(即Thread.join())

I asked this in a previous question, but some people felt that my original question was not detailed enough ("Why would you ever want a timed condition wait??") so here is a more specific one.

I have a goroutine running, call it server. It is already started, will execute for some amount of time, and do its thing. Then, it will exit since it is done.

During its execution some large number of other goroutines start. Call them "client" threads if you like. They run step A, and step B. Then, they must wait for the "server" goroutine to finish for a specified amount of time, and exit with status if "server is not finished, and say run step C if it finishes.

(Please do not tell me how to restructure this workflow. It is hypothetical and a given. It cannot be changed.)

A normal, sane way to do this is to have the server thread signal a condition variable with a selectAll or Broadcast function, and have the other threads in a timed wait state monitoring the condition variable.

func (s *Server) Join(timeMillis int) error {
  s.mux.Lock()
  defer s.mux.Unlock()
  while !s.isFinished {
     err = s.cond.Wait(timeMillis)
     if err != nil {
        stepC()
     }
  }
  return err
}

Where the server will enter a state where isFinished becomes true and broadcast signal the condition variable atomically with respect to the mutex. Except this is impoosible, since Go does not support timed condition waits. (But there is a Broadcast())

So, what is the "Go-centric" way to do this? I've reall all of the Go blogs and documentation, and this pattern or its equivalent, despite its obviousness, never comes up, nor any equivalent "reframing" of the basic problem - which is that IPC style channels are between one routine and one other routine. Yes, there is fan-in/fan-out, but remember these threads are constantly appearing and vanishing. This should be simple - and crucially /not leave thousands of "wait-state" goroutines hanging around waiting for the server to die when the other "branch" of the mux channel (the timer) has signalled/.

Note that some of the "client" above might be started before the server goroutine has started (this is when channel is usually created), some might appear during, and some might appear after... in all cases they should run stepC if and only if the server has run and exited after timeMillis milliseconds post entering the Join() function...

In general the channels facility seems sorely lacking when there's more than one consumer. "First build a registry of channels to which listeners are mapped" and "there's this really nifty recursive data structure which sends itself over a channel it holds as field" are so.not.ok as replacements to the nice, reliable, friendly, obvious: wait(forSomeTime)

I think what you want can be done by selecting on a single shared channel, and then having the server close it when it's done.

Say we create a global "Exit channel", that's shared across all goroutines. It can be created before the "server" goroutine is created. The important part is that the server goroutine never sends anything down the channel, but simply closes it.

Now the client goroutines, simply do this:

select {
    case <- ch:
    fmt.Println("Channel closed, server is done!")
    case <-time.After(time.Second):
    fmt.Println("Timed out. do recovery stuff")

}

and the server goroutine just does:

close(ch)

More complete example:

package main

import(
    "fmt"
    "time"

)


func waiter(ch chan struct{}) {
    fmt.Println("Doing stuff")

    fmt.Println("Waiting...")

    select {
        case <- ch:
        fmt.Println("Channel closed")
        case <-time.After(time.Second):
        fmt.Println("Timed out. do recovery stuff")

    }
}


func main(){

    ch := make(chan struct{})

    go waiter(ch)
    go waiter(ch)
    time.Sleep(100*time.Millisecond)
    fmt.Println("Closing channel")
    close(ch)

    time.Sleep(time.Second)

}

This can be abstracted as the following utility API:

type TimedCondition struct {
    ch chan struct{}
}

func NewTimedCondition()*TimedCondition {
    return &TimedCondition {
        ch: make(chan struct{}),
    }
}

func (c *TimedCondition)Broadcast() {
    close(c.ch)
}

func (c *TimedCondition)Wait(t time.Duration) error {
    select {
        // channel closed, meaning broadcast was called
        case <- c.ch:
            return nil
        case <-time.After(t):
            return errors.New("Time out")   
    }
}