Golang_10_并发编程

十、并发编程

并发与并行

并发:同一 时间段内 执行多个任务(用微信和两个人聊天)

并行:同一 时刻 执行多个任务(你和你朋友都在用微信和别人聊天)

Go并发通过goroutine实现

  • goroutine类似线程,用户态线程,可以根据需要创建成千上万个
  • goroutine是Go语言运行时(runtime)调度完成,而线程是由操作系统调度完成
  • Go提供channel在多个goroutine间通信

goroutine

定义多个任务,让系统帮助这些任务分配到CPU上实现并发执行

goroutine类似线程,Go语言运行时(runtime)调度完成。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。语言层面内置了调度和上下文切换机制

简单粗暴:执行并发任务时,只需要把任务包装成一个函数,开启一个goroutine去执行函数即可,不需要自己去写进程、线程、协程。

使用goroutine

调用函数钱加上go关键字,即是为这个函数创建了一个goroutine

  • 一个goroutine必定对应一个函数

  • 可以创建多个goroutine去执行相同的函数

启动单个goroutine
func hello() {
	fmt.Println("Hello Goroutine!")
}

func main() {
	go hello()
	fmt.Println("Main Goroutine done!")
}

只打印 Main Goroutine done!,为什么呢?

程序启动时,Go程序会为main()函数创建一个默认的goroutine

main()函数返回的时候,goroutine就结束了,所有在main()函数中启动的goroutine就一同结束

main()函数所在的goroutine就是 夜王,其余goroutine就是 异鬼,夜王一死,其余转化的异鬼一同GG

所以需要让main函数等待一下hello函数,简单粗暴的方法是time.sleep

func main() {
	go hello() // 启动另一个goroutine去执行hello函数
	fmt.Println("Main Goroutine done!")
	time.Sleep(time.Second)
}

先打印Main Goroutine done!,再打印Hello Goroutine!

整个过程如下:

  1. Go为main()创建goroutine,且创建完

  2. Go开始创建hello()goroutine,但是在创建过程中:

    • 没有等待:创建过程,执行fmt.Println("Main Goroutine done!"),main()结束,所有都关闭
    • 有等待:
      • 先执行fmt.Println("Main Goroutine done!"),但不退出,先等待
      • 等待的过程,hello()goroutine创建完毕并执行,fmt.Println("Hello Goroutine!")
  3. 所以要么只打印main函数内(没有sleep),要么先main后hello(有sleep)

启动多个goroutine

启动多个goroutine,这里使用sync.WaitGroup实现同步

var wg sync.WaitGroup

func hello(i int) {
	defer wg.Done() // goroutine结束就登记-1
	fmt.Println("Hello Goroutine!", i)
}

func main() {
	for i := 0; i < 10; i++ {
		wg.Add(1) // 启动一个goroutine就登记+1
		go hello(i)
	}
	wg.Wait() // 等待所有登记的goroutine都结束
}

多次执行,会发现每次打印的数字顺序都不一样。因为这10个goroutine是并发执行的,且调度是随机的

goroutine与线程

可增长的栈

操作系统线程一般有固定的栈内存(通常为2MB)

一个goroutine在其生命周期开始时只有很小的栈(通常为2KB)

goroutine的栈是不固定的,可以按需增大减小,可以达到1GB

所以在Go语言中一次创建十万左右的goroutine也是可以的

goroutine调度

GMP是Go运行时(runtime)层面的实现,是Go语言自己实现的一套调度系统,区别于操作系统调度OS线程

  • G就是goroutine,里面除了存放本goroutine信息外,还有与所在P的绑定等信息
    • ==就是go中的一段代码(以一个函数的形式展现),最小的并行单元==
  • P管理着一组goroutine队列
    • 里面会存储当前goroutine运行的上下文环境(函数指针、运行后续的goroutine等)
    • 会对自己管理的goroutine队列进行一些调度(比如把CPU时间较长的goroutine暂停、运行后续的goroutine等)
    • 当自己队列消费完了就去全局队列里面取,全局也消费完就去其他P等队列里面抢任务
    • ==调度goroutine的上下文,G依赖于P进行调度,真正的并行单元==
  • M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟,M与内核线程一般是一一映射的关系,一个goroutine最终是要放到M上执行的。
    • ==go中的工作者线程,真正的代码执行单元==

P和M也是一一对应的,他们的关系:P管理着一组G挂载在M上运行

  • 当G长久阻塞在一个M上时,runtime会创建一个新的M,阻塞G所在的P会把其他的G挂载M上
  • 当旧的G阻塞完成或任务其已经死掉时,会回收旧的M

P的个数时通过runtime.GOMAXPROCS设定(最大256),GO1.5后默认为物理线程数。并发量大时会增加一些P和M,但不会太多,切换太频繁会得不偿失。

单从线程调度讲,Go语言相比起其他语言的优势在于:

OS线程是由OS内核来调度的

goroutine则是由Go运行时(runtime)自己的调度器调度的

  • 这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。
  • 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。
  • 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上
  • 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

深入Golang调度器之GMP模型

GOMAXPROCS

一个调度器的参数,用来确定需要多少个OS线程来同时执行Go代码(默认值是机器上的CPU核心数)

比如在八核的机器上,调度会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)

Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数

我们可以通过将任务分配到不同的CPU逻辑核心上实现并行的效果,这里举个例子:

两个任务只有一个逻辑核心时:(做完一个任务再做另一个)

func a() {
    for i := 0; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 0; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(1)
    go a()
    go b()
    time.Sleep(timeSecond)
}
// 输出:
// A:0 ~ A:9
// B:0 ~ B:9
// 先做任务A再做任务B

设置逻辑核心数为2,两个任务并行执行:

func main() {
    runtime.GOMAXPROCS(2)
    go a()
    go b()
    time.Sleep(timeSecond)
}
// 输出:
// B:0 ~ B:4
// A:0 ~ A:8
// B:5 ~ B:9
// A:9
// A、B 交替进行

Go语言中 操作系统线程 和 goroutine 的关系:

  1. 一个操作系统线程对应用户态多个goroutine
  2. go程序可以同时使用多个操作系统线程
  3. goroutine和OS线程是多对多的关系,m:n

channel

函数之间并发执行,且进行数据交换才能体现并发的意义

可以使用共享内存进行数据交换,但是不同的goroutine使用共享内存会发生竞态问题。所以为了保证数据交换的正确性,需要使用互斥量对内存进行加锁,这样做会造成性能问题。

Go语言的并发模型是CSP(Communication Sequential Processes),提倡通过通信->共享内存** 而不是 ***通过共享内存*->*实现通信***

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。

channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

通道(channel)是一种特殊的类型

通道如同一个传送带或队列,先入先出FIFO,保证收发数据的顺序

每个通道都是一个具体类型的导管,也就是声明channel的时候需要为其制定元素类型

channel类型

channel是一种类型,一种引用类型。声明通道类型的格式如下:

var 变量 chan 元素类型
var ch1 chan int   // 声明一个传递整型的通道
var ch2 chan bool  // 声明一个传递布尔值的通道
var ch3 chan []int // 声明一个传递int切片的通道
创建channel

通道是引用类型,空值为nil

var ch chan int
fmt.Println(ch) // <nil>

声明的通道需要进行初始化(make)才能使用,创建channel的格式如下:

make(chan 元素类型, [缓冲大小])
ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int, 8)

channel的缓冲大小是可选的。

channel操作

通道有:发送(send)、接收(receive)和关闭(close)三种操作

发送和接收都使用<-符号

ch := make(chan int) // 1. 定义一个通道
ch <- 10             // 2. 把10发送到ch中
x := <-ch            // 3. 从通道接受值,赋值给x
<-ch                 // 3'. 从通道接受值,忽略结果(丢弃)
fmt.Println(x)
close(ch) 			  // 4. 关闭通道
// 以上代码运行时会出错,下面“缓冲通道”会讲到

关于关闭通道(close()操作:

  • 只有在接收方goroutine所有数据都发送完毕时才需要关闭通道

  • 通道是可以被垃圾回收机制回收的。

    • 关闭文件:结束操作后必须关闭文件
    • 关闭通道:非必须操作
  • 关闭后的通道有如下特点:

    1. 对一个关闭的通道
      • 再发送值就会导致panic。
      • 进行接收会一直获取值直到通道为空。
      • 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
    2. 关闭一个已经关闭的通道会导致panic。
无缓冲的通道(没有缓冲区)

又称阻塞通道,先看下面代码:

func main() {
	ch := make(chan int)
	ch <- 10
	fmt.Println("发送成功")
}

编译通过,但是执行的时候出错:

$ go build

$ ./basementStudy 
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /XXXX/basementStudy/main.go:7 +0x54

Why deadlock错误?

因为使用ch := make(chan int)创建的是无缓冲的通道,只有在有人接收值的时候才能发送值。(就像你住的小区没有快递柜和代收点,快递员直接打电话送到手,同步的)简单来说就是无缓冲的通道必须有接收才能发送。

代码会阻塞在 ch <- 10内形成死锁

解决办法是启用一个goroutine去接收值

func recv(c chan int) {
	ret := <-c
	fmt.Println("接收成功", ret)
}

func main() {
	ch := make(chan int)
	go recv(ch) // 启动一个goroutine,从通道接收值,先阻塞,直到下面有goroutine发送
	ch <- 10
	fmt.Println("发送成功")
    // 输出:
    // 接收成功 10
	// 发送成功
}
  • 无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。

  • 相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

无缓冲通道的进行通信会导致发送和接收的goroutine同步化,因此无缓冲通道也被称为==同步通道==

有缓冲的通道

解决上述“通道内必须同步”的方法是设置缓冲区。可以在make时为其指定通道容量

func main() {
	ch := make(chan int, 1)
	ch <- 10
	fmt.Println("发送成功")
}

只要容量大于0,就是有缓冲的通道,容量即为通道中能存放元素的数量。

如果满了,就需要等待有人取走才能继续往里面放。

可以使用一些函数获取容量:(虽然很少会这样做)

  • 使用len()函数获取通道内函数的数量
  • 使用cap()函数获取通道总容量
for range 从通道循环取值

当向通道中发送完数据时,我们可以通过close函数来关闭通道。

当通道被关闭时,再往该通道发送值会引发panic,从该通道里接收的值一直都是类型零值。那如何判断一个通道是否被关闭了呢?

例子:

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)

	// 开启一个goroutine将1~100发到ch1中
	go func() {
		for i := 0; i < 100; i++ {
			ch1 <- i
		}
		close(ch1)
	}()

	// 开启一个goroutine从ch1中接受值,将值平方放到ch2
	go func() {
		// 循环,中间会有brerak(当ch1被close的时候)
		for {
			// 通道取值到i,通道关闭后再取值: ok=false
			i, ok := <-ch1
			if !ok {
				break
			}
			ch2 <- i * i
		}
		close(ch2)
	}()

	for i := range ch2 {
		fmt.Println(i)
	}
}

从上面的例子中我们看到,有两种方式在接收值的时候判断该通道是否被关闭,不过我们通常使用的是for range的方式。

使用for range遍历通道,当通道被关闭的时候就会退出for range

单向通道

很多时候我们会在不同的任务函数中加上不同的限制,比如限制通道在函数内只能发送或接收

Go语言提供单向通道来处理这种情况,对上面的例子进行改造:

// chan<- 只能发送的单向通道
// 所以是 数字 发送到 out
func counter(out chan<- int) {
	for i := 0; i < 100; i++ {
		out <- i
	}
	close(out)
}

// <-chan 只能接收的单向通道
// 所以是 遍历"只能出的" 放入到 "只能进的"
func squarer(out <-chan int, in chan<- int) {
	for i := range out {
		in <- i * i
	}
	close(in)
}

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)

	go counter(ch1)
	go squarer(ch1, ch2)
	for i := range ch2 {
		fmt.Println(i)
	}
}
  • chan<- int是一个只能发送的通道
  • <-chan int是一个只能接收的通道
  • 记法:箭头的指向表示数据方向

在函数传参和任何赋值操作中,可以 将双向通道转换为单向通道,反之不行

通道总结

channel常见的异常总结,如下:

channel nil 非空 没满
接收 阻塞 接受值 阻塞 接受值 接收值
发送 阻塞 发送值 发送值 阻塞 发送值
关闭 panic 关闭成功,读完数据后,返回零值 关闭成功,返回零值 关闭成功,读完数据后,返回零值 关闭成功,读完数据后,返回零值

关闭已经关闭的channel也会引发panic

worker pool(goroutine池)

通常使用的模式为worker pool模式:可以指定启动的goroutine数量,来控制其数量防止goroutine泄漏和暴涨

一个简单的work pool实例代码:

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("worker:%d start job:%d", id, j)
        time.Sleep(time.Second)
        fmt.Printf("worker:%d end   job:%d", id, j)
        results <- j*2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    // 开启三个goroutine
    for w = 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }
    
    // 执行5个任务
    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 输出结果
    for a := 1; a <= 5; a++ {
        <- results
    }
}

输出:

worker:3 start job:1
worker:1 start job:2
worker:2 start job:3
worker:2 end   job:3
worker:2 start job:4
worker:1 end   job:2
worker:3 end   job:1
worker:1 start job:5
worker:2 end   job:4
worker:1 end   job:5

select多路复用

场景:同时从多个通道接收数据

通道在接收数据时,如果没有可接收的数据会发生阻塞

for {
    // 尝试从ch1接收值
    data, ok := <-ch1
    // 尝试从ch2接收值
    data, ok := <-ch2
    ...
}

以上写法可以实现从多个通道接收值,但是运行性能会差很多。

应对这种场景的方法为:使用Go内置的select关键字,可以同时响应多个通道的操作。

select的使用类似switch,每个case对应一个通道(接收或发送)的通信过程select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。格式如下:

select {
	case <-ch1:
	...
	case data := <-ch2:
	...
	case ch3<-data:
	...
	default:
	默认操作
}

例子:

ch := make(chan int, 1)

for i := 0; i < 10; i++ {
    // 通道空,往里面放值
    // 通道不空,取值、打印
    select {
    case: x := <-ch:
        fmt.Println(x)
    case: ch<-i:
    }
}

使用select语句可以提高代码的可读性:

  • 可处理一个或多个channel的发送/接收操作。
  • 如果多个case同时满足,select会随机选择一个
  • 对于没有caseselect{}会一直等待,可用于阻塞main函数。

并发安全和锁

有时候可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)。

类比现实生活中的例子有十字路口被各个方向的的汽车竞争;还有火车上的卫生间被车厢里的人竞争。

var x int64
var wg sync.WaitGroup

func add() {
    for i := 0; i < 5000; i++ {
        x = x + 1
    }
    wg.Done()
}

func main() {
    wg.Add(2)
    
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

上述代码,开启两个goroutine去累加x,这两个goroutine在访问和修改x的时候就会存在竞争,导致最后的结果和期待的不符。每次结果不一样 5000~10000

互斥锁

互斥锁时一种常用的控制共享资源访问的方法,它能保证同时只有一个goroutine可以访问资源

Go语言时使用sync包的Mutex类型来实现互斥锁

使用互斥锁修复上面代码的问题:

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
    for i := 0; i < 5000; i++ {
        lock.Lock() // 加锁
        x = x + 1
        lock.Unlock() // 解锁
    }
    wg.Done()
}

func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}
// 稳定输出10000

使用互斥锁可以保证:

  • 同一时间有且只有一个goroutine进入临界区
  • 其他的goroutine则会等待

互斥锁释放后,等待的goroutine才能获取锁进入临界区

多个goroutine同时等待一个锁时,唤醒策略是随机的

读写互斥锁

互斥锁是完全互斥的,但是很多场景是读多写少

当并发读取一个资源(这个资源没有涉及被修改)时,这种场景使用读写锁是一种更好的选择。

读写锁是使用在sync包中的RwMutex类型

读写锁分为两种:读锁和写锁

  • 当一个goroutine获取读锁后:(其余人不能修改)
    • 其他goroutine获取读锁后,会继续获得锁
    • 其他goroutine获取写锁后,会等待
  • 当一个goroutine获取写锁后:(其余人不能修改或访问数据)
    • 其他goroutine获取读锁写锁后,都会等待
var (
	x 		int64
    wg 		sync.WaitGroup
    lock	sync.Mutex		// 普通互斥锁
    rwlock	sync.RWMutex	// 读写互斥锁
)

func write() {
    lock.Lock() // 加普通互斥锁
    //rwlock.RLock() // 加读写互斥锁
    x = x + 1
    time.Sleep(10 * time.Millisecond) // 假设写操作耗时10毫秒
    //rwlock.RUnlock() // 解读写互斥锁
    lock.Unlock() // 解普通互斥锁
    wg.Done() // 计数器-1
}

func read() {
    lock.Lock() // 加普通互斥锁
    //rwlock.RLock() // 加读写互斥锁
    time.Sleep(10 * time.Millisecond) // 假设读操作耗时1毫秒
    //rwlock.RUnlock() // 解读写互斥锁
    lock.Unlock() // 解普通互斥锁
    wg.Done() // 计数器-1
}

func main() {
    // 用来和后面的 end 时间来计算耗时
    start := time.Now()
    
    // 开启10个写goroutine
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go write()
    }
    
    // 开启1000个读goroutine
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go read()
    }
    wg.Wait()
    
    // 计算耗时
    end := time.Now()
    fmt.Println(end.Sub(start))
}

运行效果:(读多写少的情况)

  • 读操作使用读写锁:(主要,读的时候没必要锁起来)
    • 写操作读写锁:14.765198ms
    • 写操作普通锁:107.509604ms
  • 读操作使用普通锁:(次要,不太影响性能)
    • 写操作读写锁:11.427787848s
    • 写操作普通锁:11.283028743s
sync.WaitGroup

在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步sync.WaitGroup有以下几个方法:

方法名 功能
(wg * WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。

  • 当启动了N 个并发任务时,就将计数器值增加N。
  • 每个任务完成时通过调用Done()方法将计数器减1。

通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成

开始的一个例子使用sync.WaitGroup优化

var wg sync.WaitGroup

func hello(i int) {
	defer wg.Done() // goroutine结束就登记-1
	fmt.Println("Hello Goroutine!", i)
}

func main() {
	for i := 0; i < 10; i++ {
		wg.Add(1) // 启动一个goroutine就登记+1
		go hello(i)
	}
	wg.Wait() // 等待所有登记的goroutine都结束
}

优化后:

var wg sync.WaitGroup

func hello() {
    defer wg.Done() // goroutine结束就登记-1
    fmt.Println("Hello Goroutine!")
}

func main() {
    wg.Add(1)
    go hello() // 启动另外一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
    wg.Wait()
}
// main goroutine done!
// Hello Goroutine!

// 或者下面写法,不另外创建函数
var wg sync.WaitGroup       // step1
func main(){
    wg.Add(1)				// step2
    go func(){
        defer wg.Done() // step3 每个任务内defer 
        fmt.Println("Hello goroutine!")
    }()
    wg.Wait() // step4 后面的语句阻塞,直到所有goroutine返回
    fmt.Println("The last statement to be executed")
}

需要注意sync.WaitGroup是一个结构体,传递的时候要传递指针。

wg用法三步走-总结
  • Step1-var wg sync.WaitGroup
  • Step2- wg.Add(n):启动n个goroutine,计数器+n
  • Step3-wg.Done():计数器-1,一般:go func(){defer wg.Done()}(),在每个任务内defer,最后执行
  • Step4-wg.Wait():任何在wg.Wait()后的语句先阻塞,直到所有goroutine返回
sync.Once

进阶知识点

场景:确保某些操作在高并发场景下只执行一次(加载一次配置文件、只关闭一次通道等)。

Go语言中的sync包中的sync.Once提供解决方案,里面只有一个Do方法:

func (o *Once) Do(f func()){}

备注:如果需要执行的函数f需要传参,则需要搭配闭包来使用

加载配置文件实例

当有一个开销很大的初始化操作时,先延迟它,直到真正要用到它再执行

因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序启动耗时,但可能实际执行过程中这个变量没有用上,那么这个初始化操作是非必须的。例子:

var icons map[string]image.Image

func loadIcons() {
    icons = map[string]image.Image{
        "left":		loadIcon("left.png"),
        "up":		loadIcon("up.png"),
        "right":	loadIcon("right.png"),
        "down":		loadIcon("down.png"),
    }
}

// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {
    if icons == nil {
        loadIcons()
    }
    return icons[name]
}

多个goroutine并发调用Icon函数不是并发安全的,现代编译器和CPU可能会在保证每个goroutine都满足串行一致的基础上自由地重排访问内存顺序。

上方的loadIcons可能会被重排为以下结果:

func loadIcons() {
    icons = make(map[string]image.Image)
    icons["left"] = loadIcon("left.png")
	icons["up"] = loadIcon("up.png")
	icons["right"] = loadIcon("right.png")
	icons["down"] = loadIcon("down.png")
}

这种情况,即使判断了 icons不是nil也不意味着初始化变量完成了。

考虑到这种情况,使用互斥锁可以解决,保证icons初始化的时候不会被其他的goroutine操作,这样又带来了性能问题。

这时使用sync.Once来进行改造:

package main

import (
	"image"
	"sync"
)

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcon(path string) (im image.Image) {
	return im
}

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon 是并发安全的
func Icon(name string) image.Image {
	loadIconsOnce.Do(loadIcons)
	return icons[name]
}

sync.Once其实内部包含一个互斥锁和一个布尔值,

  • 互斥锁保证布尔值和数据的安全
  • 布尔值用来记录初始化是否完成
  • 这样设计就能保证初始化操作的时候:是并发安全的并且初始化操作不会被执行多次
sync.Map

Go中内置的map不是并发安全的,示例:

package main

import (
	"fmt"
	"strconv"
	"sync"
)

var m = make(map[string]int)

func get(key string) int {
	return m[key]
}

func set(key string, value int) {
	m[key] = value
}

func main() {
	wg := sync.WaitGroup{}

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			set(key, n)
			fmt.Printf("k=:%v,v:=%v\n", key, get(key))
			wg.Done()
		}(i)
	}
	wg.Wait()
}

上述代码开启少量几个goroutine时没问题,但是并发多了之后代码会报错:fatal error: concurrent map writes

这种情况需要为map加锁来保证并发的安全性,Go语言的sync包中提供了一个开箱即用的安全版map — sync.Map

  • 不需要使用make函数初始化即可直接使用
  • 内置了诸如StoreLoadLoadOrStoreDeleteRange等操作方法
var m = sync.Map{}

func main() {
	wg := sync.WaitGroup{}

	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			m.Store(key, n)         // map添加一组键值
			value, _ := m.Load(key) // map根据key取value
			fmt.Printf("key=%v, value=%v\n", key, value)
			wg.Done()
		}(i)
	}
	wg.Wait()
}

原子操作

代码中的加锁操作,因为涉及内核态的上下文切换会比较耗时、代价比较高。

针对基本数据类型,我们还可以使用原子操作来保证并发安全

原子操作是Go语言提供的方法,它在用户态就可以完成,因此性能比加锁操作更好

Go语言中原子操作由内置的标准库sync/atomic提供

atomic包
方法 解释
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
读取操作
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
写入操作
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
修改操作
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
交换操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
比较并交换操
示例
var x int64
var l sync.Mutex
var wg sync.WaitGroup

// 普通版加函数
func add() {
	// x = x + 1
	x++ // 等价于上面的操作
	wg.Done()
}

// 互斥锁版加函数
func mutexAdd() {
	l.Lock()
	x++
	l.Unlock()
	wg.Done()
}

// 原子操作版加函数
func atomicAdd() {
	atomic.AddInt64(&x, 1)
	wg.Done()
}

func main() {
	start := time.Now()
	for i := 0; i < 10000; i++ {
		wg.Add(1)
		// go add()       // 普通版add函数 不是并发安全的
		// go mutexAdd()  // 加锁版add函数 是并发安全的,但是加锁性能开销大
		go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版
	}
	wg.Wait()
	end := time.Now()
	fmt.Println(x)
	fmt.Println(end.Sub(start))
}

atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。

这些函数必须谨慎地保证正确使用。

除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。


分享: