fix: Add retry for retrieving schemas

This commit is contained in:
Gerald Pinder 2025-04-28 20:44:48 -04:00
parent b0f1269e1e
commit 6bae48bd88
2 changed files with 44 additions and 20 deletions

View file

@ -6,8 +6,9 @@ use std::{
use blue_build_process_management::ASYNC_RUNTIME;
use blue_build_recipe::ModuleTypeVersion;
use blue_build_utils::constants::{
CUSTOM_MODULE_SCHEMA, IMPORT_MODULE_SCHEMA, JSON_SCHEMA, STAGE_SCHEMA,
use blue_build_utils::{
constants::{CUSTOM_MODULE_SCHEMA, IMPORT_MODULE_SCHEMA, JSON_SCHEMA, STAGE_SCHEMA},
retry_async,
};
use bon::bon;
use cached::proc_macro::cached;
@ -336,22 +337,24 @@ async fn cache_retrieve(uri: &Uri<String>) -> miette::Result<Value> {
};
log::debug!("Retrieving schema from {}", uri.bold().italic());
tokio::spawn(async move {
let response = reqwest::get(&uri)
.await
.into_diagnostic()
.with_context(|| format!("Failed to retrieve schema from {uri}"))?;
let raw_output = response.bytes().await.into_diagnostic()?;
serde_json::from_slice(&raw_output)
.into_diagnostic()
.with_context(|| {
format!(
"Failed to parse json from {uri}, contents:\n{}",
String::from_utf8_lossy(&raw_output)
)
})
.inspect(|value| trace!("{}:\n{value}", uri.bold().italic()))
})
tokio::spawn(
retry_async(3, 2, async move || {
let response = reqwest::get(&*uri)
.await
.into_diagnostic()
.with_context(|| format!("Failed to retrieve schema from {uri}"))?;
let raw_output = response.bytes().await.into_diagnostic()?;
serde_json::from_slice(&raw_output)
.into_diagnostic()
.with_context(|| {
format!(
"Failed to parse json from {uri}, contents:\n{}",
String::from_utf8_lossy(&raw_output)
)
})
.inspect(|value| trace!("{}:\n{value}", uri.bold().italic()))
})
)
.await
.expect("Should join task")
}

View file

@ -9,7 +9,7 @@ pub mod test_utils;
pub mod traits;
use std::{
ops::Not,
ops::{AsyncFnMut, Not},
os::unix::ffi::OsStrExt,
path::{Path, PathBuf},
thread,
@ -78,7 +78,7 @@ pub fn serde_yaml_err(contents: &str) -> impl Fn(serde_yaml::Error) -> SerdeErro
/// Will error when retries have been expended.
pub fn retry<V, F>(mut retries: u8, delay_secs: u64, mut f: F) -> miette::Result<V>
where
F: FnMut() -> miette::Result<V>,
F: FnMut() -> miette::Result<V> + Send,
{
loop {
match f() {
@ -89,6 +89,27 @@ where
warn!("Failed operation, will retry {retries} more time(s). Error:\n{e:?}");
thread::sleep(Duration::from_secs(delay_secs));
}
}
}
}
/// Performs a retry on a given closure with a given nubmer of attempts and delay.
///
/// # Errors
/// Will error when retries have been expended.
pub async fn retry_async<V, F>(mut retries: u8, delay_secs: u64, mut f: F) -> miette::Result<V>
where
F: AsyncFnMut() -> miette::Result<V>,
{
loop {
match f().await {
Ok(v) => return Ok(v),
Err(e) if retries == 0 => return Err(e),
Err(e) => {
retries -= 1;
warn!("Failed operation, will retry {retries} more time(s). Error:\n{e:?}");
thread::sleep(Duration::from_secs(delay_secs));
}
};
}
}