異步入門
為什么需要異步?
異步操作是在非阻塞方案中執(zhí)行的操作,允許主程序流繼續(xù)處理。
假設需求場景為客戶端從多個服務器下載多個文件。
| 下載方式 | 缺點 |
|---|---|
| 依次按照順序 | 必須等待前一個完成 |
| 多線程 | 為每一個下載任務創(chuàng)建線程,導致內(nèi)存占滿 |
| 線程池 | 發(fā)起下載請求后,需要等待服務端的響應,當前線程會阻塞 |
注意:多線程和線程池都可以是異步的一種實現(xiàn)方式。異步是和同步相對的概念。
異步的優(yōu)缺點
因為異步操作無須額外的線程負擔,并且使用回調(diào)的方式進行處理,在設計良好的情況下,處理函數(shù)可以不必使用共享變量(即使無法完全不用,最起碼可以減少共享變量的數(shù)量),減少了死鎖的可能。當然異步操作也并非完美無暇。編寫異步操作的復雜程度較高,程序主要使用回調(diào)方式進行處理,與普通人的思維方式有些初入,而且難以調(diào)試。
簡單使用
use async_std::task;
// use std::thread::sleep;
use std::time::Duration;
use futures::{ executor};
async fn learn_song() {
//sleep(Duration::from_secs(5));
task::sleep(Duration::from_secs(1)).await;
println!("learn song");
}
async fn sing_song() {
println!("sing song");
}
async fn dance() {
println!("dance");
}
async fn learn_and_sing_song() {
learn_song().await;
sing_song().await;
}
async fn async_main() {
let f1 = learn_and_sing_song();
let f2 = dance();
futures::join!(f1, f2);
}
fn main() {
executor::block_on(async_main());
}
執(zhí)行結(jié)果
dance
learn song
sing song
說明:
- 如果使用
sleep(Duration::from_secs(5)),結(jié)果會是按照順序執(zhí)行。因為外部的阻塞不能主動喚醒異步內(nèi)部的線程,所以直接在外部進行阻塞。如果使用異步的sleep,learn song會讓出資源; - 通過join,能等待多個Future完成,并發(fā)執(zhí)行;
-
.await是在代碼塊中按順序執(zhí)行,會阻塞后面的代碼,但是此時會讓出線程;block_on會阻塞直到Future執(zhí)行完成。
Future
標準庫定義
use crate::marker::Unpin;
use crate::ops;
use crate::pin::Pin;
use crate::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
impl<F: ?Sized + Future + Unpin> Future for &mut F { // F 的可變引用實現(xiàn) Future
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
F::poll(Pin::new(&mut **self), cx)
}
}
impl<P> Future for Pin<P> // 為 Pin<P=Unpin + ops::DerefMut<Target: Future>> 實現(xiàn)Future
where
P: Unpin + ops::DerefMut<Target: Future>,
{
type Output = <<P as ops::Deref>::Target as Future>::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::get_mut(self).as_mut().poll(cx)
}
}
poll函數(shù)傳遞一個 &mut Context<'_> 類型參數(shù), 返回一個 Poll 類型參數(shù):
-
Context 主要包含一個
Waker對象,由執(zhí)行器提供,用于告訴執(zhí)行器,重新執(zhí)行當前poll函數(shù)#[stable(feature = "futures_api", since = "1.36.0")] // Context的生命周期不會比它包含的waker引用更久 pub struct Context<'a> { waker: &'a Waker, // Ensure we future-proof against variance changes by forcing // the lifetime to be invariant (argument-position lifetimes // are contravariant while return-position lifetimes are // covariant). _marker: PhantomData<fn(&'a ()) -> &'a ()>, } -
Poll 是一個枚舉類型包含兩個枚舉
-
Ready<Output>當任務已經(jīng)就緒,返回該對象 -
Pending任務沒有就緒時返回該對象,此Future將讓出CPU,直到在其他線程或者任務執(zhí)行調(diào)用Waker為止
-
實現(xiàn)者需要保證 poll 是非阻塞,如果是阻塞的話會導致循環(huán)進行不下去
實現(xiàn)一個 Future 類型的方式
- 方式1:使用
async fn,編譯器會自動生成實現(xiàn) Future Trait的類型 - 方式2:自定義結(jié)構(gòu)體,并實現(xiàn) Future Trait
Pin
默認情況下,Rust中所有類型都是可以 move 的,Rust允許按值傳遞所有類型,并且像 Box<T> 、&mut T 之類的智能指針或者引用允許你通過 mem::swap 進行拷貝交換(移動),這樣,如果存在結(jié)構(gòu)體存在自引用,將導致引用失效。
而 async 編譯后的結(jié)構(gòu)可能就會出現(xiàn)一種自引用的結(jié)構(gòu),如下所示:
async {
let mut x = [0; 128];
let read_into_buf_fut = read_into_buf(&mut x);
read_into_buf_fut.await;
println!("{:?}", x);
}
// 編譯后的偽代碼如下
// 這是 最外層的 async {}
// struct AsyncFuture {
// x: [u8; 128],
// read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
// }
// 這是 read_into_buf_fut 的Future
// struct ReadIntoBuf<'a> {
// buf: &'a mut [u8], // points to `x` below
// }
這樣 AsyncFuture 構(gòu)造出來后,就存在自引用(AsyncFuture.read_into_buf_fut.buf 指向 AsyncFuture.x)。但是如果AsyncFuture發(fā)生移動,x肯定也會發(fā)生移動,如果read_into_buf_fut.buf還是指向原來的值的話,則會變成無效。而Pin就是為了解決此問題的。
Pin 類型包著指針類型,保證指針背后的值將不被移動。例如 Pin<&mut T>,Pin<&T>, Pin<Box<T>> 都保證 T 不會移動(move)。
原理
pub struct Pin<P> {
pointer: P,
}
-
首先
Pin<T>和Box<T>類似都是一種智能指針。不同點在于Pin<&mut T>不能通過safe代碼拿到&mut T,因此保證mem::swap無法調(diào)用,也就是P所指向的T在內(nèi)存中固定住,不能移動。Pin::as_mut返回的仍是Pin<T>只有
Pin<DerefMut<T: Unpin>>或者Pin<Deref<T: Unpin>>或者Pin<T: Unpin>可以通過get_mut或者get_ref拿到T的引用-
Pin::new只能是針對實現(xiàn)了Unpin的類型(重要)impl<P: Deref<Target: Unpin>> Pin<P> { /// Construct a new `Pin<P>` around a pointer to some data of a type that /// implements [`Unpin`]. /// /// Unlike `Pin::new_unchecked`, this method is safe because the pointer /// `P` dereferences to an [`Unpin`] type, which cancels the pinning guarantees. #[stable(feature = "pin", since = "1.33.0")] #[inline(always)] pub fn new(pointer: P) -> Pin<P> { // Safety: the value pointed to is `Unpin`, and so has no requirements // around pinning. unsafe { Pin::new_unchecked(pointer) } } /// Unwraps this `Pin<P>` returning the underlying pointer. /// /// This requires that the data inside this `Pin` is [`Unpin`] so that we /// can ignore the pinning invariants when unwrapping it. #[stable(feature = "pin_into_inner", since = "1.39.0")] #[inline(always)] pub fn into_inner(pin: Pin<P>) -> P { pin.pointer } }只有
P<T>的T: Unpin,才可以new出一個Pin<P<T>>。這里的T就是應該被pin的實例,可是由于T: Unpin實際上T的實例并不會被pin。 如果要創(chuàng)建不實現(xiàn)
Unpin的Pin<P<T>>,可以使用unsafe{ Pin::new_unchecked(&mut t) }。
-
本質(zhì)上實現(xiàn)不移動就是加了一層指針,并未違反任意值都是可以移動的規(guī)則。
- 比如
Pin<T>發(fā)生移動時,僅僅是Pin這個結(jié)構(gòu)發(fā)生了移動,但是T對象并沒有移動
- 比如
Unpin
pub auto trait Unpin {}
定義在std::marker中,如果T: Unpin,那么T在pin后可以安全地移動,可以拿到&mut T。Unpin只對Pin<P<T>>的T起作用,不對P本身起效,例如對Pin<Box<T>>的Box<T>是無效的。
默認為以下類型實現(xiàn)了Unpin:
impl<'a, T: ?Sized + 'a> Unpin for &'a T {}
impl<'a, T: ?Sized + 'a> Unpin for &'a mut T {}
impl<T: ?Sized> Unpin for *const T {}
impl<T: ?Sized> Unpin for *mut T {}
async生成的匿名結(jié)構(gòu)體(impl Future<Output=()>)沒有實現(xiàn)Unpin。
!Unpin
對Unpin取反,!Unpin的雙重否定就是pin。如果一個類型中包含了PhantomPinned,那么這個類型就是!Unpin。
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct PhantomPinned;
impl !Unpin for PhantomPinned {}
一般在結(jié)構(gòu)體中使用_marker: PhantomPinned來實現(xiàn)!Unpin。
完整示例
use std::pin::Pin;
use std::marker::PhantomPinned;
use std::mem;
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned
}
impl Test {
fn new(txt: &str) -> Self {
//此指針指向棧對象, 必須慎重考慮其生命長短,避免出現(xiàn)`懸指針`。
Test {
a: String::from(txt),
b: std::ptr::null(),
_marker: PhantomPinned
}
}
fn init<'a>(self: Pin<&'a mut Self>) {
let self_ptr: *const String = &self.a;
let this = unsafe { self.get_unchecked_mut() };
this.b = self_ptr;
}
fn a<'a>(self: Pin<&'a Self>) -> &'a str {
&self.get_ref().a
}
fn b<'a>(self: Pin<&'a Self>) -> &'a String {
unsafe { &*(self.b) }
}
}
fn main() {
// test1 is safe to move before we initialize it
let mut test1 = Test::new("test1");
// Notice how we shadow `test1` to prevent it from being accessed again
//同名的新指針變量屏蔽了原來的test1, 以此確保只能通過Pin來訪問到Test.
//這樣確保不可能再訪問到舊test1指針!
let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
Test::init(test1.as_mut());
let mut test2 = Test::new("test2");
let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
Test::init(test2.as_mut());
println!("a: {}, b: {}", Test::a(test1.as_ref()), Test::b(test1.as_ref()));
//swap導致編譯錯誤, 因為Pin實質(zhì)上就是禁止獲得&mut T引用(指針) ,
//無法獲得&mut T指針,則無法Move , 比如:swap等。
//之所以用Pin 包裹原來的裸指針,目的就是禁止獲取到:&mut T.
// std::mem::swap(test1.get_mut(), test2.get_mut());
println!("a: {}, b: {}", Test::a(test2.as_ref()), Test::b(test2.as_ref()));
}
// 結(jié)果
// a: test1, b: test1
// a: test2, b: test2
fn main() {
let mut test1 = Test::new("test1");
let mut test1_pin = unsafe { Pin::new_unchecked(&mut test1) };
Test::init(test1_pin.as_mut());
drop(test1_pin); //Pin指針被提前drop , 因為test1未被遮蔽, 后面代碼仍然可以訪問到, 但是test1已被析構(gòu)
let mut test2 = Test::new("test2");
mem::swap(&mut test1, &mut test2);
println!("Not self referential anymore: {:?}", test1.b); //test1.b == 0x00 , Pin析構(gòu)時析構(gòu)了test1 所指的Test Struct, 其內(nèi)部指針歸0,
//所以說不再是自引用。
}
如果使用Box::pin(t)創(chuàng)建Pin<Box<T>>,則會將數(shù)據(jù)固定到堆上。
impl Test {
fn new(txt: &str) -> Pin<Box<Self>> {
let t = Test {
a: String::from(txt),
b: std::ptr::null(),
_marker: PhantomPinned,
};
//Constructs a new Pin<Box<T>>. If T does not implement Unpin, then x will be pinned in memory and unable to be moved.
let mut boxed = Box::pin(t);
let self_ptr: *const String = &boxed.as_ref().a;
// boxed.as_mut() -> Pin<&mut Test>
// boxed.as_mut().get_unchecked_mut() -> &mut Test
unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };
boxed
}
}
fn main() {
let mut test1 = Test::new("test1");
let mut test2 = Test::new("test2");
println!("a: {}, b: {}", test1.as_ref().a(), test1.as_ref().b());
std::mem::swap(&mut test1, &mut test2);
println!("a: {}, b: {}", test2.as_ref().a(), test2.as_ref().b());
// 結(jié)果
// a: test1, b: test1
// a: test1, b: test1
}
為什么堆上的pin對象可以進行swap?
boxed只是一個棧變量,所指的對象在堆上。通過swap僅僅bitcopy and swap兩個棧變量test1和test2,相當于兩者交換了所有權,交換了指向,而堆上的數(shù)據(jù)不受影響。T類型對象內(nèi)存位置固定,所有沒有違反Pin的語義要求。
async/await
async轉(zhuǎn)化的Future對象和其它Future一樣是具有惰性的,即在運行之前什么也不做。運行Future最常見的方式是.await。
async
有如下代碼:
async fn async_main() {
let f1 = async_function1();
let f2 = async_function2();
let f = async move {
f1.await;
f2.await;
};
f.await;
}
那么實際上會生成一個匿名的Future trait object,包裹一個 Generator。也就是一個實現(xiàn)了 Future 的 Generator。Generator實際上是一個狀態(tài)機,配合.await當每次async 代碼塊中任何返回 Poll::Pending則即調(diào)用generator yeild,讓出執(zhí)行權,一旦恢復執(zhí)行,generator resume 繼續(xù)執(zhí)行剩余流程。
pub const fn from_generator<T>(gen: T) -> impl Future<Output = T::Return>
where
T: Generator<ResumeTy, Yield = ()>,
{
#[rustc_diagnostic_item = "gen_future"]
struct GenFuture<T: Generator<ResumeTy, Yield = ()>>(T);
// We rely on the fact that async/await futures are immovable in order to create
// self-referential borrows in the underlying generator.
impl<T: Generator<ResumeTy, Yield = ()>> !Unpin for GenFuture<T> {}
impl<T: Generator<ResumeTy, Yield = ()>> Future for GenFuture<T> {
type Output = T::Return;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Safety: Safe because we're !Unpin + !Drop, and this is just a field projection.
let gen = unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) };
// Resume the generator, turning the `&mut Context` into a `NonNull` raw pointer. The
// `.await` lowering will safely cast that back to a `&mut Context`.
match gen.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
GeneratorState::Yielded(()) => Poll::Pending,
GeneratorState::Complete(x) => Poll::Ready(x),
}
}
}
GenFuture(gen)
}
每一次gen.resume()會順序執(zhí)行async block中代碼直到遇到yield。async block中的.await語句在無法立即完成時會調(diào)用yield交出控制權等待下一次resume。而當所有代碼執(zhí)行完,也就是狀態(tài)機入Complete,async block返回Poll::Ready,代表Future執(zhí)行完畢。
生成的匿名對象類似如下:
struct AsyncFuture {
fut_one: FutFunction1,
fut_two: FutFunction2,
state: State,
}
//state的定義可能如下
enum State {
AwaitingFutFunction1,
AwaitingFutFunction2,
Done,
}
poll進行輪詢每個Future的狀態(tài),如果Poll::Ready()則進入下一個State,直到Done。
生命周期
async fn foo(x: &u8) -> u8 { *x }
fn good() -> impl Future<Output = ()> {
async {
let x = 5;
foo(&x).await;
}
}
通過將x移動到async中,延長x的生命周期和foo返回的Future生命周期一致。
move
async 塊和閉包允許 move 關鍵字,就像普通的閉包一樣。一個 async move 塊將獲取它引用變量的所有權,允許它活得比目前的范圍長,但放棄了與其它代碼分享那些變量的能力。
線程間移動
在使用多線程Future的excutor時,F(xiàn)uture可能在線程之間移動,因此在async主體中使用的任何變量都必須能夠在線程之間傳輸,因為任何.await變量都可能導致切換到新線程。
async fn Future是否為Send的取決于是否在.await點上保留非Send類型。編譯器盡其所能地估計值在.await點上的保存時間。
async fn foo() {
{
let x = Rc<()>; // Rc<T>不可Send
}
bar().await;
}
如果x不在await前drop,那么該Future是非Send的。而Future的異步特性要求需要有Send約束。
Stream
Stream是由一系列的Future組成,我們可以從Stream讀取各個Future的結(jié)果,直到Stream結(jié)束。
定義
trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker)
-> Poll<Option<Self::Item>>;
}
poll_next函數(shù)有三種可能的返回值,分別如下:
-
Poll::Pending說明下一個值還沒有就緒,仍然需要等待。 -
Poll::Ready(Some(val))已經(jīng)就緒,成功返回一個值,程序可以通過調(diào)用poll_next再獲取下一個值。 -
Poll::Ready(None)表示Stream已經(jīng)結(jié)束,不應該在調(diào)用poll_next。
迭代
Stream不支持使用for,而while let和 next/try_next則是允許的。
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; // for `next`
let mut sum = 0;
while let Some(item) = stream.next().await {
sum += item;
}
sum
}
async fn sum_with_try_next(mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>
) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; // for `try_next`
let mut sum = 0;
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}
并發(fā)
async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
const MAX_CONCURRENT_JUMPERS: usize = 100;
stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}
Select
select宏也允許并發(fā)的執(zhí)行Future,但是和join、try_join不同的是,select宏只要有一個Future返回,就會返回。
use futures::{select, future::FutureExt, pin_mut};
use tokio::runtime::Runtime;
use std::io::Result;
async fn function1() -> Result<()> {
tokio::time::delay_for(tokio::time::Duration::from_secs(10)).await;
println!("function1 ++++ ");
Ok(())
}
async fn function2() -> Result<()> {
println!("function2 ++++ ");
Ok(())
}
async fn async_main() {
// Fuse:基礎迭代器一次返回None后,就一直返回None
let f1 = function1().fuse();
let f2 = function2().fuse();
// 在棧上pin
pin_mut!(f1, f2);
select! {
_ = f1 => println!("task one completed first"),
_ = f2 => println!("task two completed first"),
}
}
fn main() {
let mut runtime = Runtime::new().unwrap();
runtime.block_on(async_main());
println!("Hello, world!");
}
BoxFuture
一個擁有的動態(tài)類型[' Future '],在不是靜態(tài)輸入或需要添加一些間接類型的情況下使用。
比如在遞歸使用Future時:
use futures::future::{BoxFuture, FutureExt};
fn re() -> BoxFuture<'static, ()> {
async move{
re().await;
re().await;
}.boxed()
}
fn main() {
re();
}
fn boxed<'a>(self) -> BoxFuture<'a, Self::Output>
where Self: Sized + Send + 'a,
{
Box::pin(self)
}