目录

我是如何将 kafka 消费吞吐提高 100 倍

文章简介:我是如何将 kafka 消费吞吐提高 100 倍的

背景

  1. 告警入库/归档效率差,200 左右
  2. 告警消费占用资源多,500w 下大量告警会直接压垮平台,平台主要业务 504 timeout
  3. 告警频繁入库/归档情况下,postgres 数据库告警相关表空洞率达到 90% 以上,大量数据空洞拖慢数据库查询几个数据量

分析原因

  1. 单表存储过多数据,单表超过 500w 行
  2. 数据库插入更新删除频繁,postgres mvvc 与 autovacuum 机制没有及时触发,导致空洞率提高
  3. 告警业务流程长,很多实时性要求不高的逻辑(告警自动化处置/统计)串在告警消费过程中,影响告警吞吐
  4. 告警消费有大量小而频繁的查询,没有添加缓存,数据库占用大量 qps
  5. 数据库使用默认配置,内存/cpu 没有充分使用

都做了哪些事

  1. 数据库配置调优
  2. 告警基于时间分区,每天一个分区,归档每次处理一个分区,分区索引与并行查询,提高查询吞吐及稳定性
  3. 告警消费流程重新梳理,拆成三个阶段,告警入库/告警统计/自动化处置; 告警消费过程不会因为告警自动化处置慢而影响到整体的吞吐
  4. 告警涉及到多张表批量入库,提高吞吐,降低资源消耗
  5. 告警归档同步到 hbase 中后,truncate 子表
  6. 加 cache
  7. 慢查询优化,优化 sql 及添加索引; 删除从未被使用过的索引
  8. 消费程序单独一个进程,资源/故障隔离
  9. 消费添加限流,提高稳定性

达到的效果

32 核 64GB 500w 告警下

  1. 告警入库吞吐可以到 40000,比优化前高 2 个数量级
  2. 告警列表取前 20 条数据 sql 提高 2500 倍;count 速度提高 27 倍;告警列表整体加载时间由 15s 缩短为 3s 以内,告警统计页面加载时间 504 timeout 优化到 15s 内
  3. 数据库调整参数后吞吐提高三倍
  4. 告警量 3000w 情况下服务可用,预期会存半年甚至更多的数据后服务依然可用

告警列表优化

优化前

1
explain (ANALYZE, VERBOSE, TIMING,COSTS, BUFFERS, SUMMARY) SELECT * FROM "alarm"  WHERE (judged_state = 1 OR judged_state = 0) ORDER BY alarm.updated_at desc nulls last LIMIT 20;

/how-to-increase-alarm-throughput-100-times/README.assets/image-20230813170415514.png

优化后

1
create index if not exists alarm_multiline_index on alarm (created_at desc, updated_at desc nulls last, judged_state, alarm_level, disposed_state);

/how-to-increase-alarm-throughput-100-times/README.assets/image-20230813170359600.png

大屏大量使用的 count 优化

17134.851/623.675 = 27 倍

优化前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
explain (ANALYZE, VERBOSE, TIMING,COSTS, BUFFERS, SUMMARY)  SELECT COUNT("id") FROM "alarm" WHERE created_at >= '2023-05-30 07:34:46.002537+00' AND created_at < '2023-05-31 07:34:46.002537+00';
                                                                                                   QUERY PLAN
 
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------
 Finalize Aggregate  (cost=840896.65..840896.66 rows=1 width=8) (actual time=17098.462..17134.785 rows=1 loops=1)
   Output: count(alarm.id)
   Buffers: shared hit=5615853 read=105541 written=1
   ->  Gather  (cost=840896.23..840896.64 rows=4 width=8) (actual time=17098.338..17134.774 rows=5 loops=1)
         Output: (PARTIAL count(alarm.id))
         Workers Planned: 4
         Workers Launched: 4
         Buffers: shared hit=5615853 read=105541 written=1
         ->  Partial Aggregate  (cost=839896.23..839896.24 rows=1 width=8) (actual time=17094.127..17094.129 rows=1 loops=5)
               Output: PARTIAL count(alarm.id)
               Buffers: shared hit=5615853 read=105541 written=1
               Worker 0:  actual time=17093.145..17093.148 rows=1 loops=1
                 Buffers: shared hit=1478094 read=28657
               Worker 1:  actual time=17093.203..17093.205 rows=1 loops=1
                 Buffers: shared hit=1470692 read=28510
               Worker 2:  actual time=17093.147..17093.150 rows=1 loops=1
                 Buffers: shared hit=886771 read=16085
               Worker 3:  actual time=17093.167..17093.170 rows=1 loops=1
                 Buffers: shared hit=882627 read=16017 written=1
               ->  Parallel Append  (cost=0.55..836170.65 rows=1490232 width=37) (actual time=0.946..17023.812 rows=1188508 loops=5)
                     Buffers: shared hit=5615853 read=105541 written=1
                     Worker 0:  actual time=1.212..17006.371 rows=1491562 loops=1
                       Buffers: shared hit=1478094 read=28657
                     Worker 1:  actual time=0.939..17005.878 rows=1484942 loops=1
                       Buffers: shared hit=1470692 read=28510
                     Worker 2:  actual time=0.814..17034.576 rows=986228 loops=1
                       Buffers: shared hit=886771 read=16085
                     Worker 3:  actual time=0.471..17035.496 rows=982083 loops=1
                       Buffers: shared hit=882627 read=16017 written=1
                     ->  Parallel Index Only Scan using alarm_20230530_pkey on public.alarm_20230530 alarm_1  (cost=0.56..713942.31 rows=1157253 width=37) (actual time=0.796..14931.447 rows=919
379 loops=5)
                           Output: alarm_1.id
                           Index Cond: ((alarm_1.created_at >= '2023-05-30 07:34:46.002537+00'::timestamp with time zone) AND (alarm_1.created_at < '2023-05-31 07:34:46.002537+00'::timestamp wi
th time zone))
                           Heap Fetches: 296346
                           Buffers: shared hit=4552140 read=88971 written=1
                           Worker 0:  actual time=1.211..16900.689 rows=1491562 loops=1
                             Buffers: shared hit=1478094 read=28657
                           Worker 1:  actual time=0.938..16900.152 rows=1484942 loops=1
                             Buffers: shared hit=1470692 read=28510
                           Worker 2:  actual time=0.501..13618.651 rows=537258 loops=1
                             Buffers: shared hit=532080 read=10556
                           Worker 3:  actual time=1.098..13619.421 rows=536621 loops=1
                             Buffers: shared hit=530559 read=10531 written=1
                     ->  Parallel Index Only Scan using alarm_20230531_pkey on public.alarm_20230531 alarm_2  (cost=0.55..114777.18 rows=332979 width=37) (actual time=0.860..3347.784 rows=44854
8 loops=3)
                           Output: alarm_2.id
                           Index Cond: ((alarm_2.created_at >= '2023-05-30 07:34:46.002537+00'::timestamp with time zone) AND (alarm_2.created_at < '2023-05-31 07:34:46.002537+00'::timestamp wi
th time zone))
                           Heap Fetches: 68366
                           Buffers: shared hit=1063713 read=16570
                           Worker 2:  actual time=0.813..3346.199 rows=448970 loops=1
                             Buffers: shared hit=354691 read=5529
                           Worker 3:  actual time=0.470..3347.495 rows=445462 loops=1
                             Buffers: shared hit=352068 read=5486
 Query Identifier: 4802739785007802293
 Planning:
   Buffers: shared hit=14
 Planning Time: 0.299 ms
 Execution Time: 17134.851 ms
(56 rows)

优化后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
pedestal=# explain (ANALYZE, VERBOSE, TIMING,COSTS, BUFFERS, SUMMARY)  SELECT COUNT(*) FROM "alarm" WHERE created_at >= '2023-05-30 07:34:46.002537+00' AND created_at < '2023-05-31 07:34:46.002537+00';
                                                                                                     QUERY PLAN
 
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------
 Finalize Aggregate  (cost=314048.18..314048.19 rows=1 width=8) (actual time=606.301..623.625 rows=1 loops=1)
   Output: count(*)
   Buffers: shared hit=2199668 read=19636
   ->  Gather  (cost=314047.76..314048.17 rows=4 width=8) (actual time=605.925..623.616 rows=5 loops=1)
         Output: (PARTIAL count(*))
         Workers Planned: 4
         Workers Launched: 4
         Buffers: shared hit=2199668 read=19636
         ->  Partial Aggregate  (cost=313047.76..313047.77 rows=1 width=8) (actual time=601.804..601.805 rows=1 loops=5)
               Output: PARTIAL count(*)
               Buffers: shared hit=2199668 read=19636
               Worker 0:  actual time=600.796..600.798 rows=1 loops=1
                 Buffers: shared hit=587645 read=3242
               Worker 1:  actual time=600.942..600.943 rows=1 loops=1
                 Buffers: shared hit=599411 read=3326
               Worker 2:  actual time=600.829..600.830 rows=1 loops=1
                 Buffers: shared hit=324544 read=4305
               Worker 3:  actual time=600.910..600.912 rows=1 loops=1
                 Buffers: shared hit=326520 read=4134
               ->  Parallel Append  (cost=0.43..309322.18 rows=1490232 width=0) (actual time=0.614..545.485 rows=1188508 loops=5)
                     Buffers: shared hit=2199668 read=19636
                     Worker 0:  actual time=1.625..540.640 rows=1226383 loops=1
                       Buffers: shared hit=587645 read=3242
                     Worker 1:  actual time=1.312..541.209 rows=1250688 loops=1
                       Buffers: shared hit=599411 read=3326
                     Worker 2:  actual time=0.065..548.210 rows=1123038 loops=1
                       Buffers: shared hit=324544 read=4305
                     Worker 3:  actual time=0.051..548.363 rows=1117758 loops=1
                       Buffers: shared hit=326520 read=4134
                     ->  Parallel Index Only Scan using alarm_20230530_created_at_idx on public.alarm_20230530 alarm_1  (cost=0.43..253250.08 rows=1157253 width=0) (actual time=0.614..324.555 r
ows=919379 loops=5)
                           Index Cond: ((alarm_1.created_at >= '2023-05-30 07:34:46.002537+00'::timestamp with time zone) AND (alarm_1.created_at < '2023-05-31 07:34:46.002537+00'::timestamp wi
th time zone))
                           Heap Fetches: 529504
                           Buffers: shared hit=2118759 read=12477
                           Worker 0:  actual time=1.623..461.733 rows=1226383 loops=1
                             Buffers: shared hit=587645 read=3242
                           Worker 1:  actual time=1.311..463.024 rows=1250688 loops=1
                             Buffers: shared hit=599411 read=3326
                           Worker 2:  actual time=0.074..233.037 rows=692071 loops=1
                             Buffers: shared hit=298845 read=1935
                           Worker 3:  actual time=0.049..233.393 rows=692036 loops=1
                             Buffers: shared hit=301847 read=1916
                     ->  Parallel Index Only Scan using alarm_20230531_created_at_idx on public.alarm_20230531 alarm_2  (cost=0.43..48620.94 rows=332979 width=0) (actual time=0.044..242.691 row
s=448548 loops=3)
                           Index Cond: ((alarm_2.created_at >= '2023-05-30 07:34:46.002537+00'::timestamp with time zone) AND (alarm_2.created_at < '2023-05-31 07:34:46.002537+00'::timestamp wi
th time zone))
                           Heap Fetches: 102772
                           Buffers: shared hit=80909 read=7159
                           Worker 2:  actual time=0.064..243.537 rows=430967 loops=1
                             Buffers: shared hit=25699 read=2370
                           Worker 3:  actual time=0.050..243.304 rows=425722 loops=1
                             Buffers: shared hit=24673 read=2218
 Query Identifier: 8664734888486988169
 Planning:
   Buffers: shared hit=14
 Planning Time: 0.321 ms
 Execution Time: 623.675 ms
(54 rows)

告警消费吞吐如何优化

该如何优化

优化数据库配置,提高数据库本身的处理能力

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
deadlock_timeout = 2s
max_connections = 500

# 16 核 8 GB 内存
shared_buffers = 2GB
effective_cache_size = 6GB
maintenance_work_mem = 1GB
effective_io_concurrency = 2
bgwriter_lru_maxpages = 500
wal_buffers = 32MB
max_wal_size = 8GB
min_wal_size = 4GB
work_mem = 8MB
max_worker_processes = 16
max_parallel_workers_per_gather = 4
max_parallel_workers = 16
max_parallel_maintenance_workers = 4
  1. 提高 pg 内存使用量
  2. 提高 pg cpu 使用量,sql 并行查询

批量入库提高吞吐

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const (
 AlarmSize  = 50000
 AlarmBatch = 100
)

// go test -timeout 30s -run ^TestSingleInsert$ github.com/exfly/alarmopt
func TestSingleInsert(t *testing.T) {
 err := DB.AutoMigrate(&Alarm{})
 require.NoError(t, err)
 err = DB.Exec("truncate alarm;").Error
 require.NoError(t, err)

 alarms := testGenAlarm(t, AlarmSize)
 err = DB.Debug().CreateInBatches(alarms, 1).Error
 require.NoError(t, err)
}

// go test -timeout 30s -run ^TestBatchInsert$ github.com/exfly/alarmopt
func TestBatchInsert(t *testing.T) {
 err := DB.AutoMigrate(&Alarm{})
 require.NoError(t, err)
 err = DB.Exec("truncate alarm;").Error
 require.NoError(t, err)

 alarms := testGenAlarm(t, AlarmSize)
 err = DB.Debug().CreateInBatches(alarms, AlarmBatch).Error
 require.NoError(t, err)
}
1
2
3
4
5
go test -timeout 30s -run ^TestSingleInsert$ github.com/exfly/alarmopt
ok      github.com/exfly/alarmopt       20.156s

go test -timeout 30s -run ^TestBatchInsert$ github.com/exfly/alarmopt
ok      github.com/exfly/alarmopt       0.551s

20.156/0.551 = 36.58 倍

如何做的告警消费批量入库

/how-to-increase-alarm-throughput-100-times/business_graph.svg

告警消费是从 kafka 中消费数据,见如下代码。Kafka 消息传递采用的方式是定时从 Server 中 pull Message,一次 pull 会拉多条数据,但是 sarama 的 ConsumerGroup 暴漏出的 interface sarama.ConsumerGroupClaim 中没有办法一次性获得整批数据,需要实现这部分聚合逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type handler struct {
 actPool *ants.PoolWithFunc
}

func (consumer *handler) Setup(session sarama.ConsumerGroupSession) error {
 return nil
}

func (consumer *handler) Cleanup(session sarama.ConsumerGroupSession) error {
 return nil
}

func (consumer *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
 for message := range claim.Messages() {

  if err := consumer.actPool.Invoke(message); err != nil {
   if err == ants.ErrPoolClosed {
    return err
   }
  }
  session.MarkMessage(message, "")
 }
 return nil
}

需要的聚合逻辑如下所述:

  1. 从 chan 中一次拿出一个 Messsage
  2. 多条消息按照如下规则聚合到一起
    1. 当数量达到 N 个时
    2. 上一次发送数据到当前时间超过 T 时

批量入库手动实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import (
 "context"
 "log/slog"
 "os"
 "sync"
 "testing"
 "time"
)

var (
 logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{AddSource: true, Level: slog.LevelDebug}))
)

type Alarm struct {
 ID int
}

type batchInsert struct {
 do           func(context.Context, []Alarm) error
 interval     time.Duration
 maxBatchSize int64

 lock sync.RWMutex
 buf  []Alarm

 once sync.Once
}

func (b *batchInsert) Do(ctx context.Context, in Alarm) error {
 b.once.Do(func() {
  go func() {
   ticker := time.NewTicker(b.interval)

   for range ticker.C {
    if b.lock.TryLock() {
     if len(b.buf) > 0 {
      logger.Info("timer do")
      if err := b.do(ctx, b.buf); err != nil {
       logger.Error("in ticker do failed", "err", err)
      } else if err == nil {
       b.buf = nil
      }
     }
     b.lock.Unlock()
    }
   }
  }()
 })

 b.lock.Lock()
 defer b.lock.Unlock()

 if b.buf == nil {
  b.buf = make([]Alarm, 0, b.maxBatchSize)
 }
 b.buf = append(b.buf, in)
 if len(b.buf) >= int(b.maxBatchSize) {
  logger.Info("realtime do")
  if err := b.do(ctx, b.buf); err != nil {
   logger.Error("do failed", "err", err)
  } else if err == nil {
   b.buf = nil
  }

 }
 return nil
}

/how-to-increase-alarm-throughput-100-times/README.assets/image-20230813202917059.png

社区 dataloader 实现

dataloader 来资源 Facebook 的 graphql,为了解决其 N+1 问题出现的

go 重的 Dataloader 实现:https://github.com/graph-gophers/dataloader/tree/master

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package alarmopt

import (
 "context"
 "testing"
 "time"

 dataloader "github.com/graph-gophers/dataloader/v7"
)

func TestDataloaderNoCache(t *testing.T) {
 batchFunc := func(_ context.Context, keys []int) []*dataloader.Result[*Alarm] {
  var results []*dataloader.Result[*Alarm]
  // do some pretend work to resolve keys
  for _, k := range keys {
   results = append(results, &dataloader.Result[*Alarm]{Data: &Alarm{ID: k}})
  }
  logger.Debug("real do", "ids", keys)
  return results
 }

 cache := &dataloader.NoCache[int, *Alarm]{}
 loader := dataloader.NewBatchedLoader(
  batchFunc,
  dataloader.WithCache[int, *Alarm](cache),
  dataloader.WithWait[int, *Alarm](time.Second*3),
  dataloader.WithBatchCapacity[int, *Alarm](5),
  dataloader.WithInputCapacity[int, *Alarm](1),
 )

 for i := 0; i < 50; i++ {
  logger.Debug("submit", "id", i)
  loader.Load(context.Background(), i)
 }
}

/how-to-increase-alarm-throughput-100-times/README.assets/image-20230813205823344.png

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func TestAlarmDataloaderNoCacheRetResult(t *testing.T) {
 batchFunc := func(_ context.Context, keys []Alarm) []*dataloader.Result[*Alarm] {
  var results []*dataloader.Result[*Alarm]
  // do some pretend work to resolve keys
  for _, k := range keys {
   results = append(results, &dataloader.Result[*Alarm]{Data: &Alarm{ID: k.ID}})
  }
  logger.Debug("real do", "ids", keys)
  return results
 }

 cache := &dataloader.NoCache[Alarm, *Alarm]{}
 loader := dataloader.NewBatchedLoader(
  batchFunc,
  dataloader.WithCache[Alarm, *Alarm](cache),
  dataloader.WithWait[Alarm, *Alarm](time.Second*3),
  dataloader.WithBatchCapacity[Alarm, *Alarm](5),
  dataloader.WithInputCapacity[Alarm, *Alarm](1),
 )

 for i := 0; i < 50; i++ {
  logger.Debug("submit", "id", i)
  result, err := loader.Load(context.Background(), Alarm{ID: i})()
  if err != nil {
   t.Error(err)
  }
  _ = result
 }
}

REF