前言
为什么要基于Mutex进行扩展?
在特定的场景中,基础的功能是不满足的,所以进行了一些扩展的例子 比如说,互斥锁被某个goroutine获取了,而且还没有释放,那么,其他请求这把锁的goroutine,就会阻塞等待,直到有机会获得这把锁。有时候阻塞并不是一个很好的主意,比如你请求锁更新一个计数器,如果获取不到锁的话没必要等待,大不了这次不更新,下次更新就行了,如果阻塞的话会导致业务处理能力的下降。
如果要监控锁的竞争情况,一个监控指标就是,等待这把锁的goroutine数量,可以把这个指标推送到时间序列数据库中,再通过一些监控系统(比如Grafanc)展示出来,锁是性能下降的“罪魁祸首”之一,所以有效地降低锁的竞争,就能够很好地提高性能,因此,监控关键互斥锁上等待的goroutine的数量,是我们分析锁竞争的激烈程序的一个重要指标。
总结:不管是不希望锁的goroutine继续等待,还是监控锁,都可以基于标准库中Mutex的实现,通过Hacker的方式,为Mutex增加一些额外的功能。
实现TryLock,获取等待着的数量等指标,实现一个线程安全的队列。
TryLock方法实现
为Mutex添加一个TryLock的方法,尝试获取排外锁。
TryLock方法逻辑思路:当一个goroutine调用这个TryLock方法请求锁的时候,如果这把锁没有被其他goroutine所持有,那么,这个goroutine就持有了这把锁,并返回true;如果这把锁已经被其他goroutine所持有,或者是正在准备交给某个被唤醒的goroutine,那么,这个请求锁的goroutine就直接返回false,不会阻塞在方法调用上。
如图所示,如果Mutex已经被一个goroutine持有,调用Lock的goroutine阻塞排队等待,调用TryLock的goroutine直接得到一个false返回。
如果要更新配置数据,我们通常需要加锁,这样可以避免同时有多个goroutine并发修改数据。这样就可以使用TryLock。这样的话,当某个goroutine想要更新配置数据时,如果发现已经有goroutine在更改了,其他的goroutine调用TryLock,返回了false,这个goroutine就会放弃更改。
很多语言都为锁提供了TryLock的方法(包括Java),但是Go官方在(issue 6123讨论中提到过),标准库的Mutex不会添加TryLock方法。在基于Mutex实现中,使用Channel也可以实现TryLock的功能。
基于Mutex实现TryLock方法
// 复制Mutex定义的常量
const (
mutexLocked = 1 << iota // 加锁标识位置
mutexWoken // 唤醒标识的位置
mutexStarving // 锁饥饿标识位置
mutexWaiterShift = iota // 标识waiter的起始bit位置
)
// Mutex 扩展一个Mutex结构
type Mutex struct {
sync.Mutex
}
// TryLock 尝试获取锁
func (m *Mutex) TryLock() bool {
// 如果能成功抢到锁
if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) {
return true
}
// 如果处于唤醒,加锁或者饥饿状态,这次请求就不参与竞争了,返回false
old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
if old&(mutexLocked|mutexStarving|mutexWoken) != 0 {
return false
}
// 尝试在竞争的状态下请求锁
new := old | mutexLocked
return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new)
}
接下来,测试一下TryLock的机制能否工作。
测试程序的工作机制是这样子的:程序运行时启动一个goroutine持有这把我们自己实现的锁,经过随机的时间才释放。主goroutine会尝试获取这把锁。如果前一个goroutine一秒内释放了这把锁,那么,主goroutine就有可能获取到这把锁了,输出“got the lock”,否则没有获取到也不会被阻塞,会直接输出“cat‘t get the lock”。
func try(){
var mu Mutex
go func() { // 启动一个goroutine持有一段时间的锁
mu.Lock()
time.Sleep(time.Duration(rand.Intn(2)) * time.Second)
}()
time.Sleep(time.Second)
ok := mu.TryLock() // 尝试获取到锁
if ok{ // 获取成功
fmt.Println("got the lock")
// do something
mu.Unlock()
return
}
// 没有获取到
fmt.Println("can't get the lock")
}
获取等待者的数量等指标
Mutex的数据结构包含两个字段,state和sema。前四个字段(int32)就是state字段。Mutex结构中的state字段有很多含义,通过state字段,可以知道锁是否已经被某个goroutine持有,当前是否处于饥饿状态、是否有等待的goroutine被唤醒、等待着等信息。
type Mutex struct {
state int32
sema uint32
}
state这个字段并没有暴露出来,所以我们需要想办法获取这个字段,并且进行解析。但是,如何获取未暴露的字段?可以通过unsafe的方式实现。
// 复制Mutex定义的常量
const (
mutexLocked = 1 << iota // 加锁标识位置
mutexWoken // 唤醒标识的位置
mutexStarving // 锁饥饿标识位置
mutexWaiterShift = iota // 标识waiter的起始bit位置
)
type Mutex struct {
sync.Mutex
}
func (m *Mutex) Count() int {
// 获取state字段的值
v := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
v = v >> mutexWaiterShift // 得到等待者的数值
v = v + (v & mutexLocked) // 再加上锁持有者的数量,0或者1
return int(v)
}
第14行通过unsafe操作,可以得到state字段的值。第15行右移三位(这里的常量mutexWaiterShift的值为3),就得到了当前等待者的数量。如果当前的锁已经被其它goroutine持有,那么,就稍微调整一下这个值, 加上一个1 (第16行),基本上可以把它看作是当前持有和等待这把锁的goroutine的总数。
state这个字段的第一位是用来标记锁是否被持有,第二位用来标记是否已经唤醒了一个等待者,第三位标记锁是否处于饥饿状态,通过分析这个state字段我们就可以得到这些状态信息。可以为这些状态提供查询的方法,这样就可以实时地知道锁的状态了。
// IsLocked 锁是否被持有
func (m *Mutex) IsLocked() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexLocked == mutexLocked
}
// IsWoken 是否被等待着唤醒
func (m *Mutex) IsWoken() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexWoken == mutexWoken
}
// IsStaving 锁是否处于饥饿状态
func (m *Mutex) IsStaving() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexStarving == mutexStarving
}
假如1000个goroutine并发访问的情况下,可以把锁的状态信息输出出来:
func count() {
var mu Mutex
// 启动1000个goroutine
for i := 0; i < 1000; i++ {
go func() {
mu.Lock()
time.Sleep(time.Second)
mu.Unlock()
}()
}
time.Sleep(time.Second)
// 输出锁的信息
fmt.Printf("waitings: %d, isLocked: %t, woken: %t, starving: %t\n", mu.Count())
}
注意:在获取state字段的时候,并没有通过Lock获取这把锁,所以获取这个state的值是一个瞬态的值,可能在解析出来这个字段之后,锁的状态已经发生了变换。但是没多大问题,查看的只是调用那一时刻的状态。
使用Mutex实现一个线程安全的队列
Mutex经常会和其他非线程安全(对于Go来说,就是指goroutine安全)的数据结构一起,组合成一个线程安全的数据结构。新的数据结构的业务逻辑由原来的数据结构提供,而Mutex提供了锁的机制,来保证线程安全。队列可以通过Slice来实现,但是通过Slice实现的队列不是线程安全的,出队(Dequeue)和入队(Enqueue)会有data race的问题。出现了data race问题后,可以通过Mutex在出队和入队的时候加上锁的保护就可以了。
type SliceQueue struct {
data []interface{}
mu sync.Mutex
}
func NewSliceQueue(n int) (q *SliceQueue) {
return &SliceQueue{data: make([]interface{}, 0, n)}
}
// Enqueue 把值放在队尾
func (q *SliceQueue) Enqueue(v interface{}) {
q.mu.Lock()
q.data = append(q.data, v)
q.mu.Unlock()
}
// Dequeue 移去对头并返回
func (q *SliceQueue) Dequeue() interface{} {
q.mu.Lock()
if len(q.data) == 0 {
q.mu.Unlock()
return nil
}
v := q.data[0]
q.data = q.data[1:]
q.mu.Unlock()
return v
}
Go语言标准库中没有线程安全的队列数据结构的实现,所以我们可以通过Mutex实现一个简单的队列。通过Mutex就可以为一个非线程的data interface{}实现线程安全的访问。