Go的concurrency之context

context

context.Contextcontext包中的核心类型,它是一个上下文对象,用于跟踪和管理请求的生命周期。通过context.Context,你可以传递请求相关的值、设置取消信号以及控制截止时间。

使用context的主要目的是在并发操作中传递上下文信息,以便在整个操作链中共享和访问这些信息。

下面是一些常见的用法和特性:

  1. 创建上下文:可以使用context.Background()函数创建一个根上下文,或使用context.WithCancel()context.WithTimeout()context.WithDeadline()等函数创建派生的上下文。

  2. 传递上下文:将上下文对象作为参数传递给需要访问上下文信息的函数或方法。

  3. 获取上下文信息:可以使用context.Value()方法从上下文中获取与键关联的值。

  4. 取消操作:通过调用上下文的cancel()函数,可以取消与上下文相关的操作。取消后,所有使用该上下文的goroutine都会收到取消信号。

  5. 控制截止时间:可以使用上下文的WithTimeout()WithDeadline()函数来设置操作的截止时间。一旦超过截止时间,上下文会自动发送取消信号。

通过使用context包,你可以在并发操作中有效地管理资源、控制超时和取消操作。这对于构建健壮的并发应用程序非常有帮助。

以下是一个简单的示例,演示如何使用context进行取消操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"context"
"fmt"
"time"
)

func main() {
// 创建一个带有取消功能的上下文
ctx, cancel := context.WithCancel(context.Background())

// 启动一个goroutine执行耗时操作
go func() {
// 模拟耗时操作
time.Sleep(2 * time.Second)

// 检查上下文是否已被取消
if ctx.Err() == nil {
fmt.Println("Operation completed successfully.")
} else {
fmt.Println("Operation canceled.")
}
}()

// 模拟等待用户输入取消操作
fmt.Println("Press Enter to cancel the operation.")
fmt.Scanln()

// 调用取消函数取消操作
cancel()

// 等待一段时间以观察取消效果
time.Sleep(1 * time.Second)
}

在上面的示例中,我们创建了一个带有取消功能的上下文,并在一个goroutine中执行一个模拟的耗时操作。然后,我们等待用户输入来触发取消操作,并调用cancel()函数取消操作。最后,我们等待一段时间以观察取消的效果。

通过使用context包,我们可以更好地管理并发操作,避免资源泄漏和无限等待的问题,以及提供更好的控制和可读性。

请求范围的上下文

在Go服务中,每个传入的请求都在其自己的 goroutine 中处理。请求处理程序通常启动额外的 goroutine 来访问其他后端,如数据库和 RPC 服务。处理请求的 goroutine 通常需要访问特定于用户(request-specific context)的值,例如最终哟过户的身份、授权令牌和请求的截止日期(deadline)。

当一个请求被取消或超时时,处理该请求的所有 goroutine 都应该快速退出(fail fast),这样系统就可以回收它们正在使用的任何资源。

Go 1.7 引入了一个 context 包,它使得跨 API 边界的请求范围元数据、取消信号和截止日期很容易传递给处理请求所涉及的所有 goroutine (显示传递,一般作为第一个参数)

其他语言:Thread Local Storage(TLS), XXXContext

例如一个跨进程请求

image-20230920160659838

如何将 context 集成到 API 中?

在将 context 集成到 API 中时,它的作用域是请求级别的。例如,沿单个数据库查询存在是有意义的,但沿数据库对象存在则没有意义。

有意义:

1
2
3
func Get(ctx context.Context,id uint) User {

}

没有意义:

1
2
3
4
type DB struct {
db *gorm.DB
ctx context.Context
}

目前有两种方法可以将 context 对象集成到 API 中:

  • Ths first parameter of a function call

    首参数传递 context 对象,比如,参考 net 包,Dialer.DialContext。此函数执行正常的 Dial 操作,但可以通过 context 对象取消函数调用。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
    if ctx == nil {
    panic("nil context")
    }
    deadline := d.deadline(ctx, time.Now())
    if !deadline.IsZero() {
    if d, ok := ctx.Deadline(); !ok || deadline.Before(d) {
    subCtx, cancel := context.WithDeadline(ctx, deadline)
    defer cancel()
    ctx = subCtx
    }
    }
    if oldCancel := d.Cancel; oldCancel != nil {
    subCtx, cancel := context.WithCancel(ctx)
    defer cancel()
    go func() {
    select {
    case <-oldCancel:
    cancel()
    case <-subCtx.Done():
    }
    }()
    ctx = subCtx
    }
    ...
    }
  • Optional config on a request structure

    在第一个 request 对象中携带一个可选的 context 对象。例如 net/http 库的 Request.WithContext,通过携带给定的 context 对象,返回一个新的 Request 对象。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    func (r *Request) WithContext(ctx context.Context) *Request {
    if ctx == nil {
    panic("nil context")
    }
    r2 := new(Request)
    *r2 = *r
    r2.ctx = ctx
    return r2
    }

不要将context存储在结构体中

Do not store Contexts inside a struct type;instead, pass a Context explicitly to each function that needs it.The Context should be the first parameter, typically named ctx:

1
2
3
func DoSomething(ctx context.Context,arg Arg) error {
// ... use ctx ...
}

Incoming requests to a server should create a Context.

使用 context 的一个很好的心智模型是它应该在程序中流动,应该贯穿你的代码。者通常意味着您不希望将其存储在结构体之中。它从一个函数传递到另一个函数,并根据需要进行扩展。理想情况下,每个请求都会创建一个 context 对象,并在请求结束时过期。

不存储上下文的一个例外是,当您需要将它放入一个结构体中时,该结构纯粹用作通过通道传递的消息。如下例所示。

1
2
3
4
5
type message struct {
responseChan chan <- int
parameter string
ctx context.Context
}

源码

context.WithValue

context.WithValue 内部基于 valueCtx 实现:

go/go1.19.6/src/context/context.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type valueCtx struct {
Context
key, val any
}

func WithValue(parent Context, key, val any) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}

为了实现不断的 WithValue ,构建新的 context,内部在查找 key 的时候,使用递归方式不断从当前,从父节点寻找匹配的key,直到 root context(Background 和 TODO context value 函数会返回 nil)

image-20230920163754133

go/go1.19.6/src/context/context.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (c *valueCtx) Value(key any) any {
if c.key == key {
return c.val
}
return value(c.Context, key)
}

func value(c Context, key any) any {
for {
switch ctx := c.(type) {
case *valueCtx:
if key == ctx.key {
return ctx.val
}
c = ctx.Context
case *cancelCtx:
if key == &cancelCtxKey {
return c
}
c = ctx.Context
case *timerCtx:
if key == &cancelCtxKey {
return &ctx.cancelCtx
}
c = ctx.Context
case *emptyCtx:
return nil
default:
return c.Value(key)
}
}
}

由于 ctx 会在不同的 goroutine 中传输,所以这些数据必须是安全的,这也就是 valueCtx 中只有一个 keyvalue 的原因,而且内容是只读的。

context.WithValue 方法允许上下文携带请求范围的数据。这些数据必须是安全的,以便多个 goroutine 同时使用。

这里的数据,更多是面向请求的元数据,不应该作为函数的可选参数来使用(比如 context 里面挂了一个 sql.Tx 对象,传递到 Dao 层使用),因为元数据相对函数参数更加是隐含的,面向请求的。而参数是更加显示的。

  • 同一个 context 对象可以传递给在不同 goroutine 中运行的函数;上下文对于多个 goroutine 同时使用时安全的。对于值类型最容易犯错的地方,在于 context value 应该是 immutable 的,每次重新赋值,应该是新的 context,即:

    1
    context.WithValue(ctx,oldvalue)

    使用的思想可以借鉴gRPC metadata

    Context.Value should inform, not control.

User Context values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.

比如:染色,API 重要性,Trace

https://github.com/go-kratos/kratos/blob/v1.0.x/pkg/net/metadata/key.go

基于 gRPC 的 MD 思想存储数据到ctx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
func DecodeKeyValue(k, v string) (string, string, error)DEPRECATED
func NewIncomingContext(ctx context.Context, md MD) context.Context
func NewOutgoingContext(ctx context.Context, md MD) context.Context
func ValueFromIncomingContext(ctx context.Context, key string) []string
type MD
func FromIncomingContext(ctx context.Context) (MD, bool)
func FromOutgoingContext(ctx context.Context) (MD, bool)
func FromOutgoingContextRaw(ctx context.Context) (MD, [][]string, bool)
func Join(mds ...MD) MD
func New(m map[string]string) MD
func Pairs(kv ...string) MD
func (md MD) Append(k string, vals ...string)
func (md MD) Copy() MD
func (md MD) Delete(k string)
func (md MD) Get(k string) []string
func (md MD) Len() int
func (md MD) Set(k string, vals ...string)

由于 context.WithValue 取值是需要一层一层往上查询的,而且每一个 context 都只能存储一个值,如果需要存储的值非常多,查询效率就会很慢。这个时候可以借鉴 gRPC 中的元数据MD

go/pkg/mod/google.golang.org/[email protected]/metadata/metadata.go

1
type MD map[string][]string

而且要保证多线程安全。

比如新建了一个基于 context.Background()ctx1,携带了一个 map 的数据,map 中包含了 k1:v1 的一个键值对,ctx1 被两个 goroutine 同时使用作为函数签名传入。

image-20230920170228430

如果我们修改了这个 map ,会导致另外进行读 context.Value 的 goroutine 和修改 map 的 goroutine ,在 map 对象上产生 data race。因此我们要使用 copy-on-write的思路,解决跨多个 goroutine 使用数据、修改数据的场景。

Replace a Context using WithCancel, WithDeadline, WithTimeout, or WithValue.

比如 gincontext,会在使用 Next() 的时候传递给下层,这样的设计不如 gRPC

1
2
3
4
5
6
7
func (c *Context) Next() {
c.index++
for c.index < int8(len(c.handlers)) {
c.handlers[c.index](c)
c.index++
}
}

image-20230921094908985

COW:从 ctx1 中获取 map1 (可以理解为 v1 版本的 map 数据)。构建一个新的 map 对象 map2,复制所有 map1 数据,同时追加新的数据 k2:v2键值对,使用 context.WithValue 创建新的 ctx2ctx2 会传递到其他的 goroutine 中。这样各自读取的副本都是自己的数据,写行为追加的数据,在 ctx2 中也能完整读取到,同时也不会污染 ctx1 中的数据。

The chain of function calls between them must propagate the Context.

context.WithCancel

When a Context is canceled, all Contexts derived from it are also canceled.

当一个 context 被取消时,从它派生的所有 context 也将被取消。WithCancel(ctx) 参数 ctx 认为是 parent ctx,在内部会进行一个传递关系链的关联。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context

mu sync.Mutex // protects following fields
done atomic.Value // of chan struct{}, created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}

func (c *cancelCtx) Done() <-chan struct{} {
d := c.done.Load()
if d != nil {
return d.(chan struct{})
}
c.mu.Lock()
defer c.mu.Unlock()
d = c.done.Load()
if d == nil {
d = make(chan struct{})
c.done.Store(d)
}
return d.(chan struct{})
}

Done() 返回一个 chan,当取消某个 parent context,实际上会递归层层 cancel 掉自己的 child contextdone chan,从而让整儿调用链中所有监听 cancel 的 goroutine 退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// This example demonstrates the use of a cancelable context to prevent a
// goroutine leak. By the end of the example function, the goroutine started
// by gen will return without leaking.
func ExampleWithCancel() {
// gen generates integers in a separate goroutine and
// sends them to the returned channel.
// The callers of gen need to cancel the context once
// they are done consuming generated integers not to leak
// the internal goroutine started by gen.
gen := func(ctx context.Context) <-chan int {
dst := make(chan int)
n := 1
go func() {
for {
select {
case <-ctx.Done():
return // returning not to leak the goroutine
case dst <- n:
n++
}
}
}()
return dst
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel when we are finished consuming integers

for n := range gen(ctx) {
fmt.Println(n)
if n == 5 {
break
}
}
// Output:
// 1
// 2
// 3
// 4
// 5
}

注意 defer cancel() // cancel when we are finished consuming integers 很重要,不 cancel() 可能会出现内存泄漏。

image-20230921102049931

所有会阻塞的、长链路调用的操作都应该可以被取消。

All blocking/long operations should be canceled.

如果要实现一个超时控制,通过上面的 contextparent/child 机制,其实只需要启动一个定时器,然后在潮湿的时候,直接将当前的 contextcancel 掉,就可以实现监听在当前和下层的 context.Done() 的 goroutine 的退出。

image-20230921102313677

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// This example passes a context with an arbitrary deadline to tell a blocking
// function that it should abandon its work as soon as it gets to it.
func ExampleWithDeadline() {
d := time.Now().Add(shortDuration)
ctx, cancel := context.WithDeadline(context.Background(), d)

// Even though ctx will be expired, it is good practice to call its
// cancellation function in any case. Failure to do so may keep the
// context and its parent alive longer than necessary.
defer cancel()

select {
case <-time.After(1 * time.Second):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err())
}

// Output:
// context deadline exceeded
}

// This example passes a context with a timeout to tell a blocking function that
// it should abandon its work after the timeout elapses.
func ExampleWithTimeout() {
// Pass a context with a timeout to tell a blocking function that it
// should abandon its work after the timeout elapses.
ctx, cancel := context.WithTimeout(context.Background(), shortDuration)
defer cancel()

select {
case <-time.After(1 * time.Second):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err()) // prints "context deadline exceeded"
}

// Output:
// context deadline exceeded
}

Final Notes

  • Incoming requests to a server should create a Context.

    对其他服务器的传入请求应该创建一个 ctx

  • Outgoing calls to servers should accept a Context.

    对其他服务器的调用应传入一个 ctx

  • Do not store Contexts inside a struct type; instead, pass a Context explicitly to each function that needs it.

    不要将 ctx 存储在结构类型内,而应该将 ctx 显式传递给每个需要它的函数。

  • The chain of function calls between them must propagate the Context.

    函数之间的调用链必须传递 ctx

  • Replace a Context using WithCancel, WithDeadline, WithTimeout, or WithValue.

    使用WithCancelWithDeadlineWithTimeoutWithValue创建新的ctx

  • When a Context is canceled, all Contexts derived from it are also canceled.

    取消ctx时,从中派生的所有ctxchild context)也将被取消。

  • The same Context may be passed to functions running in different goroutines; Context are safe for simultaneous use by multiple goroutines

    相同的Context可以传递给在不同goroutine中运行的函数;ctx对于多个goroutine是线程安全的

  • Do not pass a nil Context, even if a function permits it,Pass a TODO context if you are unsure about which Context to use.

    即使函数允许,也不要传递nil context。如果不确定要使用哪个ctx,请传递TODO ctx

  • Use context values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.

    仅对跨进程和API的请求范围的数据使用context value,而不对函数间调用传递的可选参数使用context value

  • All blocking/long operations should be cancelable.

    所有阻塞/耗时长的操作都应可取消。

  • Context.Value obscures your program’s flow.

    使用context.Value()方法获取上下文值可能会模糊程序的执行流程。

  • Context.Value should inform, not contrl.

    context.Value应该是路由逻辑,染色逻辑这种通知类的信息,而不是业务控制逻辑。

  • Try not to use context.Value.

    尽量不要使用context.Value

  • 返回的 cancel 要在 defer 中执行掉,不然会出现内存泄漏。

Cancellation, Context, and Plumbing

踩坑

这篇文章介绍的比较全面:Go Context的踩坑经历

基本上:注意闭包,不要将请求进来的 ctxcancel 作为参数发送到异步的 goroutine 中。

参考

Go: Context and Cancellation by Propagation

Go Concurrency Patterns: Context

Context Package Semantics In Go

Go Context的踩坑经历

Cancellation, Context, and Plumbing

How to correctly use context.Context in Go 1.7