This change fixes tasks hanging due to rare cases of messages being lost between full Discord reconnections by placing a configurable timeout on the `ConnectionInfo` responses. This is a companion fix to [serenity#1255](https://github.com/serenity-rs/serenity/pull/1255). To make this doable, `Config`s are now used by all versions of `Songbird`/`Call`, and relevant functions are added to simplify setup with configuration. These are now non-exhaustive, correcting an earlier oversight. For future extensibility, this PR moves the return type of `join`/`join_gateway` into a custom future (no longer leaking flume's `RecvFut` type). Additionally, this fixes the Makefile's feature sets for driver/gateway-only compilation. This is a breaking change in: * the return types of `join`/`join_gateway` * moving `crate::driver::Config` -> `crate::Config`, * `Config` and `JoinError` becoming `#[non_breaking]`. This was tested via `cargo make ready`, and by testing `examples/serenity/voice_receive` with various timeout settings.
175 lines
4.6 KiB
Rust
175 lines
4.6 KiB
Rust
//! Future types for gateway interactions.
|
|
|
|
#[cfg(feature = "driver-core")]
|
|
use crate::error::ConnectionResult;
|
|
use crate::{
|
|
error::{JoinError, JoinResult},
|
|
ConnectionInfo,
|
|
};
|
|
use core::{
|
|
convert,
|
|
future::Future,
|
|
marker::Unpin,
|
|
pin::Pin,
|
|
task::{Context, Poll},
|
|
time::Duration,
|
|
};
|
|
use flume::r#async::RecvFut;
|
|
use pin_project::pin_project;
|
|
#[cfg(not(feature = "tokio-02-marker"))]
|
|
use tokio::time::{self, Timeout};
|
|
#[cfg(feature = "tokio-02-marker")]
|
|
use tokio_compat::time::{self, Timeout};
|
|
|
|
#[cfg(feature = "driver-core")]
|
|
/// Future for a call to [`Call::join`].
|
|
///
|
|
/// This future `await`s Discord's response *and*
|
|
/// connection via the [`Driver`]. Both phases have
|
|
/// separate timeouts and failure conditions.
|
|
///
|
|
/// This future ***must not*** be `await`ed while
|
|
/// holding the lock around a [`Call`].
|
|
///
|
|
/// [`Call::join`]: crate::Call::join
|
|
/// [`Call`]: crate::Call
|
|
/// [`Driver`]: crate::driver::Driver
|
|
#[pin_project]
|
|
pub struct Join {
|
|
#[pin]
|
|
gw: JoinClass<()>,
|
|
#[pin]
|
|
driver: JoinClass<ConnectionResult<()>>,
|
|
state: JoinState,
|
|
}
|
|
|
|
#[cfg(feature = "driver-core")]
|
|
impl Join {
|
|
pub(crate) fn new(
|
|
driver: RecvFut<'static, ConnectionResult<()>>,
|
|
gw_recv: RecvFut<'static, ()>,
|
|
timeout: Option<Duration>,
|
|
) -> Self {
|
|
Self {
|
|
gw: JoinClass::new(gw_recv, timeout),
|
|
driver: JoinClass::new(driver, None),
|
|
state: JoinState::BeforeGw,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "driver-core")]
|
|
impl Future for Join {
|
|
type Output = JoinResult<()>;
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
let this = self.project();
|
|
|
|
if *this.state == JoinState::BeforeGw {
|
|
let poll = this.gw.poll(cx);
|
|
match poll {
|
|
Poll::Ready(a) if a.is_ok() => {
|
|
*this.state = JoinState::AfterGw;
|
|
},
|
|
Poll::Ready(a) => {
|
|
*this.state = JoinState::Finalised;
|
|
return Poll::Ready(a);
|
|
},
|
|
Poll::Pending => return Poll::Pending,
|
|
}
|
|
}
|
|
|
|
if *this.state == JoinState::AfterGw {
|
|
let poll = this
|
|
.driver
|
|
.poll(cx)
|
|
.map_ok(|res| res.map_err(JoinError::Driver))
|
|
.map(|res| res.and_then(convert::identity));
|
|
|
|
match poll {
|
|
Poll::Ready(a) => {
|
|
*this.state = JoinState::Finalised;
|
|
return Poll::Ready(a);
|
|
},
|
|
Poll::Pending => return Poll::Pending,
|
|
}
|
|
}
|
|
|
|
Poll::Pending
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "driver-core")]
|
|
#[derive(Copy, Clone, Eq, PartialEq)]
|
|
enum JoinState {
|
|
BeforeGw,
|
|
AfterGw,
|
|
Finalised,
|
|
}
|
|
|
|
/// Future for a call to [`Call::join_gateway`].
|
|
///
|
|
/// This future `await`s Discord's gateway response, subject
|
|
/// to any timeouts.
|
|
///
|
|
/// This future ***must not*** be `await`ed while
|
|
/// holding the lock around a [`Call`].
|
|
///
|
|
/// [`Call::join_gateway`]: crate::Call::join_gateway
|
|
/// [`Call`]: crate::Call
|
|
/// [`Driver`]: crate::driver::Driver
|
|
#[pin_project]
|
|
pub struct JoinGateway {
|
|
#[pin]
|
|
inner: JoinClass<ConnectionInfo>,
|
|
}
|
|
|
|
impl JoinGateway {
|
|
pub(crate) fn new(recv: RecvFut<'static, ConnectionInfo>, timeout: Option<Duration>) -> Self {
|
|
Self {
|
|
inner: JoinClass::new(recv, timeout),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Future for JoinGateway {
|
|
type Output = JoinResult<ConnectionInfo>;
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
self.project().inner.poll(cx)
|
|
}
|
|
}
|
|
|
|
#[pin_project(project = JoinClassProj)]
|
|
enum JoinClass<T: 'static> {
|
|
WithTimeout(#[pin] Timeout<RecvFut<'static, T>>),
|
|
Vanilla(RecvFut<'static, T>),
|
|
}
|
|
|
|
impl<T: 'static> JoinClass<T> {
|
|
pub(crate) fn new(recv: RecvFut<'static, T>, timeout: Option<Duration>) -> Self {
|
|
match timeout {
|
|
Some(t) => JoinClass::WithTimeout(time::timeout(t, recv)),
|
|
None => JoinClass::Vanilla(recv),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Future for JoinClass<T>
|
|
where
|
|
T: Unpin,
|
|
{
|
|
type Output = JoinResult<T>;
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
match self.project() {
|
|
JoinClassProj::WithTimeout(t) => t
|
|
.poll(cx)
|
|
.map_err(|_| JoinError::TimedOut)
|
|
.map_ok(|res| res.map_err(|_| JoinError::Dropped))
|
|
.map(|m| m.and_then(convert::identity)),
|
|
JoinClassProj::Vanilla(t) => Pin::new(t).poll(cx).map_err(|_| JoinError::Dropped),
|
|
}
|
|
}
|
|
}
|