Go Channel管道

本文最后更新于:1 天前

管道是golang语言级别上提供的goroutine之间的通信方式,可以使用channel在多个goroutine之间传递消息,如果说goroutine是Go程序并发的执行体,channel就是他们之间的连接,channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

​ golang的并发模型是CSP(Communicating Sequential Processes) 提倡通过通信共享内存,而不是通过共享内存而实现通信

​ Go语言中的管道 channel 是一种特殊的类型,管道想一个传达带和队列,总是遵循先入先出(FIFO)的规则,保证收发数据的顺序,每个管道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

1 channel 管道

channel是一种类型,而且是 引用类型

var ch1 chan int 
var ch2 chan bool

2 创建管道make

声明的管道后需要使用make函数来 初始化之后才能使用

ch := make(chan 元素类型 ,容量)

3 channel操作

管道发送(send) ,接收(recevie) 和关闭(close) 三种类型

3.1 发送和接收:

发送和接收都是用 <- 符号

ch := make(chan int,3)
ch <-10    //把10 发送给 ch 管道中
x := <- ch //从ch管道中接收值 赋值给x
<- ch      //也是从ch管道接收值 但是忽略结果
package main

import "fmt"

func main() {
	ch1 := make(chan int, 3)
	ch1 <- 10
	ch1 <- 20
	ch1 <- 30

	x := <-ch1
	<-ch1
	y := <-ch1
	fmt.Println(x, y)
	ch1 <- 40  //再次赋值,这样管道中就存储一个值 
	fmt.Printf("值%v,容量%v,长度%v", ch1, cap(ch1), len(ch1))
}
/*
10 30
值0xc00013a000,容量3,长度1
*/

3.2 管道阻塞

就是管道容量已经满了,再插入值 或者 管道已经空了,再次取数据 就会阻塞,一边存一边取管道就会里面数据是流动的,就不会阻塞

3.3 关闭管道

close(ch)
package main

import "fmt"

func main() {
	ch1 := make(chan int, 11)
	ch2 := make(chan int, 11)
	for i := 1; i < 10; i++ {
		ch1 <- i
	}
	close(ch1)   //注意一定要关闭管道 否则会发生死锁
	for i := 1; i < 10; i++ {
		ch2 <- i
	}
	close(ch2) //注意一定要关闭管道 否则会发生死锁
	for y := 1; y < 10; y++ {
		y := <-ch2
		fmt.Println(y)
	}

	//可以通过range 循环遍历channel
	for v := range ch2 {
		v1 := <-ch2
		fmt.Println(v, v1)

	}
}

3 Goroutine 结合channel 管道

我们定义俩个方法 一个进行写数据 一个进行读数据 ,要求同步进行

如果写入比较慢,读取比较快,读取会进行等待,因为管道是安全也不会出现问题的

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func fn1(ch chan int) {
	for i := 1; i < 10; i++ {
		ch <- i
		time.Sleep(time.Millisecond * 500)
		fmt.Println("写入数据", i)
	}
	close(ch)
	wg.Done()
}

func fn2(ch chan int) {
	for x := range ch {
		time.Sleep(time.Millisecond * 50)
		fmt.Println("读取数据", x)
	}
	wg.Done()

}

func main() {
	var ch = make(chan int, 10)
	wg.Add(1)
	go fn1(ch)
	wg.Add(1)
	go fn2(ch)

	wg.Wait()
	fmt.Println("exit...")
}
/*
读取数据 1
写入数据 1
读取数据 2
写入数据 2
读取数据 3
写入数据 3
读取数据 4
写入数据 4
读取数据 5
写入数据 5
读取数据 6
写入数据 6
读取数据 7
写入数据 7
读取数据 8
写入数据 8
读取数据 9
写入数据 9
exit...
*/

4 统计素数

结合goroutine 协程 和 channel 来实现

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

//向intChan中存放数据
func putNum(intChan chan int) {
	for i := 1; i < 100000; i++ {
		intChan <- i
	}
	close(intChan)
	wg.Done()
}

//从initChan中取出数据,进行判断是否为素数,如果是,就把得到的素数放在primeChan中
func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
	for num := range intChan {
		var flag = true
		for i := 2; i < num; i++ {
			if num%i == 0 {
				flag = false
				break
			}
		}
		if flag {
			primeChan <- num //num 是素数
		}
	}
	//在统计素数的时候 我们用了16个协程,所以需要知道什么时候16个协程执行完成后,在关闭channel
	exitChan <- true
	wg.Done()
	//close(primeChan)  //如果一个channel关闭了就没有办法给这个channel发送数据了。

}

//打印素数的方法
func printPrime(primeChan chan int) {
	//for v := range primeChan {
	//	fmt.Println("primechan", v)
	//}
	wg.Done()
}

func main() {
	start := time.Now().Unix()
	intChan := make(chan int, 1000)
	primeChan := make(chan int, 100000)
	exitChan := make(chan bool, 16)
	//往整数管道中打印数字 协程
	wg.Add(1)
	go putNum(intChan)
	//统计整数管道中的素数 ,然后写入素数管道,使用16个协程
	for i := 0; i < 16; i++ {
		wg.Add(1)
		go primeNum(intChan, primeChan, exitChan)
	}
	//通过匿名函数来实现一个协程 判断exitChan是否存满16个值
	wg.Add(1)
	go func() {
		for i := 0; i < 16; i++ {
			<-exitChan
		}
		//满足16个值 相当于上面16个统计素数的协程已经执行完毕。
		close(primeChan)
		wg.Done()
	}()
	//打印素数协程,从素数管道中取值
	wg.Add(1)
	go printPrime(primeChan)

	wg.Wait()
	stop := time.Now().Unix()
	fmt.Println("exec:", stop-start)
}
/*
exec: 2
*/

5 单向管道

有的时候 我们会将管道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用管道都会对其进行限制,比如限制管道在函数中只能发送或者只能接收

默认情况下 管道是双向的

双向管道
var chan1 chan int 
只写管道
var chan2 chan<- int 
只读管道
var chan3 <-chan int

6 select

在某些场景下 我们需要同时从多个通道接收数据,这个时候,就可以用到golang中给我们提供的select多路复用

通常情况下 通道在接收数据时候, 如果没有数据可以接收将会发生阻塞

语法如下: 可以结合for 循环来实现

select { 
case <-ch1:
  ...
case  data := <-ch2:
  ...
case ch3 <- data: 
  ...
default:
  ...

案例

package main

import (
	"fmt"
)

func main() {
	intChan := make(chan int, 10)
	for i := 0; i < 10; i++ {
		intChan <- i
	}

	stringChan := make(chan string, 10)
	for i := 0; i < 10; i++ {
		stringChan <- "hello" + fmt.Sprintf("%d", i)
	}

	for {
		//使用select获取管道数据的时候,不需要close通道
		select {
		case v := <-intChan:
			fmt.Printf("from intChan data: %d\n", v)

		case v := <-stringChan:
			fmt.Printf("from intChan data: %s\n", v)

		default:
			fmt.Printf("读取数据完毕\n")
			return //退出for循环

		}

	}

}
/*
from intChan data: 0
from intChan data: hello0
from intChan data: hello1
from intChan data: 1
from intChan data: hello2
from intChan data: hello3
from intChan data: 2
from intChan data: hello4
from intChan data: 3
from intChan data: 4
from intChan data: hello5
from intChan data: hello6
from intChan data: hello7
from intChan data: hello8
from intChan data: 5
from intChan data: 6
from intChan data: 7
from intChan data: 8
from intChan data: hello9
from intChan data: 9
读取数据完毕
*/

7 解决协程出现的panic

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func test() {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println("test() 发生异常错误", err)
			wg.Done()
		}
	}()
	var myMap map[int]string
	myMap[0] = "golang" //这里故意写错 发生error 
	wg.Done()
}
func main() {
	wg.Add(1)
	go test()
	wg.Wait()
	fmt.Println("status2 ...")

}
/*
test() 发生异常错误 assignment to entry in nil map
status2 ...
*/

8 Go并发安全锁

8.1 互斥锁

互斥锁是传统并发编程中共享资源进行访问控制的主要手段,他由标准库sync中的Mutex 结构体类型表示,sync.Mutex类型中只有俩个公开的指针方法,Lock和Unlock ,Lock锁定当前的共享资源,Unlock进行解锁

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup   
var mutex sync.Mutex  //声明一把锁
var count = 0

func test() {

	mutex.Lock()				//添加一个互斥锁
	count++
	fmt.Println(count)
	mutex.Unlock()     //释放互斥锁
	wg.Done()
}

func main() {
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go test()
	}
	wg.Wait()
	fmt.Println("exit...")
}
// 这个时候我们可以通过 "go run -race 程序代码.go" 执行来查看一个执行流程和最后是否显示报错信息
/*
1
2
3
4
5
6
7
8
9
10
exit...
*/

使用互斥锁能够保证同一时间去有且只有一个gouroutine 进入临界区,其他的goroutine则在等待。当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的

8.2 读写互斥锁

互斥锁的本质是当一个goroutine访问的时候,其他goroutinge都不能访问,这样在资源同步避免了竞争的同时也降低了程序的并发性能,程序由原来的的并行变成了串行执行

其实 当我们对一个不回变化的数据只做操作的话,是不存在资源的竞争的问题,因为数据是不变的,不管怎么读取 多个goroutine同时读取,都是可以的

所以问题不是出在上,主要是修改,也就是,修改的数据要同步,这样其他goroutine才可以感知到,所以真正的互斥影视读取和修改,修改和修改之间,读和读是没有互斥操作的必要的,因此 衍生出了另外一种锁 叫做读写锁

读写锁可以让你多个读操作进行并发 同时读取,但是对于写操作是完全互斥的,也就是说当一个goroutine进行写操作的时候, 其他goroutine 是既不能够写也不能够读取,也不能进行写操作。

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup
var mutex sync.RWMutex   //读写互斥锁

func write() {
	mutex.Lock()
	fmt.Printf("<--写入数据:\n")
	time.Sleep(time.Second * 1)
	mutex.Unlock()
	wg.Done()
}
func read() {
	mutex.RLock()									//可以同时读取
	fmt.Printf("-->读取数据:\n")
	time.Sleep(time.Second * 1)
	mutex.RUnlock()
	wg.Done()

}

func main() {

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go write()
	}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go read()
	}
	wg.Wait()

}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!