为什么基于通道的锁定块?

Hi i'm writing a Lock using channel, which aims to Lock/Unlock an operation for a given 'app'.

The general idea is that, one goroutine keeps listening on two channels: lockCh and unlockCh. Any Lock() operation sends a self-made channel to lockCh, and waits for reading from that self-made channel, done from reading this channel means Lock() succeed.

Similar process applies to Unlock().

To the listener gorouting, it checks if an 'app' is already locked when accepting a Lock(), if so, it puts that self-made channel to the tail of waiters-list. If somebody Unlock(), it wakes up (by sending message to channel) next waiter or delete the waiters-list if nobody else is waiting for the lock.

Code is put at below, I don't know where is wrong but the test cases just don't pass (it blocks after several Lock() and Unlock()!)

Thanks for giving me some advice.

type receiver struct {
            app  string
            ch   chan struct{}
            next *receiver
}

type receiveList struct {
            head *receiver
            tail *receiver
}

type AppLock struct {
            lockCh   chan receiver
            unlockCh chan receiver

            // Consider lock x:
            // if map[x] doesn't exist, x is unlocked
            // if map[x] exist but head is nil, x is locked but no waiter
            // if map[x] exist and head isn't nil, x is locked and there're waiters
            m map[string]receiveList
}

func NewAppLock() *AppLock {
            l := new(AppLock)
            l.lockCh = make(chan receiver)
            l.unlockCh = make(chan receiver)
            l.m = make(map[string]receiveList)

            go l.lockRoutine()
            return l
}

func (l *AppLock) Lock(app string) {
            ch := make(chan struct{})
            l.lockCh <- receiver{
                app: app,
                ch:  ch,
            }
            <-ch
}

func (l *AppLock) Unlock(app string) {
            ch := make(chan struct{})
            l.unlockCh <- receiver{
                app: app,
                ch:  ch,
            }
            <-ch
}

func (l *AppLock) lockRoutine() {
            for {
                select {
                case r := <-l.lockCh:
                    rlist, ok := l.m[r.app]
                    if ok { // already locked
                        if rlist.head == nil { // no waiter
                            rlist.head = &r
                            rlist.tail = &r
                        } else { // there're waiters, wait in queue
                            rlist.tail.next = &r
                            rlist.tail = &r
                        }
                    } else { // unlocked
                        rlist = receiveList{}
                        l.m[r.app] = rlist
                        r.ch <- struct{}{}
                    }
                case r := <-l.unlockCh:
                    rlist, ok := l.m[r.app]
                    if ok {
                        if rlist.head == nil { // no waiter
                            delete(l.m, r.app)
                            r.ch <- struct{}{}
                        } else { // there're waiters
                            candidate := rlist.head
                            if rlist.head == rlist.tail {
                                rlist.head = nil
                                rlist.tail = nil
                            } else {
                                rlist.head = rlist.head.next
                            }
                            candidate.ch <- struct{}{}
                            r.ch <- struct{}{}
                        }
                    } else {
                        panic("AppLock: inconsistent lock state")
                    }
                }
            }
}

Test:

func main() {
        app := "APP"
        appLock := NewAppLock()
        c := make(chan bool)

        for i := 0; i < 10; i++ {
            go func(l *AppLock, loops int, cdone chan bool) {
                for i := 0; i < loops; i++ {
                    l.Lock(app)
                    l.Unlock(app)
                }
                cdone <- true
            }(appLock, 1, c)
        }

        for i := 0; i < 10; i++ {
            <-c
        }

    fmt.Println("DONE")
}

I just found the mistake in my code.

type AppLock struct {
    lockCh   chan receiver
    unlockCh chan receiver

    // Consider lock x:
    // if map[x] doesn't exist, x is unlocked
    // if map[x] exist but head is nil, x is locked but no waiter
    // if map[x] exist and head isn't nil, x is locked and there're waiters
    m map[string]receiveList
}

Originally, I thought head and tail inside receiveList are all pointers, so we can always operate on the same waiters-list even though the receiveList isn't a Pointer type. (Obviously it's wrong)

Indeed, it's okay if we only read from head and tail without using Pointer type for receiveList. However, I do write to them inside the lockRoutine, which is writing to a copy of them. That's the problem.

I think the problem is with your main for loop. You are not closing the channel c anywhere that's causing the deadlock. Try changing the loop to this:

for i := 0; i < 10; i++ {
    var wg sync.WaitGroup // use WaitGroup
    wg.Add(1)
    go func(l *AppLock, loops int, cdone chan bool) {
        defer wg.Done()
        for i := 0; i < loops; i++ {
            l.Lock(app)
            l.Unlock(app)
        }
        cdone <- true
    }(appLock, 1, c)
    go func() {
        wg.Wait()
        close(c)
    }()
}

Working code: https://play.golang.org/p/cbXj0CPTGO

Although this should solve the deadlock problem, that doesn't necessarily mean the behavior of the program is correct. You'll need better tests to work that out.

Do you need to implement the state machine containing the linked list? There may be a much simpler solution using Go's normal channel contention to provide the queue.

So, clients are queued up to access an unbuffered channel. The locking service simply waits for a client. When one arrives, the lock service goes into a locked state. Unlock is signalled using an unbuffered channel and the state returns to unlocked.

Whilst locked, the locking service ignores subsequent requests to gain the lock. Those clients have to wait (the Go runtime provides a queue for this). Each time it is unlocked, the service can then take the next client.

type AppLock struct {
    lockCh   chan struct{}
    unlockCh chan struct{}
}

func NewAppLock() *AppLock {
    l := new(AppLock)
    l.lockCh = make(chan struct{})
    l.unlockCh = make(chan struct{})
    go l.lockRoutine()
    return l
}

func (l *AppLock) lockRoutine() {
    for {
        // take the next client off the channel
        <- l.lockCh
        // we are now in a locked state; wait until the client has released
        // which it signals via the channel c
        <- l.unlockCh
    }
}

func (l *AppLock) Lock() {
    // join the queue for a lock
    l.lockCh <- struct{}{}
}

func (l *AppLock) Unlock() {
    l.unlockCh <- struct{}{}
}

And the calling loop contains

        for i := 0; i < loops; i++ {
            l.Lock()
            l.Unlock()
        }

It is important that the channels are unbuffered because the clients must wait for completed rendezvous before they know they have acquired (for l.Lock()) or released (for l.Unlock()) the lock.

https://play.golang.org/p/2j4lkeOR_s

PS

I can't help wondering whether you just need RWMutex (https://golang.org/pkg/sync/#RWMutex)

There's a much much simpler solution to this. A single-slot buffered channel already functions as a simple locker. No need for a manager routine or a linked list queue:

// AppLock maintains a single exclusive lock for each app name string provided
type AppLock struct {
    // Consider lock x:
    // if map[x] doesn't exist, x is implicitly unlocked
    // if map[x] exist and can be read from, x is unlocked
    // if map[x] exist and blocks on read, x is locked
    m map[string]chan struct{}

    // RWMutex to protect the map writes, but allow free read access
    mut sync.RWMutex
}

func NewAppLock() *AppLock {
    l := new(AppLock)
    l.m = make(map[string]chan struct{})
    return l
}

func (l *AppLock) Lock(app string) {
    l.mut.RLock()
    ch, ok := l.m[app]
    l.mut.RUnlock()

    if ok {
        <-ch // blocks until lock acquired
    } else {
        l.mut.Lock()
        l.m[app] = make(chan struct{}, 1)
        // we don't want to put anything on it yet, because we still want the lock
        l.mut.Unlock()
    }
}

func (l *AppLock) Unlock(app string) {
    l.mut.RLock()
    ch, ok := l.m[app]
    l.mut.RUnlock()

    if ok {
        select {
        case ch <- struct{}{}:
            // do nothing, expected behavior
        default:
            // something already on channel, which means it wasn't locked
            panic("Unlock called on app not previously locked")
        }
    } else {
        panic("Unlock called on app not previously locked")
    }
}

https://play.golang.org/p/o02BHfRto3

This maintains a single channel, buffered 1, for each app. To lock, you attempt to read from the channel. If you receive something (which is just a generic token representing the lock), you got the lock, and you put the token back on the channel to unlock it. If the receive blocks, something else has the lock, and the receive will unblock when that other routine unlocks (puts the token back on the channel).

Incidentally, this also still functions as a first-in first-out system, as Go channels are ordered and will fill the receive that started receiving earliest (amongst those still trying to receive).