Golang中Channel的实现原理
CSP模型
要想理解 channel 要先知道 CSP 模型。CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型,由 Tony Hoare 于 1977 年提出。简单来说,CSP 模型由并发执行的实体(线程或者进程)所组成,实体之间通过发送消息进行通信,这里发送消息时使用的就是通道,或者叫 channel。CSP 模型的关键是关注 channel,而不关注发送消息的实体。Go 语言实现了 CSP 部分理论,goroutine 对应 CSP 中并发执行的实体,channel 也就对应着 CSP 中的 channel。
channel分类
channel 分为无缓冲 channel 和有缓冲 channel。两者的区别如下:
无缓冲:发送和接收动作是同时发生的。如果没有 goroutine 读取 channel (<- channel),则发送者 (channel <-) 会一直阻塞。
有缓冲:缓冲 channel 类似一个有容量的队列。当队列满的时候发送者会阻塞;当队列空的时候接收者会阻塞。
channel的底层实现
Go语言channel是first-class的,意味着它可以被存储到变量中,可以作为参数传递给函数,也可以作为函数的返回值返回。作为Go语言的核心特征之一,虽然channel看上去很高端,但是其实channel仅仅就是一个数据结构而已,结构体定义如下:
type hchan struct {
qcount uint // 队列q中总的数据量(容量)
dataqsiz uint //环形队列q的数据大小
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // 因读(recv)而阻塞的等待队列
sendq waitq // 因写(send)而阻塞的等待队列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
让我们来看一个hchan这个结构体。其中一个核心的部分是存放channel数据的环形队列,由qcount和elemsize分别指定了队列的容量和当前使用量。dataqsize是队列的大小。elemalg是元素操作的一个Alg结构体,记录下元素的操作,如copy函数,equal函数,hash函数等。
可能会有人疑惑,结构体中只看到了队列大小相关的域,并没有看到存放数据的域啊?如果是带缓冲区的chan,则缓冲区数据实际上是紧接着hchan结构体中分配的。
c = (hchan*)runtime.mal(n + hint*elem->size);
另一个重要部分就是recvq和sendq两个链表,一个是因读这个通道而导致阻塞的goroutine,另一个是因为写这个通道而阻塞的goroutine。如果一个goroutine阻塞于channel了,那么它就被挂在recvq或sendq中。
waitq是链表的定义,包含一个头结点和一个尾结点:
type waitq struct {
first *sudog
last *sudog
}
队列中的每个成员是一个sudog结构体变量。
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
该结构中主要的就是一个g和一个elem。elem用于存储goroutine的数据。读通道时,数据会从hchan的队列中拷贝到sudog的elem域。写通道时,数据则是由sudog的elem域拷贝到hchan的队列中。
hchan结构如下图所示:
读写channel操作
先看写channel的操作,基本的写channel操作,在底层运行时库中对应的是一个runtime.chansend函数。
c <- v
在运行时库中会执行:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
其中c就是channel,ep是取变量v的地址。这里的传值约定是调用者负责分配好ep的空间,仅需要简单的取变量地址就够了。
这个函数首先会区分是同步还是异步。同步是指chan是不带缓冲区的,因此可能写阻塞,而异步是指chan带缓冲区,只有缓冲区满才阻塞。
在同步的情况下,由于channel本身是不带数据缓存的,这时首先会查看hchan结构体中的recvq链表时否为空,即是否有因为读该管道而阻塞的goroutine。如果有则可以正常写channel,否则操作会阻塞。
recvq不为空的情况下,将一个sudog结构体出队列,将传给通道的数据(函数参数ep)拷贝到sudog结构体中的elem域,并将sudog中的g放到就绪队列中,状态置为ready,然后函数返回。
如果recvq为空,否则要将当前goroutine阻塞。此时将一个sudog结构体,挂到通道的sendq链表中,这个sudog中的elem域是参数eq,sudog中的g是当前的goroutine。当前goroutine会被设置为waiting状态并挂到等待队列中。
在异步的情况,如果缓冲区满了,也是要将当前goroutine和数据一起作为sudog结构体挂在sendq队列中,表示因写channel而阻塞。否则也是先看有没有recvq链表是否为空,有就唤醒。
跟同步不同的是在channel缓冲区不满的情况,这里不会阻塞写者,而是将数据放到channel的缓冲区中,调用者返回。
读channel的操作也是类似的,对应的函数是runtime. chanrecv。一个是收一个是发,基本的过程都是差不多的。
需要注意的是几种特殊情况下的通道操作--空通道和关闭的通道。
空通道是指将一个channel赋值为nil,或者定义后不调用make进行初始化。按照Go语言的语言规范,读写空通道是永远阻塞的。其实在函数runtime.chansend和runtime.chanrecv开头就有判断这类情况,如果发现参数c是空的,则直接将当前的goroutine放到等待队列,状态设置为waiting。
读一个关闭的通道,永远不会阻塞,会返回一个通道数据类型的零值。这个实现也很简单,将零值复制到调用函数的参数ep中。写一个关闭的通道,则会panic。关闭一个空通道,也会导致panic。
select的实现
select-case中的chan操作编译成了if-else。比如:
select {
case v = <-c:
...foo
default:
...bar
}
//会被编译为:
if selectnbrecv(&v, c) {
...foo
} else {
...bar
}
//类似地
select {
case v, ok = <-c:
... foo
default:
... bar
}
//会被编译为:
if c != nil && selectnbrecv2(&v, &ok, c) {
... foo
} else {
... bar
}
接下来就是看一下selectnbrecv相关的函数了。其实没有任何特殊的魔法,这些函数只是简单地调用runtime.chanrecv函数,只不过设置了一个参数,告诉当runtime.chanrecv函数,当不能完成操作时不要阻塞,而是返回失败。也就是说,所有的select操作其实都仅仅是被换成了if-else判断,底层调用的不阻塞的通道操作函数。
在Go的语言规范中,select中的case的执行顺序是随机的,而不像switch中的case那样一条一条的顺序执行。那么,如何实现随机呢?
select和case关键字使用了下面的结构体:
// Select case descriptor.
// Known to compiler.
// Changes here must also be made in src/cmd/internal/gc/select.go's selecttype.
type scase struct {
elem unsafe.Pointer // data element
c *hchan // chan
pc uintptr // return pc (for race detector / msan)
kind uint16
receivedp *bool // pointer to received bool, if any
releasetime int64
}
// Select statement header.
// Known to compiler.
// Changes here must also be made in src/cmd/internal/gc/select.go's selecttype.
type hselect struct {
tcase uint16 // total count of scase[](总的scase[]数量)
ncase uint16 // currently filled scase[](当前填充了的scase[]数量)
pollorder *uint16 // case poll order(case的poll次序)
lockorder *uint16 // channel lock order (channel的锁住的次序)
scase [1]scase // one per case (in order of appearance) (每个case会在结构体里有一个scase,顺序是按出现的次序)
}
每个select都对应一个hselect 结构体。在hselect 数据结构中有个scase数组,记录下了每一个case,而中包含了hchan。然后pollorder数组将元素随机排列,这样就可以将scase乱序了。
本文主要摘自:《深入解析go内核实现》
http://legendtkl.com/2017/07/30/understanding-golang-channel/
网页地址:https://www.laojiyou.com/books/shenrujiexigo/03.3.html
pdf下载地址:https://download.csdn.net/download/skh2015java/10967224