文章简介:我是如何将 kafka 消费吞吐提高 100 倍的
背景
- 告警入库/归档效率差,200 左右
- 告警消费占用资源多,500w 下大量告警会直接压垮平台,平台主要业务 504 timeout
- 告警频繁入库/归档情况下,postgres 数据库告警相关表空洞率达到 90% 以上,大量数据空洞拖慢数据库查询几个数据量
分析原因
- 单表存储过多数据,单表超过 500w 行
- 数据库插入更新删除频繁,postgres mvvc 与 autovacuum 机制没有及时触发,导致空洞率提高
- 告警业务流程长,很多实时性要求不高的逻辑(告警自动化处置/统计)串在告警消费过程中,影响告警吞吐
- 告警消费有大量小而频繁的查询,没有添加缓存,数据库占用大量 qps
- 数据库使用默认配置,内存/cpu 没有充分使用
都做了哪些事
- 数据库配置调优
- 告警基于时间分区,每天一个分区,归档每次处理一个分区,分区索引与并行查询,提高查询吞吐及稳定性
- 告警消费流程重新梳理,拆成三个阶段,告警入库/告警统计/自动化处置; 告警消费过程不会因为告警自动化处置慢而影响到整体的吞吐
- 告警涉及到多张表批量入库,提高吞吐,降低资源消耗
- 告警归档同步到 hbase 中后,truncate 子表
- 加 cache
- 慢查询优化,优化 sql 及添加索引; 删除从未被使用过的索引
- 消费程序单独一个进程,资源/故障隔离
- 消费添加限流,提高稳定性
达到的效果
32 核 64GB 500w 告警下
- 告警入库吞吐可以到 40000,比优化前高 2 个数量级
- 告警列表取前 20 条数据 sql 提高 2500 倍;count 速度提高 27 倍;告警列表整体加载时间由 15s 缩短为 3s 以内,告警统计页面加载时间 504 timeout 优化到 15s 内
- 数据库调整参数后吞吐提高三倍
- 告警量 3000w 情况下服务可用,预期会存半年甚至更多的数据后服务依然可用
告警列表优化
优化前
1
|
explain (ANALYZE, VERBOSE, TIMING,COSTS, BUFFERS, SUMMARY) SELECT * FROM "alarm" WHERE (judged_state = 1 OR judged_state = 0) ORDER BY alarm.updated_at desc nulls last LIMIT 20;
|
优化后
1
|
create index if not exists alarm_multiline_index on alarm (created_at desc, updated_at desc nulls last, judged_state, alarm_level, disposed_state);
|
大屏大量使用的 count 优化
17134.851/623.675 = 27 倍
优化前:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
explain (ANALYZE, VERBOSE, TIMING,COSTS, BUFFERS, SUMMARY) SELECT COUNT("id") FROM "alarm" WHERE created_at >= '2023-05-30 07:34:46.002537+00' AND created_at < '2023-05-31 07:34:46.002537+00';
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------
Finalize Aggregate (cost=840896.65..840896.66 rows=1 width=8) (actual time=17098.462..17134.785 rows=1 loops=1)
Output: count(alarm.id)
Buffers: shared hit=5615853 read=105541 written=1
-> Gather (cost=840896.23..840896.64 rows=4 width=8) (actual time=17098.338..17134.774 rows=5 loops=1)
Output: (PARTIAL count(alarm.id))
Workers Planned: 4
Workers Launched: 4
Buffers: shared hit=5615853 read=105541 written=1
-> Partial Aggregate (cost=839896.23..839896.24 rows=1 width=8) (actual time=17094.127..17094.129 rows=1 loops=5)
Output: PARTIAL count(alarm.id)
Buffers: shared hit=5615853 read=105541 written=1
Worker 0: actual time=17093.145..17093.148 rows=1 loops=1
Buffers: shared hit=1478094 read=28657
Worker 1: actual time=17093.203..17093.205 rows=1 loops=1
Buffers: shared hit=1470692 read=28510
Worker 2: actual time=17093.147..17093.150 rows=1 loops=1
Buffers: shared hit=886771 read=16085
Worker 3: actual time=17093.167..17093.170 rows=1 loops=1
Buffers: shared hit=882627 read=16017 written=1
-> Parallel Append (cost=0.55..836170.65 rows=1490232 width=37) (actual time=0.946..17023.812 rows=1188508 loops=5)
Buffers: shared hit=5615853 read=105541 written=1
Worker 0: actual time=1.212..17006.371 rows=1491562 loops=1
Buffers: shared hit=1478094 read=28657
Worker 1: actual time=0.939..17005.878 rows=1484942 loops=1
Buffers: shared hit=1470692 read=28510
Worker 2: actual time=0.814..17034.576 rows=986228 loops=1
Buffers: shared hit=886771 read=16085
Worker 3: actual time=0.471..17035.496 rows=982083 loops=1
Buffers: shared hit=882627 read=16017 written=1
-> Parallel Index Only Scan using alarm_20230530_pkey on public.alarm_20230530 alarm_1 (cost=0.56..713942.31 rows=1157253 width=37) (actual time=0.796..14931.447 rows=919
379 loops=5)
Output: alarm_1.id
Index Cond: ((alarm_1.created_at >= '2023-05-30 07:34:46.002537+00'::timestamp with time zone) AND (alarm_1.created_at < '2023-05-31 07:34:46.002537+00'::timestamp wi
th time zone))
Heap Fetches: 296346
Buffers: shared hit=4552140 read=88971 written=1
Worker 0: actual time=1.211..16900.689 rows=1491562 loops=1
Buffers: shared hit=1478094 read=28657
Worker 1: actual time=0.938..16900.152 rows=1484942 loops=1
Buffers: shared hit=1470692 read=28510
Worker 2: actual time=0.501..13618.651 rows=537258 loops=1
Buffers: shared hit=532080 read=10556
Worker 3: actual time=1.098..13619.421 rows=536621 loops=1
Buffers: shared hit=530559 read=10531 written=1
-> Parallel Index Only Scan using alarm_20230531_pkey on public.alarm_20230531 alarm_2 (cost=0.55..114777.18 rows=332979 width=37) (actual time=0.860..3347.784 rows=44854
8 loops=3)
Output: alarm_2.id
Index Cond: ((alarm_2.created_at >= '2023-05-30 07:34:46.002537+00'::timestamp with time zone) AND (alarm_2.created_at < '2023-05-31 07:34:46.002537+00'::timestamp wi
th time zone))
Heap Fetches: 68366
Buffers: shared hit=1063713 read=16570
Worker 2: actual time=0.813..3346.199 rows=448970 loops=1
Buffers: shared hit=354691 read=5529
Worker 3: actual time=0.470..3347.495 rows=445462 loops=1
Buffers: shared hit=352068 read=5486
Query Identifier: 4802739785007802293
Planning:
Buffers: shared hit=14
Planning Time: 0.299 ms
Execution Time: 17134.851 ms
(56 rows)
|
优化后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
pedestal=# explain (ANALYZE, VERBOSE, TIMING,COSTS, BUFFERS, SUMMARY) SELECT COUNT(*) FROM "alarm" WHERE created_at >= '2023-05-30 07:34:46.002537+00' AND created_at < '2023-05-31 07:34:46.002537+00';
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------
Finalize Aggregate (cost=314048.18..314048.19 rows=1 width=8) (actual time=606.301..623.625 rows=1 loops=1)
Output: count(*)
Buffers: shared hit=2199668 read=19636
-> Gather (cost=314047.76..314048.17 rows=4 width=8) (actual time=605.925..623.616 rows=5 loops=1)
Output: (PARTIAL count(*))
Workers Planned: 4
Workers Launched: 4
Buffers: shared hit=2199668 read=19636
-> Partial Aggregate (cost=313047.76..313047.77 rows=1 width=8) (actual time=601.804..601.805 rows=1 loops=5)
Output: PARTIAL count(*)
Buffers: shared hit=2199668 read=19636
Worker 0: actual time=600.796..600.798 rows=1 loops=1
Buffers: shared hit=587645 read=3242
Worker 1: actual time=600.942..600.943 rows=1 loops=1
Buffers: shared hit=599411 read=3326
Worker 2: actual time=600.829..600.830 rows=1 loops=1
Buffers: shared hit=324544 read=4305
Worker 3: actual time=600.910..600.912 rows=1 loops=1
Buffers: shared hit=326520 read=4134
-> Parallel Append (cost=0.43..309322.18 rows=1490232 width=0) (actual time=0.614..545.485 rows=1188508 loops=5)
Buffers: shared hit=2199668 read=19636
Worker 0: actual time=1.625..540.640 rows=1226383 loops=1
Buffers: shared hit=587645 read=3242
Worker 1: actual time=1.312..541.209 rows=1250688 loops=1
Buffers: shared hit=599411 read=3326
Worker 2: actual time=0.065..548.210 rows=1123038 loops=1
Buffers: shared hit=324544 read=4305
Worker 3: actual time=0.051..548.363 rows=1117758 loops=1
Buffers: shared hit=326520 read=4134
-> Parallel Index Only Scan using alarm_20230530_created_at_idx on public.alarm_20230530 alarm_1 (cost=0.43..253250.08 rows=1157253 width=0) (actual time=0.614..324.555 r
ows=919379 loops=5)
Index Cond: ((alarm_1.created_at >= '2023-05-30 07:34:46.002537+00'::timestamp with time zone) AND (alarm_1.created_at < '2023-05-31 07:34:46.002537+00'::timestamp wi
th time zone))
Heap Fetches: 529504
Buffers: shared hit=2118759 read=12477
Worker 0: actual time=1.623..461.733 rows=1226383 loops=1
Buffers: shared hit=587645 read=3242
Worker 1: actual time=1.311..463.024 rows=1250688 loops=1
Buffers: shared hit=599411 read=3326
Worker 2: actual time=0.074..233.037 rows=692071 loops=1
Buffers: shared hit=298845 read=1935
Worker 3: actual time=0.049..233.393 rows=692036 loops=1
Buffers: shared hit=301847 read=1916
-> Parallel Index Only Scan using alarm_20230531_created_at_idx on public.alarm_20230531 alarm_2 (cost=0.43..48620.94 rows=332979 width=0) (actual time=0.044..242.691 row
s=448548 loops=3)
Index Cond: ((alarm_2.created_at >= '2023-05-30 07:34:46.002537+00'::timestamp with time zone) AND (alarm_2.created_at < '2023-05-31 07:34:46.002537+00'::timestamp wi
th time zone))
Heap Fetches: 102772
Buffers: shared hit=80909 read=7159
Worker 2: actual time=0.064..243.537 rows=430967 loops=1
Buffers: shared hit=25699 read=2370
Worker 3: actual time=0.050..243.304 rows=425722 loops=1
Buffers: shared hit=24673 read=2218
Query Identifier: 8664734888486988169
Planning:
Buffers: shared hit=14
Planning Time: 0.321 ms
Execution Time: 623.675 ms
(54 rows)
|
告警消费吞吐如何优化
该如何优化
优化数据库配置,提高数据库本身的处理能力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
deadlock_timeout = 2s
max_connections = 500
# 16 核 8 GB 内存
shared_buffers = 2GB
effective_cache_size = 6GB
maintenance_work_mem = 1GB
effective_io_concurrency = 2
bgwriter_lru_maxpages = 500
wal_buffers = 32MB
max_wal_size = 8GB
min_wal_size = 4GB
work_mem = 8MB
max_worker_processes = 16
max_parallel_workers_per_gather = 4
max_parallel_workers = 16
max_parallel_maintenance_workers = 4
|
- 提高 pg 内存使用量
- 提高 pg cpu 使用量,sql 并行查询
批量入库提高吞吐
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
const (
AlarmSize = 50000
AlarmBatch = 100
)
// go test -timeout 30s -run ^TestSingleInsert$ github.com/exfly/alarmopt
func TestSingleInsert(t *testing.T) {
err := DB.AutoMigrate(&Alarm{})
require.NoError(t, err)
err = DB.Exec("truncate alarm;").Error
require.NoError(t, err)
alarms := testGenAlarm(t, AlarmSize)
err = DB.Debug().CreateInBatches(alarms, 1).Error
require.NoError(t, err)
}
// go test -timeout 30s -run ^TestBatchInsert$ github.com/exfly/alarmopt
func TestBatchInsert(t *testing.T) {
err := DB.AutoMigrate(&Alarm{})
require.NoError(t, err)
err = DB.Exec("truncate alarm;").Error
require.NoError(t, err)
alarms := testGenAlarm(t, AlarmSize)
err = DB.Debug().CreateInBatches(alarms, AlarmBatch).Error
require.NoError(t, err)
}
|
1
2
3
4
5
|
go test -timeout 30s -run ^TestSingleInsert$ github.com/exfly/alarmopt
ok github.com/exfly/alarmopt 20.156s
go test -timeout 30s -run ^TestBatchInsert$ github.com/exfly/alarmopt
ok github.com/exfly/alarmopt 0.551s
|
20.156/0.551 = 36.58 倍
如何做的告警消费批量入库
告警消费是从 kafka 中消费数据,见如下代码。Kafka 消息传递采用的方式是定时从 Server 中 pull Message,一次 pull 会拉多条数据,但是 sarama 的 ConsumerGroup 暴漏出的 interface sarama.ConsumerGroupClaim
中没有办法一次性获得整批数据,需要实现这部分聚合逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
type handler struct {
actPool *ants.PoolWithFunc
}
func (consumer *handler) Setup(session sarama.ConsumerGroupSession) error {
return nil
}
func (consumer *handler) Cleanup(session sarama.ConsumerGroupSession) error {
return nil
}
func (consumer *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
if err := consumer.actPool.Invoke(message); err != nil {
if err == ants.ErrPoolClosed {
return err
}
}
session.MarkMessage(message, "")
}
return nil
}
|
需要的聚合逻辑如下所述:
- 从 chan 中一次拿出一个 Messsage
- 多条消息按照如下规则聚合到一起
- 当数量达到 N 个时
- 上一次发送数据到当前时间超过 T 时
批量入库手动实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
import (
"context"
"log/slog"
"os"
"sync"
"testing"
"time"
)
var (
logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{AddSource: true, Level: slog.LevelDebug}))
)
type Alarm struct {
ID int
}
type batchInsert struct {
do func(context.Context, []Alarm) error
interval time.Duration
maxBatchSize int64
lock sync.RWMutex
buf []Alarm
once sync.Once
}
func (b *batchInsert) Do(ctx context.Context, in Alarm) error {
b.once.Do(func() {
go func() {
ticker := time.NewTicker(b.interval)
for range ticker.C {
if b.lock.TryLock() {
if len(b.buf) > 0 {
logger.Info("timer do")
if err := b.do(ctx, b.buf); err != nil {
logger.Error("in ticker do failed", "err", err)
} else if err == nil {
b.buf = nil
}
}
b.lock.Unlock()
}
}
}()
})
b.lock.Lock()
defer b.lock.Unlock()
if b.buf == nil {
b.buf = make([]Alarm, 0, b.maxBatchSize)
}
b.buf = append(b.buf, in)
if len(b.buf) >= int(b.maxBatchSize) {
logger.Info("realtime do")
if err := b.do(ctx, b.buf); err != nil {
logger.Error("do failed", "err", err)
} else if err == nil {
b.buf = nil
}
}
return nil
}
|
社区 dataloader 实现
dataloader 来资源 Facebook 的 graphql,为了解决其 N+1 问题出现的
go 重的 Dataloader 实现:https://github.com/graph-gophers/dataloader/tree/master
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package alarmopt
import (
"context"
"testing"
"time"
dataloader "github.com/graph-gophers/dataloader/v7"
)
func TestDataloaderNoCache(t *testing.T) {
batchFunc := func(_ context.Context, keys []int) []*dataloader.Result[*Alarm] {
var results []*dataloader.Result[*Alarm]
// do some pretend work to resolve keys
for _, k := range keys {
results = append(results, &dataloader.Result[*Alarm]{Data: &Alarm{ID: k}})
}
logger.Debug("real do", "ids", keys)
return results
}
cache := &dataloader.NoCache[int, *Alarm]{}
loader := dataloader.NewBatchedLoader(
batchFunc,
dataloader.WithCache[int, *Alarm](cache),
dataloader.WithWait[int, *Alarm](time.Second*3),
dataloader.WithBatchCapacity[int, *Alarm](5),
dataloader.WithInputCapacity[int, *Alarm](1),
)
for i := 0; i < 50; i++ {
logger.Debug("submit", "id", i)
loader.Load(context.Background(), i)
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
func TestAlarmDataloaderNoCacheRetResult(t *testing.T) {
batchFunc := func(_ context.Context, keys []Alarm) []*dataloader.Result[*Alarm] {
var results []*dataloader.Result[*Alarm]
// do some pretend work to resolve keys
for _, k := range keys {
results = append(results, &dataloader.Result[*Alarm]{Data: &Alarm{ID: k.ID}})
}
logger.Debug("real do", "ids", keys)
return results
}
cache := &dataloader.NoCache[Alarm, *Alarm]{}
loader := dataloader.NewBatchedLoader(
batchFunc,
dataloader.WithCache[Alarm, *Alarm](cache),
dataloader.WithWait[Alarm, *Alarm](time.Second*3),
dataloader.WithBatchCapacity[Alarm, *Alarm](5),
dataloader.WithInputCapacity[Alarm, *Alarm](1),
)
for i := 0; i < 50; i++ {
logger.Debug("submit", "id", i)
result, err := loader.Load(context.Background(), Alarm{ID: i})()
if err != nil {
t.Error(err)
}
_ = result
}
}
|
REF