Go语言管道Channel通信教程
Go语言管道Channel通信教程
目录
- Channel基础概念
- Channel类型与创建
- Channel操作详解
- Select语句
- Channel通信模式
- 高级Channel技巧
- 实战案例
Channel基础概念
什么是Channel?
Channel是Go语言中用于goroutine之间通信的管道。它体现了Go的并发哲学:“不要通过共享内存来通信,而要通过通信来共享内存”。
Channel的特性
- 类型安全:每个channel只能传输特定类型的数据
- 同步机制:提供goroutine之间的同步
- 方向性:可以限制channel的读写方向
- 缓冲控制:支持无缓冲和有缓冲两种模式
Channel类型与创建
无缓冲Channel
package mainimport ("fmt""time"
)func main() {// 创建无缓冲channelch := make(chan string)// 启动发送者goroutinego func() {time.Sleep(1 * time.Second)ch <- "Hello, Channel!"fmt.Println("Message sent")}()// 主goroutine接收消息fmt.Println("Waiting for message...")message := <-chfmt.Println("Received:", message)
}
有缓冲Channel
package mainimport ("fmt""time"
)func main() {// 创建缓冲大小为3的channelch := make(chan int, 3)// 发送数据(不会阻塞,因为有缓冲)ch <- 1ch <- 2ch <- 3fmt.Printf("Channel length: %d, capacity: %d\n", len(ch), cap(ch))// 接收数据for i := 0; i < 3; i++ {value := <-chfmt.Printf("Received: %d\n", value)}
}
方向性Channel
package mainimport "fmt"// 只能发送的channel
func sender(ch chan<- string) {ch <- "Hello from sender"close(ch)
}// 只能接收的channel
func receiver(ch <-chan string) {for message := range ch {fmt.Println("Received:", message)}
}func main() {ch := make(chan string)go sender(ch)receiver(ch)
}
Channel操作详解
发送和接收
package mainimport ("fmt""time"
)func main() {ch := make(chan int, 2)// 发送操作ch <- 42ch <- 100// 接收操作value1 := <-chvalue2 := <-chfmt.Printf("Received: %d, %d\n", value1, value2)// 带ok的接收操作ch <- 200close(ch)value3, ok := <-chfmt.Printf("Received: %d, ok: %t\n", value3, ok)value4, ok := <-chfmt.Printf("Received: %d, ok: %t\n", value4, ok) // ok为false,channel已关闭
}
关闭Channel
package mainimport "fmt"func producer(ch chan<- int) {for i := 1; i <= 5; i++ {ch <- ifmt.Printf("Sent: %d\n", i)}close(ch) // 关闭channel表示不再发送数据
}func consumer(ch <-chan int) {// 使用range遍历channel,直到channel关闭for value := range ch {fmt.Printf("Received: %d\n", value)}fmt.Println("Channel closed, consumer finished")
}func main() {ch := make(chan int, 2)go producer(ch)consumer(ch)
}
Select语句
基本Select用法
package mainimport ("fmt""time"
)func main() {ch1 := make(chan string)ch2 := make(chan string)go func() {time.Sleep(1 * time.Second)ch1 <- "Message from ch1"}()go func() {time.Sleep(2 * time.Second)ch2 <- "Message from ch2"}()// select语句等待多个channel操作for i := 0; i < 2; i++ {select {case msg1 := <-ch1:fmt.Println("Received from ch1:", msg1)case msg2 := <-ch2:fmt.Println("Received from ch2:", msg2)}}
}
带超时的Select
package mainimport ("fmt""time"
)func main() {ch := make(chan string)go func() {time.Sleep(3 * time.Second)ch <- "Delayed message"}()select {case msg := <-ch:fmt.Println("Received:", msg)case <-time.After(2 * time.Second):fmt.Println("Timeout: no message received within 2 seconds")}
}
非阻塞Select
package mainimport "fmt"func main() {ch := make(chan int, 1)// 非阻塞发送select {case ch <- 42:fmt.Println("Sent 42")default:fmt.Println("Channel is full, cannot send")}// 非阻塞接收select {case value := <-ch:fmt.Printf("Received: %d\n", value)default:fmt.Println("No value available")}// 再次尝试非阻塞接收select {case value := <-ch:fmt.Printf("Received: %d\n", value)default:fmt.Println("No value available")}
}
Channel通信模式
生产者-消费者模式
package mainimport ("fmt""sync""time"
)type Task struct {ID intData string
}func producer(tasks chan<- Task, wg *sync.WaitGroup) {defer wg.Done()defer close(tasks)for i := 1; i <= 10; i++ {task := Task{ID: i,Data: fmt.Sprintf("Task-%d", i),}tasks <- taskfmt.Printf("Produced: %s\n", task.Data)time.Sleep(100 * time.Millisecond)}
}func consumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {defer wg.Done()for task := range tasks {fmt.Printf("Consumer %d processing: %s\n", id, task.Data)time.Sleep(200 * time.Millisecond) // 模拟处理时间fmt.Printf("Consumer %d finished: %s\n", id, task.Data)}
}func main() {tasks := make(chan Task, 5) // 缓冲channelvar wg sync.WaitGroup// 启动生产者wg.Add(1)go producer(tasks, &wg)// 启动多个消费者for i := 1; i <= 3; i++ {wg.Add(1)go consumer(i, tasks, &wg)}wg.Wait()fmt.Println("All tasks completed")
}
管道模式
package mainimport "fmt"// 第一阶段:生成数字
func generate(nums chan<- int) {for i := 1; i <= 10; i++ {nums <- i}close(nums)
}// 第二阶段:计算平方
func square(nums <-chan int, squares chan<- int) {for num := range nums {squares <- num * num}close(squares)
}// 第三阶段:过滤偶数
func filter(squares <-chan int, evens chan<- int) {for square := range squares {if square%2 == 0 {evens <- square}}close(evens)
}func main() {nums := make(chan int)squares := make(chan int)evens := make(chan int)// 启动管道的各个阶段go generate(nums)go square(nums, squares)go filter(squares, evens)// 输出最终结果fmt.Println("Even squares:")for even := range evens {fmt.Println(even)}
}
扇入模式
package mainimport ("fmt""sync""time"
)func worker(id int, output chan<- string) {for i := 1; i <= 3; i++ {message := fmt.Sprintf("Worker %d - Message %d", id, i)output <- messagetime.Sleep(time.Second)}close(output)
}func fanIn(inputs ...<-chan string) <-chan string {output := make(chan string)var wg sync.WaitGroup// 为每个输入channel启动一个goroutinefor _, input := range inputs {wg.Add(1)go func(ch <-chan string) {defer wg.Done()for message := range ch {output <- message}}(input)}// 等待所有输入完成后关闭输出channelgo func() {wg.Wait()close(output)}()return output
}func main() {// 创建多个worker的输出channelch1 := make(chan string)ch2 := make(chan string)ch3 := make(chan string)// 启动workersgo worker(1, ch1)go worker(2, ch2)go worker(3, ch3)// 扇入所有worker的输出merged := fanIn(ch1, ch2, ch3)// 接收合并后的消息for message := range merged {fmt.Println("Received:", message)}
}
高级Channel技巧
Channel的Channel
package mainimport ("fmt""time"
)func worker(id int, jobs <-chan chan string) {for job := range jobs {result := fmt.Sprintf("Worker %d processed job", id)job <- resultclose(job)}
}func main() {jobs := make(chan chan string, 3)// 启动workersfor i := 1; i <= 2; i++ {go worker(i, jobs)}// 发送任务for i := 1; i <= 5; i++ {resultCh := make(chan string, 1)jobs <- resultCh// 等待结果result := <-resultChfmt.Printf("Job %d result: %s\n", i, result)}close(jobs)
}
信号量模式
package mainimport ("fmt""sync""time"
)type Semaphore chan struct{}func NewSemaphore(capacity int) Semaphore {return make(Semaphore, capacity)
}func (s Semaphore) Acquire() {s <- struct{}{}
}func (s Semaphore) Release() {<-s
}func worker(id int, sem Semaphore, wg *sync.WaitGroup) {defer wg.Done()sem.Acquire() // 获取信号量defer sem.Release() // 释放信号量fmt.Printf("Worker %d started\n", id)time.Sleep(2 * time.Second) // 模拟工作fmt.Printf("Worker %d finished\n", id)
}func main() {const maxConcurrent = 3sem := NewSemaphore(maxConcurrent)var wg sync.WaitGroup// 启动10个worker,但最多只有3个同时运行for i := 1; i <= 10; i++ {wg.Add(1)go worker(i, sem, &wg)}wg.Wait()fmt.Println("All workers completed")
}
实战案例
并发Web爬虫
package mainimport ("fmt""net/http""sync""time"
)type CrawlResult struct {URL stringStatusCode intError errorDuration time.Duration
}type Crawler struct {maxConcurrent intsemaphore chan struct{}
}func NewCrawler(maxConcurrent int) *Crawler {return &Crawler{maxConcurrent: maxConcurrent,semaphore: make(chan struct{}, maxConcurrent),}
}func (c *Crawler) crawlURL(url string, results chan<- CrawlResult, wg *sync.WaitGroup) {defer wg.Done()// 获取信号量c.semaphore <- struct{}{}defer func() { <-c.semaphore }()start := time.Now()resp, err := http.Get(url)duration := time.Since(start)result := CrawlResult{URL: url,Duration: duration,Error: err,}if err == nil {result.StatusCode = resp.StatusCoderesp.Body.Close()}results <- result
}func (c *Crawler) Crawl(urls []string) <-chan CrawlResult {results := make(chan CrawlResult, len(urls))var wg sync.WaitGroupfor _, url := range urls {wg.Add(1)go c.crawlURL(url, results, &wg)}go func() {wg.Wait()close(results)}()return results
}func main() {urls := []string{"https://www.google.com","https://www.github.com","https://www.stackoverflow.com","https://www.golang.org","https://www.reddit.com",}crawler := NewCrawler(3) // 最多3个并发请求results := crawler.Crawl(urls)fmt.Println("Crawling results:")for result := range results {if result.Error != nil {fmt.Printf("❌ %s: %v\n", result.URL, result.Error)} else {fmt.Printf("✅ %s: %d (%v)\n", result.URL, result.StatusCode, result.Duration)}}
}
实时数据处理管道
package mainimport ("fmt""math/rand""time"
)type DataPoint struct {ID intValue float64Timestamp time.Time
}type ProcessedData struct {DataPointProcessed boolResult float64
}// 数据生成器
func dataGenerator(output chan<- DataPoint) {defer close(output)for i := 1; i <= 20; i++ {data := DataPoint{ID: i,Value: rand.Float64() * 100,Timestamp: time.Now(),}output <- datatime.Sleep(100 * time.Millisecond)}
}// 数据处理器
func dataProcessor(input <-chan DataPoint, output chan<- ProcessedData) {defer close(output)for data := range input {// 模拟数据处理time.Sleep(50 * time.Millisecond)processed := ProcessedData{DataPoint: data,Processed: true,Result: data.Value * 2, // 简单的处理逻辑}output <- processed}
}// 数据过滤器
func dataFilter(input <-chan ProcessedData, output chan<- ProcessedData) {defer close(output)for data := range input {// 只传递结果大于100的数据if data.Result > 100 {output <- data}}
}func main() {// 创建管道rawData := make(chan DataPoint, 5)processedData := make(chan ProcessedData, 5)filteredData := make(chan ProcessedData, 5)// 启动管道各阶段go dataGenerator(rawData)go dataProcessor(rawData, processedData)go dataFilter(processedData, filteredData)// 输出最终结果fmt.Println("Filtered results (Result > 100):")for data := range filteredData {fmt.Printf("ID: %d, Original: %.2f, Result: %.2f, Time: %s\n",data.ID, data.Value, data.Result, data.Timestamp.Format("15:04:05"))}
}
总结
Channel是Go语言并发编程的核心工具,提供了优雅的goroutine间通信方式:
关键概念
- 无缓冲vs有缓冲:控制同步行为
- 方向性:限制channel的使用方式
- Select语句:处理多个channel操作
- 关闭channel:信号传递机制
常用模式
- 生产者-消费者:解耦数据生产和消费
- 管道:数据流式处理
- 扇入扇出:并发处理和结果聚合
- 信号量:控制并发数量
最佳实践
- 发送者负责关闭channel
- 使用range遍历channel
- 利用select实现超时和非阻塞操作
- 合理设置缓冲大小
- 避免channel泄漏
掌握Channel的使用是成为Go并发编程专家的必经之路。记住:通过通信来共享内存,而不是通过共享内存来通信。
参考资源
- Go官方文档 - Channel
- Go并发模式:管道和取消
- Go并发模式:上下文