make builds cancelable
This commit is contained in:
parent
f8c46796ae
commit
114a34de8f
@ -3,7 +3,7 @@ use crate::db::prelude::Builds;
|
||||
use crate::db::{builds, packages, versions};
|
||||
use rocket::response::status::NotFound;
|
||||
use rocket::serde::json::Json;
|
||||
use rocket::{delete, get, State};
|
||||
use rocket::{delete, get, post, State};
|
||||
|
||||
use crate::api::types::input::ListBuildsModel;
|
||||
use rocket_okapi::openapi;
|
||||
@ -12,6 +12,8 @@ use sea_orm::QueryFilter;
|
||||
use sea_orm::{
|
||||
DatabaseConnection, EntityTrait, ModelTrait, QueryOrder, QuerySelect, RelationTrait,
|
||||
};
|
||||
use tokio::sync::broadcast::Sender;
|
||||
use crate::builder::types::Action;
|
||||
|
||||
#[openapi(tag = "build")]
|
||||
#[get("/build/<buildid>/output?<startline>")]
|
||||
@ -142,3 +144,17 @@ pub async fn delete_build(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[openapi(tag = "build")]
|
||||
#[post("/build/<buildid>/cancel")]
|
||||
pub async fn cancle_build(
|
||||
db: &State<DatabaseConnection>,
|
||||
tx: &State<Sender<Action>>,
|
||||
buildid: i32,
|
||||
) -> Result<(), NotFound<String>> {
|
||||
let db = db as &DatabaseConnection;
|
||||
|
||||
let _ = tx.send(Action::Cancel(buildid)).map_err(|e| NotFound(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -1,3 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::iter::Map;
|
||||
use crate::builder::types::Action;
|
||||
use crate::db::builds::ActiveModel;
|
||||
use crate::db::prelude::{Builds, Packages};
|
||||
@ -5,15 +7,17 @@ use crate::db::{builds, packages, versions};
|
||||
use crate::repo::repo::add_pkg;
|
||||
use anyhow::anyhow;
|
||||
use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, Set};
|
||||
use std::ops::Add;
|
||||
use std::ops::{Add, Deref};
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::broadcast::{Receiver, Sender};
|
||||
use tokio::sync::{broadcast, Semaphore};
|
||||
use tokio::sync::{broadcast, Mutex, Semaphore};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
pub async fn init(db: DatabaseConnection, tx: Sender<Action>) {
|
||||
let semaphore = Arc::new(Semaphore::new(1));
|
||||
let job_handles: Arc<Mutex<HashMap<i32, JoinHandle<_>>>> = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
loop {
|
||||
if let Ok(_result) = tx.subscribe().recv().await {
|
||||
@ -28,9 +32,34 @@ pub async fn init(db: DatabaseConnection, tx: Sender<Action>) {
|
||||
build_model,
|
||||
db.clone(),
|
||||
semaphore.clone(),
|
||||
job_handles.clone()
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Action::Cancel(build_id) => {
|
||||
let build = Builds::find_by_id(build_id)
|
||||
.one(&db)
|
||||
.await
|
||||
.expect("TODO: panic message")
|
||||
.expect("TODO: panic message");
|
||||
|
||||
let mut build: builds::ActiveModel = build.into();
|
||||
build.status = Set(Some(4));
|
||||
build.end_time = Set(Some(
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as u32,
|
||||
));
|
||||
let _ = build.clone().update(&db).await;
|
||||
|
||||
job_handles
|
||||
.lock()
|
||||
.await
|
||||
.remove(&build.id.clone().unwrap())
|
||||
.expect("TODO: panic message")
|
||||
.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -44,12 +73,15 @@ async fn queue_package(
|
||||
mut build_model: builds::ActiveModel,
|
||||
db: DatabaseConnection,
|
||||
semaphore: Arc<Semaphore>,
|
||||
job_handles: Arc<Mutex<HashMap<i32, JoinHandle<()>>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let permits = Arc::clone(&semaphore);
|
||||
let mut job_handles = Arc::clone(&job_handles);
|
||||
let build_id = build_model.id.clone().unwrap();
|
||||
|
||||
// spawn new thread for each pkg build
|
||||
// todo add queue and build two packages in parallel
|
||||
tokio::spawn(async move {
|
||||
let handle = tokio::spawn(async move {
|
||||
let _permit = permits.acquire().await.unwrap();
|
||||
|
||||
// set build status to building
|
||||
@ -58,6 +90,7 @@ async fn queue_package(
|
||||
|
||||
let _ = build_package(build_model, db, version_model, version, name, url).await;
|
||||
});
|
||||
job_handles.lock().await.insert(build_id, handle);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -9,4 +9,5 @@ pub enum Action {
|
||||
versions::ActiveModel,
|
||||
builds::ActiveModel,
|
||||
),
|
||||
Cancel(i32),
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user