前言:ZIPKIN作為當前“分布式服務鏈路跟蹤”問題的流行解決方案之一,正在被越來越多的公司和個人學習使用。其中很重要的一塊,就是上報鏈路數(shù)據(jù)。那么知道服務端如何接收數(shù)據(jù),以及我們該怎樣上報數(shù)據(jù)到服務端就顯得十分重要。雖然ZIPKIN官方也開源了一個客戶端Brave,但是本文卻并不想直接介紹Brave,而是想站在一個從零開發(fā)ZIPKIN客戶端的角度,一層層分析解決如何自己寫一個ZIPKIN客戶端,直到最后引出Brave。本文最終想達到的效果,就是希望通過本文,能讓大家對ZIPKIN的鏈路上報有一個詳盡的理解和認識。
零行代碼快速開始
ZIPKIN是Spring Boot服務,因此啟動起來十分方便,直接運行ZIPKIN JAR包就可以了。ZIPKIN JAR包我們可以自行編譯ZIPKIN源碼獲得,也可以從下面的倉庫獲取,這個倉庫專門存放ZIPKIN各種編譯好的JAR包,倉庫地址為:
https://dl.bintray.com/openzipkin/maven/io/zipkin/java/
進入zipkin-server目錄下載server jar包,比如下載2.11.5版本,運行jar包:
java -jar zipkin-server-2.11.5-exec.jar
這樣本地就起了一個ZIPKIN服務,瀏覽器中輸入http://localhost:9411/zipkin/,即可打開ZIPKIN首頁,效果如下:

接下來,我們開始上報數(shù)據(jù),現(xiàn)在我不想寫任何代碼,那么就用postman發(fā)起一次post請求上報數(shù)據(jù)吧:

上報后在本地ZIPKIN服務上即可看到剛上報的數(shù)據(jù)效果:

沒有任何一句代碼,一個完整的ZIPKIN數(shù)據(jù)上報存儲展示就完成了,那么我們來思考下:
1、如果自己寫一個上ZIPKIN客戶端,該如何寫?
分析:要上傳數(shù)據(jù)給服務端,那么必須要搞清楚,ZIPKIN服務端可以接受什么樣格式的數(shù)據(jù)?支持的編碼解碼協(xié)議是什么?還有支持那些通信傳輸協(xié)議?
Task: 了解了服務怎么接收數(shù)據(jù)后,客戶端的task也就明確了:
1)以ZIPKIN服務端支持的數(shù)據(jù)格式組織數(shù)據(jù);
2)以ZIPKIN服務端支持的編解碼協(xié)議編碼數(shù)據(jù);
3)以ZIPKIN服務端支持的傳輸協(xié)議上報數(shù)據(jù)。
2、基礎ZIPKIN客戶端完成后,如何適配各種組件?
描述:一個服務通常涉及多個組件,包括服務框架,消息中間件,各種數(shù)據(jù)庫等,那么怎么上報這些組件的數(shù)據(jù)給服務端了,值得我們思考?
分析:其實最簡單的方法就是采用一個裝飾者模式包裝一下組件接口,在其中加入上報的邏輯,但是這種方法局限很大,不靈活。除此之外,我們或許我們可以利用下攔截器技術,AOP技術,以及Agent,探針等技術做到無侵入上報數(shù)據(jù)。
服務端如何接收數(shù)據(jù)的?
要想自己寫一個ZIPKIN客戶端,必須搞清楚ZIPKIN服務端是怎么接收數(shù)據(jù)的,包括數(shù)據(jù)格式協(xié)議是怎樣的?編解碼協(xié)議是怎樣的?還有支持那么傳輸協(xié)議?
ZIPKIN支持的數(shù)據(jù)格式是怎樣的?
ZIPKIN是兼容OpenTracing標準的,OpenTracing規(guī)定,每個Span包含如下狀態(tài):
- 操作名稱
- 起始時間
- 結束時間
- 一組 KV 值,作為階段的標簽(Span Tags)
- 階段日志(Span Logs)
- 階段上下文(SpanContext),其中包含 Trace ID 和 Span ID
- 引用關系(References)
OpenTracing規(guī)范與zipkin對應關系如下:
| OpenTracing | ZIPKIN |
|---|---|
| 操作名稱 | name |
| 起始時間 | timestamp |
| 結束時間 | timestamp+duration |
| 標簽 | tags |
| Span上下文 | traceId; id |
| 引用關系 | parentId |
| Span日志 | 無 |
除此之外,ZIPKIN SAPN還增加了其他幾個字端:
| 字端 | 描述 |
|---|---|
| Kind kind | Spanl類型,比如是Server還是Client |
| Annotaion | 表示某個時間點發(fā)生的Event,Event類型:cs:Client Send 請求;sr:Server Receive到請求;ss:Server 處理完成、并Send Response;cr:Client Receive 到響應 |
| Endpoint localEndpoint | 描述本地服務信息,比如服務名,ip等,方便根據(jù)服務名檢索鏈路 |
| Endpoint remoteEndpoint | RPC調用時,描述遠程服務的服務信息 |
最終,包含兼容OpenTracing標準的字端,以及其本身的一些字端后,zipkin的span數(shù)據(jù)字端如下:

了解了ZIPKIN的數(shù)據(jù)字端格式后,我們再看看ZIPKIN支持的編解碼協(xié)議。
ZIPKIN支持那些編解碼協(xié)議?
Zipkin主要支持三種編解碼協(xié)議,分別為JSON, PROTO3, THRIFT。
JSON編解碼如下:
JSON_V1 {
public Encoding encoding() {
return Encoding.JSON;
}
public boolean decode(byte[] bytes, Collection<Span> out) {
Span result = this.decodeOne(bytes);
if (result == null) {
return false;
} else {
out.add(result);
return true;
}
}
public boolean decodeList(byte[] spans, Collection<Span> out) {
return (new V1JsonSpanReader()).readList(spans, out);
}
public Span decodeOne(byte[] span) {
V1Span v1 = (V1Span)JsonCodec.readOne(new V1JsonSpanReader(), span);
List<Span> out = new ArrayList(1);
V1SpanConverter.create().convert(v1, out);
return (Span)out.get(0);
}
public List<Span> decodeList(byte[] spans) {
return decodeList(this, spans);
}
}
JSON_V2 {
public Encoding encoding() {
return Encoding.JSON;
}
public boolean decode(byte[] span, Collection<Span> out) {
return JsonCodec.read(new V2SpanReader(), span, out);
}
public boolean decodeList(byte[] spans, Collection<Span> out) {
return JsonCodec.readList(new V2SpanReader(), spans, out);
}
@Nullable
public Span decodeOne(byte[] span) {
return (Span)JsonCodec.readOne(new V2SpanReader(), span);
}
public List<Span> decodeList(byte[] spans) {
return decodeList(this, spans);
}
}
THRIFT編解碼如下:
THRIFT {
public Encoding encoding() {
return Encoding.THRIFT;
}
public boolean decode(byte[] span, Collection<Span> out) {
return ThriftCodec.read(span, out);
}
public boolean decodeList(byte[] spans, Collection<Span> out) {
return ThriftCodec.readList(spans, out);
}
public Span decodeOne(byte[] span) {
return ThriftCodec.readOne(span);
}
public List<Span> decodeList(byte[] spans) {
return decodeList(this, spans);
}
}
PROTO3編解碼如下:
PROTO3 {
public Encoding encoding() {
return Encoding.PROTO3;
}
public boolean decode(byte[] span, Collection<Span> out) {
return Proto3Codec.read(span, out);
}
public boolean decodeList(byte[] spans, Collection<Span> out) {
return Proto3Codec.readList(spans, out);
}
@Nullable
public Span decodeOne(byte[] span) {
return Proto3Codec.readOne(span);
}
public List<Span> decodeList(byte[] spans) {
return decodeList(this, spans);
}
}
其中Json是默認支持的,也是使用起來最方便的,除此之外,還包括Thirft和Proto3可供開發(fā)者選擇。
ZIPKIN支持那些傳輸協(xié)議?
Zipkin默認支持Http協(xié)議,除此之外,它還支持kafka,rabbitmq以及scribe協(xié)議:

他們的初始化過程如下:

傳輸協(xié)議支持的編解碼協(xié)議如下:

其中Scribe限定了只支持Thirft協(xié)議,而HTTP、Kafka和RabbitMQ則是三種協(xié)議都支持。
如何做到支持所有的編碼解碼協(xié)議了?ZIPKIN中提供了一個自動探測編解碼的類SpanBytesDecoderDetector,其中核心方法如下:
static BytesDecoder<Span> detectDecoder(byte[] bytes) {
if (bytes[0] <= 16) { // binary format
if (protobuf3(bytes)) return SpanBytesDecoder.PROTO3;
return SpanBytesDecoder.THRIFT; /* the first byte is the TType, in a range 0-16 */
} else if (bytes[0] != '[' && bytes[0] != '{') {
throw new IllegalArgumentException("Could not detect the span format");
}
if (contains(bytes, ENDPOINT_FIELD_SUFFIX)) return SpanBytesDecoder.JSON_V2;
if (contains(bytes, TAGS_FIELD)) return SpanBytesDecoder.JSON_V2;
return SpanBytesDecoder.JSON_V1;
}
現(xiàn)在我們已經知道了zipkin服務接收的數(shù)據(jù)格式以及編解碼協(xié)議和傳輸協(xié)議,那么接下來就可以寫一個客戶端了!
自己寫一個ZIPKIN客戶端
對于服務端如何接收數(shù)據(jù),有了一個全面的認識后,我們就可以著手開始寫一個ZIPKIN客戶端了。
那么,首先定義客戶端上報的數(shù)據(jù)格式,最簡單的方式就是定義一個跟ZIPKIN服務端一樣數(shù)據(jù)格式的Span就可以了:
@Setter
@Getter
public static class MySapn {
private String traceId;
private String parentId;
private String id;
private String name;
private long timestamp;
private long duration;
private Map<String, String> tags;
String kind;
Endpoint localEndpoint, remoteEndpoint;
public static enum Kind {
CLIENT,
SERVER,
PRODUCER,
CONSUMER
}
public static class Endpoint {
String serviceName, ipv4, ipv6;
byte[] ipv4Bytes, ipv6Bytes;
int port; // zero means null
public Endpoint(String serviceName) {
this.serviceName = serviceName;
}
}
}
數(shù)據(jù)格式確定后,接著就編碼數(shù)據(jù),ZIPKIN支持三種編碼方式,JSON、THIFT和PROTO3,為了簡單方便,我們選擇JSON協(xié)議編碼Span數(shù)據(jù)。注意,ZIPKIN JSON字符串前后需要加括號。
數(shù)據(jù)編碼后,接著上報數(shù)據(jù),ZIPKIN默認支持HTTP協(xié)議方式,JAVA HTTP請求包很多,我們隨便選擇一種,比如選擇Apach的HttpClient jar包,代碼如下:
public class App {
private static final String serverUrl = "http://localhost:9411/api/v2/spans";
public static void main(String[] args) {
MySapn span = new MySapn();
span.traceId = "1ae1e4f435814744";
span.parentId = "1ae1e4f435814744";
span.id = "d1ab9cd2c50d13d1";
span.kind = MySapn.Kind.SERVER.toString();
span.name = "my client test";
span.timestamp = 1565933251470428L;
span.duration = 8286;
span.localEndpoint = new MySapn.Endpoint("My client");
Map<String, String> tags = new HashMap<>();
tags.put("name", "pioneeryi");
tags.put("lover", "dandan");
span.tags = tags;
doPost(serverUrl, span);
System.out.println("Hello World!");
}
public static void doPost(String url, MySapn span) {
try {
HttpClient httpClient = new DefaultHttpClient();
HttpPost post = new HttpPost(url);
post.setHeader("Content-Type", "application/json");
post.setHeader("charset", "UTF-8");
String body = new Gson().toJson(span);
body = "[" + body + "]";
System.out.print(body);
StringEntity entity = new StringEntity(body);
post.setEntity(entity);
HttpResponse httpResponse = httpClient.execute(post);
System.out.print(httpResponse);
} catch (Exception exception) {
System.out.print("do post request fail");
}
}
}
maven pom如下:
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.6</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
運行上面main方法,即可上報Span數(shù)據(jù),打開zipkin首頁:http://localhost:9411/zipkin/,搜索剛上報的Span的TraceId,展示效果如下:

一個最簡單的,采用JSON編碼數(shù)據(jù),HTTP協(xié)議上傳數(shù)據(jù)的客戶端我們就完成了。為了用戶調用方便,我們可以將上面的代碼封裝為一個接口供用戶使用:
public void reportSpan(MySpan span)
但是這個太簡陋了,我們只支持了一種一個編解碼協(xié)議,一種傳輸協(xié)議,開始優(yōu)化:
優(yōu)化一:支持多種編解碼,支持多種傳輸協(xié)議。這個時候就需要定一個上報接口類了,根據(jù)不同傳輸協(xié)議提供不同實現(xiàn)。編解碼也是同樣的,定義編碼接口,根據(jù)不同編碼協(xié)議提供不同實現(xiàn)。
此外,當前這個客戶端是同步上報的,性能很差,因此必須改成異步上報,接著優(yōu)化:
優(yōu)化二:上報改成異步上報,隊列+線程。
有了這兩步優(yōu)化后,client大體框架就初步成型了,寫好了客戶端后,怎么適配各個組件,做到無侵入上報也很重要,繼續(xù)優(yōu)化:
優(yōu)化三:適配各個組件,比如Spring Boot,Kafak,MySql等等。
一個完整的Client,還是有很多工作要做的,這里咱就不繼續(xù)深入優(yōu)化開發(fā)了,直接看看官方的Brave怎么做的!
探究ZIPKIN客戶端Brave
為了說明Brave的使用和上報過程,我們先寫一個很簡單的上報Demo,進行演示。Demo將上報一個“一父Span,兩個子Span“的鏈路,demo如下:
public class TraceDemo {
public static void main(String[] args) {
Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
AsyncReporter asyncReporter = AsyncReporter.create(sender);
Tracing tracing = Tracing.newBuilder()
.localServiceName("my-service")
.spanReporter(asyncReporter)
.build();
Tracer tracer = tracing.tracer();
Span parentSpan = tracer.newTrace().name("parent span").start();
Span childSpan1 = tracer.newChild(parentSpan.context()).name("child span1").start();
sleep(500);
childSpan1.finish();
Span childSpan2 = tracer.newChild(parentSpan.context()).name("child span2").start();
sleep(500);
childSpan2.finish();
parentSpan.finish();
}
}
其中 maven pom 引入如下包:
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave</artifactId>
<version>5.3.3</version>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-okhttp3</artifactId>
<version>2.6.0</version>
</dependency>
啟動zipkin 服務:
java -jar zipkin-server-2.11.5-exec.jar
然后在瀏覽器中輸入:http://localhost:9411 ,即可打開zipkin首頁,看到示例代碼上報的效果圖如下:

一個最簡單的但是卻是很完整的一個ZIPKIN鏈路上報演示,就如上面Demo所示,那么接下來分析一下整個鏈路的上報過程!
鏈路上報解析
自己開發(fā)一個客戶端時,我們首先會封裝一個Span類,Brave也不例外,它也定義了Span數(shù)據(jù)結構;那么定義好Span后,誰來負責構造Span了?
Brave中定義了一個類叫Tracer來完成構造Span的工作;Brave生成好了Span后,此時需要編碼發(fā)送了,那么誰又來發(fā)送了?
Brave定義了Reporter組件,它支持異步發(fā)送以及多傳輸協(xié)議以及多編碼協(xié)議發(fā)送Span數(shù)據(jù)。
看起來,各個組件已經完備了,組件有點多!此時需要那么一個人將這些組件組織起來,同時與服務端取得聯(lián)系,開始打通整個流程了,這就是Tracing類的功能。
接下來詳細講解一下各個組件的功能,并根據(jù)Demo代碼,將各個組件串起來,最終梳理清楚Brave上報的流程和原理。
Brave Span
Brave中Span相關類有:SpanCustomizer、NoopSpanCustomizer、CurrentSpanCustomizer、RealSpanCustomizer、Span、NoopSpan、RealSpan。
示例代碼中,關于span的操作如下:
Span span = tracer.newTrace().name("encode").start();
try {
doSomethingExpensive();
} finally {
span.finish();
}
首先通過tracer生成一個span,最后,調用span.finish()上報,接下來就來看看span,以及這個finish干了什么。
咱們用的Span的實現(xiàn)子類為RealSpan,RealSpan兩個核心方法start, finish:
@Override
public Span start() {
return start(clock.currentTimeMicroseconds());
}
@Override
public Span start(long timestamp) {
synchronized (state) {
state.startTimestamp(timestamp);
}
return this;
}
start方法主要記錄開始時間,接下來看看finish 方法:
@Override
public void finish(long timestamp) {
if (!pendingSpans.remove(context)) return;
synchronized (state) {
state.finishTimestamp(timestamp);
}
finishedSpanHandler.handle(context, state);
}
這里交給FinishedSpanHandler來處理。FinishedSpanHandler是一個抽象類,他有如下子類實現(xiàn):
- ZipkinFinishedSpanHandler
- MetricsFinishedSpanHandler
- NoopAwareFinishedSpan
- CompositeFinishedSpanHandler
我們主要關注ZipkinFinishedSpanHandler實現(xiàn):
public final class ZipkinFinishedSpanHandler extends FinishedSpanHandler {
final Reporter<zipkin2.Span> spanReporter;
final MutableSpanConverter converter;
public ZipkinFinishedSpanHandler(Reporter<zipkin2.Span> spanReporter,
ErrorParser errorParser, String serviceName, String ip, int port) {
this.spanReporter = spanReporter;
this.converter = new MutableSpanConverter(errorParser, serviceName, ip, port);
}
@Override public boolean handle(TraceContext context, MutableSpan span) {
if (!Boolean.TRUE.equals(context.sampled())) return true;
Span.Builder builderWithContextData = Span.newBuilder()
.traceId(context.traceIdString())
.parentId(context.parentIdString())
.id(context.spanIdString());
if (context.debug()) builderWithContextData.debug(true);
converter.convert(span, builderWithContextData);
spanReporter.report(builderWithContextData.build());
return true;
}
@Override public String toString() {
return spanReporter.toString();
}
}
可見,上面最終是通過Reporter組件來上報數(shù)據(jù)的。那么Report是如何上報的了?
Brave Reporter
因為上報組件要支持多種編碼協(xié)議以及多種傳輸協(xié)議,因此邏輯比較復雜,官方專門建了一個項目:zipkin-reporter-java
我們首先看看Reporter接口類定義:
public interface Reporter<S> {
Reporter<Span> NOOP = new Reporter<Span>() {
@Override public void report(Span span) {
}
@Override public String toString() {
return "NoopReporter{}";
}
};
Reporter<Span> CONSOLE = new Reporter<Span>() {
@Override public void report(Span span) {
System.out.println(span.toString());
}
@Override public String toString() {
return "ConsoleReporter{}";
}
};
/**
* Schedules the span to be sent onto the transport.
*
* @param span Span, should not be <code>null</code>.
*/
void report(S span);
}
Reporter有三個子類實現(xiàn),分別是CONSOLE,NOOP,和AsyncReporter。CONSOLE就是直接控制臺打印出Span數(shù)據(jù),一般用來調試差不多;NOOP是一個空實現(xiàn),啥也不干;AsyncReporter是我們平時上報用的Reporter,他提供異步上報數(shù)據(jù)能力。
AsyncReporter
AsyncReporter is how you actually get spans to zipkin. By default, it waits up to a second before flushes any pending spans out of process via a Sender.
根據(jù)不同協(xié)議,Ayncreporter執(zhí)行相應的build邏輯:
public AsyncReporter<Span> build() {
switch(this.sender.encoding()) {
case JSON:
return this.build(SpanBytesEncoder.JSON_V2);
case PROTO3:
return this.build(SpanBytesEncoder.PROTO3);
case THRIFT:
return this.build(SpanBytesEncoder.THRIFT);
default:
throw new UnsupportedOperationException(this.sender.encoding().name());
}
}
build方法詳細邏輯,如下:
public <S> AsyncReporter<S> build(BytesEncoder<S> encoder) {
if (encoder == null) throw new NullPointerException("encoder == null");
if (encoder.encoding() != sender.encoding()) {
throw new IllegalArgumentException(String.format(
"Encoder doesn't match Sender: %s %s", encoder.encoding(), sender.encoding()));
}
final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);
if (messageTimeoutNanos > 0) {
// Start a thread that flushes the queue in a loop.
final BufferNextMessage<S> consumer =
BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
Thread flushThread = threadFactory.newThread(new Flusher<>(result, consumer));
flushThread.setName("AsyncReporter{" + sender + "}");
flushThread.setDaemon(true);
flushThread.start();
}
return result;
}
可以看到AsyncReporter的build方法中,啟動了一個守護線程flushThread,一直循環(huán)調用BoundedAsyncReporter的flush方法:
void flush(BufferNextMessage<S> bundler) {
if (closed.get()) throw new IllegalStateException("closed");
pending.drainTo(bundler, bundler.remainingNanos());
// record after flushing reduces the amount of gauge events vs on doing this on report
metrics.updateQueuedSpans(pending.count);
metrics.updateQueuedBytes(pending.sizeInBytes);
// loop around if we are running, and the bundle isn't full
// if we are closed, try to send what's pending
if (!bundler.isReady() && !closed.get()) return;
// Signal that we are about to send a message of a known size in bytes
metrics.incrementMessages();
metrics.incrementMessageBytes(bundler.sizeInBytes());
ArrayList<byte[]> nextMessage = new ArrayList<>(bundler.count());
bundler.drain(new SpanWithSizeConsumer<S>() {
@Override public boolean offer(S next, int nextSizeInBytes) {
nextMessage.add(encoder.encode(next)); // speculatively add to the pending message
if (sender.messageSizeInBytes(nextMessage) > messageMaxBytes) {
// if we overran the message size, remove the encoded message.
nextMessage.remove(nextMessage.size() - 1);
return false;
}
return true;
}
});
try {
sender.sendSpans(nextMessage).execute();
} catch (IOException | RuntimeException | Error t) {
......
}
}
Flush方法的主要邏輯如下:
- 將隊列pending中的數(shù)據(jù),提取到nextMessage鏈表中;
- 調用Sender的sendSpans方法,發(fā)送到nextMessage鏈表中的Span數(shù)據(jù)到Zipkin;
這樣,Reporter即做到了異步發(fā)送!
Sender
Sender組件完成發(fā)送Span到zipkin服務端的最后一步,即利用某個傳輸協(xié)議,將數(shù)據(jù)發(fā)送到zipkin服務端。
public abstract class Sender extends Component {
public Sender() {
}
public abstract Encoding encoding();
public abstract int messageMaxBytes();
public abstract int messageSizeInBytes(List<byte[]> var1);
public int messageSizeInBytes(int encodedSizeInBytes) {
return this.messageSizeInBytes(Collections.singletonList(new byte[encodedSizeInBytes]));
}
public abstract Call<Void> sendSpans(List<byte[]> var1);
}
其中核心的方法sendSpans是一個抽象方法,不同傳輸協(xié)議的Sender會提供具體的實現(xiàn)邏輯,其子類有:
ActiveMQSender、FakeSender、KafkaSender、LibthriftSender、OkHttpSender、RabbitMQSender、URLConnectionSender。
不同協(xié)議均按照自身協(xié)議規(guī)范執(zhí)行發(fā)送邏輯,因為我們的Demo中用的是OkHttpSender,所以我們主要看看OkHttpSender是如何實現(xiàn)的。Demo中使用如下:
Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
AsyncReporter asyncReporter = AsyncReporter.create(sender);
這里,通過AsyncReporter.create方法,我們將OkHttpSender注入到了Reporter中,那么接下來看看OkHttpSender的sendSpans方法實現(xiàn):
@Override
public zipkin2.Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (closeCalled) throw new IllegalStateException("closed");
Request request;
try {
request = newRequest(encoder.encode(encodedSpans));
} catch (IOException e) {
throw zipkin2.internal.Platform.get().uncheckedIOException(e);
}
return new HttpCall(client.newCall(request));
}
執(zhí)行完這個方法后,會返回一個HttpCall,Reporter的flush方法中會調用HttpCall的execute方法,完成Http請求發(fā)送。
Brave Tracer
Span數(shù)據(jù)結構,包括發(fā)送Span的組件我們搞清楚了,那么誰來負責創(chuàng)建Span了?這就是Tracer的工作,他負責創(chuàng)建Span及提供Span的各種操作,主要方法如下表所示:
| 方法名 | 描述 |
|---|---|
| Span newTrace() | 創(chuàng)建一個Root Span |
| Span joinSpan(TraceContext context) | 公用一個SpanId,主要存在于RPC場景中 |
| Span newChild(TraceContext parent) | 創(chuàng)建一個子Span |
| Span nextSpan(TraceContextOrSamplingFlags extracted) | 基于請求的參數(shù)信息創(chuàng)建一個新的Span |
| Span toSpan(TraceContext context) | 通過TraceContext創(chuàng)建一個Span |
| Span currentSpan() | 獲取當前Span |
| Span nextSpan() | 基于當前Span生成一個子Span |
Brave Tracing
現(xiàn)在Span有了,創(chuàng)建Span的組件有了,發(fā)送Span的組件也有了,那就只需要一個把他們組合起來的類似工廠的角色了,那就是Tracing,他的主要工作就是連接服務器,然后利用Tracer創(chuàng)建出Span,接著發(fā)送Span到zipkin服務端。
Tracing源碼采用的Builder模式,再看看我們Demo中創(chuàng)建Tracing的代碼:
Tracing tracing = Tracing.newBuilder()
.localServiceName("my-service")
.spanReporter(asyncReporter)
.build();
我們Tracing.newBuilder()創(chuàng)建了一個Tracing的Builder,然后指定了這個Tracing的服務名,使用什么Reporter,接著調用了Builder的build方法,我們看看build方法代碼:
public Tracing build() {
if (clock == null) clock = Platform.get().clock();
if (localIp == null) localIp = Platform.get().linkLocalIp();
if (spanReporter == null) spanReporter = new LoggingReporter();
return new Default(this);
}
它調用了Tracing的默認實現(xiàn),默認實現(xiàn)子類如下:
static final class Default extends Tracing {
final Tracer tracer;
final Propagation.Factory propagationFactory;
final Propagation<String> stringPropagation;
final CurrentTraceContext currentTraceContext;
final Sampler sampler;
final Clock clock;
final ErrorParser errorParser;
Default(Builder builder) {
this.clock = builder.clock;
this.errorParser = builder.errorParser;
this.propagationFactory = builder.propagationFactory;
this.stringPropagation = builder.propagationFactory.create(Propagation.KeyFactory.STRING);
this.currentTraceContext = builder.currentTraceContext;
this.sampler = builder.sampler;
zipkin2.Endpoint localEndpoint = zipkin2.Endpoint.newBuilder()
.serviceName(builder.localServiceName)
.ip(builder.localIp)
.port(builder.localPort)
.build();
SpanReporter reporter = new SpanReporter(localEndpoint, builder.reporter, noop);
this.tracer = new Tracer(
builder.clock,
builder.propagationFactory,
reporter,
new PendingSpans(localEndpoint, clock, reporter, noop),
builder.sampler,
builder.errorParser,
builder.currentTraceContext,
builder.traceId128Bit || propagationFactory.requires128BitTraceId(),
builder.supportsJoin && propagationFactory.supportsJoin(),
noop
);
maybeSetCurrent();
}
從上面可以看到,主要干的工作有:
- 根據(jù)Spanreporter,生成FinishedSpanHandler,發(fā)送Span用;
- 根據(jù)FinishedSpanHandler以及其他默認信息生成Tracer;
OK,現(xiàn)在對于DEMO中的Brave的上報數(shù)據(jù)流程和原理是不是清楚了不少!
鏈路上報總結
Zipkin鏈路上報看起來很復雜,其實剝離各種封裝,去除各種組件,其主線邏輯就是如下三步:
- 構造span對象,包括traceId,parentId,以及其自身的spanId等參數(shù);
- 選一種編解碼協(xié)議,比如JSON,或者THRIF,或者PROTO3對Span進行編碼;
- 將編碼后的span,通過利用一種傳輸協(xié)議上報到服務端;
還有一塊未分析:Brave是如何Support各個組件的。因為本文內容較多,放到以后的文章分析!
后記
本文為我的調用鏈系列文章之一,已有文章如下: