本篇文章主要是閱讀官網(wǎng)的scheduler文檔。原文鏈接http://reactivex.io/documentation/scheduler.html
彈珠圖
彈珠圖可以很好的描述rxjava的運(yùn)作流程,建議先看下這篇文章Understanding Marble Diagrams for Reactive Streams
簡單scheduler
輸出結(jié)果: RxNewThreadScheduler-1 hello world!
public class SchedulersWork {
private static final CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(SchedulersWork::sayHello);
latch.await();
worker.dispose();
}
public static void sayHello() {
System.out.println(Thread.currentThread().getName() + " hello world!");
latch.countDown();
}
}
遞歸scheduler
輸出結(jié)果: RxNewThreadScheduler-1 hello world! ...
循環(huán)輸出直到dispose worker
public class RecursiveScheduler {
public static void main(String[] args) throws InterruptedException {
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(new Runnable() {
@Override
public void run() {
sayHello();
//遞歸直到dispose
worker.schedule(this);
}
});
Thread.sleep(1000);
worker.dispose();
}
public static void sayHello() {
System.out.println(Thread.currentThread().getName() + " hello world!");
}
}
??: 遞歸調(diào)用需要限制遞歸次數(shù)或者主動(dòng)設(shè)置dispose狀態(tài),否則會(huì)出現(xiàn)死循環(huán)。
檢查或者設(shè)置dispose狀態(tài)
package com.zihao.schedulers;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Scanner;
/**
* 相比較普通的recursive 調(diào)度器,使用dispose狀態(tài)檢查
* 如果不訂閱了 就停止并釋放資源
*
* @author tangzihao
* @Date 2021/1/3 11:20 上午
*/
public class CheckDisposeRecursiveScheduler {
public static void main(String[] args) throws InterruptedException {
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(new Runnable() {
@Override
public void run() {
while (!worker.isDisposed()) {
sayHello();
}
System.out.println("worker被終止了。。。");
}
});
//主線程通過終端控制worker結(jié)束
Scanner scan = new Scanner(System.in);
System.out.println("終止worker工作: ");
if (scan.hasNext()) {
String str = scan.next();
if (str.equals("stop")) {
worker.dispose();
}
}
scan.close();
}
public static void sayHello() {
//do nothing 故意不輸出方便終端輸入
}
}
輸出結(jié)果
終止worker工作:
stop //在控制臺(tái)輸入 main線程
worker被終止了。。。 //worker線程
延遲或周期性調(diào)度器
delayed scheduler
schedule有三個(gè)參數(shù),調(diào)用的方法,延遲的時(shí)間,時(shí)間單位
public class DelayedAndPeriodicScheduler {
public static void main(String[] args) throws InterruptedException {
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(DelayedAndPeriodicScheduler::sayHello,1, TimeUnit.SECONDS);
Thread.sleep(2000);
}
public static void sayHello(){
System.out.println("hello,world!");
}
}
periodic scheduler
schedulePeriodically有四個(gè)參數(shù),調(diào)用的方法,延遲的時(shí)間,周期性時(shí)間,時(shí)間單位
public class DelayedAndPeriodicScheduler {
public static void main(String[] args) throws InterruptedException {
periodicScheduler();
}
public static void periodicScheduler() throws InterruptedException {
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedulePeriodically(DelayedAndPeriodicScheduler::sayHello, 500, 250, TimeUnit.MILLISECONDS);
Thread.sleep(3000);
}
public static void sayHello() {
System.out.println("hello,world!");
}
}