use crate::stats; use crate::worker; use tracing::error; pub struct StatCollectorWorker { events: E, collector: stats::MetricCollector, } impl StatCollectorWorker { pub fn new(events: E, collector: stats::MetricCollector) -> StatCollectorWorker { StatCollectorWorker { events, collector } } } impl worker::SimpleWorker for StatCollectorWorker { type J = stats::EventMessage; async fn msg_to_job( &mut self, _: &str, _: &Option, body: &[u8], ) -> Result { match serde_json::from_slice(body) { Ok(e) => Ok(e), Err(_) => { let mut modified_body: Vec = vec![b"\""[0]]; modified_body.append(&mut body.to_vec()); modified_body.push(b"\""[0]); match serde_json::from_slice(&modified_body) { Ok(event) => { self.events .notify(stats::Event::StatCollectorLegacyEvent( stats::event_metric_name(&event), )) .await; Ok(stats::EventMessage { sender: "".to_owned(), events: vec![event], }) } Err(err) => { self.events .notify(stats::Event::StatCollectorBogusEvent) .await; error!( "Failed to decode message: {:?}, Err: {err:?}", std::str::from_utf8(body).unwrap_or("") ); Err("Failed to decode message".to_owned()) } } } } } async fn consumer(&mut self, job: &stats::EventMessage) -> worker::Actions { let sender = job.sender.clone(); for event in job.events.iter() { self.collector.record(sender.clone(), event.clone()); } vec![worker::Action::Ack] } }