开始-等待优先级队列中的下一个项目(如果为空)

I am trying to implement a priority queue to send json objects through a network socket based on priority. I am using the container/heap package to implement the queue. I came up with something like this:

for {
    if pq.Len() > 0 {
        item := heap.Pop(&pq).(*Item)
        jsonEncoder.Encode(&item)
    } else {
        time.Sleep(10 * time.Millisecond)
    }
}

Are there better ways to wait for a new item than just polling the priority queue?

I'd probably use a couple a queuing goroutine. Starting with the data structures in the PriorityQueue example, I'd build a function like this:

http://play.golang.org/p/hcNFX8ehBW

func queue(in <-chan *Item, out chan<- *Item) {
    // Make us a queue!
    pq := make(PriorityQueue, 0)
    heap.Init(&pq)

    var currentItem *Item       // Our item "in hand"
    var currentIn = in          // Current input channel (may be nil sometimes)
    var currentOut chan<- *Item // Current output channel (starts nil until we have something)

    defer close(out)

    for {
        select {
        // Read from the input
        case item, ok := <-currentIn:
            if !ok {
                // The input has been closed. Don't keep trying to read it
                currentIn = nil
                // If there's nothing pending to write, we're done
                if currentItem == nil {
                    return
                }
                continue
            }

            // Were we holding something to write? Put it back.
            if currentItem != nil {
                heap.Push(&pq, currentItem)
            }

            // Put our new thing on the queue
            heap.Push(&pq, item)

            // Turn on the output queue if it's not turned on
            currentOut = out

            // Grab our best item. We know there's at least one. We just put it there.
            currentItem = heap.Pop(&pq).(*Item)

            // Write to the output
        case currentOut <- currentItem:
            // OK, we wrote. Is there anything else?
            if len(pq) > 0 {
                // Hold onto it for next time
                currentItem = heap.Pop(&pq).(*Item)
            } else {
                // Oh well, nothing to write. Is the input stream done?
                if currentIn == nil {
                    // Then we're done
                    return
                }

                // Otherwise, turn off the output stream for now.
                currentItem = nil
                currentOut = nil
            }
        }
    }
}

Here's an example of using it:

func main() {
    // Some items and their priorities.
    items := map[string]int{
        "banana": 3, "apple": 2, "pear": 4,
    }

    in := make(chan *Item, 10) // Big input buffer and unbuffered output should give best sort ordering.
    out := make(chan *Item)    // But the system will "work" for any particular values

    // Start the queuing engine!
    go queue(in, out)

    // Stick some stuff on in another goroutine
    go func() {
        i := 0
        for value, priority := range items {
            in <- &Item{
                value:    value,
                priority: priority,
                index:    i,
            }
            i++
        }
        close(in)
    }()

    // Read the results
    for item := range out {
        fmt.Printf("%.2d:%s ", item.priority, item.value)
    }
    fmt.Println()
}

Note that if you run this example, the order will be a little different every time. That's of course expected. It depends on exactly how fast the input and output channels run.

One way would be to use sync.Cond:

Cond implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.

An example from the package could be amended as follows (for the consumer):

c.L.Lock()
for heap.Len() == 0 {
    c.Wait() // Will wait until signalled by pushing routine
}
item := heap.Pop(&pq).(*Item)
c.L.Unlock()
// Do stuff with the item

And producer could simply do:

c.L.Lock()
heap.Push(x)
c.L.Unlock()
c.Signal()

(Wrapping these in functions and using defers might be a good idea.)

Here is an example of thread-safe (naive) heap which pop method waits until item is available:

package main

import (
    "fmt"
    "sort"
    "sync"
    "time"
    "math/rand"
)

type Heap struct {
    b []int
    c *sync.Cond
}

func NewHeap() *Heap {
    return &Heap{c: sync.NewCond(new(sync.Mutex))}
}

// Pop (waits until anything available)
func (h *Heap) Pop() int {
    h.c.L.Lock()
    defer h.c.L.Unlock()
    for len(h.b) == 0 {
        h.c.Wait()
    }
    // There is definitely something in there
    x := h.b[len(h.b)-1]
    h.b = h.b[:len(h.b)-1]
    return x
}

func (h *Heap) Push(x int) {
    defer h.c.Signal() // will wake up a popper
    h.c.L.Lock()
    defer h.c.L.Unlock()
    // Add and sort to maintain priority (not really how the heap works)
    h.b = append(h.b, x)
    sort.Ints(h.b)
}

func main() {
    heap := NewHeap()

    go func() {
        for range time.Tick(time.Second) {
            for n := 0; n < 3; n++ {
                x := rand.Intn(100)
                fmt.Println("push:", x)
                heap.Push(x)
            }
        }
    }()

    for {
        item := heap.Pop()
        fmt.Println("pop: ", item)
    }
}

(Note this is not working in playground because of the for range time.Tick loop. Run it locally.)