/**
* 通过channel实现并发信息的累计
* 这里单独开一个endTaskNumChan是考虑到可能会有多种信息的累计,比如成功信息,失败信息
*/
package main
import (
"fmt"
"time"
)
var (
msgs string //信息
msgChan chan string //单条信息通道
endTaskNumChan chan bool //完成任务通知
endTaskNum int //已完成任务数
)
func main() {
//因为考虑到其他地方如果循环调用,这里需要重置一下
msgs = ""
msgChan = make(chan string)
endTaskNumChan = make(chan bool)
endTaskNum = 0
for i := 0; i < 5; i++ {
go addMsg(i)
}
L:
for {
select {
case msg, ok := <-msgChan: //获取单条信息
if ok {
msgs = fmt.Sprint(msgs, msg)
}
case <-endTaskNumChan: //获取处理完通知
endTaskNum++
if endTaskNum == 5 { //如果已完成任务等于总任务(退出这里是否有更好的办法??)
close(msgChan)
break L
}
}
}
fmt.Println("msgs", msgs)
}
func addMsg(i int) {
defer func() {
endTaskNumChan <- true
}()
time.Sleep(1 * time.Second)
msgChan <- fmt.Sprint("当前是:", i, "\n")
}
直接用WaitGroup等待各个子任务都完成。进行结束处理
源程序输出:
msgs 当前是:2
当前是:1
当前是:4
当前是:0
当前是:3
代码:使用工作池做缓冲,锁保证msg存入到msgs中。
package main
import (
"fmt"
"sync"
"time"
)
var (
msgs string //信息
msgChan chan bool //单条信息通道
endTaskNum int //已完成任务数
mt sync.Mutex //锁
)
func main() {
msgs = ""
endTaskNum = 5
msgChan = make(chan bool, 1)
var wg sync.WaitGroup
for i := 0; i < endTaskNum; i++ {
wg.Add(1)
go addMsg(i, &wg)
}
wg.Wait()
fmt.Println("msgs", msgs)
}
func addMsg(i int, wg *sync.WaitGroup) {
defer func() {
wg.Done()
}()
time.Sleep(1 * time.Second)
msg := fmt.Sprint("当前是:", i, "\n")
mt.Lock()
msgs = fmt.Sprint(msgs, msg)
mt.Unlock()
}