異步

異步入門

為什么需要異步?

異步操作是在非阻塞方案中執(zhí)行的操作,允許主程序流繼續(xù)處理。

假設需求場景為客戶端從多個服務器下載多個文件。

下載方式 缺點
依次按照順序 必須等待前一個完成
多線程 為每一個下載任務創(chuàng)建線程,導致內(nèi)存占滿
線程池 發(fā)起下載請求后,需要等待服務端的響應,當前線程會阻塞

注意:多線程和線程池都可以是異步的一種實現(xiàn)方式。異步是和同步相對的概念。

異步的優(yōu)缺點

因為異步操作無須額外的線程負擔,并且使用回調(diào)的方式進行處理,在設計良好的情況下,處理函數(shù)可以不必使用共享變量(即使無法完全不用,最起碼可以減少共享變量的數(shù)量),減少了死鎖的可能。當然異步操作也并非完美無暇。編寫異步操作的復雜程度較高,程序主要使用回調(diào)方式進行處理,與普通人的思維方式有些初入,而且難以調(diào)試。

簡單使用

use async_std::task;
// use std::thread::sleep;
use std::time::Duration;
use futures::{ executor};

async fn learn_song() {
    //sleep(Duration::from_secs(5));
    task::sleep(Duration::from_secs(1)).await;
    println!("learn song");
}

async fn sing_song() {
    println!("sing song");
}

async fn dance() {
    println!("dance");
}

async fn learn_and_sing_song() {
    learn_song().await;
    sing_song().await;
}

async fn async_main() {
    let f1 = learn_and_sing_song();
    let f2 = dance();
    futures::join!(f1, f2);
}

fn main() {
    executor::block_on(async_main());
}

執(zhí)行結(jié)果

dance
learn song
sing song

說明:

  1. 如果使用sleep(Duration::from_secs(5)),結(jié)果會是按照順序執(zhí)行。因為外部的阻塞不能主動喚醒異步內(nèi)部的線程,所以直接在外部進行阻塞。如果使用異步的sleep,learn song會讓出資源;
  2. 通過join,能等待多個Future完成,并發(fā)執(zhí)行;
  3. .await是在代碼塊中按順序執(zhí)行,會阻塞后面的代碼,但是此時會讓出線程;block_on會阻塞直到Future執(zhí)行完成。

Future

標準庫定義

use crate::marker::Unpin;
use crate::ops;
use crate::pin::Pin;
use crate::task::{Context, Poll};

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

impl<F: ?Sized + Future + Unpin> Future for &mut F { // F 的可變引用實現(xiàn) Future
    type Output = F::Output;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        F::poll(Pin::new(&mut **self), cx)
    }
}

impl<P> Future for Pin<P> // 為 Pin<P=Unpin + ops::DerefMut<Target: Future>> 實現(xiàn)Future
where
    P: Unpin + ops::DerefMut<Target: Future>,
{
    type Output = <<P as ops::Deref>::Target as Future>::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Pin::get_mut(self).as_mut().poll(cx)
    }
}

poll函數(shù)傳遞一個 &mut Context<'_> 類型參數(shù), 返回一個 Poll 類型參數(shù):

  • Context 主要包含一個 Waker 對象,由執(zhí)行器提供,用于告訴執(zhí)行器,重新執(zhí)行當前 poll 函數(shù)

    #[stable(feature = "futures_api", since = "1.36.0")]
     // Context的生命周期不會比它包含的waker引用更久
    pub struct Context<'a> {
        waker: &'a Waker,
        // Ensure we future-proof against variance changes by forcing
        // the lifetime to be invariant (argument-position lifetimes
        // are contravariant while return-position lifetimes are
        // covariant).
        _marker: PhantomData<fn(&'a ()) -> &'a ()>,
    }
    
  • Poll 是一個枚舉類型包含兩個枚舉

    • Ready<Output> 當任務已經(jīng)就緒,返回該對象
    • Pending 任務沒有就緒時返回該對象,此Future將讓出CPU,直到在其他線程或者任務執(zhí)行調(diào)用Waker為止
  • 實現(xiàn)者需要保證 poll 是非阻塞,如果是阻塞的話會導致循環(huán)進行不下去

實現(xiàn)一個 Future 類型的方式

  • 方式1:使用 async fn,編譯器會自動生成實現(xiàn) Future Trait的類型
  • 方式2:自定義結(jié)構(gòu)體,并實現(xiàn) Future Trait

Pin

默認情況下,Rust中所有類型都是可以 move 的,Rust允許按值傳遞所有類型,并且像 Box<T>&mut T 之類的智能指針或者引用允許你通過 mem::swap 進行拷貝交換(移動),這樣,如果存在結(jié)構(gòu)體存在自引用,將導致引用失效。

而 async 編譯后的結(jié)構(gòu)可能就會出現(xiàn)一種自引用的結(jié)構(gòu),如下所示:

async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);

}

// 編譯后的偽代碼如下
// 這是 最外層的 async {}
// struct AsyncFuture {
//    x: [u8; 128],
//    read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
// }

// 這是 read_into_buf_fut 的Future
// struct ReadIntoBuf<'a> {
//    buf: &'a mut [u8], // points to `x` below
// }

這樣 AsyncFuture 構(gòu)造出來后,就存在自引用(AsyncFuture.read_into_buf_fut.buf 指向 AsyncFuture.x)。但是如果AsyncFuture發(fā)生移動,x肯定也會發(fā)生移動,如果read_into_buf_fut.buf還是指向原來的值的話,則會變成無效。而Pin就是為了解決此問題的。

Pin 類型包著指針類型,保證指針背后的值將不被移動。例如 Pin<&mut T>,Pin<&T>Pin<Box<T>> 都保證 T 不會移動(move)。

原理

pub struct Pin<P> {
    pointer: P,
}
  • 首先 Pin<T>Box<T> 類似都是一種智能指針。不同點在于Pin<&mut T>不能通過safe代碼拿到&mut T,因此保證mem::swap無法調(diào)用,也就是P所指向的T在內(nèi)存中固定住,不能移動。

    • Pin::as_mut 返回的仍是 Pin<T>

    • 只有 Pin<DerefMut<T: Unpin>> 或者 Pin<Deref<T: Unpin>> 或者 Pin<T: Unpin> 可以通過 get_mut 或者 get_ref 拿到 T 的引用

    • Pin::new 只能是針對實現(xiàn)了 Unpin 的類型(重要)

      impl<P: Deref<Target: Unpin>> Pin<P> {
          /// Construct a new `Pin<P>` around a pointer to some data of a type that
          /// implements [`Unpin`].
          ///
          /// Unlike `Pin::new_unchecked`, this method is safe because the pointer
          /// `P` dereferences to an [`Unpin`] type, which cancels the pinning guarantees.
          #[stable(feature = "pin", since = "1.33.0")]
          #[inline(always)]
          pub fn new(pointer: P) -> Pin<P> {
              // Safety: the value pointed to is `Unpin`, and so has no requirements
              // around pinning.
              unsafe { Pin::new_unchecked(pointer) }
          }
      
          /// Unwraps this `Pin<P>` returning the underlying pointer.
          ///
          /// This requires that the data inside this `Pin` is [`Unpin`] so that we
          /// can ignore the pinning invariants when unwrapping it.
          #[stable(feature = "pin_into_inner", since = "1.39.0")]
          #[inline(always)]
          pub fn into_inner(pin: Pin<P>) -> P {
              pin.pointer
          }
      }
      

      只有P<T>T: Unpin,才可以new出一個Pin<P<T>>。這里的T就是應該被pin的實例,可是由于T: Unpin實際上T的實例并不會被pin。

    • 如果要創(chuàng)建不實現(xiàn)UnpinPin<P<T>>,可以使用unsafe{ Pin::new_unchecked(&mut t) }

  • 本質(zhì)上實現(xiàn)不移動就是加了一層指針,并未違反任意值都是可以移動的規(guī)則。

    • 比如 Pin<T> 發(fā)生移動時,僅僅是 Pin 這個結(jié)構(gòu)發(fā)生了移動,但是 T 對象并沒有移動

Unpin

pub auto trait Unpin {}

定義在std::marker中,如果T: Unpin,那么Tpin后可以安全地移動,可以拿到&mut T。Unpin只對Pin<P<T>>T起作用,不對P本身起效,例如對Pin<Box<T>>Box<T>是無效的。

默認為以下類型實現(xiàn)了Unpin

impl<'a, T: ?Sized + 'a> Unpin for &'a T {}
impl<'a, T: ?Sized + 'a> Unpin for &'a mut T {}
impl<T: ?Sized> Unpin for *const T {}
impl<T: ?Sized> Unpin for *mut T {}

async生成的匿名結(jié)構(gòu)體(impl Future<Output=()>)沒有實現(xiàn)Unpin

!Unpin

對Unpin取反,!Unpin的雙重否定就是pin。如果一個類型中包含了PhantomPinned,那么這個類型就是!Unpin。

#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct PhantomPinned;
impl !Unpin for PhantomPinned {}

一般在結(jié)構(gòu)體中使用_marker: PhantomPinned來實現(xiàn)!Unpin

完整示例

use std::pin::Pin;
use std::marker::PhantomPinned;
use std::mem;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned
}

impl Test {
    fn new(txt: &str) -> Self {
        //此指針指向棧對象, 必須慎重考慮其生命長短,避免出現(xiàn)`懸指針`。
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
            _marker: PhantomPinned
        }
    }
    fn init<'a>(self: Pin<&'a mut Self>) {
        let self_ptr: *const String = &self.a;
        let this = unsafe { self.get_unchecked_mut() };
        this.b = self_ptr;
    }

    fn a<'a>(self: Pin<&'a Self>) -> &'a str {
        &self.get_ref().a
    }
    fn b<'a>(self: Pin<&'a Self>) -> &'a String {
        unsafe { &*(self.b) }
    }
}

fn main() {
// test1 is safe to move before we initialize it
    let mut test1 = Test::new("test1");
// Notice how we shadow `test1` to prevent it from being accessed again
//同名的新指針變量屏蔽了原來的test1, 以此確保只能通過Pin來訪問到Test.
//這樣確保不可能再訪問到舊test1指針!
    let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
    Test::init(test1.as_mut());

    let mut test2 = Test::new("test2");
    let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
    Test::init(test2.as_mut());

    println!("a: {}, b: {}", Test::a(test1.as_ref()), Test::b(test1.as_ref()));

//swap導致編譯錯誤, 因為Pin實質(zhì)上就是禁止獲得&mut T引用(指針) ,
//無法獲得&mut T指針,則無法Move , 比如:swap等。
//之所以用Pin 包裹原來的裸指針,目的就是禁止獲取到:&mut T.
//     std::mem::swap(test1.get_mut(), test2.get_mut());

    println!("a: {}, b: {}", Test::a(test2.as_ref()), Test::b(test2.as_ref()));
}
// 結(jié)果
// a: test1, b: test1
// a: test2, b: test2

fn main() {
    let mut test1 = Test::new("test1");
    let mut test1_pin = unsafe { Pin::new_unchecked(&mut test1) };
    Test::init(test1_pin.as_mut());
    drop(test1_pin); //Pin指針被提前drop , 因為test1未被遮蔽, 后面代碼仍然可以訪問到, 但是test1已被析構(gòu)

    let mut test2 = Test::new("test2");
    mem::swap(&mut test1, &mut test2);
    println!("Not self referential anymore: {:?}", test1.b); //test1.b == 0x00 , Pin析構(gòu)時析構(gòu)了test1 所指的Test Struct, 其內(nèi)部指針歸0,
//所以說不再是自引用。
}

如果使用Box::pin(t)創(chuàng)建Pin<Box<T>>,則會將數(shù)據(jù)固定到堆上。

impl Test {
    fn new(txt: &str) -> Pin<Box<Self>> {
     let t = Test {
         a: String::from(txt),
         b: std::ptr::null(),
         _marker: PhantomPinned,
     };
     //Constructs a new Pin<Box<T>>. If T does not implement Unpin, then x will be pinned in memory and unable to be moved.
     let mut boxed = Box::pin(t);
     let self_ptr: *const String = &boxed.as_ref().a;
      // boxed.as_mut() -> Pin<&mut Test>
      // boxed.as_mut().get_unchecked_mut() -> &mut Test
     unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };
     boxed
 }
}
fn main() {
    let mut test1 = Test::new("test1");
    let mut test2 = Test::new("test2");

    println!("a: {}, b: {}", test1.as_ref().a(), test1.as_ref().b());
    std::mem::swap(&mut test1, &mut test2);
    println!("a: {}, b: {}", test2.as_ref().a(), test2.as_ref().b()); 
  // 結(jié)果
  // a: test1, b: test1
    // a: test1, b: test1
}

為什么堆上的pin對象可以進行swap

boxed只是一個棧變量,所指的對象在堆上。通過swap僅僅bitcopy and swap兩個棧變量test1test2,相當于兩者交換了所有權,交換了指向,而堆上的數(shù)據(jù)不受影響。T類型對象內(nèi)存位置固定,所有沒有違反Pin的語義要求。

async/await

async轉(zhuǎn)化的Future對象和其它Future一樣是具有惰性的,即在運行之前什么也不做。運行Future最常見的方式是.await。

async

有如下代碼:

async fn async_main() {
    let f1 = async_function1();
    let f2 = async_function2();

    let f = async move {
        f1.await;
        f2.await;
    };
    f.await;
}

那么實際上會生成一個匿名的Future trait object,包裹一個 Generator。也就是一個實現(xiàn)了 FutureGenerator。Generator實際上是一個狀態(tài)機,配合.await當每次async 代碼塊中任何返回 Poll::Pending則即調(diào)用generator yeild,讓出執(zhí)行權,一旦恢復執(zhí)行,generator resume 繼續(xù)執(zhí)行剩余流程。

pub const fn from_generator<T>(gen: T) -> impl Future<Output = T::Return>
where
    T: Generator<ResumeTy, Yield = ()>,
{
    #[rustc_diagnostic_item = "gen_future"]
    struct GenFuture<T: Generator<ResumeTy, Yield = ()>>(T);

    // We rely on the fact that async/await futures are immovable in order to create
    // self-referential borrows in the underlying generator.
    impl<T: Generator<ResumeTy, Yield = ()>> !Unpin for GenFuture<T> {}

    impl<T: Generator<ResumeTy, Yield = ()>> Future for GenFuture<T> {
        type Output = T::Return;
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            // Safety: Safe because we're !Unpin + !Drop, and this is just a field projection.
            let gen = unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) };

            // Resume the generator, turning the `&mut Context` into a `NonNull` raw pointer. The
            // `.await` lowering will safely cast that back to a `&mut Context`.
            match gen.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
                GeneratorState::Yielded(()) => Poll::Pending,
                GeneratorState::Complete(x) => Poll::Ready(x),
            }
        }
    }

    GenFuture(gen)
}

每一次gen.resume()會順序執(zhí)行async block中代碼直到遇到yieldasync block中的.await語句在無法立即完成時會調(diào)用yield交出控制權等待下一次resume。而當所有代碼執(zhí)行完,也就是狀態(tài)機入Complete,async block返回Poll::Ready,代表Future執(zhí)行完畢。

生成的匿名對象類似如下:

struct AsyncFuture {
    fut_one: FutFunction1,
    fut_two: FutFunction2,
    state: State,
}

//state的定義可能如下
enum State {
    AwaitingFutFunction1,
    AwaitingFutFunction2,
    Done,
}

poll進行輪詢每個Future的狀態(tài),如果Poll::Ready()則進入下一個State,直到Done。

生命周期

async fn foo(x: &u8) -> u8 { *x }
fn good() -> impl Future<Output = ()> {
    async {
        let x = 5;
        foo(&x).await;
    }
}

通過將x移動到async中,延長x的生命周期和foo返回的Future生命周期一致。

move

async 塊和閉包允許 move 關鍵字,就像普通的閉包一樣。一個 async move 塊將獲取它引用變量的所有權,允許它活得比目前的范圍長,但放棄了與其它代碼分享那些變量的能力。

線程間移動

在使用多線程Future的excutor時,F(xiàn)uture可能在線程之間移動,因此在async主體中使用的任何變量都必須能夠在線程之間傳輸,因為任何.await變量都可能導致切換到新線程。

async fn Future是否為Send的取決于是否在.await點上保留非Send類型。編譯器盡其所能地估計值在.await點上的保存時間。

async fn foo() {
    {
        let x = Rc<()>; // Rc<T>不可Send
    }
    bar().await;
}

如果x不在await前drop,那么該Future是非Send的。而Future的異步特性要求需要有Send約束。

Stream

Stream是由一系列的Future組成,我們可以從Stream讀取各個Future的結(jié)果,直到Stream結(jié)束。

定義

trait Stream {
    type Item;

    fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker)
        -> Poll<Option<Self::Item>>;
}

poll_next函數(shù)有三種可能的返回值,分別如下:

  • Poll::Pending 說明下一個值還沒有就緒,仍然需要等待。
  • Poll::Ready(Some(val)) 已經(jīng)就緒,成功返回一個值,程序可以通過調(diào)用poll_next再獲取下一個值。
  • Poll::Ready(None) 表示Stream已經(jīng)結(jié)束,不應該在調(diào)用poll_next

迭代

Stream不支持使用for,而while letnext/try_next則是允許的。

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}

并發(fā)

async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}

Select

select宏也允許并發(fā)的執(zhí)行Future,但是和join、try_join不同的是,select宏只要有一個Future返回,就會返回。

use futures::{select, future::FutureExt, pin_mut};
use tokio::runtime::Runtime;
use std::io::Result;

async fn function1() -> Result<()> {
    tokio::time::delay_for(tokio::time::Duration::from_secs(10)).await;
    println!("function1 ++++ ");
    Ok(())
}

async fn function2() -> Result<()> {
    println!("function2 ++++ ");
    Ok(())
}

async fn async_main() {
  // Fuse:基礎迭代器一次返回None后,就一直返回None
    let f1 = function1().fuse();
    let f2 = function2().fuse();

  // 在棧上pin
    pin_mut!(f1, f2);

    select! {
        _ = f1 => println!("task one completed first"),
        _ = f2 => println!("task two completed first"),
    }
}

fn main() {
    let mut runtime = Runtime::new().unwrap();
    runtime.block_on(async_main());
    println!("Hello, world!");
}

BoxFuture

一個擁有的動態(tài)類型[' Future '],在不是靜態(tài)輸入或需要添加一些間接類型的情況下使用。

比如在遞歸使用Future時:

use futures::future::{BoxFuture, FutureExt};

fn re() -> BoxFuture<'static, ()> {
    async move{
        re().await;
        re().await;
    }.boxed()
}

fn main() {
    re();
}

fn boxed<'a>(self) -> BoxFuture<'a, Self::Output>
    where Self: Sized + Send + 'a,
    {
        Box::pin(self)
    }

參考

  1. Rust異步編程
  2. Rust 異步編程,Pin 介紹
  3. Pin Unpin學習筆記
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容