diff --git a/Cargo.lock b/Cargo.lock index 0400103b8eacdf18ffe6ad92f1d5606b2f3d3176..17ad62b91d4a6babecf74582ecc16619317def2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -99,6 +99,38 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "crossbeam-deque" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fca89a0e215bab21874660c67903c5f143333cab1da83d041c7ded6053774751" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2fe95351b870527a5d09bf563ed3c97c0cffb87cf1c78a591bf48bb218d9aa" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" +dependencies = [ + "cfg-if", +] + [[package]] name = "deranged" version = "0.3.10" @@ -115,6 +147,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +[[package]] +name = "either" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" + [[package]] name = "encoding_rs" version = "0.8.33" @@ -412,6 +450,15 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -445,7 +492,10 @@ dependencies = [ "dotenv", "enum-as-inner", "prometheus-http-query", + "rayon", "reqwest", + "serde", + "serde_json", "tokio", "url", ] @@ -599,6 +649,26 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rayon" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index 08fef249d36b8d162e6b15d945510fea91028e02..bb06babb9776a2b03d730ad8981427a3864c7b58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,13 @@ enum-as-inner = "0.6.0" prometheus-http-query = "0.8.0" reqwest = "0.11.22" url = "2.5.0" +serde = { version = "1.0.193", optional = true } +serde_json = { version = "1.0.108", optional = true } +rayon = { version = "1.8.0", optional = true } + +[features] +default = [] +utils = ["dep:serde", "dep:serde_json", "dep:rayon"] [dev-dependencies] tokio = { version = "1.34.0", default-features = false, features = ["rt", "rt-multi-thread", "macros"]} diff --git a/src/lib.rs b/src/lib.rs index 44c6480f1a218d45aa484047f8c8f1f051e68363..887a231da636edc788f1fe113e1ed3a1b65dbc7c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,8 @@ pub mod query; pub mod result; +#[cfg(feature = "utils")] +pub mod utils; use prometheus_http_query::Client; use query::IntoQuery; diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000000000000000000000000000000000000..c47ccd3431222a460ff3fc47484fee401eb90af6 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,140 @@ +//! Utilities for handling the query result data +//! +//! Some of the functionality provided by this module may be rendered redundant if the dependency on `prometheus-http-query` is dropped +//! in the future in favor of a fully custom implementation of the query client and the response deserialization process. + +use prometheus_http_query::response::{InstantVector, RangeVector, Sample}; +use rayon::prelude::*; +use serde::de::DeserializeOwned; +use std::collections::HashMap; + +/// Helper trait for bulk type conversion of query result data +/// +/// Convert the query result data such as `Vec<InstantVector>` and `Vec<RangeVector>` +/// containing raw labels of the type `HashMap<String, String>` into special custom types that support +/// user defined structs that mimic the structure of the labels. +/// +/// The user defined struct should implement serde's `Deserialize` trait and must contain fields of +/// the same name as the keys in the labels/metric's `HashMap`. This is similar to deserializing from a __Json__ object. +/// For example, if the labels `HashMap` has keys named "foo", "bar", the struct must have the fields of the same name. +/// It would look something like `struct Custom { foo: String, bar: String }` +/// +/// The [``Convert::convert``] and [``Convert::convert_par``] are both fallible, and return a `serde_json::Error` type on failure. +/// The former function processes data in a single thread, while the latter uses `rayon` for multithreaded processing. +/// +/// For `InstantVector` or a `RangeVector`, use `try_into` or `try_from` using [`CustomInstantVec`] and [`CustomRangeVec`], or use the +/// free-standing function [`convert_labels`] to convert a single set of labels (HashMap). +/// +/// # Example +/// ```ignore +/// use mquery::{result::Error, utils::Convert, QueryManager}; +/// +/// #[derive(serde::Deserialize, Debug, Clone)] +/// struct Custom { // custom type that represents the metric labels +/// method: String, +/// endpoint: String, +/// } +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Error> { +/// let resp = QueryManager::default().query("http_requests").await?; +/// let data = resp.into_vector().unwrap(); +/// // returns `Ok(CustomInstantVec<Custom>)` if conversion succeeds +/// if let Ok(converted) = data.convert_par::<Custom>() { +/// println!("{converted:?}"); +/// } +/// Ok(()) +/// } +/// ``` +pub trait Convert { + type Target<M: DeserializeOwned>; + type Error; + /// Bulk convert the query result data into a 'Custom' type that can hold a user-defined struct of the same structure as the label data + /// + /// For a single set of labels, use `try_into` or `try_from` using [`CustomInstantVec`] and [`CustomRangeVec`]. + /// See the trait level docs for more information. + fn convert<M: DeserializeOwned>(self) -> Result<Self::Target<M>, Self::Error>; + + /// Parallelly (using `rayon`) convert the query result data into a 'Custom' type that can hold a user-defined struct of the same structure as the label data + /// + /// For a single set of labels, use `try_into` or `try_from` using [`CustomInstantVec`] and [`CustomRangeVec`]. + /// See the trait level docs for more information. + fn convert_par<M: DeserializeOwned + Send>(self) -> Result<Self::Target<M>, Self::Error>; +} + +impl Convert for Vec<InstantVector> { + type Target<M: DeserializeOwned> = Vec<CustomInstantVec<M>>; + type Error = serde_json::Error; + + fn convert<M: DeserializeOwned>(self) -> Result<Self::Target<M>, Self::Error> { + self.into_iter().map(|v| v.try_into()).collect() + } + + fn convert_par<M: DeserializeOwned + Send>(self) -> Result<Self::Target<M>, Self::Error> { + self.into_par_iter().map(|v| v.try_into()).collect() + } +} + +impl Convert for Vec<RangeVector> { + type Target<M: DeserializeOwned> = Vec<CustomRangeVec<M>>; + type Error = serde_json::Error; + + fn convert<M: DeserializeOwned>(self) -> Result<Self::Target<M>, Self::Error> { + self.into_iter().map(|v| v.try_into()).collect() + } + + fn convert_par<M: DeserializeOwned + Send>(self) -> Result<Self::Target<M>, Self::Error> { + self.into_par_iter().map(|v| v.try_into()).collect() + } +} + +pub fn convert_labels<T: DeserializeOwned>( + labels: HashMap<String, String>, +) -> Result<T, serde_json::Error> { + serde_json::from_value(serde_json::to_value(labels)?) +} + +/// Analogue of the [`InstantVector`] type which is generic over the type that represents labels +/// +/// This can be used directly to convert an [`InstantVector`] using `try_from` with type hinting, +/// or use the [`Convert`] trait for bulk conversion of type `Vec<InstantVector>`. +/// See the trait level docs for [`Convert`] for more information. +#[derive(Clone, Debug)] +pub struct CustomInstantVec<M> { + pub labels: M, + pub sample: Sample, +} + +impl<M: DeserializeOwned> TryFrom<InstantVector> for CustomInstantVec<M> { + type Error = serde_json::Error; + + fn try_from(value: InstantVector) -> Result<Self, Self::Error> { + let (labels, sample) = value.into_inner(); + Ok(Self { + labels: convert_labels(labels)?, + sample, + }) + } +} + +/// Analogue of the [`RangeVector`] type which is generic over the type that represents labels +/// +/// This can be used directly to convert a [`RangeVector`] using `try_from` with type hinting, +/// or use the [`Convert`] trait for bulk conversion of type `Vec<InstantVector>`. +/// See the trait level docs for [`Convert`] for more information. +pub struct CustomRangeVec<M> { + pub labels: M, + pub samples: Vec<Sample>, +} + +impl<M: DeserializeOwned> TryFrom<RangeVector> for CustomRangeVec<M> { + type Error = serde_json::Error; + + fn try_from(value: RangeVector) -> Result<Self, Self::Error> { + let (labels, samples) = value.into_inner(); + Ok(Self { + labels: convert_labels(labels)?, + samples, + }) + } +}