Go Channel管道
本文最后更新于:1 天前
管道是golang语言级别上提供的goroutine之间的通信方式,可以使用channel在多个goroutine之间传递消息,如果说goroutine是Go程序并发的执行体,channel就是他们之间的连接,channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
golang的并发模型是CSP(Communicating Sequential Processes) 提倡通过通信共享内存,而不是通过共享内存而实现通信
Go语言中的管道 channel 是一种特殊的类型,管道想一个传达带和队列,总是遵循先入先出(FIFO)的规则,保证收发数据的顺序,每个管道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
1 channel 管道
channel是一种类型,而且是 引用类型
var ch1 chan int
var ch2 chan bool
2 创建管道make
声明的管道后需要使用make函数来 初始化之后才能使用
ch := make(chan 元素类型 ,容量)
3 channel操作
管道发送(send) ,接收(recevie) 和关闭(close) 三种类型
3.1 发送和接收:
发送和接收都是用 <- 符号
ch := make(chan int,3)
ch <-10 //把10 发送给 ch 管道中
x := <- ch //从ch管道中接收值 赋值给x
<- ch //也是从ch管道接收值 但是忽略结果
package main
import "fmt"
func main() {
ch1 := make(chan int, 3)
ch1 <- 10
ch1 <- 20
ch1 <- 30
x := <-ch1
<-ch1
y := <-ch1
fmt.Println(x, y)
ch1 <- 40 //再次赋值,这样管道中就存储一个值
fmt.Printf("值%v,容量%v,长度%v", ch1, cap(ch1), len(ch1))
}
/*
10 30
值0xc00013a000,容量3,长度1
*/
3.2 管道阻塞
就是管道容量已经满了,再插入值 或者 管道已经空了,再次取数据 就会阻塞,一边存一边取管道就会里面数据是流动的,就不会阻塞
3.3 关闭管道
close(ch)
package main
import "fmt"
func main() {
ch1 := make(chan int, 11)
ch2 := make(chan int, 11)
for i := 1; i < 10; i++ {
ch1 <- i
}
close(ch1) //注意一定要关闭管道 否则会发生死锁
for i := 1; i < 10; i++ {
ch2 <- i
}
close(ch2) //注意一定要关闭管道 否则会发生死锁
for y := 1; y < 10; y++ {
y := <-ch2
fmt.Println(y)
}
//可以通过range 循环遍历channel
for v := range ch2 {
v1 := <-ch2
fmt.Println(v, v1)
}
}
3 Goroutine 结合channel 管道
我们定义俩个方法 一个进行写数据 一个进行读数据 ,要求同步进行
如果写入比较慢,读取比较快,读取会进行等待,因为管道是安全也不会出现问题的
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func fn1(ch chan int) {
for i := 1; i < 10; i++ {
ch <- i
time.Sleep(time.Millisecond * 500)
fmt.Println("写入数据", i)
}
close(ch)
wg.Done()
}
func fn2(ch chan int) {
for x := range ch {
time.Sleep(time.Millisecond * 50)
fmt.Println("读取数据", x)
}
wg.Done()
}
func main() {
var ch = make(chan int, 10)
wg.Add(1)
go fn1(ch)
wg.Add(1)
go fn2(ch)
wg.Wait()
fmt.Println("exit...")
}
/*
读取数据 1
写入数据 1
读取数据 2
写入数据 2
读取数据 3
写入数据 3
读取数据 4
写入数据 4
读取数据 5
写入数据 5
读取数据 6
写入数据 6
读取数据 7
写入数据 7
读取数据 8
写入数据 8
读取数据 9
写入数据 9
exit...
*/
4 统计素数
结合goroutine 协程 和 channel 来实现
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
//向intChan中存放数据
func putNum(intChan chan int) {
for i := 1; i < 100000; i++ {
intChan <- i
}
close(intChan)
wg.Done()
}
//从initChan中取出数据,进行判断是否为素数,如果是,就把得到的素数放在primeChan中
func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
for num := range intChan {
var flag = true
for i := 2; i < num; i++ {
if num%i == 0 {
flag = false
break
}
}
if flag {
primeChan <- num //num 是素数
}
}
//在统计素数的时候 我们用了16个协程,所以需要知道什么时候16个协程执行完成后,在关闭channel
exitChan <- true
wg.Done()
//close(primeChan) //如果一个channel关闭了就没有办法给这个channel发送数据了。
}
//打印素数的方法
func printPrime(primeChan chan int) {
//for v := range primeChan {
// fmt.Println("primechan", v)
//}
wg.Done()
}
func main() {
start := time.Now().Unix()
intChan := make(chan int, 1000)
primeChan := make(chan int, 100000)
exitChan := make(chan bool, 16)
//往整数管道中打印数字 协程
wg.Add(1)
go putNum(intChan)
//统计整数管道中的素数 ,然后写入素数管道,使用16个协程
for i := 0; i < 16; i++ {
wg.Add(1)
go primeNum(intChan, primeChan, exitChan)
}
//通过匿名函数来实现一个协程 判断exitChan是否存满16个值
wg.Add(1)
go func() {
for i := 0; i < 16; i++ {
<-exitChan
}
//满足16个值 相当于上面16个统计素数的协程已经执行完毕。
close(primeChan)
wg.Done()
}()
//打印素数协程,从素数管道中取值
wg.Add(1)
go printPrime(primeChan)
wg.Wait()
stop := time.Now().Unix()
fmt.Println("exec:", stop-start)
}
/*
exec: 2
*/
5 单向管道
有的时候 我们会将管道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用管道都会对其进行限制,比如限制管道在函数中只能发送或者只能接收
默认情况下 管道是双向的
双向管道
var chan1 chan int
只写管道
var chan2 chan<- int
只读管道
var chan3 <-chan int
6 select
在某些场景下 我们需要同时从多个通道接收数据,这个时候,就可以用到golang中给我们提供的select多路复用
通常情况下 通道在接收数据时候, 如果没有数据可以接收将会发生阻塞
语法如下: 可以结合for 循环来实现
select {
case <-ch1:
...
case data := <-ch2:
...
case ch3 <- data:
...
default:
...
案例
package main
import (
"fmt"
)
func main() {
intChan := make(chan int, 10)
for i := 0; i < 10; i++ {
intChan <- i
}
stringChan := make(chan string, 10)
for i := 0; i < 10; i++ {
stringChan <- "hello" + fmt.Sprintf("%d", i)
}
for {
//使用select获取管道数据的时候,不需要close通道
select {
case v := <-intChan:
fmt.Printf("from intChan data: %d\n", v)
case v := <-stringChan:
fmt.Printf("from intChan data: %s\n", v)
default:
fmt.Printf("读取数据完毕\n")
return //退出for循环
}
}
}
/*
from intChan data: 0
from intChan data: hello0
from intChan data: hello1
from intChan data: 1
from intChan data: hello2
from intChan data: hello3
from intChan data: 2
from intChan data: hello4
from intChan data: 3
from intChan data: 4
from intChan data: hello5
from intChan data: hello6
from intChan data: hello7
from intChan data: hello8
from intChan data: 5
from intChan data: 6
from intChan data: 7
from intChan data: 8
from intChan data: hello9
from intChan data: 9
读取数据完毕
*/
7 解决协程出现的panic
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func test() {
defer func() {
if err := recover(); err != nil {
fmt.Println("test() 发生异常错误", err)
wg.Done()
}
}()
var myMap map[int]string
myMap[0] = "golang" //这里故意写错 发生error
wg.Done()
}
func main() {
wg.Add(1)
go test()
wg.Wait()
fmt.Println("status2 ...")
}
/*
test() 发生异常错误 assignment to entry in nil map
status2 ...
*/
8 Go并发安全锁
8.1 互斥锁
互斥锁是传统并发编程中共享资源进行访问控制的主要手段,他由标准库sync中的Mutex 结构体类型表示,sync.Mutex类型中只有俩个公开的指针方法,Lock和Unlock ,Lock锁定当前的共享资源,Unlock进行解锁
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
var mutex sync.Mutex //声明一把锁
var count = 0
func test() {
mutex.Lock() //添加一个互斥锁
count++
fmt.Println(count)
mutex.Unlock() //释放互斥锁
wg.Done()
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1)
go test()
}
wg.Wait()
fmt.Println("exit...")
}
// 这个时候我们可以通过 "go run -race 程序代码.go" 执行来查看一个执行流程和最后是否显示报错信息
/*
1
2
3
4
5
6
7
8
9
10
exit...
*/
使用互斥锁能够保证同一时间去有且只有一个gouroutine 进入临界区,其他的goroutine则在等待。当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的
8.2 读写互斥锁
互斥锁的本质是当一个goroutine访问的时候,其他goroutinge都不能访问,这样在资源同步避免了竞争的同时也降低了程序的并发性能,程序由原来的的并行变成了串行执行
其实 当我们对一个不回变化的数据只做读
操作的话,是不存在资源的竞争的问题,因为数据是不变的,不管怎么读取 多个goroutine同时读取,都是可以的
所以问题不是出在读
上,主要是修改,也就是写
,修改的数据要同步,这样其他goroutine才可以感知到,所以真正的互斥影视读取和修改,修改和修改之间,读和读是没有互斥操作的必要的,因此 衍生出了另外一种锁 叫做读写锁
读写锁可以让你多个读操作进行并发 同时读取,但是对于写操作是完全互斥的,也就是说当一个goroutine进行写操作的时候, 其他goroutine 是既不能够写也不能够读取,也不能进行写操作。
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
var mutex sync.RWMutex //读写互斥锁
func write() {
mutex.Lock()
fmt.Printf("<--写入数据:\n")
time.Sleep(time.Second * 1)
mutex.Unlock()
wg.Done()
}
func read() {
mutex.RLock() //可以同时读取
fmt.Printf("-->读取数据:\n")
time.Sleep(time.Second * 1)
mutex.RUnlock()
wg.Done()
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 10; i++ {
wg.Add(1)
go read()
}
wg.Wait()
}
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!