Skip to content

Commit

Permalink
wip: Add add some instrumentation
Browse files Browse the repository at this point in the history
Adding some spans

todo: clean up this commit

Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Jan 13, 2025
1 parent e5670f8 commit 6574b60
Show file tree
Hide file tree
Showing 21 changed files with 185 additions and 56 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/common/mqtt_channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rumqttc = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt", "time"] }
tracing = { workspace = true }
zeroize = { workspace = true }

[dev-dependencies]
Expand Down
54 changes: 37 additions & 17 deletions crates/common/mqtt_channel/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::SinkExt;
use futures::StreamExt;
use log::error;
use log::info;
use rumqttc::AsyncClient;
use rumqttc::Event;
use rumqttc::EventLoop;
Expand All @@ -18,6 +16,12 @@ use rumqttc::Outgoing;
use rumqttc::Packet;
use std::time::Duration;
use tokio::time::sleep;
use tracing::error;
use tracing::info;
use tracing::instrument;
use tracing::trace;
use tracing::trace_span;
use tracing::Instrument;

/// A connection to some MQTT server
pub struct Connection {
Expand Down Expand Up @@ -88,20 +92,26 @@ impl Connection {

let (mqtt_client, event_loop) =
Connection::open(config, received_sender.clone(), error_sender.clone()).await?;
tokio::spawn(Connection::receiver_loop(
mqtt_client.clone(),
config.clone(),
event_loop,
received_sender,
error_sender.clone(),
));
tokio::spawn(Connection::sender_loop(
mqtt_client,
published_receiver,
error_sender,
config.last_will_message.clone(),
pub_done_sender,
));
tokio::spawn(
Connection::receiver_loop(
mqtt_client.clone(),
config.clone(),
event_loop,
received_sender,
error_sender.clone(),
)
.instrument(trace_span!("receiver_loop")),
);
tokio::spawn(
Connection::sender_loop(
mqtt_client,
published_receiver,
error_sender,
config.last_will_message.clone(),
pub_done_sender,
)
.instrument(trace_span!("sender_loop")),
);

Ok(Connection {
received: received_receiver,
Expand Down Expand Up @@ -194,15 +204,20 @@ impl Connection {
Ok((mqtt_client, event_loop))
}

#[instrument(skip_all, level = "trace")]
async fn receiver_loop(
mqtt_client: AsyncClient,
config: Config,
mut event_loop: EventLoop,
mut message_sender: mpsc::UnboundedSender<MqttMessage>,
mut error_sender: mpsc::UnboundedSender<MqttError>,
) -> Result<(), MqttError> {
trace!("starting receiver loop");
loop {
match event_loop.poll().await {
trace!("attempting recv");
let event = event_loop.poll().await;
trace!("recv");
match event {
Ok(Event::Incoming(Packet::Publish(msg))) => {
if msg.payload.len() > config.max_packet_size {
error!("Dropping message received on topic {} with payload size {} that exceeds the maximum packet size of {}",
Expand Down Expand Up @@ -266,9 +281,11 @@ impl Connection {
// No more messages will be forwarded to the client
let _ = message_sender.close().await;
let _ = error_sender.close().await;
trace!("terminating receiver loop");
Ok(())
}

#[instrument(skip_all, level = "trace")]
async fn sender_loop(
mqtt_client: AsyncClient,
mut messages_receiver: mpsc::UnboundedReceiver<MqttMessage>,
Expand All @@ -277,20 +294,23 @@ impl Connection {
done: oneshot::Sender<()>,
) {
loop {
trace!("waiting for message");
match messages_receiver.next().await {
None => {
// The sender channel has been closed by the client
// No more messages will be published by the client
break;
}
Some(message) => {
trace!(msg = ?message, "received message");
let payload = Vec::from(message.payload_bytes());
if let Err(err) = mqtt_client
.publish(message.topic, message.qos, message.retain, payload)
.await
{
let _ = error_sender.send(err.into()).await;
}
trace!("passed to rumqttc");
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/tedge_actors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tokio = { workspace = true, default_features = false, features = [
"macros",
"time",
] }
tracing = { workspace = true }

[dev-dependencies]
env_logger = { workspace = true } # TODO: remove me
Expand Down
15 changes: 14 additions & 1 deletion crates/core/tedge_actors/src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use async_trait::async_trait;
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::SinkExt;
use tracing::debug;
use tracing::instrument;

/// A sender of messages of type `M`
///
Expand Down Expand Up @@ -75,13 +77,24 @@ impl<M: Message, N: Message + Into<M>> CloneSender<N> for DynSender<M> {
#[async_trait]
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::Sender<M> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(SinkExt::send(&mut self, message.into()).await?)
let message = message.into();
if let Err(err) = self.try_send(message) {
if err.is_full() {
debug!("Sender is full");
}
let message = err.into_inner();
SinkExt::send(&mut self, message).await?;
debug!("Blocked send completed");
}

Ok(())
}
}

/// An `mpsc::UnboundedSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
#[async_trait]
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::UnboundedSender<M> {
#[instrument(name = "mpsc::UnboundedSender::send", skip_all)]
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(SinkExt::send(&mut self, message.into()).await?)
}
Expand Down
8 changes: 6 additions & 2 deletions crates/core/tedge_actors/src/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ use futures::channel::mpsc;
use futures::StreamExt;
use log::debug;
use std::fmt::Debug;
use tracing::instrument;

#[async_trait]
pub trait MessageReceiver<Input> {
Expand Down Expand Up @@ -163,13 +164,15 @@ impl<Input: Debug> LoggingReceiver<Input> {
impl<Input: Send + Debug> MessageReceiver<Input> for LoggingReceiver<Input> {
async fn try_recv(&mut self) -> Result<Option<Input>, RuntimeRequest> {
let message = self.receiver.try_recv().await;
debug!(target: &self.name, "recv {:?}", message);
debug!("recv {:?}", message);
message
}

#[instrument(name = "LoggingReceiver::recv", skip(self), fields(name = self.name))]
async fn recv(&mut self) -> Option<Input> {
debug!("attempting recv");
let message = self.receiver.recv().await;
debug!(target: &self.name, "recv {:?}", message);
debug!("recv");
message
}

Expand Down Expand Up @@ -202,6 +205,7 @@ impl<Output> LoggingSender<Output> {

#[async_trait]
impl<Output: Message> Sender<Output> for LoggingSender<Output> {
#[instrument(name = "LoggingSender::send", skip(self, message), fields(name = self.name))]
async fn send(&mut self, message: Output) -> Result<(), ChannelError> {
log_message_sent(&self.name, &message);
self.sender.send(message).await
Expand Down
9 changes: 8 additions & 1 deletion crates/core/tedge_actors/src/run_actor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use tracing::debug_span;
use tracing::Instrument;

use crate::Actor;
use crate::Builder;
use crate::DynSender;
Expand Down Expand Up @@ -39,7 +42,11 @@ impl RunActor {
}

pub async fn run(self) -> Result<(), RuntimeError> {
self.actor.run_boxed().await
let name = self.actor.name().to_string();
self.actor
.run_boxed()
.instrument(debug_span!("Actor::run", actor_name = name))
.await
}
}

Expand Down
12 changes: 7 additions & 5 deletions crates/core/tedge_actors/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ use crate::RuntimeRequestSink;
use futures::channel::mpsc;
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use log::debug;
use log::error;
use log::info;
use std::collections::HashMap;
use std::panic;
use std::time::Duration;
use tokio::task::JoinError;
use tokio::task::JoinHandle;
use tracing::debug;
use tracing::error;
use tracing::info;

// TODO: set back to 60
const ACTORS_EXIT_TIMEOUT: Duration = Duration::from_secs(5);

/// Actions sent by actors to the runtime
#[derive(Debug)]
Expand Down Expand Up @@ -64,8 +67,7 @@ impl Runtime {

fn with_events_sender(events_sender: Option<DynSender<RuntimeEvent>>) -> Runtime {
let (actions_sender, actions_receiver) = mpsc::channel(16);
let runtime_actor =
RuntimeActor::new(actions_receiver, events_sender, Duration::from_secs(60));
let runtime_actor = RuntimeActor::new(actions_receiver, events_sender, ACTORS_EXIT_TIMEOUT);

let runtime_task = tokio::spawn(runtime_actor.run());
Runtime {
Expand Down
2 changes: 2 additions & 0 deletions crates/core/tedge_actors/src/servers/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use futures::channel::oneshot;
use futures::StreamExt;
use std::fmt::Debug;
use std::ops::ControlFlow;
use tracing::instrument;

/// A message box for a request-response server
pub type ServerMessageBox<Request, Response> = LoggingReceiver<RequestEnvelope<Request, Response>>;
Expand Down Expand Up @@ -146,6 +147,7 @@ impl<Request, Response> Clone for RequestSender<Request, Response> {

#[async_trait]
impl<Request: Message, Response: Message> Sender<Request> for RequestSender<Request, Response> {
#[instrument(name = "RequestSender::send", skip_all)]
async fn send(&mut self, request: Request) -> Result<(), ChannelError> {
let reply_to = self.reply_to.sender();
self.sender
Expand Down
3 changes: 2 additions & 1 deletion crates/core/tedge_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ json-writer = { workspace = true }
log = { workspace = true }
mqtt_channel = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
serde = { workspace = true, features = ["derive", "rc"] }
serde_json = { workspace = true }
shell-words = { workspace = true }
tedge_utils = { workspace = true, features = ["timestamp"] }
Expand All @@ -30,6 +30,7 @@ time = { workspace = true, features = [
"serde-well-known",
] }
tokio = { workspace = true, features = ["fs", "process"] }
tracing = { workspace = true }

[dev-dependencies]
anyhow = { workspace = true }
Expand Down
Loading

0 comments on commit 6574b60

Please sign in to comment.