Skip to content

并发编程

1.GMP模型

GMP模型是 Go 的协程调度模型。

img

  • G是 goroutine 实现的核心结构,它包含了栈,指令指针,以及其他对调度 goroutine 很重要的信息,例如其阻塞的 channel。
  • P结构是 Processor,它的主要用途就是 M 执行 G 提供上下文,它维护了一个 goroutine 队列,即 runqueue。Processor 是让我们从N:1调度到M:N调度的重要部分。
  • M结构是 Machine,系统线程,它由操作系统管理的,goroutine 就是跑在M之上的;M是一个很大的结构,里面维护小对象内存 cache(mcache)、当前执行的 goroutine、随机数发生器等等非常多的信息。
  • 形象来说,G是任务,M是处理器,P是等待队列。(当然并不是这样,这只是抽象比喻)

P与M一般是一一对应的,P管理着一组G挂载在M上运行。

当一个G长久阻塞在一个M上是,runtime会新建一个M,阻塞G所在的 P 会把其他G挂载到新建的M上。当旧的M上的G不再阻塞或已经死掉,回收旧的M。

当一个P空闲时,就会从全局队列中获取G,若全局队列为空,则会和其他 P 分担G(一般分一半)

GM模型

在GMP模型之前,Go 语言使用的是GM模型。

img

GM模型同样也是从队列里取G,为什么要加P?

很简单的道理,因为多线程访问全局变量要加锁。GM模型每次到全局队列中取G都要加锁解锁,竞争大、效率低,影响并发性能。GMP模型添加本地队列 P ,M优先从自己的本地队列中取G,无需加锁解锁,能够大幅减少锁的竞争,提高效率。

2.goroutine

关键字 go

Go语言中使用 goroutine 非常简单,只需要在调用函数的时候在前面加上 go 关键字,就可以为一个函数创建一个 goroutine。

main goroutine

在 Go 语言中,main goroutine 是一个特殊的 goroutine,它的生命周期和程序的生命周期相同。当 main 函数结束时,main goroutine 会随之结束,同时其他子 goroutine 也会被结束,然后程序退出。

因此,当我们启动其他 goroutine 时,要保持 main 函数运行。

启动 goroutine

go
package main

import (
    "fmt"
    "time"
)

func hi() {
    fmt.Println("hi")
}

func main() {
    fmt.Println("START")
    go hi()
    fmt.Println("END")
    time.Sleep(1)    //这里只暂停main函数一纳秒
}
//若子进程运行得快,就能打印出 hi,因此结果有时有 hi,有时没有

使用time.sleep太粗糙生硬了,若有多个goroutine,不可能精确知道每个goroutine运行需要的时间,因此可以使用sync.WaitGroup来实现并发任务的同步。

go
package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup

func hi(i int) {
    defer wg.Done()    //计数器-1,用defer关键字,哪怕进程崩溃了也能保证计数器-1
    fmt.Println("hi", i)
}

func main() {
    fmt.Println("START")
    for i := 0; i < 10; i++ {
        wg.Add(1)    //计数器+1
        go hi(i)
    }
    fmt.Println("END")
    wg.Wait()    //阻塞直到计数器为0
}
//除了START,输出结果是完全乱序的,因为10个goroutine是并发执行的

3.channel

“不要通过共享内存来通信,而要通过通信来共享内存”

只是单纯将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

与主流语言通过共享内存来通信进行并发控制的方式不同,Go 语言的并发模型采用了 CSP 模式,goroutine 之间使用共享的 channel(管道)进行通信,通过通信传递数据。

channel类型

  • channel是一种类型,一种引用类型,空值是nil。
  • 声明后的channel需要使用make函数初始化

声明channel

语法如下:

例子:

go
var ch1 chan int   // 声明一个传递整型的通道
    var ch2 chan bool  // 声明一个传递布尔型的通道
    var ch3 chan []int // 声明一个传递int切片的通道

初始化channel

语法如下:

例子:

go
var ch1 chan int
    ch1 = make(chan int)    //无缓冲区
    ch2 := make(chan bool)    //无缓冲区
    ch3 := make(chan string,3)    //缓冲区大小为3

无缓冲通道和有缓冲通道

  • 无缓冲通道,必须要有人接收时才能发送数据;就好比送货上门。
  • 有缓冲通道,只要缓冲区没满都可以发送数据;就好比快递柜没满,快递员就能直接放里面。
go
package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup
var ch1 chan int
var ch2 chan string

func hi() {
    defer wg.Done()
    fmt.Println("hi", <-ch1)
}

func main() {
    fmt.Println("START")

    ch1 = make(chan int)
    ch2 = make(chan string, 2)

    wg.Add(1)
    go hi()

    ch1 <- 1    //简单粗暴的发送,如果没有接收,就会阻塞死锁,然后报错;可以使用select辅助
    ch2 <- "hello"    //先进先出
    ch2 <- "Go"

    wg.Wait()
    fmt.Println(<-ch2)    //hello

    fmt.Println("END")
}

channel操作

  • 有发送(send)、接收(receive)和关闭(close)三种操作
  • 发送和接收都使用 <- 符号

关闭

通过内置的 close 函数关闭通道。关闭通道不是必须的,但最好关闭,在长时间运行的程序中,不关闭通道是资源泄露。

需要注意的地方:

  1. 对一个关闭的通道再发送值就会导致 panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致 panic。

发送和接收

当通道关闭后,接收的值都是零值,我们怎么判断这个零值是 通道关闭 或 通道发送 产生的?即怎么判断通道是否关闭?

有两种方法判断通道是否关闭:多返回值模式 和 for range。

多返回值模式

  • value:从通道中取出的值,如果通道被关闭则返回对应类型的零值。
  • ok:通道ch关闭时返回 false,否则返回 true。

for range

go
for v := range ch {
    //...
}
  • 只有关闭通道才会退出循环(如果没有break)
  • 关闭通道,读取完通道的所有值后退出循环

小结

  1. 死锁问题。发送和接收阻塞都会产生死锁,需要谨慎处理。
  2. 通道关闭的处理。重复关闭,关闭后是否有再发送,这些都是需要注意的。

img

单向channel

channel类型作为参数时,很多时候我们需要限制通道在函数内只能发送或接收,单向channel就是为了解决这个问题的。

go
<- chan int // 只接收通道,只能接收不能发送
chan <- int // 只发送通道,只能发送不能接收

例子:

go
package main

import (
    "fmt"
)

func Producer(i int, in chan<- int) {
    in <- i
}

func Consumer(out <-chan int) {
    i := <-out
    fmt.Println(i)
}

func main() {
    fmt.Println("START")

    var ch1 chan int = make(chan int, 1)
    Producer(1, ch1)
    Consumer(ch1)

    fmt.Println("END")
}

channel实现机制

channel数据结构

go
源码文件:src/runtime/chan.go    line:33

type hchan struct {
    qcount   uint           //当前通道中元素个数
    dataqsiz uint           //最多可存放的元素个数
    buf      unsafe.Pointer //环形队列指针
    elemsize uint16  //每个元素的大小
    closed   uint32  //记录关闭状态
    elemtype *_type //元素类型
    sendx    uint   //写下标
    recvx    uint   //读下标
    recvq    waitq  //等待读的 goroutine 队列
    sendq    waitq  //等待写的 gouroutine 队列

    lock mutex  //互斥锁,不允许并发读写
}

img

channel写

向一个channel中写数据简单过程如下:

  1. 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
  2. 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
  3. 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;

简单流程图如下:

img

简单例子:

img

channel读

从一个channel读数据简单过程如下:

  1. 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;
  2. 如果等待发送队列sendq不为空,说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
  3. 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
  4. 将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;

简单流程图如下:

img

channel关闭

关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。

4.runtime包

了解一些runtime包中关于并发编程的方法:

①runtime.Gosched()

作用:让出CPU时间片,重新等待安排任务

go
package main

import (
    "fmt"
    "runtime"
)

func main() {
    fmt.Println("START")

    go func(s string) {
        fmt.Println(s)
    }("hi")

    runtime.Gosched()    //让出时间片
    fmt.Println("Go")

    fmt.Println("END")
}
//这个子协程非常快,main协程让出时间片等待过程中,子协程已经完成

②runtime.Goexit()

作用:退出当前协程

go
func main() {
    fmt.Println("START")

    go func(s string) {
        runtime.Goexit()
        fmt.Println(s)    //不执行
    }("hi")

    runtime.Gosched()
    fmt.Println("Go")

    fmt.Println("END")
}

③runtime.GOMAXPROCS()

作用:设置逻辑CPU数量,即设置P的数量

runtime.GOMAXPROCS(1) //设置P个数为1

5.sync包

了解一些sync包中关于并发编程的方法:

①sync.WaitGroup

作用:并发同步

是个结构体,值类型,只有三个方法:Add()、Done() 和 Wait()

②sync.Once

作用:确保只执行一次

只有一个 Do 方法,举个简单例子:

go
package main

import (
    "fmt"
    "sync"
)

var one sync.Once

func One() {
    fmt.Println("only one")
}

func anotherone() {
    one.Do(One)
}

func main() {
    fmt.Println("START")

    one.Do(One)
    one.Do(One)
    go anotherone()

    fmt.Println("END")
}
//START
//only one
//END

③sync.Map

作用:并发安全的map

如有转载或 CV 的请标注本站原文地址