当前位置: 首页 > news >正文

Go语言管道Channel通信教程

Go语言管道Channel通信教程

目录

  1. Channel基础概念
  2. Channel类型与创建
  3. Channel操作详解
  4. Select语句
  5. Channel通信模式
  6. 高级Channel技巧
  7. 实战案例

Channel基础概念

什么是Channel?

Channel是Go语言中用于goroutine之间通信的管道。它体现了Go的并发哲学:“不要通过共享内存来通信,而要通过通信来共享内存”

Channel的特性

  • 类型安全:每个channel只能传输特定类型的数据
  • 同步机制:提供goroutine之间的同步
  • 方向性:可以限制channel的读写方向
  • 缓冲控制:支持无缓冲和有缓冲两种模式

Channel类型与创建

无缓冲Channel

package mainimport ("fmt""time"
)func main() {// 创建无缓冲channelch := make(chan string)// 启动发送者goroutinego func() {time.Sleep(1 * time.Second)ch <- "Hello, Channel!"fmt.Println("Message sent")}()// 主goroutine接收消息fmt.Println("Waiting for message...")message := <-chfmt.Println("Received:", message)
}

有缓冲Channel

package mainimport ("fmt""time"
)func main() {// 创建缓冲大小为3的channelch := make(chan int, 3)// 发送数据(不会阻塞,因为有缓冲)ch <- 1ch <- 2ch <- 3fmt.Printf("Channel length: %d, capacity: %d\n", len(ch), cap(ch))// 接收数据for i := 0; i < 3; i++ {value := <-chfmt.Printf("Received: %d\n", value)}
}

方向性Channel

package mainimport "fmt"// 只能发送的channel
func sender(ch chan<- string) {ch <- "Hello from sender"close(ch)
}// 只能接收的channel
func receiver(ch <-chan string) {for message := range ch {fmt.Println("Received:", message)}
}func main() {ch := make(chan string)go sender(ch)receiver(ch)
}

Channel操作详解

发送和接收

package mainimport ("fmt""time"
)func main() {ch := make(chan int, 2)// 发送操作ch <- 42ch <- 100// 接收操作value1 := <-chvalue2 := <-chfmt.Printf("Received: %d, %d\n", value1, value2)// 带ok的接收操作ch <- 200close(ch)value3, ok := <-chfmt.Printf("Received: %d, ok: %t\n", value3, ok)value4, ok := <-chfmt.Printf("Received: %d, ok: %t\n", value4, ok) // ok为false,channel已关闭
}

关闭Channel

package mainimport "fmt"func producer(ch chan<- int) {for i := 1; i <= 5; i++ {ch <- ifmt.Printf("Sent: %d\n", i)}close(ch) // 关闭channel表示不再发送数据
}func consumer(ch <-chan int) {// 使用range遍历channel,直到channel关闭for value := range ch {fmt.Printf("Received: %d\n", value)}fmt.Println("Channel closed, consumer finished")
}func main() {ch := make(chan int, 2)go producer(ch)consumer(ch)
}

Select语句

基本Select用法

package mainimport ("fmt""time"
)func main() {ch1 := make(chan string)ch2 := make(chan string)go func() {time.Sleep(1 * time.Second)ch1 <- "Message from ch1"}()go func() {time.Sleep(2 * time.Second)ch2 <- "Message from ch2"}()// select语句等待多个channel操作for i := 0; i < 2; i++ {select {case msg1 := <-ch1:fmt.Println("Received from ch1:", msg1)case msg2 := <-ch2:fmt.Println("Received from ch2:", msg2)}}
}

带超时的Select

package mainimport ("fmt""time"
)func main() {ch := make(chan string)go func() {time.Sleep(3 * time.Second)ch <- "Delayed message"}()select {case msg := <-ch:fmt.Println("Received:", msg)case <-time.After(2 * time.Second):fmt.Println("Timeout: no message received within 2 seconds")}
}

非阻塞Select

package mainimport "fmt"func main() {ch := make(chan int, 1)// 非阻塞发送select {case ch <- 42:fmt.Println("Sent 42")default:fmt.Println("Channel is full, cannot send")}// 非阻塞接收select {case value := <-ch:fmt.Printf("Received: %d\n", value)default:fmt.Println("No value available")}// 再次尝试非阻塞接收select {case value := <-ch:fmt.Printf("Received: %d\n", value)default:fmt.Println("No value available")}
}

Channel通信模式

生产者-消费者模式

package mainimport ("fmt""sync""time"
)type Task struct {ID   intData string
}func producer(tasks chan<- Task, wg *sync.WaitGroup) {defer wg.Done()defer close(tasks)for i := 1; i <= 10; i++ {task := Task{ID:   i,Data: fmt.Sprintf("Task-%d", i),}tasks <- taskfmt.Printf("Produced: %s\n", task.Data)time.Sleep(100 * time.Millisecond)}
}func consumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {defer wg.Done()for task := range tasks {fmt.Printf("Consumer %d processing: %s\n", id, task.Data)time.Sleep(200 * time.Millisecond) // 模拟处理时间fmt.Printf("Consumer %d finished: %s\n", id, task.Data)}
}func main() {tasks := make(chan Task, 5) // 缓冲channelvar wg sync.WaitGroup// 启动生产者wg.Add(1)go producer(tasks, &wg)// 启动多个消费者for i := 1; i <= 3; i++ {wg.Add(1)go consumer(i, tasks, &wg)}wg.Wait()fmt.Println("All tasks completed")
}

管道模式

package mainimport "fmt"// 第一阶段:生成数字
func generate(nums chan<- int) {for i := 1; i <= 10; i++ {nums <- i}close(nums)
}// 第二阶段:计算平方
func square(nums <-chan int, squares chan<- int) {for num := range nums {squares <- num * num}close(squares)
}// 第三阶段:过滤偶数
func filter(squares <-chan int, evens chan<- int) {for square := range squares {if square%2 == 0 {evens <- square}}close(evens)
}func main() {nums := make(chan int)squares := make(chan int)evens := make(chan int)// 启动管道的各个阶段go generate(nums)go square(nums, squares)go filter(squares, evens)// 输出最终结果fmt.Println("Even squares:")for even := range evens {fmt.Println(even)}
}

扇入模式

package mainimport ("fmt""sync""time"
)func worker(id int, output chan<- string) {for i := 1; i <= 3; i++ {message := fmt.Sprintf("Worker %d - Message %d", id, i)output <- messagetime.Sleep(time.Second)}close(output)
}func fanIn(inputs ...<-chan string) <-chan string {output := make(chan string)var wg sync.WaitGroup// 为每个输入channel启动一个goroutinefor _, input := range inputs {wg.Add(1)go func(ch <-chan string) {defer wg.Done()for message := range ch {output <- message}}(input)}// 等待所有输入完成后关闭输出channelgo func() {wg.Wait()close(output)}()return output
}func main() {// 创建多个worker的输出channelch1 := make(chan string)ch2 := make(chan string)ch3 := make(chan string)// 启动workersgo worker(1, ch1)go worker(2, ch2)go worker(3, ch3)// 扇入所有worker的输出merged := fanIn(ch1, ch2, ch3)// 接收合并后的消息for message := range merged {fmt.Println("Received:", message)}
}

高级Channel技巧

Channel的Channel

package mainimport ("fmt""time"
)func worker(id int, jobs <-chan chan string) {for job := range jobs {result := fmt.Sprintf("Worker %d processed job", id)job <- resultclose(job)}
}func main() {jobs := make(chan chan string, 3)// 启动workersfor i := 1; i <= 2; i++ {go worker(i, jobs)}// 发送任务for i := 1; i <= 5; i++ {resultCh := make(chan string, 1)jobs <- resultCh// 等待结果result := <-resultChfmt.Printf("Job %d result: %s\n", i, result)}close(jobs)
}

信号量模式

package mainimport ("fmt""sync""time"
)type Semaphore chan struct{}func NewSemaphore(capacity int) Semaphore {return make(Semaphore, capacity)
}func (s Semaphore) Acquire() {s <- struct{}{}
}func (s Semaphore) Release() {<-s
}func worker(id int, sem Semaphore, wg *sync.WaitGroup) {defer wg.Done()sem.Acquire() // 获取信号量defer sem.Release() // 释放信号量fmt.Printf("Worker %d started\n", id)time.Sleep(2 * time.Second) // 模拟工作fmt.Printf("Worker %d finished\n", id)
}func main() {const maxConcurrent = 3sem := NewSemaphore(maxConcurrent)var wg sync.WaitGroup// 启动10个worker,但最多只有3个同时运行for i := 1; i <= 10; i++ {wg.Add(1)go worker(i, sem, &wg)}wg.Wait()fmt.Println("All workers completed")
}

实战案例

并发Web爬虫

package mainimport ("fmt""net/http""sync""time"
)type CrawlResult struct {URL        stringStatusCode intError      errorDuration   time.Duration
}type Crawler struct {maxConcurrent intsemaphore     chan struct{}
}func NewCrawler(maxConcurrent int) *Crawler {return &Crawler{maxConcurrent: maxConcurrent,semaphore:     make(chan struct{}, maxConcurrent),}
}func (c *Crawler) crawlURL(url string, results chan<- CrawlResult, wg *sync.WaitGroup) {defer wg.Done()// 获取信号量c.semaphore <- struct{}{}defer func() { <-c.semaphore }()start := time.Now()resp, err := http.Get(url)duration := time.Since(start)result := CrawlResult{URL:      url,Duration: duration,Error:    err,}if err == nil {result.StatusCode = resp.StatusCoderesp.Body.Close()}results <- result
}func (c *Crawler) Crawl(urls []string) <-chan CrawlResult {results := make(chan CrawlResult, len(urls))var wg sync.WaitGroupfor _, url := range urls {wg.Add(1)go c.crawlURL(url, results, &wg)}go func() {wg.Wait()close(results)}()return results
}func main() {urls := []string{"https://www.google.com","https://www.github.com","https://www.stackoverflow.com","https://www.golang.org","https://www.reddit.com",}crawler := NewCrawler(3) // 最多3个并发请求results := crawler.Crawl(urls)fmt.Println("Crawling results:")for result := range results {if result.Error != nil {fmt.Printf("❌ %s: %v\n", result.URL, result.Error)} else {fmt.Printf("✅ %s: %d (%v)\n", result.URL, result.StatusCode, result.Duration)}}
}

实时数据处理管道

package mainimport ("fmt""math/rand""time"
)type DataPoint struct {ID        intValue     float64Timestamp time.Time
}type ProcessedData struct {DataPointProcessed boolResult    float64
}// 数据生成器
func dataGenerator(output chan<- DataPoint) {defer close(output)for i := 1; i <= 20; i++ {data := DataPoint{ID:        i,Value:     rand.Float64() * 100,Timestamp: time.Now(),}output <- datatime.Sleep(100 * time.Millisecond)}
}// 数据处理器
func dataProcessor(input <-chan DataPoint, output chan<- ProcessedData) {defer close(output)for data := range input {// 模拟数据处理time.Sleep(50 * time.Millisecond)processed := ProcessedData{DataPoint: data,Processed: true,Result:    data.Value * 2, // 简单的处理逻辑}output <- processed}
}// 数据过滤器
func dataFilter(input <-chan ProcessedData, output chan<- ProcessedData) {defer close(output)for data := range input {// 只传递结果大于100的数据if data.Result > 100 {output <- data}}
}func main() {// 创建管道rawData := make(chan DataPoint, 5)processedData := make(chan ProcessedData, 5)filteredData := make(chan ProcessedData, 5)// 启动管道各阶段go dataGenerator(rawData)go dataProcessor(rawData, processedData)go dataFilter(processedData, filteredData)// 输出最终结果fmt.Println("Filtered results (Result > 100):")for data := range filteredData {fmt.Printf("ID: %d, Original: %.2f, Result: %.2f, Time: %s\n",data.ID, data.Value, data.Result, data.Timestamp.Format("15:04:05"))}
}

总结

Channel是Go语言并发编程的核心工具,提供了优雅的goroutine间通信方式:

关键概念

  • 无缓冲vs有缓冲:控制同步行为
  • 方向性:限制channel的使用方式
  • Select语句:处理多个channel操作
  • 关闭channel:信号传递机制

常用模式

  • 生产者-消费者:解耦数据生产和消费
  • 管道:数据流式处理
  • 扇入扇出:并发处理和结果聚合
  • 信号量:控制并发数量

最佳实践

  1. 发送者负责关闭channel
  2. 使用range遍历channel
  3. 利用select实现超时和非阻塞操作
  4. 合理设置缓冲大小
  5. 避免channel泄漏

掌握Channel的使用是成为Go并发编程专家的必经之路。记住:通过通信来共享内存,而不是通过共享内存来通信

参考资源

  • Go官方文档 - Channel
  • Go并发模式:管道和取消
  • Go并发模式:上下文
http://www.dtcms.com/a/295771.html

相关文章:

  • Lua(table)
  • 数据库集群环境漏洞修复
  • Vue-23-通过flask接口提供的数据使用plotly.js绘图(二)
  • Python爬虫实战:与dominoup.com平台结合的域名数据分析系统
  • 【数据可视化-72】苏超第七轮战罢:黑金大屏下的足球数据洞察(含完整代码、数据和大屏)
  • Windows 如何更改 ModelScope 的模型下载缓存位置?
  • 低功耗设计双目协同画面实现光学变焦内带AI模型
  • 几个常用的Oxygen编辑器插件
  • Python进阶第三方库之Matplotlib
  • Jenkins最新版本的安装以及集成Allure生成测试报告
  • “适应度”简介
  • 【牛客刷题】和零在一起
  • SQL基础⑫ | 视图篇
  • 学习设计模式《十九》——享元模式
  • ​Excel——SUMPRODUCT 函数
  • LeetCode 2322:从树中删除边的最小分数
  • J2EE模式---业务代表模式
  • 后信创时代,融合数据库成为国产数据库的新锚点
  • MongoDB的操作
  • skywalking应用性能监控
  • 内网穿透的应用-分布式系统观测革命:SigNoz与cpolar的技术协同之道
  • Java从入门到精通!第十三天(IO 流)
  • Web前端:JavaScript every()迭代方法
  • 闲庭信步使用图像验证平台加速FPGA的开发:第三十四课——车牌识别的FPGA实现(6)叠加车牌识别的信息
  • 认识单片机
  • Centos新增磁盘,进行根目录扩容
  • SkyWalking异步采集spring gateway日志
  • 基于Qt和OpenCV的图片与视频编辑器
  • 完整指南:使用Apache htpasswd为Chronograf配置基础认证及功能详解
  • 响应式前端设计:CSS 自适应布局与字体大小的最佳实践