//PublishSubject -> 會(huì)發(fā)送訂閱者從訂閱之后的事件序列
PublishSubjectlet?
sequenceOfInts = PublishSubject()
//在新的訂閱對(duì)象訂閱的時(shí)候會(huì)補(bǔ)發(fā)所有已經(jīng)發(fā)送過的數(shù)據(jù)隊(duì)列,bufferSize 是緩沖區(qū)的大小,決定了補(bǔ)發(fā)隊(duì)列的最大值。如果 bufferSize 是1,那么新的訂閱者出現(xiàn)的時(shí)候就會(huì)補(bǔ)發(fā)上一個(gè)事件,如果是2,則補(bǔ)兩個(gè).
ReplaySubject
let replaySubject = ReplaySubject.create(bufferSize: 2)
//在新的訂閱對(duì)象訂閱的時(shí)候會(huì)發(fā)送最近發(fā)送的事件,如果沒有則發(fā)送一個(gè)默認(rèn)值。
BehaviorSubject
let behaviorSubject = BehaviorSubject.init(value: "z")
Variable
Variable 是基于 BehaviorSubject 的一層封裝,它的優(yōu)勢是:不會(huì)被顯式終結(jié)。即:不會(huì)收到 .Completed 和 .Error 這類的終結(jié)事件,它會(huì)主動(dòng)在析構(gòu)的時(shí)候發(fā)送 .Complete 。
subscribe
//訂閱各種不同類型的事件
let disposeBag = DisposeBag()
Observable.of("??", "??", "??", "??")
.subscribe(onNext: { (str) in
print(str,#line,#function)
}, onError: { (error) in
print(error,#line,#function)
}, onCompleted: {
},onDisposed:{
print("釋放")
}).disposed(by: disposeBag)
Never
create an Observable that emits no items and does not terminate

Throw
create an Observable that emits no items and terminates with an error

Empty
create an Observable that emits no items but terminates normally

just
just 是只包含一個(gè)元素的序列,它會(huì)先發(fā)送 .Next(value) ,然后發(fā)送 .Completed 。
of
創(chuàng)建一個(gè)可觀測序列與固定數(shù)量的元素。
let disposeBag = DisposeBag()
Observable.of("??", "??", "??", "??")
.subscribe(onNext: { element in
print(element)
})
.disposed(by: disposeBag)
from
//from 創(chuàng)建一個(gè)可觀測序列的序列,如一個(gè)數(shù)組,字典,或一組。
Observable.from(["??", "??", "??", "??"])
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
create
//Creates a custom Observable sequence.
let disposeBag = DisposeBag()? ? ? ?
?let myJust = { (element: String) -> Observablein
return Observable.create { observer in
observer.on(.next(element))
observer.on(.completed)
return Disposables.create()
}
}
myJust("??")
.subscribe { print($0) }
.disposed(by: disposeBag)
repeatElement
//創(chuàng)建一個(gè)可觀測序列無限期發(fā)出給定的元素。
Observable.repeatElement("??")
.take(3)//制定數(shù)量
.subscribe(onNext: { print($0,#line) })
.disposed(by: disposeBag)
range
//創(chuàng)建一個(gè)可觀測序列發(fā)出一系列連續(xù)的整數(shù),然后終止
let disposeBag = DisposeBag()
Observable.range(start: 1, count: 10)
.subscribe { print($0) }
.disposed(by: disposeBag)
generate
//創(chuàng)建一個(gè)可觀測序列生成的值,只要所提供的條件的求值結(jié)果為true。
Observable.generate(
initialState: 0,
condition: { $0 < 3 },
scheduler: CurrentThreadScheduler.instance,
iterate: { $0 + 1 }
)
.subscribe(onNext: { print($0,#line) })
.disposed(by: disposeBag)
deferred
會(huì)等到有訂閱者的時(shí)候再通過工廠方法創(chuàng)建 Observable 對(duì)象,每個(gè)訂閱者訂閱的對(duì)象都是內(nèi)容相同而完全獨(dú)立的序列。
let disposeBag = DisposeBag()? ? var count = 1? ? ? ??
let deferredSequence = Observable.deferred {
print("Creating \(count)")
count += 1
return Observable.create { observer in
print("Emitting...")
observer.onNext("??")
observer.onNext("??")
observer.onNext("??")
return Disposables.create()
}
}
deferredSequence
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
deferredSequence
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
map?
就是對(duì)每個(gè)元素都用函數(shù)做一次轉(zhuǎn)換,挨個(gè)映射一遍。
let aa = sequenceOfInts.map{ i -> Int in
print("MAP---\(i)")
return i * 2
}
flatMap
struct Player {? ? ? ? ? ??
var score: Variable
}? ? ? ? ? ? ? ??
let ???? = Player(score: Variable(80))? ? ? ?
?let ???? = Player(score: Variable(90))? ? ? ? ? ? ? ??
let player = Variable(????)? ? ? ? ? ? ? ??
player.asObservable()? ? ? ? ? ??
.flatMap { $0.score.asObservable() } // Change flatMap to flatMapLatest and observe change in printed output
.subscribe(onNext: { print($0,#line) })? ? ? ? ? ??
.disposed(by: disposeBag)? ? ? ? ? ? ? ??
????.score.value = 85? ? ? ? ? ? ? ?
?player.value = ????? ? ? ??
// Will be printed when using flatMap, but will not be printed when using flatMapLatest? ? ? ??
????.score.value = 95? ? ? ??
????.score.value = 100? ? ? ? ? ? ? ??
Observable.from(["??", "??", "??", "??"])? ? ? ? ? ??
.flatMap { (event) -> Observablein
return Observable.of("??", "??", "??", "??")
}.subscribe{
print($0,#line)
}.disposed(by: disposeBag)
scan
Observable.of(10, 100, 1000)
.scan(2) { aggregateValue, newValue in
print(aggregateValue,newValue,"->scan")
return aggregateValue + newValue
}
.subscribe(onNext: { print($0,"scan->",#line) })
.disposed(by: disposeBag)
打印結(jié)果
2 10 ->scan
12 scan-> 324
12 100 ->scan
112 scan-> 324
112 1000 ->scan
1112 scan-> 324
distinctUntilChanged
//去掉連續(xù)的重復(fù)元素
Observable.of("??", "??", "??", "??", "??", "??", "??")
.distinctUntilChanged()
.subscribe(onNext: { print($0,"->distinctUntilChanged") })
.disposed(by: disposeBag)
elementAt
Observable.of("??", "??", "??", "??", "??", "??")
.elementAt(3)
.subscribe(onNext: { print($0,"->elementAt") })
.disposed(by: disposeBag)
takeUntil
//對(duì)另一個(gè)觀測序列(referenceSequence)的依賴停止(completed) let sourceSequence = PublishSubject() let referenceSequence = PublishSubject()
sourceSequence
.takeUntil(referenceSequence)
.subscribe { print($0) }
.disposed(by: disposeBag)
sourceSequence.onNext("??")
sourceSequence.onNext("??")
sourceSequence.onNext("??")
referenceSequence.onNext("??")
sourceSequence.onNext("??")
sourceSequence.onNext("??")
sourceSequence.onNext("??")
skip
//過濾掉最初始的2個(gè)序列形成可觀察序列(skipWhile,skipWhileWithIndex)
Observable.of("??", "??", "??", "??", "??", "??")
.skip(2)
.subscribe(onNext: { print($0,"->skip") })
.disposed(by: disposeBag)
skipUntil
//對(duì)另一個(gè)觀測序列的依賴,過濾let sourceSequence1 = PublishSubject()? ? ? ? let referenceSequence1 = PublishSubject()
sourceSequence1
.skipUntil(referenceSequence1)
.subscribe(onNext: { print($0,"->skipUntil") })
.disposed(by: disposeBag)
sourceSequence1.onNext("??")
sourceSequence1.onNext("??")
sourceSequence1.onNext("??")
referenceSequence1.onNext("??")
sourceSequence1.onNext("??")
sourceSequence1.onNext("??")
sourceSequence1.onNext("??")
single
//將元素轉(zhuǎn)化成觀察序列,如果重復(fù)或者沒有這個(gè)元素就拋出錯(cuò)誤,默認(rèn)是第一個(gè)元素
Observable.of("??", "??", "??", "??", "??", "??")
.single()
.subscribe(onNext: { print($0,"->single") })
.disposed(by: disposeBag)
concat
等待每個(gè)序列終止之前,成功發(fā)射第二序列的元素。
let subject7 = BehaviorSubject(value: "??")
let subject8 = BehaviorSubject(value: "??")
let variable1 = Variable(subject7)
variable1.asObservable()
.concat()
.subscribe { print($0,"->concat") }
.disposed(by: disposeBag)
subject7.onNext("??")
subject7.onNext("??")
variable1.value = subject8
subject8.onNext("I would be ignored")
subject8.onNext("??")
subject7.onCompleted()//此時(shí)第二個(gè)序列可以成功發(fā)送數(shù)據(jù)
subject8.onNext("??")
toArray
Observable.range(start: 1, count: 10)
.toArray()
.subscribe { print($0) }
.disposed(by: disposeBag)
reduce
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
bindTo
將觀察到的對(duì)象綁定到某對(duì)象
//這是一個(gè)加法計(jì)算,將相加的結(jié)果賦值給Label
Observable.combineLatest(txf1.rx.text.orEmpty,txf2.rx.text.orEmpty,txf3.rx.text.orEmpty) {
(Int($0) ?? 0)+(Int($1) ?? 0)+(Int($2) ?? 0)
}.map {
$0.description
}.bindTo(result.rx.text).disposed(by: disPoseBag)
融合信號(hào)
merge
let subject1 = PublishSubject()let subject2 = PublishSubject()
Observable.of(subject1, subject2)
.merge()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("???")
subject1.onNext("???")
subject2.onNext("①")
subject2.onNext("②")
subject1.onNext("??")
subject2.onNext("③")
combineLatest
//釋放元素從最近期內(nèi)可觀察到的序列。
當(dāng)需要同時(shí)監(jiān)聽時(shí),那么每當(dāng)有新的事件發(fā)生的時(shí)候,combineLatest 會(huì)將每個(gè)隊(duì)列的最新的一個(gè)元素進(jìn)行合并。
把3個(gè)輸入框的值相加再綁定到Label
Observable.combineLatest(txf1.rx.text.orEmpty,txf2.rx.text.orEmpty,txf3.rx.text.orEmpty) {
(Int($0) ?? 0)+(Int($1) ?? 0)+(Int($2) ?? 0)
}.map {
$0.description
}.bindTo(result.rx.text).disposed(by: disPoseBag)
switchLatest
let subject3 = BehaviorSubject(value: "??")
let subject4 = BehaviorSubject(value: "??")
let variable = Variable(subject3)
variable.asObservable()
.switchLatest()
.subscribe(onNext: { print($0,"->switchLatest") })
.disposed(by: disposeBag)
subject3.onNext("??")
subject3.onNext("??")
variable.value = subject4
subject3.onNext("??”)//發(fā)送失敗,最近的是subject4
subject4.onNext("??")
shareReplay
//shareReplay它是以重播(保存通知記錄)的方式通知自己的訂閱者,防止map重復(fù)調(diào)用,即使訂閱之前的序列也可以收到(收到可重播次數(shù)的事件序列)
let aa = sequenceOfInts.map{ i -> Int in
print("MAP---\(i)")
return i * 2
}.shareReplay(3)
Driver
Driver的drive方法與Observable的方法bindTo用法非常相似
它的特點(diǎn)
* 它不會(huì)發(fā)射出錯(cuò)誤(Error)事件
* 對(duì)它的觀察訂閱是發(fā)生在主線程(UI線程)的
* 自帶shareReplayLatestWhileConnected,防止重復(fù)執(zhí)行事件
textField.rx_text
.asDriver()
.drive(label.rx_sayHelloObserver)
.addDisposableTo(disposeBag)
throttle
在Observable中假如你要進(jìn)行限流,你要用到方法throttle(dueTime: RxSwift.RxTimeInterval, scheduler: SchedulerType),方法的第一個(gè)參數(shù)是兩個(gè)事件之間的間隔時(shí)間,第二個(gè)參數(shù)是一個(gè)線程的有關(guān)類,如我要在主線程中,我可以傳入MainScheduler.instance。而在Driver中我們要限流,調(diào)用的是throttle(dueTime: RxSwift.RxTimeInterval),只配置事件的間隔時(shí)間,而它默認(rèn)會(huì)在主線程中進(jìn)行。
driveField.rx.text
.asDriver().throttle(2)
.drive(driveLabel.rx.text)
.addDisposableTo(disPoseBag)
DisposeBag
//事件的釋放,會(huì)在該對(duì)象釋放的時(shí)候釋放綁定監(jiān)聽事件
let disPoseBag = DisposeBag()
aa.subscribe {
print("--1--\($0)")
}.disposed(by: disPoseBag)——>不能在這里寫DisposeBag(),會(huì)直接結(jié)束掉