目录

connect kafka failed with cannot assign requested address

目录

文章简介:一次业务逻辑问题导致的 kafka 连接泄露问题排查

生产出了问题,再一次被拉来救场。

问题现象:golang 程序所有使用 kafka 连接的地方报错 cannot assign requested address,业务不可用。

错误的含意是客户端无法申请新的端口。

先看 pedestal 底层连接数:

netstat -nt | grep 9092 结果非常多,命令执行了很久,疯狂刷各种 kafka 连接信息。对于 kafka 来说是不太合理的,看起来是一次 kafka 连接泄露问题,即业务疯狂创建 kafka 连接,但未关闭连接。

先看看调用栈:

1
2
grep 'sarama@v1.38.1/client.go:209' 'goroutine' | wc -l
23311

看起来有很多 sarama kafka 后台 goroutine 在跑,也印证了有连接泄露。

然后看 profile /connect-kafka-failed-with-cannot-assign-requested-address/images/image-2.png

也印证了同样的结论。

但是为什么会这样呢?kafka 函数调用会创建 goroutine 后退出,调用栈不携带调用方调用栈,所以无法在 profile 或者 goroutine 中看到实际有问题的代码逻辑。

到此问题卡住了。

尝试看看内存分配是否支持,确认哪里的内存分配对象个数有问题呢。

heap:

/connect-kafka-failed-with-cannot-assign-requested-address/images/image-1.png

可以看到堆调用了 InitClient,这个函数中调用了很多的 sarama.NewClient,暂时猜测是这个代码的问题。

详细看过代码后发现类似这种逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *service) InitClient() error {
	s.producer, err := NewProducer()
	if err != nil {
		return err
	}

	err = doSomeInit()
	if err != nil {
		return err
	}

	return nil
}

func do() {
	s := newService()
	err := s.InitClient()
	if err != nil {
		return
	}

	defer s.producer.Close()
}

正常情况下 InitClient 不会报错,defer 逻辑会被执行,producer 会被正确的关闭。当 InitClient 的 doSomeInit 失败后,do 函数会没有执行 s.producer.Close() 直接退出了,导致连接泄露了。

修复方法也比较朴素

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (s *service) InitClient() error {
	s.producer, err := NewProducer()
	if err != nil {
		return err
	}

	err = doSomeInit()
	if err != nil {
+       _ = s.producer.Close()
		return err
	}

	return nil
}

func do() {
	s := newService()
	err := s.InitClient()
	if err != nil {
		return
	}

	defer s.producer.Close()
}

总结

问题现象非常难对应到代码上,需要不断探索,解决的过程中一度想要放弃了。还好,再一次发现了问题的原因,并解决了。