具有多个通道的串行数据的异步队列

There was a problem ... I have three structures that implement one interface. I have a function that processes the structure data, but it happens that the processing of the data for some structure can take a few seconds. Data must be processed sequentially.

I did it this way:

type dater struct {
    value int
}

type waiter struct {
    data chan *dater
}

func (dat *waiter) conf(wait bool) {
    go func() {
        for {
            obj := <-dat.data
            if wait {
                time.Sleep(5 * time.Second)
            }
            fmt.Println("WAIT", wait, *obj)
        }
    }()
}

func (dat *waiter) gowrite(w *dater) {
    dat.data <- w
}

func main() {
    waiters := []*waiter{}

    first := &waiter{data: make(chan *dater)}
    first.conf(false)
    waiters = append(waiters, first)

    second := &waiter{data: make(chan *dater)}
    second.conf(true)
    waiters = append(waiters, second)

    for i := 0; i < 30; i++ {
        for _, wait := range waiters {
            wait.gowrite(&dater{value: i})
        }
    }
}

output:

WAIT false {0}
WAIT false {1}
WAIT true {0} (SLEEP 5 sec)
WAIT false {2} (will appear after 5 seconds)
WAIT true {1}
WAIT false {3}
WAIT true {2}
WAIT false {4}
WAIT true {3}
WAIT false {5}

I want:

WAIT false {0}
WAIT false {1}
WAIT false {2}
WAIT false {3}
WAIT false {4}
WAIT false {5}
WAIT true {0} (5 seconds have passed and we show the first message)
WAIT true {1} (This message will appear after 5 seconds)
WAIT true {2}
WAIT true {3}

The "gowrite" function of the "second" structure waits until the channel of the structure "first" receives the data. But it's important for me that they process the data asynchronously without blocking each other. In this case, all channels receive the correct data.

It seems you need two separate buffered channels, like so:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    wg.Add(2)
    go first()
    go second()
    for i := 1; i < 10; i++ {
        a <- i // send to first
        b <- i // send to second
    }
    close(a)
    close(b)
    wg.Wait()
}

func first() {
    for {
        d, ok := <-a
        time.Sleep(100 * time.Millisecond)
        if !ok {
            break
        }
        fmt.Println("first job:", d) // do first job here.
    }
    wg.Done()
}

func second() {
    for {
        d, ok := <-b
        time.Sleep(300 * time.Millisecond)
        if !ok {
            break
        }
        fmt.Println("second job:", d) // do second job here.
    }
    wg.Done()
}

var a = make(chan int, 1000)
var b = make(chan int, 1000)
var wg sync.WaitGroup

output:

first job: 1
first job: 2
second job: 1
first job: 3
first job: 4
first job: 5
second job: 2
first job: 6
first job: 7
first job: 8
second job: 3
first job: 9
second job: 4
second job: 5
second job: 6
second job: 7
second job: 8
second job: 9