
本文探讨在go语言中如何高效地实现独立工作协程的并行执行与同步。通过分析一个常见场景,即主协程需要等待多个独立工作协程完成对同一数据项的处理后才能继续,文章详细介绍了使用go通道(channel)进行输入分发和输出同步的正确模式,并提供了代码示例和最佳实践,确保在固定协程数量下实现真正的并发处理。
Go语言中并行独立工作协程的同步模式
在Go语言中,利用其强大的并发原语——Goroutine和Channel,可以优雅地构建复杂的并发系统。然而,正确地编排这些并发任务以实现真正的并行并确保数据同步,是Go并发编程中的一个核心挑战。本文将深入探讨一种常见的并发场景:一个主协程需要将数据分发给多个独立的子工作协程进行处理,并且必须等待所有子工作协程完成处理后才能继续其自身流程。
问题场景描述
假设我们有一个account协程,它从account_chan接收数据项。对于每个接收到的数据项,account协程需要委托给两个独立的子工作协程workerA和workerB进行处理。这两个worker协程的处理顺序不重要,但account协程必须确保workerA和workerB都已完成对当前数据项的处理,才能将该数据项发送到final_chan并继续处理下一个数据项。
此场景有以下关键要求:
workerA和workerB是单例协程,即在程序生命周期内只启动一次。系统中并发运行的协程数量应保持恒定,避免为每个数据项创建新的协程。workerA和workerB是完全独立的,它们可以并且应该并发执行。
初始“非并发”实现分析
考虑以下一种“直观”但错误的实现方式:
立即学习“go语言免费学习笔记(深入)”;
package mainimport "fmt"func workerA(work_in_chan <-chan int, work_out_chan chan<- int) { for d := range work_in_chan { fmt.Println("A ", d) work_out_chan <- d // 模拟工作完成并发送信号 }}func workerB(work_in_chan <-chan int, work_out_chan chan<- int) { for d := range work_in_chan { fmt.Println("B ", d) work_out_chan <- d // 模拟工作完成并发送信号 }}func account(account_chan <-chan int, final_chan chan<- int) { wa_in := make(chan int) wa_out := make(chan int) wb_in := make(chan int) wb_out := make(chan int) go workerA(wa_in, wa_out) go workerB(wb_in, wb_out) for d := range account_chan { // 错误的实现方式:顺序执行 wa_in <- d // 发送数据给workerA <-wa_out // 等待workerA完成 wb_in <- d // 发送数据给workerB <-wb_out // 等待workerB完成 final_chan <- d }}func main() { account_chan := make(chan int, 100) final_chan := make(chan int, 100) go account(account_chan, final_chan) account_chan <- 1 account_chan <- 2 account_chan <- 3 close(account_chan) // 关闭输入通道,以便account协程最终退出 for i := 0; i < 3; i++ { fmt.Println("Final:", <-final_chan) }}
上述代码中的account协程在处理每个数据项时,首先将数据发送给workerA并立即等待其完成,然后才将数据发送给workerB并等待其完成。这种模式导致workerA和workerB实际上是顺序执行的,完全失去了并行处理的优势。这与我们希望它们并发执行的初衷相悖。
正确的并发模式:并行分发与同步等待
要实现workerA和workerB的真正并发,关键在于改变数据发送和完成信号接收的顺序。正确的做法是:首先将数据并行地发送给所有需要处理的子工作协程,然后并行地等待所有子工作协程的完成信号。
修改后的account协程中的循环逻辑如下:
// ... (workerA, workerB, channel声明部分同上)func account(account_chan <-chan int, final_chan chan<- int) { wa_in := make(chan int) wa_out := make(chan int) wb_in := make(chan int) wb_out := make(chan int) go workerA(wa_in, wa_out) go workerB(wb_in, wb_out) for d := range account_chan { // 正确的实现方式:并行发送输入,并行等待输出 wa_in <- d // 发送数据给workerA wb_in <- d // 发送数据给workerB (此时workerA和workerB可同时开始处理) <-wa_out // 等待workerA完成 <-wb_out // 等待workerB完成 (这两个接收操作会阻塞,直到两个worker都发送了信号) final_chan <- d } // 当account_chan关闭且所有数据处理完毕后,关闭worker的输入通道 // 这样worker协程也能优雅退出 close(wa_in) close(wb_in) // 等待worker协程退出,或者确保它们处理完所有数据 // 实际应用中可能需要更复杂的协调机制,例如WaitGroup close(wa_out) // 如果worker协程已退出,这些通道可能需要关闭 close(wb_out)}
代码解释:
wa_in
这种模式确保了workerA和workerB能够真正地并发执行。当一个数据项被发送给它们时,它们会同时开始处理。account协程则会在两个worker都发出完成信号后,才继续处理下一个数据项。
关于完成顺序的思考
初次接触这种模式时,可能会担心“如果workerB比workerA先完成怎么办?”。实际上,这并不重要。
替代方案与注意事项
在上述示例中,worker协程的work_out_chan实际上只用于发送一个完成信号,其发送的具体值在account协程中并未被使用。在这种情况下,sync.WaitGroup是一个更简洁且推荐的替代方案,特别是当工作协程不需要返回任何处理结果,仅需通知完成时。
使用 sync.WaitGroup 的示例:
package mainimport ( "fmt" "sync" "time" // 引入time包用于模拟耗时操作)func workerA_wg(work_in_chan <-chan int, wg *sync.WaitGroup) { defer wg.Done() // 确保无论如何都调用Done for d := range work_in_chan { fmt.Println("A ", d) time.Sleep(100 * time.Millisecond) // 模拟耗时 // workerA完成一个任务后,并不立即调用Done,而是在协程退出时调用一次 // 如果是每个任务完成后都要通知,则需要每次循环内调用Done,并增加Add计数 } fmt.Println("WorkerA exited.")}func workerB_wg(work_in_chan <-chan int, wg *sync.WaitGroup) { defer wg.Done() // 确保无论如何都调用Done for d := range work_in_chan { fmt.Println("B ", d) time.Sleep(150 * time.Millisecond) // 模拟耗时 } fmt.Println("WorkerB exited.")}func account_wg(account_chan <-chan int, final_chan chan<- int) { wa_in := make(chan int) wb_in := make(chan int) // 注意:WaitGroup通常用于等待一组goroutine的完成。 // 在本例中,worker协程是常驻的,每个数据项的处理需要单独同步。 // 因此,WaitGroup的Add/Done操作需要针对每个数据项进行。 go workerA_wg(wa_in, nil) // 这里的wg传入nil,因为workerA_wg的wg参数用于其自身退出,而非每次任务完成 go workerB_wg(wb_in, nil) // 同上 for d := range account_chan { var wg sync.WaitGroup wg.Add(2) // 为workerA和workerB各增加一个计数 // 改进的worker函数,每次处理完一个数据项后调用wg.Done() go func(data int) { defer wg.Done() wa_in <- data // 在实际worker中处理,这里只是发送数据 // 假设workerA接收到数据后会自己处理并发送一个信号 // 但如果workerA是常驻的,它的Done应该由它自己控制 }(d) go func(data int) { defer wg.Done() wb_in <- data }(d) // 这种模式下,如果workerA/B是常驻的,且每次处理一个数据后需要通知, // 那么workerA/B内部需要接收一个wg指针并在处理完数据后调用Done。 // 这会使workerA/B的签名变得复杂,需要传递WaitGroup指针。 // 更直接的WaitGroup使用方式,如果worker是短暂的: // 如果worker是常驻的,且每个数据项处理完后需要通知, // 那么原始的out_chan模式更清晰。 // 如果要用WaitGroup,需要重构worker函数使其接收WaitGroup指针,并在处理完数据后调用Done。 // 例如: // go func(data int, wg *sync.WaitGroup) { // defer wg.Done() // // 模拟workerA处理 // fmt.Println("A processing", data) // time.Sleep(100 * time.Millisecond) // }(d, &wg) // go func(data int, wg *sync.WaitGroup) { // defer wg.Done() // // 模拟workerB处理 // fmt.Println("B processing", data) // time.Sleep(150 * time.Millisecond) // }(d, &wg) // 如果worker是常驻的,并且每次处理一个数据后需要通知, // 那么每个worker需要一个输入通道和一个输出通道(或直接使用WaitGroup)。 // 原始的channel方案在这里更直观。 // 如果坚持使用WaitGroup,则每个worker需要一个输入通道, // 并且在处理完一个数据后,主协程(或一个协调协程)负责调用wg.Done()。 // 这意味着worker的输出通道仍然是必要的,或者worker自己调用Done。 // 鉴于原问题中workerA和workerB是单例协程,且每次处理一个数据后需要通知主协程, // 原始的输入/输出通道对模式是更直接且符合其设计意图的。 // WaitGroup通常用于等待一组goroutine的启动和最终退出, // 而不是用于每次任务的同步。 // 如果要用于每次任务同步,那么每个任务需要一个WaitGroup,这会比Channel复杂。 // 因此,对于原问题描述,使用独立输出通道的模式是更合适的。 // WaitGroup更适合于等待一组一次性任务的完成,或者等待常驻goroutine的最终退出。 // 在这里,每个数据项的处理都是一个“任务”,需要等待两个worker完成, // 每次迭代都需要独立的同步。 // 重新考虑:如果worker的out channel仅仅是信号, // 那么可以在account协程内部为每个数据项创建一个临时的WaitGroup。 // workerA和workerB需要被改造,使其接收WaitGroup指针并在处理完成后调用Done。 // 鉴于原始问题中的约束和代码结构,使用独立输出通道是最直接和符合Go惯用法的方式。 // 让我们回到原始的Channel解决方案,因为它更贴合“固定数量Goroutine”和“每次任务同步”的需求。 } close(wa_in) close(wb_in)}// 总结:对于“固定数量常驻worker协程,每次处理一个数据项后需要同步”的场景,// 使用输入通道分发数据,输出通道接收完成信号,是最直接和符合Go语言习惯的模式。// WaitGroup更适用于等待一组Goroutine的整体完成,而非每次任务的细粒度同步。
总结
在Go语言中,实现多个独立工作协程的并行执行和同步,关键在于合理地利用通道进行数据传输和信号协调。当主协程需要等待所有子工作协程完成对同一数据项的处理时,正确的模式是:
并行发送输入: 将数据项同时发送给所有相关的子工作协程的输入通道。并行等待输出: 阻塞等待从所有子工作协程的输出通道接收完成信号。
这种模式不仅确保了真正的并发,而且利用Go通道的阻塞特性,自然地实现了“全部完成”的同步语义,而无需手动管理复杂的锁或条件变量。通过这种方式,我们可以构建出高效、健壮且易于理解的并发系统。
以上就是Go语言中并行独立工作协程的同步模式的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1417274.html
微信扫一扫
支付宝扫一扫