zipkin 數(shù)據(jù)上報的研究

前言:ZIPKIN作為當前“分布式服務鏈路跟蹤”問題的流行解決方案之一,正在被越來越多的公司和個人學習使用。其中很重要的一塊,就是上報鏈路數(shù)據(jù)。那么知道服務端如何接收數(shù)據(jù),以及我們該怎樣上報數(shù)據(jù)到服務端就顯得十分重要。雖然ZIPKIN官方也開源了一個客戶端Brave,但是本文卻并不想直接介紹Brave,而是想站在一個從零開發(fā)ZIPKIN客戶端的角度,一層層分析解決如何自己寫一個ZIPKIN客戶端,直到最后引出Brave。本文最終想達到的效果,就是希望通過本文,能讓大家對ZIPKIN的鏈路上報有一個詳盡的理解和認識。

零行代碼快速開始

ZIPKIN是Spring Boot服務,因此啟動起來十分方便,直接運行ZIPKIN JAR包就可以了。ZIPKIN JAR包我們可以自行編譯ZIPKIN源碼獲得,也可以從下面的倉庫獲取,這個倉庫專門存放ZIPKIN各種編譯好的JAR包,倉庫地址為:

https://dl.bintray.com/openzipkin/maven/io/zipkin/java/

進入zipkin-server目錄下載server jar包,比如下載2.11.5版本,運行jar包:

java -jar zipkin-server-2.11.5-exec.jar

這樣本地就起了一個ZIPKIN服務,瀏覽器中輸入http://localhost:9411/zipkin/,即可打開ZIPKIN首頁,效果如下:

image

接下來,我們開始上報數(shù)據(jù),現(xiàn)在我不想寫任何代碼,那么就用postman發(fā)起一次post請求上報數(shù)據(jù)吧:

image

上報后在本地ZIPKIN服務上即可看到剛上報的數(shù)據(jù)效果:

image

沒有任何一句代碼,一個完整的ZIPKIN數(shù)據(jù)上報存儲展示就完成了,那么我們來思考下:

1、如果自己寫一個上ZIPKIN客戶端,該如何寫?

分析:要上傳數(shù)據(jù)給服務端,那么必須要搞清楚,ZIPKIN服務端可以接受什么樣格式的數(shù)據(jù)?支持的編碼解碼協(xié)議是什么?還有支持那些通信傳輸協(xié)議?

Task: 了解了服務怎么接收數(shù)據(jù)后,客戶端的task也就明確了:
1)以ZIPKIN服務端支持的數(shù)據(jù)格式組織數(shù)據(jù);
2)以ZIPKIN服務端支持的編解碼協(xié)議編碼數(shù)據(jù);
3)以ZIPKIN服務端支持的傳輸協(xié)議上報數(shù)據(jù)。

2、基礎ZIPKIN客戶端完成后,如何適配各種組件?

描述:一個服務通常涉及多個組件,包括服務框架,消息中間件,各種數(shù)據(jù)庫等,那么怎么上報這些組件的數(shù)據(jù)給服務端了,值得我們思考?

分析:其實最簡單的方法就是采用一個裝飾者模式包裝一下組件接口,在其中加入上報的邏輯,但是這種方法局限很大,不靈活。除此之外,我們或許我們可以利用下攔截器技術,AOP技術,以及Agent,探針等技術做到無侵入上報數(shù)據(jù)。

服務端如何接收數(shù)據(jù)的?

要想自己寫一個ZIPKIN客戶端,必須搞清楚ZIPKIN服務端是怎么接收數(shù)據(jù)的,包括數(shù)據(jù)格式協(xié)議是怎樣的?編解碼協(xié)議是怎樣的?還有支持那么傳輸協(xié)議?

ZIPKIN支持的數(shù)據(jù)格式是怎樣的?

ZIPKIN是兼容OpenTracing標準的,OpenTracing規(guī)定,每個Span包含如下狀態(tài):

  • 操作名稱
  • 起始時間
  • 結束時間
  • 一組 KV 值,作為階段的標簽(Span Tags)
  • 階段日志(Span Logs)
  • 階段上下文(SpanContext),其中包含 Trace ID 和 Span ID
  • 引用關系(References)

OpenTracing規(guī)范與zipkin對應關系如下:

OpenTracing ZIPKIN
操作名稱 name
起始時間 timestamp
結束時間 timestamp+duration
標簽 tags
Span上下文 traceId; id
引用關系 parentId
Span日志

除此之外,ZIPKIN SAPN還增加了其他幾個字端:

字端 描述
Kind kind Spanl類型,比如是Server還是Client
Annotaion 表示某個時間點發(fā)生的Event,Event類型:cs:Client Send 請求;sr:Server Receive到請求;ss:Server 處理完成、并Send Response;cr:Client Receive 到響應
Endpoint localEndpoint 描述本地服務信息,比如服務名,ip等,方便根據(jù)服務名檢索鏈路
Endpoint remoteEndpoint RPC調用時,描述遠程服務的服務信息

最終,包含兼容OpenTracing標準的字端,以及其本身的一些字端后,zipkin的span數(shù)據(jù)字端如下:

image

了解了ZIPKIN的數(shù)據(jù)字端格式后,我們再看看ZIPKIN支持的編解碼協(xié)議。

ZIPKIN支持那些編解碼協(xié)議?

Zipkin主要支持三種編解碼協(xié)議,分別為JSON, PROTO3, THRIFT。

JSON編解碼如下:

JSON_V1 {
    public Encoding encoding() {
        return Encoding.JSON;
    }

    public boolean decode(byte[] bytes, Collection<Span> out) {
        Span result = this.decodeOne(bytes);
        if (result == null) {
            return false;
        } else {
            out.add(result);
            return true;
        }
    }

    public boolean decodeList(byte[] spans, Collection<Span> out) {
        return (new V1JsonSpanReader()).readList(spans, out);
    }

    public Span decodeOne(byte[] span) {
        V1Span v1 = (V1Span)JsonCodec.readOne(new V1JsonSpanReader(), span);
        List<Span> out = new ArrayList(1);
        V1SpanConverter.create().convert(v1, out);
        return (Span)out.get(0);
    }

    public List<Span> decodeList(byte[] spans) {
        return decodeList(this, spans);
    }
}

JSON_V2 {
    public Encoding encoding() {
        return Encoding.JSON;
    }

    public boolean decode(byte[] span, Collection<Span> out) {
        return JsonCodec.read(new V2SpanReader(), span, out);
    }

    public boolean decodeList(byte[] spans, Collection<Span> out) {
        return JsonCodec.readList(new V2SpanReader(), spans, out);
    }

    @Nullable
    public Span decodeOne(byte[] span) {
        return (Span)JsonCodec.readOne(new V2SpanReader(), span);
    }

    public List<Span> decodeList(byte[] spans) {
        return decodeList(this, spans);
    }
}

THRIFT編解碼如下:

THRIFT {
    public Encoding encoding() {
        return Encoding.THRIFT;
    }

    public boolean decode(byte[] span, Collection<Span> out) {
        return ThriftCodec.read(span, out);
    }

    public boolean decodeList(byte[] spans, Collection<Span> out) {
        return ThriftCodec.readList(spans, out);
    }

    public Span decodeOne(byte[] span) {
        return ThriftCodec.readOne(span);
    }

    public List<Span> decodeList(byte[] spans) {
        return decodeList(this, spans);
    }
}

PROTO3編解碼如下:

PROTO3 {
    public Encoding encoding() {
        return Encoding.PROTO3;
    }

    public boolean decode(byte[] span, Collection<Span> out) {
        return Proto3Codec.read(span, out);
    }

    public boolean decodeList(byte[] spans, Collection<Span> out) {
        return Proto3Codec.readList(spans, out);
    }

    @Nullable
    public Span decodeOne(byte[] span) {
        return Proto3Codec.readOne(span);
    }

    public List<Span> decodeList(byte[] spans) {
        return decodeList(this, spans);
    }
}

其中Json是默認支持的,也是使用起來最方便的,除此之外,還包括Thirft和Proto3可供開發(fā)者選擇。

ZIPKIN支持那些傳輸協(xié)議?

Zipkin默認支持Http協(xié)議,除此之外,它還支持kafka,rabbitmq以及scribe協(xié)議:

image

他們的初始化過程如下:

image

傳輸協(xié)議支持的編解碼協(xié)議如下:

image

其中Scribe限定了只支持Thirft協(xié)議,而HTTP、Kafka和RabbitMQ則是三種協(xié)議都支持。

如何做到支持所有的編碼解碼協(xié)議了?ZIPKIN中提供了一個自動探測編解碼的類SpanBytesDecoderDetector,其中核心方法如下:

static BytesDecoder<Span> detectDecoder(byte[] bytes) {
    if (bytes[0] <= 16) { // binary format
    if (protobuf3(bytes)) return SpanBytesDecoder.PROTO3;
        return SpanBytesDecoder.THRIFT; /* the first byte is the TType, in a range 0-16 */
    } else if (bytes[0] != '[' && bytes[0] != '{') {
        throw new IllegalArgumentException("Could not detect the span format");
    }
    if (contains(bytes, ENDPOINT_FIELD_SUFFIX)) return SpanBytesDecoder.JSON_V2;
    if (contains(bytes, TAGS_FIELD)) return SpanBytesDecoder.JSON_V2;
    return SpanBytesDecoder.JSON_V1;
}

現(xiàn)在我們已經知道了zipkin服務接收的數(shù)據(jù)格式以及編解碼協(xié)議和傳輸協(xié)議,那么接下來就可以寫一個客戶端了!

自己寫一個ZIPKIN客戶端

對于服務端如何接收數(shù)據(jù),有了一個全面的認識后,我們就可以著手開始寫一個ZIPKIN客戶端了。

那么,首先定義客戶端上報的數(shù)據(jù)格式,最簡單的方式就是定義一個跟ZIPKIN服務端一樣數(shù)據(jù)格式的Span就可以了:

@Setter
@Getter
public static class MySapn {
    private String traceId;
    private String parentId;
    private String id;
    private String name;
    private long timestamp;
    private long duration;
    private Map<String, String> tags;
    String kind;
    Endpoint localEndpoint, remoteEndpoint;

    public static enum Kind {
        CLIENT,
        SERVER,
        PRODUCER,
        CONSUMER
    }

    public static class Endpoint {
        String serviceName, ipv4, ipv6;
        byte[] ipv4Bytes, ipv6Bytes;
        int port; // zero means null

        public Endpoint(String serviceName) {
            this.serviceName = serviceName;
        }
    }
}

數(shù)據(jù)格式確定后,接著就編碼數(shù)據(jù),ZIPKIN支持三種編碼方式,JSON、THIFT和PROTO3,為了簡單方便,我們選擇JSON協(xié)議編碼Span數(shù)據(jù)。注意,ZIPKIN JSON字符串前后需要加括號。

數(shù)據(jù)編碼后,接著上報數(shù)據(jù),ZIPKIN默認支持HTTP協(xié)議方式,JAVA HTTP請求包很多,我們隨便選擇一種,比如選擇Apach的HttpClient jar包,代碼如下:

public class App {

    private static final String serverUrl = "http://localhost:9411/api/v2/spans";

    public static void main(String[] args) {
        MySapn span = new MySapn();
        span.traceId = "1ae1e4f435814744";
        span.parentId = "1ae1e4f435814744";
        span.id = "d1ab9cd2c50d13d1";
        span.kind = MySapn.Kind.SERVER.toString();
        span.name = "my client test";
        span.timestamp = 1565933251470428L;
        span.duration = 8286;
        span.localEndpoint = new MySapn.Endpoint("My client");
        Map<String, String> tags = new HashMap<>();
        tags.put("name", "pioneeryi");
        tags.put("lover", "dandan");
        span.tags = tags;

        doPost(serverUrl, span);
        System.out.println("Hello World!");
    }

    public static void doPost(String url, MySapn span) {
        try {
            HttpClient httpClient = new DefaultHttpClient();

            HttpPost post = new HttpPost(url);
            post.setHeader("Content-Type", "application/json");
            post.setHeader("charset", "UTF-8");

            String body = new Gson().toJson(span);
            body = "[" + body + "]";
            System.out.print(body);

            StringEntity entity = new StringEntity(body);
            post.setEntity(entity);

            HttpResponse httpResponse = httpClient.execute(post);
            System.out.print(httpResponse);
        } catch (Exception exception) {
            System.out.print("do post request fail");
        }
    }
}

maven pom如下:

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.6</version>
</dependency>

<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.5</version>
</dependency>

運行上面main方法,即可上報Span數(shù)據(jù),打開zipkin首頁:http://localhost:9411/zipkin/,搜索剛上報的Span的TraceId,展示效果如下:

image

一個最簡單的,采用JSON編碼數(shù)據(jù),HTTP協(xié)議上傳數(shù)據(jù)的客戶端我們就完成了。為了用戶調用方便,我們可以將上面的代碼封裝為一個接口供用戶使用:

public void reportSpan(MySpan span)

但是這個太簡陋了,我們只支持了一種一個編解碼協(xié)議,一種傳輸協(xié)議,開始優(yōu)化:

優(yōu)化一:支持多種編解碼,支持多種傳輸協(xié)議。這個時候就需要定一個上報接口類了,根據(jù)不同傳輸協(xié)議提供不同實現(xiàn)。編解碼也是同樣的,定義編碼接口,根據(jù)不同編碼協(xié)議提供不同實現(xiàn)。

此外,當前這個客戶端是同步上報的,性能很差,因此必須改成異步上報,接著優(yōu)化:

優(yōu)化二:上報改成異步上報,隊列+線程。

有了這兩步優(yōu)化后,client大體框架就初步成型了,寫好了客戶端后,怎么適配各個組件,做到無侵入上報也很重要,繼續(xù)優(yōu)化:

優(yōu)化三:適配各個組件,比如Spring Boot,Kafak,MySql等等。

一個完整的Client,還是有很多工作要做的,這里咱就不繼續(xù)深入優(yōu)化開發(fā)了,直接看看官方的Brave怎么做的!

探究ZIPKIN客戶端Brave

為了說明Brave的使用和上報過程,我們先寫一個很簡單的上報Demo,進行演示。Demo將上報一個“一父Span,兩個子Span“的鏈路,demo如下:

public class TraceDemo {
    public static void main(String[] args) {
        Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
        AsyncReporter asyncReporter = AsyncReporter.create(sender);
        Tracing tracing = Tracing.newBuilder()
            .localServiceName("my-service")
            .spanReporter(asyncReporter)
            .build();
        Tracer tracer = tracing.tracer();

        Span parentSpan = tracer.newTrace().name("parent span").start();

        Span childSpan1 = tracer.newChild(parentSpan.context()).name("child span1").start();
        sleep(500);
        childSpan1.finish();

        Span childSpan2 = tracer.newChild(parentSpan.context()).name("child span2").start();
        sleep(500);
        childSpan2.finish();

        parentSpan.finish();
    }
}

其中 maven pom 引入如下包:

<dependency>
    <groupId>io.zipkin.brave</groupId>
    <artifactId>brave</artifactId>
    <version>5.3.3</version>
</dependency>

<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-sender-okhttp3</artifactId>
    <version>2.6.0</version>
</dependency>

啟動zipkin 服務:

java -jar zipkin-server-2.11.5-exec.jar

然后在瀏覽器中輸入:http://localhost:9411 ,即可打開zipkin首頁,看到示例代碼上報的效果圖如下:

image

一個最簡單的但是卻是很完整的一個ZIPKIN鏈路上報演示,就如上面Demo所示,那么接下來分析一下整個鏈路的上報過程!

鏈路上報解析

自己開發(fā)一個客戶端時,我們首先會封裝一個Span類,Brave也不例外,它也定義了Span數(shù)據(jù)結構;那么定義好Span后,誰來負責構造Span了?

Brave中定義了一個類叫Tracer來完成構造Span的工作;Brave生成好了Span后,此時需要編碼發(fā)送了,那么誰又來發(fā)送了?

Brave定義了Reporter組件,它支持異步發(fā)送以及多傳輸協(xié)議以及多編碼協(xié)議發(fā)送Span數(shù)據(jù)。

看起來,各個組件已經完備了,組件有點多!此時需要那么一個人將這些組件組織起來,同時與服務端取得聯(lián)系,開始打通整個流程了,這就是Tracing類的功能。

接下來詳細講解一下各個組件的功能,并根據(jù)Demo代碼,將各個組件串起來,最終梳理清楚Brave上報的流程和原理。

Brave Span

Brave中Span相關類有:SpanCustomizer、NoopSpanCustomizer、CurrentSpanCustomizer、RealSpanCustomizer、Span、NoopSpan、RealSpan。

示例代碼中,關于span的操作如下:

Span span = tracer.newTrace().name("encode").start();
try {
  doSomethingExpensive();
} finally {
  span.finish();
}

首先通過tracer生成一個span,最后,調用span.finish()上報,接下來就來看看span,以及這個finish干了什么。

咱們用的Span的實現(xiàn)子類為RealSpan,RealSpan兩個核心方法start, finish:

@Override 
public Span start() {
    return start(clock.currentTimeMicroseconds());
}

@Override 
public Span start(long timestamp) {
    synchronized (state) {
        state.startTimestamp(timestamp);
    }
    return this;
}

start方法主要記錄開始時間,接下來看看finish 方法:

@Override 
public void finish(long timestamp) {
    if (!pendingSpans.remove(context)) return;
    synchronized (state) {
        state.finishTimestamp(timestamp);
    }
    finishedSpanHandler.handle(context, state);
}

這里交給FinishedSpanHandler來處理。FinishedSpanHandler是一個抽象類,他有如下子類實現(xiàn):

  • ZipkinFinishedSpanHandler
  • MetricsFinishedSpanHandler
  • NoopAwareFinishedSpan
  • CompositeFinishedSpanHandler

我們主要關注ZipkinFinishedSpanHandler實現(xiàn):

public final class ZipkinFinishedSpanHandler extends FinishedSpanHandler {
    final Reporter<zipkin2.Span> spanReporter;
    final MutableSpanConverter converter;

    public ZipkinFinishedSpanHandler(Reporter<zipkin2.Span> spanReporter,
    ErrorParser errorParser, String serviceName, String ip, int port) {
        this.spanReporter = spanReporter;
        this.converter = new MutableSpanConverter(errorParser, serviceName, ip, port);
    }

    @Override public boolean handle(TraceContext context, MutableSpan span) {
        if (!Boolean.TRUE.equals(context.sampled())) return true;

        Span.Builder builderWithContextData = Span.newBuilder()
            .traceId(context.traceIdString())
            .parentId(context.parentIdString())
            .id(context.spanIdString());
        if (context.debug()) builderWithContextData.debug(true);

        converter.convert(span, builderWithContextData);
        spanReporter.report(builderWithContextData.build());
        return true;
    }

    @Override public String toString() {
        return spanReporter.toString();
    }
}

可見,上面最終是通過Reporter組件來上報數(shù)據(jù)的。那么Report是如何上報的了?

Brave Reporter

因為上報組件要支持多種編碼協(xié)議以及多種傳輸協(xié)議,因此邏輯比較復雜,官方專門建了一個項目:zipkin-reporter-java

我們首先看看Reporter接口類定義:

public interface Reporter<S> {
    Reporter<Span> NOOP = new Reporter<Span>() {
        @Override public void report(Span span) {
        }

        @Override public String toString() {
        return "NoopReporter{}";
        }
    };
    Reporter<Span> CONSOLE = new Reporter<Span>() {
        @Override public void report(Span span) {
             System.out.println(span.toString());
        }

        @Override public String toString() {
            return "ConsoleReporter{}";
        }
    };

    /**
    * Schedules the span to be sent onto the transport.
    *
    * @param span Span, should not be <code>null</code>.
    */
    void report(S span);
}

Reporter有三個子類實現(xiàn),分別是CONSOLE,NOOP,和AsyncReporter。CONSOLE就是直接控制臺打印出Span數(shù)據(jù),一般用來調試差不多;NOOP是一個空實現(xiàn),啥也不干;AsyncReporter是我們平時上報用的Reporter,他提供異步上報數(shù)據(jù)能力。

AsyncReporter

AsyncReporter is how you actually get spans to zipkin. By default, it waits up to a second before flushes any pending spans out of process via a Sender.

根據(jù)不同協(xié)議,Ayncreporter執(zhí)行相應的build邏輯:

public AsyncReporter<Span> build() {
    switch(this.sender.encoding()) {
        case JSON:
            return this.build(SpanBytesEncoder.JSON_V2);
        case PROTO3:
            return this.build(SpanBytesEncoder.PROTO3);
        case THRIFT:
            return this.build(SpanBytesEncoder.THRIFT);
        default:
            throw new UnsupportedOperationException(this.sender.encoding().name());
    }
}

build方法詳細邏輯,如下:

public <S> AsyncReporter<S> build(BytesEncoder<S> encoder) {
    if (encoder == null) throw new NullPointerException("encoder == null");

    if (encoder.encoding() != sender.encoding()) {
        throw new IllegalArgumentException(String.format(
    "Encoder doesn't match Sender: %s %s", encoder.encoding(), sender.encoding()));
    }

    final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);

    if (messageTimeoutNanos > 0) { 
        // Start a thread that flushes the queue in a loop.
        final BufferNextMessage<S> consumer =
        BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
        Thread flushThread = threadFactory.newThread(new Flusher<>(result, consumer));
        flushThread.setName("AsyncReporter{" + sender + "}");
        flushThread.setDaemon(true);
        flushThread.start();
    }
    return result;
}

可以看到AsyncReporter的build方法中,啟動了一個守護線程flushThread,一直循環(huán)調用BoundedAsyncReporter的flush方法:

void flush(BufferNextMessage<S> bundler) {
    if (closed.get()) throw new IllegalStateException("closed");
    pending.drainTo(bundler, bundler.remainingNanos());

    // record after flushing reduces the amount of gauge events vs on doing this on report
    metrics.updateQueuedSpans(pending.count);
    metrics.updateQueuedBytes(pending.sizeInBytes);

    // loop around if we are running, and the bundle isn't full
    // if we are closed, try to send what's pending
    if (!bundler.isReady() && !closed.get()) return;

    // Signal that we are about to send a message of a known size in bytes
    metrics.incrementMessages();
    metrics.incrementMessageBytes(bundler.sizeInBytes());
    ArrayList<byte[]> nextMessage = new ArrayList<>(bundler.count());
    bundler.drain(new SpanWithSizeConsumer<S>() {
        @Override public boolean offer(S next, int nextSizeInBytes) {
            nextMessage.add(encoder.encode(next)); // speculatively add to the pending message
            if (sender.messageSizeInBytes(nextMessage) > messageMaxBytes) {
                // if we overran the message size, remove the encoded message.
                nextMessage.remove(nextMessage.size() - 1);
                return false;
            }
            return true;
        }
    });

    try {
        sender.sendSpans(nextMessage).execute();
    } catch (IOException | RuntimeException | Error t) {
        ......
    }
}

Flush方法的主要邏輯如下:

  • 將隊列pending中的數(shù)據(jù),提取到nextMessage鏈表中;
  • 調用Sender的sendSpans方法,發(fā)送到nextMessage鏈表中的Span數(shù)據(jù)到Zipkin;

這樣,Reporter即做到了異步發(fā)送!

Sender

Sender組件完成發(fā)送Span到zipkin服務端的最后一步,即利用某個傳輸協(xié)議,將數(shù)據(jù)發(fā)送到zipkin服務端。

public abstract class Sender extends Component {
    public Sender() {
    }

    public abstract Encoding encoding();

    public abstract int messageMaxBytes();

    public abstract int messageSizeInBytes(List<byte[]> var1);

    public int messageSizeInBytes(int encodedSizeInBytes) {
        return this.messageSizeInBytes(Collections.singletonList(new byte[encodedSizeInBytes]));
    }

    public abstract Call<Void> sendSpans(List<byte[]> var1);
}

其中核心的方法sendSpans是一個抽象方法,不同傳輸協(xié)議的Sender會提供具體的實現(xiàn)邏輯,其子類有:
ActiveMQSender、FakeSender、KafkaSender、LibthriftSender、OkHttpSender、RabbitMQSender、URLConnectionSender。

不同協(xié)議均按照自身協(xié)議規(guī)范執(zhí)行發(fā)送邏輯,因為我們的Demo中用的是OkHttpSender,所以我們主要看看OkHttpSender是如何實現(xiàn)的。Demo中使用如下:

Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
AsyncReporter asyncReporter = AsyncReporter.create(sender);

這里,通過AsyncReporter.create方法,我們將OkHttpSender注入到了Reporter中,那么接下來看看OkHttpSender的sendSpans方法實現(xiàn):

@Override 
public zipkin2.Call<Void> sendSpans(List<byte[]> encodedSpans) {
    if (closeCalled) throw new IllegalStateException("closed");
    Request request;
    try {
        request = newRequest(encoder.encode(encodedSpans));
    } catch (IOException e) {
        throw zipkin2.internal.Platform.get().uncheckedIOException(e);
    }
    return new HttpCall(client.newCall(request));
}

執(zhí)行完這個方法后,會返回一個HttpCall,Reporter的flush方法中會調用HttpCall的execute方法,完成Http請求發(fā)送。

Brave Tracer

Span數(shù)據(jù)結構,包括發(fā)送Span的組件我們搞清楚了,那么誰來負責創(chuàng)建Span了?這就是Tracer的工作,他負責創(chuàng)建Span及提供Span的各種操作,主要方法如下表所示:

方法名 描述
Span newTrace() 創(chuàng)建一個Root Span
Span joinSpan(TraceContext context) 公用一個SpanId,主要存在于RPC場景中
Span newChild(TraceContext parent) 創(chuàng)建一個子Span
Span nextSpan(TraceContextOrSamplingFlags extracted) 基于請求的參數(shù)信息創(chuàng)建一個新的Span
Span toSpan(TraceContext context) 通過TraceContext創(chuàng)建一個Span
Span currentSpan() 獲取當前Span
Span nextSpan() 基于當前Span生成一個子Span

Brave Tracing

現(xiàn)在Span有了,創(chuàng)建Span的組件有了,發(fā)送Span的組件也有了,那就只需要一個把他們組合起來的類似工廠的角色了,那就是Tracing,他的主要工作就是連接服務器,然后利用Tracer創(chuàng)建出Span,接著發(fā)送Span到zipkin服務端。

Tracing源碼采用的Builder模式,再看看我們Demo中創(chuàng)建Tracing的代碼:

Tracing tracing = Tracing.newBuilder()
            .localServiceName("my-service")
            .spanReporter(asyncReporter)
            .build();

我們Tracing.newBuilder()創(chuàng)建了一個Tracing的Builder,然后指定了這個Tracing的服務名,使用什么Reporter,接著調用了Builder的build方法,我們看看build方法代碼:

public Tracing build() {
    if (clock == null) clock = Platform.get().clock();
    if (localIp == null) localIp = Platform.get().linkLocalIp();
    if (spanReporter == null) spanReporter = new LoggingReporter();
    return new Default(this);
}

它調用了Tracing的默認實現(xiàn),默認實現(xiàn)子類如下:

static final class Default extends Tracing {
    final Tracer tracer;
    final Propagation.Factory propagationFactory;
    final Propagation<String> stringPropagation;
    final CurrentTraceContext currentTraceContext;
    final Sampler sampler;
    final Clock clock;
    final ErrorParser errorParser;

    Default(Builder builder) {
      this.clock = builder.clock;
      this.errorParser = builder.errorParser;
      this.propagationFactory = builder.propagationFactory;
      this.stringPropagation = builder.propagationFactory.create(Propagation.KeyFactory.STRING);
      this.currentTraceContext = builder.currentTraceContext;
      this.sampler = builder.sampler;
      zipkin2.Endpoint localEndpoint = zipkin2.Endpoint.newBuilder()
          .serviceName(builder.localServiceName)
          .ip(builder.localIp)
          .port(builder.localPort)
          .build();
      SpanReporter reporter = new SpanReporter(localEndpoint, builder.reporter, noop);
      this.tracer = new Tracer(
          builder.clock,
          builder.propagationFactory,
          reporter,
          new PendingSpans(localEndpoint, clock, reporter, noop),
          builder.sampler,
          builder.errorParser,
          builder.currentTraceContext,
          builder.traceId128Bit || propagationFactory.requires128BitTraceId(),
          builder.supportsJoin && propagationFactory.supportsJoin(),
          noop
      );
      maybeSetCurrent();
    }

從上面可以看到,主要干的工作有:

  • 根據(jù)Spanreporter,生成FinishedSpanHandler,發(fā)送Span用;
  • 根據(jù)FinishedSpanHandler以及其他默認信息生成Tracer;

OK,現(xiàn)在對于DEMO中的Brave的上報數(shù)據(jù)流程和原理是不是清楚了不少!

鏈路上報總結

Zipkin鏈路上報看起來很復雜,其實剝離各種封裝,去除各種組件,其主線邏輯就是如下三步:

  • 構造span對象,包括traceId,parentId,以及其自身的spanId等參數(shù);
  • 選一種編解碼協(xié)議,比如JSON,或者THRIF,或者PROTO3對Span進行編碼;
  • 將編碼后的span,通過利用一種傳輸協(xié)議上報到服務端;

還有一塊未分析:Brave是如何Support各個組件的。因為本文內容較多,放到以后的文章分析!

后記

本文為我的調用鏈系列文章之一,已有文章如下:

轉自: http://www.itdecent.cn/p/17ce989e108e

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容