如何使用etcd选举api

The new major version of etcd v3 has introduces new concurrency primitives. One of those is election.

The api lacks support to start a campaign and return the (other) winner, this means we need to query the leader. This makes it complex because now we have two concurrent paths, one running an campaign and the other monitoring leader changes. What is the best practise to synchronise those two if we want all the nodes to run for leader, and fallback to follower.

    go func() {
        if err := election.Campaign(this.ctx, this.Address); err != nil {
            this.log.Error("election campaign failed", zap.Error(err))
        }
    }()

    for {
        select {
        case response, ok := <-observations:
            if !ok {
                this.log.Warn("election observation channel closed")
                return
            }

            value := string(response.Kvs[0].Value)
            change := LeaderChanged{strings.EqualFold(value, this.Address), value}

            select {
            case changes <- change:
                // should we figure out if we are a leader at this point?
            case <-session.Done():
                // it might be that we lost leadership
            }
        }
    }

Currently the election API does not allow you todo this simply. However you can run the election code in a goroutine and query the result of the election by using the election.Leader() call.

go func() {
    if err := election.Campaign(this.ctx, this.Address); err != nil {
        this.log.Error("election campaign failed", zap.Error(err))
    }

    fmt.Printf("we are leader")

    for {
        select {
        case response, ok := <-observations:
            if !ok {
                this.log.Warn("election observation channel closed")
                return
            }

            value := string(response.Kvs[0].Value)
            change := LeaderChanged{strings.EqualFold(value, this.Address), value}

            select {
            case changes <- change:
                // should we figure out if we are a leader at this point?
            case <-session.Done():
                // it might be that we lost leadership
            }
        }
    }
}()

// Wait until we have a leader before continuing
for {
    resp, err := election.Leader(this.ctx)
    if err != nil {
        if err != concurrency.ErrElectionNoLeader {
            this.log.Warn("Leader returned error", err)
            return
        }
        time.Sleep(time.Millisecond * 300)
        continue
    }
    // Report if we are not leader
    if string(resp.Kvs[0].Value) != this.Address {
        fmt.Println("we are NOT leader")
    }
    break
}

A complete example of leader election with connection interruption and initial leadership reporting can be found here.