文章简介:源码层面分析 Opentracing 一个实现 jaeger 的工作原理
agent
Agent 处于 jaeger-client 和 collector 之间,属于代理的作用,主要是把 client 发送过来的数据从 thrift 转为 Batch,并通过 RPC 批量提交到 collector
jaegertracing/jaeger/cmd/agent/app/flags.go#L62
1
2
3
4
5
6
7
8
9
|
var defaultProcessors = []struct {
model Model
protocol Protocol
port int
}{
{model: "zipkin", protocol: "compact", port: 5775},
{model: "jaeger", protocol: "compact", port: 6831},
{model: "jaeger", protocol: "binary", port: 6832},
}
|
jaegertracing/jaeger/cmd/agent/app/servers/tbuffered_server.go#L82
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// Serve initiates the readers and starts serving traffic
func (s *TBufferedServer) Serve() {
atomic.StoreUint32(&s.serving, 1)
for s.IsServing() {
readBuf := s.readBufPool.Get().(*ReadBuf)
n, err := s.transport.Read(readBuf.bytes)
if err == nil {
readBuf.n = n
s.metrics.PacketSize.Update(int64(n))
select {
case s.dataChan <- readBuf:
s.metrics.PacketsProcessed.Inc(1)
s.updateQueueSize(1)
default:
s.metrics.PacketsDropped.Inc(1)
}
} else {
s.metrics.ReadError.Inc(1)
}
}
}
|
jaegertracing/jaeger/blob/master/cmd/agent/app/processors/thrift_processor.go#L114
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// processBuffer reads data off the channel and puts it into a custom transport for
// the processor to process
func (s *ThriftProcessor) processBuffer() {
for readBuf := range s.server.DataChan() {
protocol := s.protocolPool.Get().(thrift.TProtocol)
payload := readBuf.GetBytes()
protocol.Transport().Write(payload)
s.logger.Debug("Span(s) received by the agent", zap.Int("bytes-received", len(payload)))
if ok, err := s.handler.Process(protocol, protocol); !ok {
s.logger.Error("Processor failed", zap.Error(err))
s.metrics.HandlerProcessError.Inc(1)
}
s.protocolPool.Put(protocol)
s.server.DataRecd(readBuf) // acknowledge receipt and release the buffer
}
}
|
jaegertracing/jaeger/thrift-gen/agent/agent.go#L187
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (p *agentProcessorEmitBatch) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
args := AgentEmitBatchArgs{}
if err = args.Read(iprot); err != nil {
iprot.ReadMessageEnd()
return false, err
}
iprot.ReadMessageEnd()
var err2 error
if err2 = p.handler.EmitBatch(args.Batch); err2 != nil {
return true, err2
}
return true, nil
}
|
jaegertracing/jaeger/thrift-gen/jaeger/tchan-jaeger.go#L39
Collector
接收 Agent 的数据
jaegertracing/jaeger/cmd/collector/app/handler/thrift_span_handler.go#L60
比较舒服的维护metrics的场景
references