1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
use crate::stats;
use crate::worker;
use tracing::error;
pub struct StatCollectorWorker<E> {
events: E,
collector: stats::MetricCollector,
}
impl<E: stats::SysEvents + 'static> StatCollectorWorker<E> {
pub fn new(events: E, collector: stats::MetricCollector) -> StatCollectorWorker<E> {
StatCollectorWorker { events, collector }
}
}
impl<E: stats::SysEvents + 'static> worker::SimpleWorker for StatCollectorWorker<E> {
type J = stats::EventMessage;
async fn msg_to_job(
&mut self,
_: &str,
_: &Option<String>,
body: &[u8],
) -> Result<Self::J, String> {
match serde_json::from_slice(body) {
Ok(e) => Ok(e),
Err(_) => {
let mut modified_body: Vec<u8> = vec![b"\""[0]];
modified_body.append(&mut body.to_vec());
modified_body.push(b"\""[0]);
match serde_json::from_slice(&modified_body) {
Ok(event) => {
self.events
.notify(stats::Event::StatCollectorLegacyEvent(
stats::event_metric_name(&event),
))
.await;
Ok(stats::EventMessage {
sender: "".to_owned(),
events: vec![event],
})
}
Err(err) => {
self.events
.notify(stats::Event::StatCollectorBogusEvent)
.await;
error!(
"Failed to decode message: {:?}, Err: {err:?}",
std::str::from_utf8(body).unwrap_or("<message not utf8>")
);
Err("Failed to decode message".to_owned())
}
}
}
}
}
async fn consumer(&mut self, job: &stats::EventMessage) -> worker::Actions {
let sender = job.sender.clone();
for event in job.events.iter() {
self.collector.record(sender.clone(), event.clone());
}
vec![worker::Action::Ack]
}
}
|