Golang_10_并发编程
十、并发编程
并发与并行
并发:同一 时间段内
执行多个任务(用微信和两个人聊天)
并行:同一 时刻
执行多个任务(你和你朋友都在用微信和别人聊天)
Go并发通过goroutine
实现
goroutine
类似线程,用户态线程,可以根据需要创建成千上万个goroutine
是Go语言运行时(runtime)调度完成,而线程是由操作系统调度完成- Go提供
channel
在多个goroutine
间通信
goroutine
定义多个任务,让系统帮助这些任务分配到CPU上实现并发执行
goroutine
类似线程,Go语言运行时(runtime)调度完成。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。语言层面内置了调度和上下文切换机制
简单粗暴:执行并发任务时,只需要把任务包装成一个函数,开启一个goroutine
去执行函数即可,不需要自己去写进程、线程、协程。
使用goroutine
调用函数钱加上go
关键字,即是为这个函数创建了一个goroutine
-
一个
goroutine
必定对应一个函数 -
可以创建多个
goroutine
去执行相同的函数
启动单个goroutine
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
go hello()
fmt.Println("Main Goroutine done!")
}
只打印 Main Goroutine done!
,为什么呢?
程序启动时,Go程序会为main()
函数创建一个默认的goroutine
main()
函数返回的时候,goroutine
就结束了,所有在main()
函数中启动的goroutine
就一同结束
main()
函数所在的goroutine
就是 夜王,其余goroutine
就是 异鬼,夜王一死,其余转化的异鬼一同GG
所以需要让main函数等待一下hello函数,简单粗暴的方法是time.sleep
func main() {
go hello() // 启动另一个goroutine去执行hello函数
fmt.Println("Main Goroutine done!")
time.Sleep(time.Second)
}
先打印Main Goroutine done!
,再打印Hello Goroutine!
整个过程如下:
-
Go为
main()
创建goroutine
,且创建完 -
Go开始创建
hello()
的goroutine
,但是在创建过程中:- 没有等待:创建过程,执行
fmt.Println("Main Goroutine done!")
,main()结束,所有都关闭 - 有等待:
- 先执行
fmt.Println("Main Goroutine done!")
,但不退出,先等待 - 等待的过程,
hello()
的goroutine
创建完毕并执行,fmt.Println("Hello Goroutine!")
- 先执行
- 没有等待:创建过程,执行
-
所以要么只打印main函数内(没有sleep),要么先main后hello(有sleep)
启动多个goroutine
启动多个goroutine,这里使用sync.WaitGroup
实现同步
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() // goroutine结束就登记-1
fmt.Println("Hello Goroutine!", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 启动一个goroutine就登记+1
go hello(i)
}
wg.Wait() // 等待所有登记的goroutine都结束
}
多次执行,会发现每次打印的数字顺序都不一样。因为这10个goroutine
是并发执行的,且调度是随机的
goroutine与线程
可增长的栈
操作系统线程一般有固定的栈内存(通常为2MB)
一个goroutine
在其生命周期开始时只有很小的栈(通常为2KB)
goroutine
的栈是不固定的,可以按需增大减小,可以达到1GB
所以在Go语言中一次创建十万左右的goroutine
也是可以的
goroutine调度
GMP
是Go运行时(runtime)层面的实现,是Go语言自己实现的一套调度系统,区别于操作系统调度OS线程
G
就是goroutine
,里面除了存放本goroutine
信息外,还有与所在P
的绑定等信息- ==就是go中的一段代码(以一个函数的形式展现),最小的并行单元==
P
管理着一组goroutine
队列- 里面会存储当前
goroutine
运行的上下文环境(函数指针、运行后续的goroutine
等) - 会对自己管理的
goroutine
队列进行一些调度(比如把CPU时间较长的goroutine
暂停、运行后续的goroutine
等) - 当自己队列消费完了就去全局队列里面取,全局也消费完就去其他P等队列里面抢任务
- ==调度goroutine的上下文,G依赖于P进行调度,真正的并行单元==
- 里面会存储当前
M(machine)
是Go运行时(runtime)对操作系统内核线程的虚拟,M
与内核线程一般是一一映射的关系,一个goroutine
最终是要放到M上执行的。- ==go中的工作者线程,真正的代码执行单元==
P和M也是一一对应的,他们的关系:P管理着一组G挂载在M上运行。
- 当G长久阻塞在一个M上时,runtime会创建一个新的M,阻塞G所在的P会把其他的G挂载M上
- 当旧的G阻塞完成或任务其已经死掉时,会回收旧的M
P的个数时通过runtime.GOMAXPROCS
设定(最大256),GO1.5后默认为物理线程数。并发量大时会增加一些P和M,但不会太多,切换太频繁会得不偿失。
单从线程调度讲,Go语言相比起其他语言的优势在于:
OS线程是由OS内核来调度的
goroutine
则是由Go运行时(runtime)自己的调度器调度的
- 这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。
- 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。
- 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上
- 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。
GOMAXPROCS
一个调度器的参数,用来确定需要多少个OS线程来同时执行Go代码(默认值是机器上的CPU核心数)
比如在八核的机器上,调度会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)
Go语言中可以通过runtime.GOMAXPROCS()
函数设置当前程序并发时占用的CPU逻辑核心数。
Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数。
我们可以通过将任务分配到不同的CPU逻辑核心上实现并行的效果,这里举个例子:
两个任务只有一个逻辑核心时:(做完一个任务再做另一个)
func a() {
for i := 0; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 0; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
runtime.GOMAXPROCS(1)
go a()
go b()
time.Sleep(timeSecond)
}
// 输出:
// A:0 ~ A:9
// B:0 ~ B:9
// 先做任务A再做任务B
设置逻辑核心数为2,两个任务并行执行:
func main() {
runtime.GOMAXPROCS(2)
go a()
go b()
time.Sleep(timeSecond)
}
// 输出:
// B:0 ~ B:4
// A:0 ~ A:8
// B:5 ~ B:9
// A:9
// A、B 交替进行
Go语言中 操作系统线程 和 goroutine
的关系:
- 一个操作系统线程对应用户态多个
goroutine
- go程序可以同时使用多个操作系统线程
goroutine
和OS线程是多对多的关系,m:n
channel
函数之间并发执行,且进行数据交换才能体现并发的意义
可以使用共享内存进行数据交换,但是不同的goroutine
使用共享内存会发生竞态问题。所以为了保证数据交换的正确性,需要使用互斥量对内存进行加锁,这样做会造成性能问题。
Go语言的并发模型是CSP(Communication Sequential Processes)
,提倡通过通信->共享内存** 而不是 ***通过共享内存*->*实现通信***
如果说goroutine
是Go程序并发的执行体,channel
就是它们之间的连接。
channel
是可以让一个goroutine
发送特定值到另一个goroutine
的通信机制。
通道(channel
)是一种特殊的类型
通道如同一个传送带或队列,先入先出FIFO,保证收发数据的顺序
每个通道都是一个具体类型的导管,也就是声明channel
的时候需要为其制定元素类型
channel类型
channel
是一种类型,一种引用类型。声明通道类型的格式如下:
var 变量 chan 元素类型
var ch1 chan int // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔值的通道
var ch3 chan []int // 声明一个传递int切片的通道
创建channel
通道是引用类型,空值为nil
var ch chan int
fmt.Println(ch) // <nil>
声明的通道需要进行初始化(make
)才能使用,创建channel
的格式如下:
make(chan 元素类型, [缓冲大小])
ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int, 8)
channel的缓冲大小是可选的。
channel操作
通道有:发送(send)、接收(receive)和关闭(close)三种操作
发送和接收都使用<-
符号
ch := make(chan int) // 1. 定义一个通道
ch <- 10 // 2. 把10发送到ch中
x := <-ch // 3. 从通道接受值,赋值给x
<-ch // 3'. 从通道接受值,忽略结果(丢弃)
fmt.Println(x)
close(ch) // 4. 关闭通道
// 以上代码运行时会出错,下面“缓冲通道”会讲到
关于关闭通道(close()
操作:
-
只有在接收方
goroutine
所有数据都发送完毕时才需要关闭通道 -
通道是可以被垃圾回收机制回收的。
- 关闭文件:结束操作后必须关闭文件
- 关闭通道:非必须操作
-
关闭后的通道有如下特点:
- 对一个关闭的通道
- 再发送值就会导致panic。
- 进行接收会一直获取值直到通道为空。
- 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
- 关闭一个已经关闭的通道会导致panic。
- 对一个关闭的通道
无缓冲的通道(没有缓冲区)
又称阻塞通道,先看下面代码:
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("发送成功")
}
编译通过,但是执行的时候出错:
$ go build
$ ./basementStudy
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/XXXX/basementStudy/main.go:7 +0x54
Why deadlock
错误?
因为使用ch := make(chan int)
创建的是无缓冲的通道,只有在有人接收值的时候才能发送值。(就像你住的小区没有快递柜和代收点,快递员直接打电话送到手,同步的)简单来说就是无缓冲的通道必须有接收才能发送。
代码会阻塞在 ch <- 10
内形成死锁
解决办法是启用一个goroutine
去接收值
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 启动一个goroutine,从通道接收值,先阻塞,直到下面有goroutine发送
ch <- 10
fmt.Println("发送成功")
// 输出:
// 接收成功 10
// 发送成功
}
-
无缓冲通道上的发送操作会阻塞,直到另一个
goroutine
在该通道上执行接收操作,这时值才能发送成功,两个goroutine
将继续执行。 -
相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个
goroutine
在该通道上发送一个值。
无缓冲通道的进行通信会导致发送和接收的goroutine
同步化,因此无缓冲通道也被称为==同步通道==
有缓冲的通道
解决上述“通道内必须同步”的方法是设置缓冲区。可以在make时为其指定通道容量
func main() {
ch := make(chan int, 1)
ch <- 10
fmt.Println("发送成功")
}
只要容量大于0,就是有缓冲的通道,容量即为通道中能存放元素的数量。
如果满了,就需要等待有人取走才能继续往里面放。
可以使用一些函数获取容量:(虽然很少会这样做)
- 使用
len()
函数获取通道内函数的数量 - 使用
cap()
函数获取通道总容量
for range 从通道循环取值
当向通道中发送完数据时,我们可以通过close
函数来关闭通道。
当通道被关闭时,再往该通道发送值会引发panic,从该通道里接收的值一直都是类型零值。那如何判断一个通道是否被关闭了呢?
例子:
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
// 开启一个goroutine将1~100发到ch1中
go func() {
for i := 0; i < 100; i++ {
ch1 <- i
}
close(ch1)
}()
// 开启一个goroutine从ch1中接受值,将值平方放到ch2
go func() {
// 循环,中间会有brerak(当ch1被close的时候)
for {
// 通道取值到i,通道关闭后再取值: ok=false
i, ok := <-ch1
if !ok {
break
}
ch2 <- i * i
}
close(ch2)
}()
for i := range ch2 {
fmt.Println(i)
}
}
从上面的例子中我们看到,有两种方式在接收值的时候判断该通道是否被关闭,不过我们通常使用的是for range
的方式。
使用for range
遍历通道,当通道被关闭的时候就会退出for range
。
单向通道
很多时候我们会在不同的任务函数中加上不同的限制,比如限制通道在函数内只能发送或接收
Go语言提供单向通道来处理这种情况,对上面的例子进行改造:
// chan<- 只能发送的单向通道
// 所以是 数字 发送到 out
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}
// <-chan 只能接收的单向通道
// 所以是 遍历"只能出的" 放入到 "只能进的"
func squarer(out <-chan int, in chan<- int) {
for i := range out {
in <- i * i
}
close(in)
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch1, ch2)
for i := range ch2 {
fmt.Println(i)
}
}
chan<- int
是一个只能发送的通道<-chan int
是一个只能接收的通道- 记法:箭头的指向表示数据方向
在函数传参和任何赋值操作中,可以 将双向通道转换为单向通道,反之不行
通道总结
channel
常见的异常总结,如下:
channel | nil | 非空 | 空 | 满 | 没满 |
---|---|---|---|---|---|
接收 | 阻塞 | 接受值 | 阻塞 | 接受值 | 接收值 |
发送 | 阻塞 | 发送值 | 发送值 | 阻塞 | 发送值 |
关闭 | panic | 关闭成功,读完数据后,返回零值 | 关闭成功,返回零值 | 关闭成功,读完数据后,返回零值 | 关闭成功,读完数据后,返回零值 |
关闭已经关闭的channel
也会引发panic
。
worker pool(goroutine池)
通常使用的模式为worker pool
模式:可以指定启动的goroutine
数量,来控制其数量防止goroutine
泄漏和暴涨
一个简单的work pool
实例代码:
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("worker:%d start job:%d", id, j)
time.Sleep(time.Second)
fmt.Printf("worker:%d end job:%d", id, j)
results <- j*2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 开启三个goroutine
for w = 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 执行5个任务
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 输出结果
for a := 1; a <= 5; a++ {
<- results
}
}
输出:
worker:3 start job:1
worker:1 start job:2
worker:2 start job:3
worker:2 end job:3
worker:2 start job:4
worker:1 end job:2
worker:3 end job:1
worker:1 start job:5
worker:2 end job:4
worker:1 end job:5
select多路复用
场景:同时从多个通道接收数据
通道在接收数据时,如果没有可接收的数据会发生阻塞
for {
// 尝试从ch1接收值
data, ok := <-ch1
// 尝试从ch2接收值
data, ok := <-ch2
...
}
以上写法可以实现从多个通道接收值,但是运行性能会差很多。
应对这种场景的方法为:使用Go内置的select
关键字,可以同时响应多个通道的操作。
select
的使用类似switch
,每个case对应一个通道(接收或发送)的通信过程。select
会一直等待,直到某个case
的通信操作完成时,就会执行case
分支对应的语句。格式如下:
select {
case <-ch1:
...
case data := <-ch2:
...
case ch3<-data:
...
default:
默认操作
}
例子:
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
// 通道空,往里面放值
// 通道不空,取值、打印
select {
case: x := <-ch:
fmt.Println(x)
case: ch<-i:
}
}
使用select
语句可以提高代码的可读性:
- 可处理一个或多个channel的发送/接收操作。
- 如果多个
case
同时满足,select
会随机选择一个。 - 对于没有
case
的select{}
会一直等待,可用于阻塞main函数。
并发安全和锁
有时候可能会存在多个goroutine
同时操作一个资源(临界区),这种情况会发生竞态问题
(数据竞态)。
类比现实生活中的例子有十字路口被各个方向的的汽车竞争;还有火车上的卫生间被车厢里的人竞争。
var x int64
var wg sync.WaitGroup
func add() {
for i := 0; i < 5000; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
上述代码,开启两个goroutine去累加x,这两个goroutine在访问和修改x的时候就会存在竞争,导致最后的结果和期待的不符。每次结果不一样 5000~10000
互斥锁
互斥锁时一种常用的控制共享资源访问的方法,它能保证同时只有一个goroutine
可以访问资源
Go语言时使用sync
包的Mutex
类型来实现互斥锁
使用互斥锁修复上面代码的问题:
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i := 0; i < 5000; i++ {
lock.Lock() // 加锁
x = x + 1
lock.Unlock() // 解锁
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
// 稳定输出10000
使用互斥锁可以保证:
- 同一时间有且只有一个
goroutine
进入临界区 - 其他的
goroutine
则会等待
互斥锁释放后,等待的goroutine
才能获取锁进入临界区
多个goroutine
同时等待一个锁时,唤醒策略是随机的
读写互斥锁
互斥锁是完全互斥的,但是很多场景是读多写少的
当并发读取一个资源(这个资源没有涉及被修改)时,这种场景使用读写锁是一种更好的选择。
读写锁是使用在sync
包中的RwMutex
类型
读写锁分为两种:读锁和写锁
- 当一个
goroutine
获取读锁后:(其余人不能修改)- 其他
goroutine
获取读锁后,会继续获得锁 - 其他
goroutine
获取写锁后,会等待
- 其他
- 当一个
goroutine
获取写锁后:(其余人不能修改或访问数据)- 其他
goroutine
获取读锁或写锁后,都会等待
- 其他
var (
x int64
wg sync.WaitGroup
lock sync.Mutex // 普通互斥锁
rwlock sync.RWMutex // 读写互斥锁
)
func write() {
lock.Lock() // 加普通互斥锁
//rwlock.RLock() // 加读写互斥锁
x = x + 1
time.Sleep(10 * time.Millisecond) // 假设写操作耗时10毫秒
//rwlock.RUnlock() // 解读写互斥锁
lock.Unlock() // 解普通互斥锁
wg.Done() // 计数器-1
}
func read() {
lock.Lock() // 加普通互斥锁
//rwlock.RLock() // 加读写互斥锁
time.Sleep(10 * time.Millisecond) // 假设读操作耗时1毫秒
//rwlock.RUnlock() // 解读写互斥锁
lock.Unlock() // 解普通互斥锁
wg.Done() // 计数器-1
}
func main() {
// 用来和后面的 end 时间来计算耗时
start := time.Now()
// 开启10个写goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
// 开启1000个读goroutine
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
// 计算耗时
end := time.Now()
fmt.Println(end.Sub(start))
}
运行效果:(读多写少的情况)
- 读操作使用读写锁:(主要,读的时候没必要锁起来)
- 写操作读写锁:14.765198ms
- 写操作普通锁:107.509604ms
- 读操作使用普通锁:(次要,不太影响性能)
- 写操作读写锁:11.427787848s
- 写操作普通锁:11.283028743s
sync.WaitGroup
在代码中生硬的使用time.Sleep
肯定是不合适的,Go语言中可以使用sync.WaitGroup
来实现并发任务的同步。 sync.WaitGroup
有以下几个方法:
方法名 | 功能 |
---|---|
(wg * WaitGroup) Add(delta int) |
计数器+delta |
(wg *WaitGroup) Done() |
计数器-1 |
(wg *WaitGroup) Wait() |
阻塞直到计数器变为0 |
sync.WaitGroup
内部维护着一个计数器,计数器的值可以增加和减少。
- 当启动了N 个并发任务时,就将计数器值增加N。
- 每个任务完成时通过调用
Done()
方法将计数器减1。
通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成
开始的一个例子使用sync.WaitGroup
优化
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() // goroutine结束就登记-1
fmt.Println("Hello Goroutine!", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 启动一个goroutine就登记+1
go hello(i)
}
wg.Wait() // 等待所有登记的goroutine都结束
}
优化后:
var wg sync.WaitGroup
func hello() {
defer wg.Done() // goroutine结束就登记-1
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
wg.Wait()
}
// main goroutine done!
// Hello Goroutine!
// 或者下面写法,不另外创建函数
var wg sync.WaitGroup // step1
func main(){
wg.Add(1) // step2
go func(){
defer wg.Done() // step3 每个任务内defer
fmt.Println("Hello goroutine!")
}()
wg.Wait() // step4 后面的语句阻塞,直到所有goroutine返回
fmt.Println("The last statement to be executed")
}
需要注意sync.WaitGroup
是一个结构体,传递的时候要传递指针。
wg用法三步走-总结
- Step1-
var wg sync.WaitGroup
- Step2-
wg.Add(n)
:启动n个goroutine
,计数器+n
- Step3-
wg.Done()
:计数器-1
,一般:go func(){defer wg.Done()}()
,在每个任务内defer
,最后执行 - Step4-
wg.Wait()
:任何在wg.Wait()
后的语句先阻塞,直到所有goroutine
返回
sync.Once
进阶知识点
场景:确保某些操作在高并发场景下只执行一次(加载一次配置文件、只关闭一次通道等)。
Go语言中的sync
包中的sync.Once
提供解决方案,里面只有一个Do
方法:
func (o *Once) Do(f func()){}
备注:如果需要执行的函数f需要传参,则需要搭配闭包来使用
加载配置文件实例
当有一个开销很大的初始化操作时,先延迟它,直到真正要用到它再执行
因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序启动耗时,但可能实际执行过程中这个变量没有用上,那么这个初始化操作是非必须的。例子:
var icons map[string]image.Image
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {
if icons == nil {
loadIcons()
}
return icons[name]
}
多个goroutine
并发调用Icon函数不是并发安全的,现代编译器和CPU可能会在保证每个goroutine
都满足串行一致的基础上自由地重排访问内存顺序。
上方的loadIcons可能会被重排为以下结果:
func loadIcons() {
icons = make(map[string]image.Image)
icons["left"] = loadIcon("left.png")
icons["up"] = loadIcon("up.png")
icons["right"] = loadIcon("right.png")
icons["down"] = loadIcon("down.png")
}
这种情况,即使判断了 icons
不是nil也不意味着初始化变量完成了。
考虑到这种情况,使用互斥锁可以解决,保证icons
初始化的时候不会被其他的goroutine
操作,这样又带来了性能问题。
这时使用sync.Once
来进行改造:
package main
import (
"image"
"sync"
)
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcon(path string) (im image.Image) {
return im
}
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 是并发安全的
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
sync.Once
其实内部包含一个互斥锁和一个布尔值,
- 互斥锁保证布尔值和数据的安全
- 布尔值用来记录初始化是否完成
- 这样设计就能保证初始化操作的时候:是并发安全的并且初始化操作也不会被执行多次。
sync.Map
Go中内置的map不是并发安全的,示例:
package main
import (
"fmt"
"strconv"
"sync"
)
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
上述代码开启少量几个goroutine
时没问题,但是并发多了之后代码会报错:fatal error: concurrent map writes
这种情况需要为map加锁来保证并发的安全性,Go语言的sync
包中提供了一个开箱即用的安全版map — sync.Map
- 不需要使用make函数初始化即可直接使用
- 内置了诸如
Store
、Load
、LoadOrStore
、Delete
、Range
等操作方法
var m = sync.Map{}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m.Store(key, n) // map添加一组键值
value, _ := m.Load(key) // map根据key取value
fmt.Printf("key=%v, value=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
原子操作
代码中的加锁操作,因为涉及内核态的上下文切换会比较耗时、代价比较高。
针对基本数据类型,我们还可以使用原子操作来保证并发安全
原子操作是Go语言提供的方法,它在用户态就可以完成,因此性能比加锁操作更好
Go语言中原子操作由内置的标准库sync/atomic
提供
atomic包
方法 | 解释 |
---|---|
func LoadInt32(addr *int32) (val int32) func LoadInt64(addr *int64) (val int64) func LoadUint32(addr *uint32) (val uint32) func LoadUint64(addr *uint64) (val uint64) func LoadUintptr(addr *uintptr) (val uintptr) func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) |
读取操作 |
func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintptr, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) |
写入操作 |
func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) |
修改操作 |
func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) |
交换操作 |
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) |
比较并交换操 |
示例
var x int64
var l sync.Mutex
var wg sync.WaitGroup
// 普通版加函数
func add() {
// x = x + 1
x++ // 等价于上面的操作
wg.Done()
}
// 互斥锁版加函数
func mutexAdd() {
l.Lock()
x++
l.Unlock()
wg.Done()
}
// 原子操作版加函数
func atomicAdd() {
atomic.AddInt64(&x, 1)
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10000; i++ {
wg.Add(1)
// go add() // 普通版add函数 不是并发安全的
// go mutexAdd() // 加锁版add函数 是并发安全的,但是加锁性能开销大
go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版
}
wg.Wait()
end := time.Now()
fmt.Println(x)
fmt.Println(end.Sub(start))
}
atomic
包提供了底层的原子级内存操作,对于同步算法的实现很有用。
这些函数必须谨慎地保证正确使用。
除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。