文章转载并翻译自:https://heikoseeberger.de/2022/10/20/2022-10-21-tower-1/

第一讲:将服务视为函数

作者:Heiko Seeberger 2022年10月21日 标签:Rust, Tokio, Tower

当我第一次接触Tower库(Tokio技术栈的一部分)时,注意到它的标题行写着:

async fn(Request) -> Result<Response, Error>

文档中对此解释如下:

Tower提供了一个简单的核心抽象——Service特性,它表示一个异步函数,接收请求并返回响应或错误。这个抽象可用于建模客户端和服务器。

这立即让我这个经验丰富的Scala开发者想起了Finagle论文《Your Server as a Function》,其中定义"系统边界由称为服务的异步函数表示"。看到新项目利用已有知识总是件好事。

虽然Tower和Finagle服务共享相同的核心抽象——异步函数,但有两点不同。

首先,Tower不一定将服务解释为系统边界。相反,服务也可以"本地"组合,这更符合Finagle的"过滤器":

通用组件(如超时、速率限制和负载平衡)可以建模为包装某些内部服务并在调用内部服务前后应用附加行为的服务。这允许以协议无关、可组合的方式实现这些组件。通常,此类服务被称为中间件。

其次,Tower通过就绪概念丰富了异步函数的核心抽象,正如我们从Service特性的定义中看到的:

pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
    fn call(&mut self, req: Request) -> Self::Future;
}

其核心在于这些服务方法的调用契约如下:

  1. 必须先调用poll_ready(可能需要多次)直到返回Poll::Ready
  2. 然后才能调用call
  3. 如果不遵守上述规则,call可能会panic

在下一讲深入探讨调用服务的细节之前,我们先看一个简单的服务示例:

pub struct EchoRequest(String);
pub struct EchoResponse(String);
pub struct EchoService;

/// 为[EchoService]实现Tower的`Service`特性:
/// 总是就绪,永不失败,立即返回回显响应(即内容与请求相同的响应)
impl Service<EchoRequest> for EchoService {
    type Response = EchoResponse;
    type Error = Infallible; // 此服务永不失败
    type Future = Ready<Result<Self::Response, Self::Error>>; // 立即响应

    /// 总是返回`Poll::Ready`:此服务总是就绪
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    /// 总是返回回显,即内容与请求相同的响应
    fn call(&mut self, req: EchoRequest) -> Self::Future {
        ready(Ok(EchoResponse(req.0)))
    }
}

由于此服务总是就绪的,因此如果之前没有调用poll_readycall也不需要panic。

完整代码可在GitHub的tower-experiments仓库找到。下一讲我们将探讨服务客户端。

第二讲:调用Tower服务

作者:Heiko Seeberger 2022年10月23日 标签:Rust, Tokio, Tower

在上一讲中我们了解到,Tower服务由poll_readycall两个方法组成,它们遵循这样的契约:调用者必须先持续调用poll_ready直到返回Poll::Ready,然后才能调用call,否则call可能会panic。

poll_ready的签名如下:

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

要调用poll_ready,我们需要一个std::task::Context。如果你熟悉Future,会发现它的poll方法也有这样的参数。那么我们是否必须实现一个Future才能调用服务呢?

幸运的是不必,因为Tower已经提供了ServiceExt特性,其中包含一些辅助方法,可以按照契约要求调用poll_ready,甚至链式调用poll_readycall

首先我们来看ServiceExt::oneshot。根据文档说明,它会"消费此服务,在就绪时使用提供的请求调用一次":

let mut service = EchoService;
let response = service.oneshot("Hello, Tower!".into()).await?;
println!("Echo service responded with: {response}");

oneshot返回的Future在被轮询时首先调用poll_ready,当服务返回Poll::Ready时再调用call。当然,由于oneshot通过self消费了服务,所以不可能再次调用该服务。但这实际上是Tower提供的唯一能强制执行poll_readycall之间契约的方式。

接下来看看ServiceExt::ready,根据文档说明,它"在服务准备好接受请求时返回一个可变引用":

let mut service = EchoService;

let service = service.ready().await?;
let response = service.call("Hello, Tower!".into()).await?;
println!("Echo service responded with: {response}");

// 再次调用call前应该先调用ready!
// service.ready().await?;
let response = service.call("Hello again, Tower!".into()).await?;
println!("Echo service responded with: {response}");

ready返回的Future会返回服务的可变引用,然后可以用来调用它。如你所见,我们甚至可以不检查就绪状态就多次调用服务。当然这违反了契约,我们应该在每次调用call之前再次调用ready,即在上面的代码片段中应该取消第8行的注释。

最后还有ServiceExt::ready_oneshot。它像ready一样工作,但会消费服务,然后又将其返回:

let service = EchoService;

let mut service = service.ready_oneshot().await?;
let response = service.call("Hello, Tower!".into()).await?;
println!("Echo service responded with: {response}");

// 再次调用call前应该先调用ready_oneshot!
// let mut service = service.ready_oneshot().await?;
let response = service.call("Hello again, Tower!".into()).await?;
println!("Echo service responded with: {response}");

ready类似,我们可以不检查就绪状态就多次调用返回的服务。因此我认为这个方法命名不当,因为"oneshot"对我来说意味着最多只能调用一次call

现在你可能会问,这种总是先调用poll_ready再调用call的服务契约是否真的那么重要。对于无害的EchoService来说确实不重要,因为它总是就绪的,所以它的call方法永远不会panic。我们将在下一讲中看看其他表现不那么好的服务。完整代码照例可在GitHub的tower-experiments仓库找到。

第三讲:就绪状态

作者:Heiko Seeberger 2022年11月8日 标签:Rust, Tokio, Tower

在上一讲中,我们学习了如何调用Tower服务。特别讨论了poll_readycall之间的契约关系:调用者必须首先持续调用poll_ready直到返回Poll::Ready,才能调用call方法,否则call可能会触发panic。

当然,某些服务(例如第一讲中介绍的EchoService)始终处于就绪状态,因此无需调用poll_ready直接调用call也能正常工作。

但通常情况下,服务的内部状态或外部依赖(例如数据库连接)会影响其就绪状态。此外,将领域服务与中间件(如限流器或熔断器)组合时,自然也会影响就绪状态。

为了演示这一点,我们来看一个在"就绪"与"未就绪"状态间交替切换的服务:

/// 交替就绪服务,接收[AlternatingReadyRequest]并返回[AlternatingReadyResponse]
#[derive(Debug, Clone)]
pub struct AlternatingReadyService {
    ready: bool,
}

/// 为[AlternatingReadyService]实现Tower的`Service`特性:
/// 就绪状态在Pending和Ready间交替切换,调用永不失败且响应立即就绪
impl Service<AlternatingReadyRequest> for AlternatingReadyService {
    type Response = AlternatingReadyResponse;
    type Error = Infallible; // 该服务永不失败
    type Future = Ready<Result<Self::Response, Self::Error>>; // 响应立即就绪

    /// 交替返回`Poll::Pending`和`Poll::Ready`
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        if self.ready {
            // 唤醒任务确保执行器再次调用poll_ready
            cx.waker().wake_by_ref();
            self.ready = false;
            Poll::Pending
        } else {
            self.ready = true;
            Poll::Ready(Ok(()))
        }
    }

    /// 始终返回[AlternatingReadyResponse]
    /// 
    /// # Panics
    /// 若未先调用poll_ready就直接调用本方法会panic
    fn call(&mut self, _req: AlternatingReadyRequest) -> Self::Future {
        if !self.ready {
            panic!("服务未就绪,必须先调用poll_ready");
        }
        self.ready = false;
        ready(Ok(AlternatingReadyResponse))
    }
}

AlternatingReadyService通过ready字段跟踪就绪状态。poll_ready的实现逻辑是:若服务之前处于就绪状态,则将其设为未就绪并返回Poll::Pending;反之则设为就绪并返回Poll::Ready。同时,call方法会检查ready字段,若服务未就绪时调用将触发panic。

虽然这个实现是刻意设计的,但它很好地演示了poll_readycall之间契约的重要性:

let mut service = AlternatingReadyService::new();

let service = service.ready().await?;
let _response = service.call(AlternatingReadyRequest).await?;

// 再次调用call前应该先调用ready!
// service.ready().await?;
let _response = service.call(AlternatingReadyRequest).await?;

在这个例子中,第二次call调用会因为服务处于未就绪状态而panic(由于第一次ready调用后服务状态变为未就绪)。如果你想亲自验证,可以在GitHub的tower-experiments仓库查看完整代码。