
本教程探讨go语言中如何优化独立工作协程的并行执行。针对传统顺序执行导致并发效率低下的问题,文章提出了一种通过巧妙重排通道操作的解决方案。该模式允许多个独立工作协程同时启动并并行处理数据,并通过通道接收操作实现同步,确保所有工作完成后再进行下一步处理,从而在保持固定协程数量的同时,显著提升系统吞吐量。
在Go语言中,利用协程(goroutine)和通道(channel)实现并发是其核心优势之一。然而,不恰当的通道操作顺序可能导致即使是独立的任务也无法真正并行执行,从而限制了程序的并发能力。本教程将深入探讨如何通过优化通道操作顺序,使得多个独立的工作协程能够高效并行处理数据,同时满足保持固定协程数量的约束。
挑战:独立工作协程的顺序执行
考虑一个常见的场景:一个主协调协程(例如account)需要将接收到的数据分发给多个独立的子工作协程(例如workerA和workerB)进行处理。要求是:
workerA和workerB各自运行在一个独立的协程中,且这些协程数量固定,不随数据项的增加而动态创建。workerA和workerB对数据的处理是完全独立的,它们之间没有数据依赖,因此可以并行执行。只有当所有相关的子工作协程都完成对当前数据项的处理后,主协调协程才能将该数据项传递给下一个阶段。
初始的实现可能如下所示,其中主协调协程account在处理每个数据项时,会先将数据发送给workerA并等待其完成,然后再发送给workerB并等待其完成。这种串行等待的方式,即使workerA和workerB是独立的,也无法实现真正的并行。
package mainimport "fmt"func workerA(work_in_chan <-chan int, work_out_chan chan<- int) { for d := range work_in_chan { fmt.Println("A ", d) // 模拟工作 work_out_chan <- d }}func workerB(work_in_chan <-chan int, work_out_chan chan<- int) { for d := range work_in_chan { fmt.Println("B ", d) // 模拟工作 work_out_chan <- d }}func account(account_chan <-chan int, final_chan chan<- int) { wa_in := make(chan int) wa_out := make(chan int) wb_in := make(chan int) wb_out := make(chan int) go workerA(wa_in, wa_out) go workerB(wb_in, wb_out) for d := range account_chan { // 初始实现:串行处理,无法并行 wa_in <- d <-wa_out // 阻塞,等待workerA完成 wb_in <- d <-wb_out // 阻塞,等待workerB完成 final_chan <- d }}func main() { account_chan := make(chan int, 100) final_chan := make(chan int, 100) go account(account_chan, final_chan) account_chan <- 1 account_chan <- 2 account_chan <- 3 close(account_chan) // 关闭输入通道,以便account协程最终退出 // 从final_chan接收结果 for i := 0; i < 3; i++ { fmt.Println("Final:", <-final_chan) } close(final_chan) // 关闭输出通道}
在上述代码中,account协程在处理每个数据项d时,首先向wa_in发送数据,然后立即阻塞等待wa_out的返回。只有workerA处理完毕并发送到wa_out后,account协程才能继续向wb_in发送数据,并再次阻塞等待wb_out的返回。这种模式导致workerA和workerB无法同时运行,极大地限制了并发性。
立即学习“go语言免费学习笔记(深入)”;
解决方案:重排通道操作实现并行
要解决上述问题,关键在于改变主协调协程中通道的发送和接收顺序。既然workerA和workerB是独立的,我们可以先将数据同时发送给它们,让它们并行开始工作,然后统一等待它们全部完成。
优化的实现如下:
package mainimport "fmt"func workerA(work_in_chan <-chan int, work_out_chan chan<- int) { for d := range work_in_chan { fmt.Println("A processing:", d) // 模拟工作,可能耗时 work_out_chan <- d // 完成后发送信号 } close(work_out_chan) // 当输入通道关闭时,关闭输出通道}func workerB(work_in_chan <-chan int, work_out_chan chan<- int) { for d := range work_in_chan { fmt.Println("B processing:", d) // 模拟工作,可能耗时 work_out_chan <- d // 完成后发送信号 } close(work_out_chan) // 当输入通道关闭时,关闭输出通道}func account(account_chan <-chan int, final_chan chan<- int) { // 创建用于workerA和workerB的输入输出通道 // 注意:这里使用无缓冲通道,确保worker在准备好接收前不会阻塞发送 wa_in := make(chan int) wa_out := make(chan int) wb_in := make(chan int) wb_out := make(chan int) // 启动worker协程 go workerA(wa_in, wa_out) go workerB(wb_in, wb_out) // 遍历输入数据 for d := range account_chan { // 1. 同时将数据发送给所有工作协程 // 假设worker协程已准备好接收,此操作是非阻塞的(对于无缓冲通道,worker必须已在接收端等待) // 或如果通道有缓冲,则只要缓冲未满,发送就是非阻塞的 wa_in <- d wb_in <- d // 2. 阻塞等待所有工作协程完成 // 接收操作会阻塞,直到对应的worker完成其工作并发送信号 <-wa_out <-wb_out // 3. 所有工作完成后,将数据发送到最终通道 final_chan <- d } // 当account_chan关闭且所有数据处理完毕后,关闭worker的输入通道 // 这样worker协程才能从for range循环中退出 close(wa_in) close(wb_in) // 等待worker协程完成所有剩余工作并关闭其输出通道 // 确保在关闭final_chan之前所有数据都已处理 for range wa_out {} // 消费完所有wa_out中可能剩余的信号 for range wb_out {} // 消费完所有wb_out中可能剩余的信号 close(final_chan) // 所有工作完成后关闭最终输出通道}func main() { account_chan := make(chan int, 100) // 带缓冲的输入通道 final_chan := make(chan int, 100) // 带缓冲的输出通道 go account(account_chan, final_chan) // 发送数据 account_chan <- 1 account_chan <- 2 account_chan <- 3 close(account_chan) // 发送完毕,关闭输入通道 // 从final_chan接收结果 for res := range final_chan { fmt.Println("Final result:", res) }}
代码分析:
并行启动工作: wa_in 同步等待完成: 顺序不重要: 即使workerA比workerB先完成,或者反之,这种模式都能正确工作。因为account协程会同时等待两个接收操作,无论哪个先完成,它都会继续等待另一个,直到两者都完成为止。
通过这种简单的通道操作重排,我们成功地让两个独立的worker协程实现了真正的并行处理,同时满足了所有数据项必须经过所有worker处理的同步要求,并且保持了固定数量的协程。
关键概念与注意事项
并发与并行:
并发(Concurrency) 是指程序设计结构能够处理多个任务。Go语言通过协程(goroutines)提供了优秀的并发原语。并行(Parallelism) 是指多个任务在同一时间点上物理地同时执行。本教程的优化正是为了在多核处理器上实现workerA和workerB的并行执行。
通道缓冲:
在上述示例中,wa_in、wa_out、wb_in、wb_out通道默认是无缓冲的。这意味着发送操作会阻塞,直到有接收者准备好接收;接收操作会阻塞,直到有发送者发送数据。这种行为保证了严格的同步。如果将这些通道设置为带缓冲的(例如make(chan int, 1)),则发送操作在缓冲区未满时是非阻塞的。这可以减少协调协程与工作协程之间的紧密耦合,提高吞吐量,但需要注意缓冲区大小的选择,以避免死锁或资源耗尽。
sync.WaitGroup的替代方案:
在当前场景中,workerA和workerB的输出通道(wa_out, wb_out)仅用于发送完成信号,其传输的具体值并不重要。
如果工作协程的输出值确实不需要被主协调协程使用,那么使用sync.WaitGroup可能是一个更简洁、更高效的同步机制。sync.WaitGroup专门用于等待一组协程完成。
使用sync.WaitGroup的伪代码示例:
// ... (workerA和workerB不再需要work_out_chan,而是接收一个*sync.WaitGroup)func workerA(work_in_chan <-chan int, wg *sync.WaitGroup) { defer wg.Done() // 在函数退出时通知WaitGroup for d := range work_in_chan { // ... 处理数据 }}func account(account_chan <-chan int, final_chan chan<- int) { // ... var wg sync.WaitGroup // ... for d := range account_chan { wg.Add(2) // 增加计数,表示有两个worker需要完成 wa_in <- d wb_in <- d wg.Wait() // 阻塞等待所有worker完成 final_chan <- d } // ...}
sync.WaitGroup的优势在于它更明确地表达了“等待一组任务完成”的意图,并且避免了创建不必要的通道。
优雅关闭:
在main函数中,通过close(account_chan)来通知account协程不再有新的数据。account协程在for range account_chan循环结束后,需要close(wa_in)和close(wb_in)来通知workerA和workerB不再有新的输入。workerA和workerB在接收通道关闭后,也会退出其for range循环,并close其输出通道。account协程在关闭其输入通道后,需要确保所有worker协程都已完成并关闭其输出通道后,才能安全地关闭final_chan。通过for range wa_out {}和for range wb_out {}来消费完所有可能的剩余信号,确保worker协程完全退出。这确保了整个数据流的完整性和程序的优雅终止。
总结
通过对Go语言中通道操作顺序的细致调整,我们能够有效地将独立的任务从串行执行转变为并行执行,从而充分利用多核处理器的能力,提升程序的整体吞吐量。这种模式的核心思想是:先同时启动所有独立的工作任务(通过非阻塞发送),然后统一等待所有任务完成(通过阻塞接收)。在实际开发中,根据具体需求(是否需要传递结果、同步机制的简洁性等),可以选择使用通道进行同步,或者考虑使用sync.WaitGroup等更专业的同步原语。理解并熟练运用这些并发模式,是编写高性能Go语言应用的关键。
以上就是Go语言并发模式:优化独立工作协程的并行执行的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1417442.html
微信扫一扫
支付宝扫一扫