Implement Songbird driver configuration (#1074)

This commit is contained in:
Kyle Simpson
2020-11-11 22:40:09 +00:00
committed by GitHub
parent 26c9c9117c
commit 8b7f388f7b
14 changed files with 604 additions and 113 deletions

View File

@@ -2,13 +2,16 @@ use super::{
error::{Error, Result},
message::*,
};
use crate::{constants::*, driver::CryptoMode, events::CoreContext};
use crate::{
constants::*,
driver::{Config, DecodeMode},
events::CoreContext,
};
use audiopus::{coder::Decoder as OpusDecoder, Channels};
use discortp::{
demux::{self, DemuxedMut},
rtp::{RtpExtensionPacket, RtpPacket},
FromPacket,
MutablePacket,
Packet,
PacketSize,
};
@@ -16,7 +19,7 @@ use flume::Receiver;
use std::collections::HashMap;
use tokio::net::udp::RecvHalf;
use tracing::{error, info, instrument, warn};
use xsalsa20poly1305::{aead::AeadInPlace, Nonce, Tag, XSalsa20Poly1305 as Cipher, TAG_SIZE};
use xsalsa20poly1305::XSalsa20Poly1305 as Cipher;
#[derive(Debug)]
struct SsrcState {
@@ -46,19 +49,38 @@ impl SsrcState {
&mut self,
pkt: RtpPacket<'_>,
data_offset: usize,
) -> Result<(SpeakingDelta, Vec<i16>)> {
data_trailer: usize,
decode_mode: DecodeMode,
decrypted: bool,
) -> Result<(SpeakingDelta, Option<Vec<i16>>)> {
let new_seq: u16 = pkt.get_sequence().into();
let payload_len = pkt.payload().len();
let extensions = pkt.get_extension() != 0;
let seq_delta = new_seq.wrapping_sub(self.last_seq);
Ok(if seq_delta >= (1 << 15) {
// Overflow, reordered (previously missing) packet.
(SpeakingDelta::Same, vec![])
(SpeakingDelta::Same, Some(vec![]))
} else {
self.last_seq = new_seq;
let missed_packets = seq_delta.saturating_sub(1);
let (audio, pkt_size) =
self.scan_and_decode(&pkt.payload()[data_offset..], extensions, missed_packets)?;
// Note: we still need to handle this for non-decoded.
// This is mainly because packet events and speaking events can be handed to the
// user.
let (audio, pkt_size) = if decode_mode.should_decrypt() && decrypted {
self.scan_and_decode(
&pkt.payload()[data_offset..payload_len - data_trailer],
extensions,
missed_packets,
decode_mode == DecodeMode::Decode,
)?
} else {
// The latter part is an upper bound, as we cannot determine
// how long packet extensions are.
// WIthout decryption, speaking detection is thus broken.
(None, payload_len - data_offset - data_trailer)
};
let delta = if pkt_size == SILENT_FRAME.len() {
// Frame is silent.
@@ -91,8 +113,8 @@ impl SsrcState {
data: &[u8],
extension: bool,
missed_packets: u16,
) -> Result<(Vec<i16>, usize)> {
let mut out = vec![0; STEREO_FRAME_SIZE];
decode: bool,
) -> Result<(Option<Vec<i16>>, usize)> {
let start = if extension {
RtpExtensionPacket::new(data)
.map(|pkt| pkt.packet_size())
@@ -104,26 +126,34 @@ impl SsrcState {
Ok(0)
}?;
for _ in 0..missed_packets {
let missing_frame: Option<&[u8]> = None;
if let Err(e) = self.decoder.decode(missing_frame, &mut out[..], false) {
warn!("Issue while decoding for missed packet: {:?}.", e);
let pkt = if decode {
let mut out = vec![0; STEREO_FRAME_SIZE];
for _ in 0..missed_packets {
let missing_frame: Option<&[u8]> = None;
if let Err(e) = self.decoder.decode(missing_frame, &mut out[..], false) {
warn!("Issue while decoding for missed packet: {:?}.", e);
}
}
}
let audio_len = self
.decoder
.decode(Some(&data[start..]), &mut out[..], false)
.map_err(|e| {
error!("Failed to decode received packet: {:?}.", e);
e
})?;
let audio_len = self
.decoder
.decode(Some(&data[start..]), &mut out[..], false)
.map_err(|e| {
error!("Failed to decode received packet: {:?}.", e);
e
})?;
// Decoding to stereo: audio_len refers to sample count irrespective of channel count.
// => multiply by number of channels.
out.truncate(2 * audio_len);
// Decoding to stereo: audio_len refers to sample count irrespective of channel count.
// => multiply by number of channels.
out.truncate(2 * audio_len);
Ok((out, data.len() - start))
Some(out)
} else {
None
};
Ok((pkt, data.len() - start))
}
}
@@ -131,7 +161,7 @@ struct UdpRx {
cipher: Cipher,
decoder_map: HashMap<u32, SsrcState>,
#[allow(dead_code)]
mode: CryptoMode, // In future, this will allow crypto mode selection.
config: Config,
packet_buffer: [u8; VOICE_PACKET_MAX],
rx: Receiver<UdpRxMessage>,
udp_socket: RecvHalf,
@@ -150,7 +180,10 @@ impl UdpRx {
match msg {
Ok(ReplaceInterconnect(i)) => {
*interconnect = i;
}
},
Ok(SetConfig(c)) => {
self.config = c;
},
Ok(Poison) | Err(_) => break,
}
}
@@ -166,6 +199,7 @@ impl UdpRx {
// For simplicity, we nominate the mixing context to rebuild the event
// context if it fails (hence, the `let _ =` statements.), as it will try to
// make contact every 20ms.
let crypto_mode = self.config.crypto_mode;
let packet = &mut self.packet_buffer[..len];
match demux::demux_mut(packet) {
@@ -175,15 +209,40 @@ impl UdpRx {
return;
}
let rtp_body_start =
decrypt_in_place(&mut rtp, &self.cipher).expect("RTP decryption failed.");
let packet_data = if self.config.decode_mode.should_decrypt() {
let out = crypto_mode
.decrypt_in_place(&mut rtp, &self.cipher)
.map(|(s, t)| (s, t, true));
if let Err(e) = out {
warn!("RTP decryption failed: {:?}", e);
}
out.ok()
} else {
None
};
let (rtp_body_start, rtp_body_tail, decrypted) = packet_data.unwrap_or_else(|| {
(
crypto_mode.payload_prefix_len(),
crypto_mode.payload_suffix_len(),
false,
)
});
let entry = self
.decoder_map
.entry(rtp.get_ssrc())
.or_insert_with(|| SsrcState::new(rtp.to_immutable()));
if let Ok((delta, audio)) = entry.process(rtp.to_immutable(), rtp_body_start) {
if let Ok((delta, audio)) = entry.process(
rtp.to_immutable(),
rtp_body_start,
rtp_body_tail,
self.config.decode_mode,
decrypted,
) {
match delta {
SpeakingDelta::Start => {
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
@@ -209,25 +268,40 @@ impl UdpRx {
audio,
packet: rtp.from_packet(),
payload_offset: rtp_body_start,
payload_end_pad: rtp_body_tail,
},
));
} else {
warn!("RTP decoding/decrytion failed.");
warn!("RTP decoding/processing failed.");
}
},
DemuxedMut::Rtcp(mut rtcp) => {
let rtcp_body_start = decrypt_in_place(&mut rtcp, &self.cipher);
let packet_data = if self.config.decode_mode.should_decrypt() {
let out = crypto_mode.decrypt_in_place(&mut rtcp, &self.cipher);
if let Ok(start) = rtcp_body_start {
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::RtcpPacket {
packet: rtcp.from_packet(),
payload_offset: start,
},
));
if let Err(e) = out {
warn!("RTCP decryption failed: {:?}", e);
}
out.ok()
} else {
warn!("RTCP decryption failed.");
}
None
};
let (start, tail) = packet_data.unwrap_or_else(|| {
(
crypto_mode.payload_prefix_len(),
crypto_mode.payload_suffix_len(),
)
});
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::RtcpPacket {
packet: rtcp.from_packet(),
payload_offset: start,
payload_end_pad: tail,
},
));
},
DemuxedMut::FailedParse(t) => {
warn!("Failed to parse message of type {:?}.", t);
@@ -244,7 +318,7 @@ pub(crate) async fn runner(
mut interconnect: Interconnect,
rx: Receiver<UdpRxMessage>,
cipher: Cipher,
mode: CryptoMode,
config: Config,
udp_socket: RecvHalf,
) {
info!("UDP receive handle started.");
@@ -252,7 +326,7 @@ pub(crate) async fn runner(
let mut state = UdpRx {
cipher,
decoder_map: Default::default(),
mode,
config,
packet_buffer: [0u8; VOICE_PACKET_MAX],
rx,
udp_socket,
@@ -263,23 +337,6 @@ pub(crate) async fn runner(
info!("UDP receive handle stopped.");
}
#[inline]
fn decrypt_in_place(packet: &mut impl MutablePacket, cipher: &Cipher) -> Result<usize> {
// Applies discord's cheapest.
// In future, might want to make a choice...
let header_len = packet.packet().len() - packet.payload().len();
let mut nonce = Nonce::default();
nonce[..header_len].copy_from_slice(&packet.packet()[..header_len]);
let data = packet.payload_mut();
let (tag_bytes, data_bytes) = data.split_at_mut(TAG_SIZE);
let tag = Tag::from_slice(tag_bytes);
Ok(cipher
.decrypt_in_place_detached(&nonce, b"", data_bytes, tag)
.map(|_| TAG_SIZE)?)
}
#[inline]
fn rtp_valid(packet: RtpPacket<'_>) -> bool {
packet.get_version() == RTP_VERSION && packet.get_payload_type() == RTP_PROFILE_TYPE