问题描述
基于 kafka 框架的两个处理服务的实例,订阅一个 topic,并且有 20 个 partition,在 traffic 正常发送的情况下,手动杀死一个实例,测试是否对 traffic 有影响。
事实上,在 100CPS 的稳定性测试下,该服务最大出现 7-8s 的 STW 的情况。
分析
在该测试案例下,杀一个实例,并启动一个新的实例的过程,会触发 consumer group 的 rebalancing 操作,在杀死一个实例的过程中,该实例会 leave 这个 consumer group,当新的实例被创建起来时,将会加入这个 consumer group。
根据 kafka 的实现,旧版本是在这个 rebalancing 的过程中是完全的 STW 的,而在 2.4 的新版本中,为了尽可能的避免 STW,引入了 KIP-429 Incremental Rebalance Protocol,但该协议是需要 client 实现。
而我们使用的是 kafka-go 库中并不支持最新的协议,所以目前的实现在 rebalancing 的过程中,仍是完全的 STW 的。
但是考虑到在测试案例的过程中,虽然会有两次的 rebalancing 的过程,但是导致 7-8s 的 STW 仍是奇怪的行为,因为即使网络的原因也不会导致如此长的延迟。于是通过阅读 kafka-go 的源代码发现,该等待时间是由于使用了库中默认设置的 MaxWait(默认 10s) 配置,该参数将会应用到 Fetch 请求中,其作用是在从 kafka 中 fetch 消息到来的最大等待时间,也就是说如果该 partition 上没有新的数据到来,将会等待的最大时间,而根据 Kafka 官网定义 fetch.max.wait.ms,该值默认为 500ms,
而为什么该值的设置导致我们 7-8s 的 STW 的情况,则是,该开源库的实现是基于旧的 rebalancing 的协议实现,所以在 rebalancing 的过程中,它将会 停止所有的处理 goroutine,然后等待这些 goroutine 的结束,再进行下一轮的 generation 的计算。
所以在这个过程中,当一个 consumer group 中的一个 consumer 通过 hearbeat 请求 接收到 rebalancing 的请求时,将会通过关闭 channel 的方式 告知所有的处理 goroutine 进行关闭,等待所有 goroutine 全部结束,再进行重新 joingroup 的请求,进行 rebalancing 处理,但是由于我们使用默认的 MaxWait,导致当有些 partition 上没有消息的情况下,该处理 goroutine 将会等待 MaxWait 时间,才会从 Fetch 请求中退出,再处理 channel 关闭的信号,进行关闭 routine。 所以由于某些 Fetch 的超长等待,导致这一次的处理 routine 不能完全关闭,从而阻塞了下一次 rebalancing 的过程,导致了长时间 STW。
下面从源码角度分析:
当我们调用 NewReader 时,将会调用 NewConsumerGroup 创建 consumergroup 实例,并传入 Reader 实例的 run 函数,
//in reader.go
func NewReader(config ReaderConfig) *Reader {
r := &Reader{
}
if r.useConsumerGroup() {
r.done = make(chan struct{})
cg, err := NewConsumerGroup(ConsumerGroupConfig{
})
if err != nil {
panic(err)
}
go r.run(cg)
}
return r
}
//in consumergroup.go
func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) {
cg := &ConsumerGroup{
next: make(chan *Generation),
done: make(chan struct{}),
}
cg.wg.Add(1)
go func() {
cg.run()
cg.wg.Done()
}()
return cg, nil
}
我们可以看到 Reader 的 run 函数中,无限循环的从 cg.Next 中取出 gen 进行处理,并只有当 stctx error 的时候才会退出,
//in reader.go
func (r *Reader) run(cg *ConsumerGroup) {
for {
gen, err := cg.Next(r.stctx)
if err != nil {
if err == r.stctx.Err() {
return
}
}
r.subscribe(gen.Assignments[r.config.Topic])
gen.Start(func(ctx context.Context) {
r.commitLoop(ctx, gen)
})
gen.Start(func(ctx context.Context) {
// wait for the generation to end and then unsubscribe.
select {
case <-ctx.Done():
// continue to next generation
case <-r.stctx.Done():
// this will be the last loop because the reader is closed.
}
r.unsubscribe()
})
}
}
首先看看小小的 gen.Start 函数, 这里将会传入一个函数,并通过 waitgroup 进行同步操作,也就是说一个 gen 的实例调用的 Start 函数,将会是同步操作;此外当传入函数执行完毕后,会主动关闭 gen.done channel。
//in consumergroup.go
func (g *Generation) Start(fn func(ctx context.Context)) {
g.wg.Add(1)
go func() {
fn(genCtx{g})
// shut down the generation as soon as one function exits. this is
// different from close() in that it doesn't wait on the wg.
g.once.Do(func() {
close(g.done)
})
g.wg.Done()
}()
}
在run函数中,通过调用 r.subscribe() -> Reader.start() 中对于每一个 partition 都开启一个 reader.run 进行处理。
//in reader.go
func (r *Reader) start(offsetsByPartition map[int]int64) {
r.join.Add(len(offsetsByPartition))
for partition, offset := range offsetsByPartition {
go func(ctx context.Context, partition int, offset int64, join *sync.WaitGroup) {
defer join.Done()
(&reader{
maxWait: r.config.MaxWait,
}).run(ctx, offset)
}(ctx, partition, offset, &r.join)
}
}
//in reader.go
func (r *reader) run(ctx context.Context, offset int64) {
for attempt := 0; true; attempt++ {
if attempt != 0 {
if !sleep(ctx, backoff(attempt, r.backoffDelayMin, r.backoffDelayMax)) {
return
}
}
conn, start, err := r.initialize(ctx, offset)
readLoop:
for {
if !sleep(ctx, backoff(errcount, r.backoffDelayMin, r.backoffDelayMax)) {
conn.Close()
return
}
switch offset, err = r.read(ctx, offset, conn); err {
case nil:
errcount = 0
case io.EOF:
case context.Canceled:
conn.Close()
return
case errUnknownCodec:
break readLoop
default:
}
}
}
}
我们在 reader.run 中通过 initialize 初始化连接,并通过 read 去读取消息,直到一些主要的错误,或者 cancel 或者 done 的信号才会退出。
下面我们看下 read 函数中,设置了 maxWait 作为 fetch 的 deadline,所以如分析所述,如果分区上没有消息的情况下,将会在该函数中等待 maxWait 才会退出。
//in reader.go
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
t0 := time.Now()
conn.SetReadDeadline(t0.Add(r.maxWait))
batch := conn.ReadBatchWith(ReadBatchConfig{
MinBytes: r.minBytes,
MaxBytes: r.maxBytes,
IsolationLevel: r.isolationLevel,
})
highWaterMark := batch.HighWaterMark()
const safetyTimeout = 10 * time.Second
deadline := time.Now().Add(safetyTimeout)
conn.SetReadDeadline(deadline)
for {
if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) {
deadline = now.Add(safetyTimeout)
conn.SetReadDeadline(deadline)
}
if msg, err = batch.ReadMessage(); err != nil {
batch.Close()
break
}
if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
batch.Close()
break
}
}
conn.SetReadDeadline(time.Time{})
return offset, err
}
我们再回过头来看看 Reader 的 run 中的 cg.Next 函数的逻辑,则是通过阻塞式的读取 cg.next,所以当没有 next 的时候,上面的 run 函数将会一直阻塞在该函数上。
//in consumergroup.go
func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-cg.done:
return nil, ErrGroupClosed
case err := <-cg.errs:
return nil, err
case next := <-cg.next:
return next, nil
}
}
下面来看看是如何向该 next 中输入数据的,我们从 consumergroup 中的 run 函数看起,我们可以看到仍是一个死循环的处理消息,首先从 cg.nextGeneration 中获取 memberID,
//in consumergroup.go
func (cg *ConsumerGroup) run() {
for {
memberID, err = cg.nextGeneration(memberID)
// backoff will be set if this go routine should sleep before continuing
// to the next generation. it will be non-nil in the case of an error
// joining or syncing the group.
var backoff <-chan time.Time
switch err {
case nil:
// no error...the previous generation finished normally.
continue
case ErrGroupClosed:
// the CG has been closed...leave the group and exit loop.
_ = cg.leaveGroup(memberID)
return
case RebalanceInProgress:
default:
// leave the group and report the error if we had gotten far
// enough so as to have a member ID. also clear the member id
// so we don't attempt to use it again. in order to avoid
// a tight error loop, backoff before the next attempt to join
// the group.
_ = cg.leaveGroup(memberID)
memberID = ""
backoff = time.After(cg.config.JoinGroupBackoff)
}
}
}
下面看 nextGeneration 函数中,主要则是做 rebalancing 的处理,首先找到 coordinator,然后发送 joinGroup 请求,再 syncGroup,并 fetchOffsets 及开启 heartbeat 循环,并生成 generation 一个实例,发送到 cg.next channel 中。
//in consumergroup.go
func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
conn, err := cg.coordinator()
// join group. this will join the group and prepare assignments if our
// consumer is elected leader. it may also change or assign the member ID.
memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID)
// sync group
assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments)
// fetch initial offsets.
offsets, err = cg.fetchOffsets(conn, assignments)
// create the generation.
gen := Generation{
ID: generationID,
GroupID: cg.config.ID,
MemberID: memberID,
Assignments: cg.makeAssignments(assignments, offsets),
conn: conn,
done: make(chan struct{}),
}
// spawn all of the go routines required to facilitate this generation. if
// any of these functions exit, then the generation is determined to be
// complete.
gen.heartbeatLoop(cg.config.HeartbeatInterval)
if cg.config.WatchPartitionChanges {
for _, topic := range cg.config.Topics {
gen.partitionWatcher(cg.config.PartitionWatchInterval, topic)
}
}
// make this generation available for retrieval. if the CG is closed before
// we can send it on the channel, exit. that case is required b/c the next
// channel is unbuffered. if the caller to Next has already bailed because
// it's own teardown logic has been invoked, this would deadlock otherwise.
select {
case <-cg.done:
gen.close()
return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
case cg.next <- &gen:
}
// wait for generation to complete. if the CG is closed before the
// generation is finished, exit and leave the group.
select {
case <-cg.done:
gen.close()
return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
case <-gen.done:
// before continuing onward.
gen.close()
return memberID, nil
}
}
所以当一个 consumer 正常的加入 consumer group 后,则首先会阻塞在 Reader.run 中的 cg.Next(r.stctx) 中,等待下一次的 generation 的生成。
同样另一 routine 将会阻塞在 ConsumerGroup 的 run 函数中的 cg.nextGeneration(memberID) 中,等待 cg.done 或者 gen.done 的信号,然后执行 gen.close 关闭旧的 generation。
我们看下 gen.close 函数,这里会调用 waitgroup 的 Wait 等待所有的通过调用上面描述的 gen.Start 函数的 routine 结束。
//in consumergroup.go
func (g *Generation) close() {
g.once.Do(func() {
close(g.done)
})
g.wg.Wait()
}
再看看 nextGeneration 函数中 gen.heartbeatLoop(cg.config.HeartbeatInterval) 函数,则是当 heartbeat 请求返回失败的情况下,将会退出该程序。
//in consumergroup.go
func (g *Generation) heartbeatLoop(interval time.Duration) {
g.Start(func(ctx context.Context) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_, err := g.conn.heartbeat(heartbeatRequestV0{
GroupID: g.GroupID,
GenerationID: g.ID,
MemberID: g.MemberID,
})
if err != nil {
return
}
}
}
})
}
结合本问题分析,及当 rebalancing in progress 时,heartbeat 请求将会返回错误时,则此时该 gen.Start 函数中将会主动 close gen.done。
根据上面分析,首先 阻塞的 nextGeneration 函数将会收到该信号,从而调用 gen.close(),然后调用 wg.Wait 等待相应的 routine 退出,当所有的 routine 退出后,则会返回该函数,进入 consumergroup 的 run 函数进行处理。
而这里由于对于每一个 generation,我们调动了三次 Start 函数,分别为 r.commitLoop, r.unsubscribe 和 g.conn.heartbeat,而对于我们处理消息的 routine 则是通过 r.unsubscribe 进行取消通知,并等待所有 routine 结束才退出。
//in reader.go
func (r *Reader) unsubscribe() {
r.cancel()
r.join.Wait()
}
因此 从源码角度上,我们可以看出当出现上述情况下,由于某一个 fetch 请求的阻塞,都会影响到整个 rebalancing 的处理。
所以对于这个问题的根本原因在于使用了默认的 MaxWait 参数导致。
总结
配置 MaxWait 参数为一个合适的值,参考 librdkafka 可设置为默认值 500ms 或 sarama 为 250ms。
参考
Kafka protocol : KIP-429 Incremental Rebalance Protocol
golang 客户端实现 : kafka-go , sarama
c/C++ 客户端实现 : librdkafka