多个功能的数据处理是异步的

I have data that I receive via http, this data needs to be processed by two different functions. It is important that they are processed by each function in sequence. In the file, for example: 1,2,3,4,5. And the database also recorded 1,2,3,4,5. As a fifo model. Now I have such a problem ... The data I have is running continuously and sometimes the database can fulfill my request to update the data quite a long time, because of this I can not update the file in a timely manner. It is important for me that the data has been added to the file or database when it is possible. I could use buffered channels, but I can not know how much data can wait for processing in the queue, I would not want to indicate the size of the buffer is certainly large. I tried adding more goroutine to the NewData function, but in that case my data is not written sequentially.

this code shows the problem.

package main

import (
    "fmt"
    "time"
)

type procHandler interface {
    Start()
    NewData(newdata []byte)
}

type fileWriter struct {
    Data chan []byte
}

func (proc *fileWriter) Start() {
    proc.Data = make(chan []byte)
    go func() {
        for {
            obj := <-proc.Data

            fmt.Printf("proc %T ", proc)
            fmt.Println(obj)
        }
    }()
}

func (proc *fileWriter) NewData(newdata []byte) {
    proc.Data <- newdata
}

type sqlWriter struct {
    Data chan []byte
}

func (proc *sqlWriter) Start() {
    proc.Data = make(chan []byte)
    go func() {
        for {
            obj := <-proc.Data
            time.Sleep(5 * time.Second)
            fmt.Printf("proc %T ", proc)
            fmt.Println(obj)
        }
    }()
}

func (proc *sqlWriter) NewData(newdata []byte) {
    proc.Data <- newdata
}

var processors = []procHandler{}

func receiver() {
    newDataImitateByteRange := 30
    for i := 0; i < newDataImitateByteRange; i++ {
        pseudoData := []byte{byte(i)}

        for _, handler := range processors {
            handler.NewData(pseudoData)
        }
    }
}

func main() {
    // file writer
    fileUpdate := &fileWriter{}
    processors = append(processors, fileUpdate)

    // sql writer
    sqlUpdate := &sqlWriter{}
    processors = append(processors, sqlUpdate)

    sqlUpdate.Start()
    fileUpdate.Start()

    go receiver()

    fmt.Scanln()
}

Code works: https://play.golang.org/p/rSshsJYZ4h

output:

proc *main.fileWriter [0]
proc *main.fileWriter [1]
proc *main.sqlWriter [0] (sleep)
proc *main.fileWriter [2] (Display after 5 seconds when the previous channel is processed)
proc *main.sqlWriter [1] (sleep)
proc *main.fileWriter [3] (Display after 5 seconds when the previous channel is processed)
proc *main.sqlWriter [2]
proc *main.fileWriter [4]
proc *main.sqlWriter [3]
proc *main.fileWriter [5]
proc *main.sqlWriter [4]
proc *main.fileWriter [6]

I want:

proc *main.fileWriter [0]
proc *main.fileWriter [1]
proc *main.fileWriter [2]
proc *main.fileWriter [3]
proc *main.fileWriter [4]
proc *main.fileWriter [5]
proc *main.fileWriter [6]
proc *main.sqlWriter [0] (after 5 seconds passed the handler started execution.)
proc *main.sqlWriter [1] (sleep)
proc *main.sqlWriter [2] (sleep)
proc *main.sqlWriter [3] (sleep)
proc *main.sqlWriter [4] (sleep)
proc *main.sqlWriter [5] (sleep)
proc *main.sqlWriter [6] (sleep)

I hope for help, thank you!

It sounds like what you are looking for is something that works like a channel that resizes (grows or shrinks) with the data that is enqueued on it. This could be implemented by having a queue between an input and output channel, with a goroutine to service those channels. Here is such a solution: https://github.com/gammazero/bigchan#bigchan

I have used a BigChan as the Data channel in your fileWriter and sqlWriter and it appears to have the results you are looking for. Following is your reworked code:

package main

import (
    "fmt"
    "time"

    "github.com/gammazero/bigchan"
)

// Maximum number of items to buffer.  set to -1 for unlimited.
const limit = 65536

type procHandler interface {
    Start()
    NewData(newdata []byte)
}

type fileWriter struct {
    Data *bigchan.BigChan
}

func (proc *fileWriter) Start() {
    proc.Data = bigchan.New(limit)
    go func() {
        for {
            _obj := <-proc.Data.Out()
            obj := _obj.([]byte)

            fmt.Printf("proc %T ", proc)
            fmt.Println(obj)
        }
    }()
}

func (proc *fileWriter) NewData(newdata []byte) {
    proc.Data.In() <- newdata
}

type sqlWriter struct {
    Data *bigchan.BigChan
}

func (proc *sqlWriter) Start() {
    proc.Data = bigchan.New(limit)

    go func() {
        for {
            _obj := <-proc.Data.Out()
            obj := _obj.([]byte)
            time.Sleep(5 * time.Second)
            fmt.Printf("proc %T ", proc)
            fmt.Println(obj)
        }
    }()
}
func (proc *sqlWriter) NewData(newdata []byte) {
    proc.Data.In() <- newdata
}

var processors = []procHandler{}

func receiver() {
    newDataImitateByteRange := 30
    for i := 0; i < newDataImitateByteRange; i++ {
        pseudoData := []byte{byte(i)}

        for _, handler := range processors {
            handler.NewData(pseudoData)
        }
    }
}

func main() {
    // file writer
    fileUpdate := &fileWriter{}
    processors = append(processors, fileUpdate)

    // sql writer
    sqlUpdate := &sqlWriter{}
    processors = append(processors, sqlUpdate)

    sqlUpdate.Start()
    fileUpdate.Start()

    go receiver()

    fmt.Scanln()
}