summaryrefslogtreecommitdiff
path: root/ofborg/tickborg/src/tasks/statscollector.rs
blob: fef23ad3c6001f867ac79429d19ebdb5ec683630 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use crate::stats;
use crate::worker;

use tracing::error;

pub struct StatCollectorWorker<E> {
    events: E,
    collector: stats::MetricCollector,
}

impl<E: stats::SysEvents + 'static> StatCollectorWorker<E> {
    pub fn new(events: E, collector: stats::MetricCollector) -> StatCollectorWorker<E> {
        StatCollectorWorker { events, collector }
    }
}

impl<E: stats::SysEvents + 'static> worker::SimpleWorker for StatCollectorWorker<E> {
    type J = stats::EventMessage;

    async fn msg_to_job(
        &mut self,
        _: &str,
        _: &Option<String>,
        body: &[u8],
    ) -> Result<Self::J, String> {
        match serde_json::from_slice(body) {
            Ok(e) => Ok(e),
            Err(_) => {
                let mut modified_body: Vec<u8> = vec![b"\""[0]];
                modified_body.append(&mut body.to_vec());
                modified_body.push(b"\""[0]);

                match serde_json::from_slice(&modified_body) {
                    Ok(event) => {
                        self.events
                            .notify(stats::Event::StatCollectorLegacyEvent(
                                stats::event_metric_name(&event),
                            ))
                            .await;
                        Ok(stats::EventMessage {
                            sender: "".to_owned(),
                            events: vec![event],
                        })
                    }
                    Err(err) => {
                        self.events
                            .notify(stats::Event::StatCollectorBogusEvent)
                            .await;
                        error!(
                            "Failed to decode message: {:?}, Err: {err:?}",
                            std::str::from_utf8(body).unwrap_or("<message not utf8>")
                        );
                        Err("Failed to decode message".to_owned())
                    }
                }
            }
        }
    }

    async fn consumer(&mut self, job: &stats::EventMessage) -> worker::Actions {
        let sender = job.sender.clone();
        for event in job.events.iter() {
            self.collector.record(sender.clone(), event.clone());
        }

        vec![worker::Action::Ack]
    }
}