目录

Opentracing - jaeger 源码分析

文章简介:源码层面分析 Opentracing 一个实现 jaeger 的工作原理

/media/img/opentracing/jaeger.png

agent

Agent 处于 jaeger-client 和 collector 之间,属于代理的作用,主要是把 client 发送过来的数据从 thrift 转为 Batch,并通过 RPC 批量提交到 collector

jaegertracing/jaeger/cmd/agent/app/flags.go#L62

1
2
3
4
5
6
7
8
9
var defaultProcessors = []struct {
	model    Model
	protocol Protocol
	port     int
}{
	{model: "zipkin", protocol: "compact", port: 5775},
	{model: "jaeger", protocol: "compact", port: 6831},
	{model: "jaeger", protocol: "binary", port: 6832},
}

jaegertracing/jaeger/cmd/agent/app/servers/tbuffered_server.go#L82

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// Serve initiates the readers and starts serving traffic
func (s *TBufferedServer) Serve() {
	atomic.StoreUint32(&s.serving, 1)
	for s.IsServing() {
		readBuf := s.readBufPool.Get().(*ReadBuf)
		n, err := s.transport.Read(readBuf.bytes)
		if err == nil {
			readBuf.n = n
			s.metrics.PacketSize.Update(int64(n))
			select {
			case s.dataChan <- readBuf:
				s.metrics.PacketsProcessed.Inc(1)
				s.updateQueueSize(1)
			default:
				s.metrics.PacketsDropped.Inc(1)
			}
		} else {
			s.metrics.ReadError.Inc(1)
		}
	}
}

jaegertracing/jaeger/blob/master/cmd/agent/app/processors/thrift_processor.go#L114

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// processBuffer reads data off the channel and puts it into a custom transport for
// the processor to process
func (s *ThriftProcessor) processBuffer() {
	for readBuf := range s.server.DataChan() {
		protocol := s.protocolPool.Get().(thrift.TProtocol)
		payload := readBuf.GetBytes()
		protocol.Transport().Write(payload)
		s.logger.Debug("Span(s) received by the agent", zap.Int("bytes-received", len(payload)))

		if ok, err := s.handler.Process(protocol, protocol); !ok {
			s.logger.Error("Processor failed", zap.Error(err))
			s.metrics.HandlerProcessError.Inc(1)
		}
		s.protocolPool.Put(protocol)
		s.server.DataRecd(readBuf) // acknowledge receipt and release the buffer
	}
}

jaegertracing/jaeger/thrift-gen/agent/agent.go#L187

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (p *agentProcessorEmitBatch) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
	args := AgentEmitBatchArgs{}
	if err = args.Read(iprot); err != nil {
		iprot.ReadMessageEnd()
		return false, err
	}

	iprot.ReadMessageEnd()
	var err2 error
	if err2 = p.handler.EmitBatch(args.Batch); err2 != nil {
		return true, err2
	}
	return true, nil
}

jaegertracing/jaeger/thrift-gen/jaeger/tchan-jaeger.go#L39

Collector

接收 Agent 的数据

jaegertracing/jaeger/cmd/collector/app/handler/thrift_span_handler.go#L60

比较舒服的维护metrics的场景

references