From 63eea3c8227a90097c619dba529a8b9fa478c555 Mon Sep 17 00:00:00 2001 From: lukas-heiligenbrunner Date: Tue, 2 Jan 2024 23:05:03 +0100 Subject: [PATCH] add queue mechanism for builds and only allow one package built at a time for now. --- backend/src/api/add.rs | 4 +- backend/src/api/list.rs | 8 +- backend/src/builder/builder.rs | 244 ++++++++++-------- .../components/dashboard/your_packages.dart | 4 + frontend/lib/screens/build_screen.dart | 16 +- frontend/lib/screens/package_screen.dart | 3 - 6 files changed, 158 insertions(+), 121 deletions(-) diff --git a/backend/src/api/add.rs b/backend/src/api/add.rs index 61f1502..3134a87 100644 --- a/backend/src/api/add.rs +++ b/backend/src/api/add.rs @@ -41,7 +41,7 @@ pub async fn package_add( None => { let new_package = packages::ActiveModel { name: Set(input.name.clone()), - status: Set(0), + status: Set(3), latest_aur_version: Set(pkg.version.clone()), ..Default::default() }; @@ -77,7 +77,7 @@ pub async fn package_add( } }; - pkg_model.status = Set(0); + pkg_model.status = Set(3); pkg_model.latest_version_id = Set(Some(version_model.id.clone().unwrap())); pkg_model.save(db).await.expect("todo error message"); diff --git a/backend/src/api/list.rs b/backend/src/api/list.rs index c12e8bc..b95373e 100644 --- a/backend/src/api/list.rs +++ b/backend/src/api/list.rs @@ -84,14 +84,16 @@ pub async fn get_package( let db = db as &DatabaseConnection; let all: ListPackageModel = Packages::find() - .join_rev(JoinType::InnerJoin, versions::Relation::Packages.def()) + .join_rev(JoinType::LeftJoin, versions::Relation::LatestPackage.def()) .filter(packages::Column::Id.eq(id)) .select_only() - .column_as(versions::Column::Id.count(), "count") .column(packages::Column::Name) .column(packages::Column::Id) .column(packages::Column::Status) - .group_by(packages::Column::Name) + .column_as(packages::Column::OutOfDate, "outofdate") + .column_as(packages::Column::LatestAurVersion, "latest_aur_version") + .column_as(versions::Column::Version, "latest_version") + .column_as(packages::Column::LatestVersionId, "latest_version_id") .into_model::() .one(db) .await diff --git a/backend/src/builder/builder.rs b/backend/src/builder/builder.rs index 3f077f1..1039947 100644 --- a/backend/src/builder/builder.rs +++ b/backend/src/builder/builder.rs @@ -1,144 +1,166 @@ use crate::builder::types::Action; +use crate::db::builds::ActiveModel; use crate::db::prelude::{Builds, Packages}; -use crate::db::{builds, packages}; +use crate::db::{builds, packages, versions}; use crate::repo::repo::add_pkg; use anyhow::anyhow; use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, Set}; use std::ops::Add; +use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; -use tokio::sync::broadcast::Sender; +use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::{broadcast, Semaphore}; pub async fn init(db: DatabaseConnection, tx: Sender) { + let semaphore = Arc::new(Semaphore::new(1)); + loop { if let Ok(_result) = tx.subscribe().recv().await { match _result { // add a package to parallel build Action::Build(name, version, url, mut version_model) => { - let db = db.clone(); - let build = builds::ActiveModel { - pkg_id: version_model.package_id.clone(), - version_id: version_model.id.clone(), - ouput: Set(None), - status: Set(Some(0)), - start_time: Set(Some( - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs() as u32, - )), - ..Default::default() - }; - let mut new_build = build.save(&db).await.unwrap(); - - // spawn new thread for each pkg build - // todo add queue and build two packages in parallel - tokio::spawn(async move { - let (tx, mut rx) = broadcast::channel::(3); - - let db2 = db.clone(); - let new_build2 = new_build.clone(); - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok(output_line) => { - println!("{output_line}"); - - let _ = append_db_log_output( - &db2, - output_line, - new_build2.id.clone().unwrap(), - ) - .await; - } - Err(e) => match e { - RecvError::Closed => { - break; - } - RecvError::Lagged(_) => {} - }, - } - } - }); - - match add_pkg(url, version, name, tx).await { - Ok(pkg_file_name) => { - println!("successfully built package"); - let _ = set_pkg_status( - &db, - version_model.package_id.clone().unwrap(), - version_model.id.clone().unwrap(), - Some(false), - 1, - ) - .await; - - version_model.file_name = Set(Some(pkg_file_name)); - let _ = version_model.update(&db).await; - - new_build.status = Set(Some(1)); - new_build.end_time = Set(Some( - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs() as u32, - )); - let _ = new_build.update(&db).await; - } - Err(e) => { - let _ = set_pkg_status( - &db, - version_model.package_id.clone().unwrap(), - version_model.id.clone().unwrap(), - None, - 2, - ) - .await; - let _ = version_model.update(&db).await; - - new_build.status = Set(Some(2)); - new_build.end_time = Set(Some( - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs() as u32, - )); - let _ = new_build.update(&db).await; - - println!("Error: {e}") - } - } - }); + let _ = queue_package( + name, + version, + url, + version_model, + db.clone(), + semaphore.clone(), + ) + .await; } } } } } -// todo maybe move to helper file -async fn set_pkg_status( - db: &DatabaseConnection, - package_id: i32, - version_id: i32, - outofdate: Option, - status: i32, +async fn queue_package( + name: String, + version: String, + url: String, + mut version_model: versions::ActiveModel, + db: DatabaseConnection, + semaphore: Arc, ) -> anyhow::Result<()> { + // set build status to pending + let build = builds::ActiveModel { + pkg_id: version_model.package_id.clone(), + version_id: version_model.id.clone(), + ouput: Set(None), + status: Set(Some(3)), + start_time: Set(Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as u32, + )), + ..Default::default() + }; + let mut new_build = build.save(&db).await.unwrap(); + + let permits = Arc::clone(&semaphore); + + // spawn new thread for each pkg build + // todo add queue and build two packages in parallel + tokio::spawn(async move { + let _permit = permits.acquire().await.unwrap(); + + // set build status to building + new_build.status = Set(Some(0)); + new_build = new_build.save(&db).await.unwrap(); + + build_package(new_build, db, version_model, version, name, url).await; + }); + Ok(()) +} + +async fn build_package( + mut new_build: builds::ActiveModel, + db: DatabaseConnection, + mut version_model: versions::ActiveModel, + version: String, + name: String, + url: String, +) -> anyhow::Result<()> { + let (tx, rx) = broadcast::channel::(3); + spawn_log_appender(db.clone(), new_build.clone(), rx); + + let package_id = version_model.package_id.clone().unwrap(); let mut pkg: packages::ActiveModel = Packages::find_by_id(package_id) - .one(db) + .one(&db) .await? .ok_or(anyhow!("no package with id {package_id} found"))? .into(); - pkg.status = Set(status); - pkg.latest_version_id = Set(Some(version_id)); - if outofdate.is_some() { - pkg.out_of_date = Set(outofdate.unwrap() as i32) - } - pkg.update(db).await?; + // update status to building + pkg.status = Set(0); + pkg = pkg.update(&db).await?.into(); + + match add_pkg(url, version, name, tx).await { + Ok(pkg_file_name) => { + println!("successfully built package"); + // update package success status + pkg.status = Set(1); + pkg.latest_version_id = Set(Some(version_model.id.clone().unwrap())); + pkg.out_of_date = Set(false as i32); + pkg.update(&db).await?; + + version_model.file_name = Set(Some(pkg_file_name)); + let _ = version_model.update(&db).await; + + new_build.status = Set(Some(1)); + new_build.end_time = Set(Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as u32, + )); + let _ = new_build.update(&db).await; + } + Err(e) => { + pkg.status = Set(2); + pkg.latest_version_id = Set(Some(version_model.id.clone().unwrap())); + pkg.update(&db).await?; + + let _ = version_model.update(&db).await; + + new_build.status = Set(Some(2)); + new_build.end_time = Set(Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as u32, + )); + let _ = new_build.update(&db).await; + + println!("Error: {e}") + } + }; Ok(()) } +fn spawn_log_appender(db2: DatabaseConnection, new_build2: ActiveModel, mut rx: Receiver) { + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(output_line) => { + println!("{output_line}"); + + let _ = append_db_log_output(&db2, output_line, new_build2.id.clone().unwrap()) + .await; + } + Err(e) => match e { + RecvError::Closed => { + break; + } + RecvError::Lagged(_) => {} + }, + } + } + }); +} + async fn append_db_log_output( db: &DatabaseConnection, text: String, diff --git a/frontend/lib/components/dashboard/your_packages.dart b/frontend/lib/components/dashboard/your_packages.dart index d3eb6b6..3c610c6 100644 --- a/frontend/lib/components/dashboard/your_packages.dart +++ b/frontend/lib/components/dashboard/your_packages.dart @@ -156,6 +156,8 @@ IconData switchSuccessIcon(int status) { return Icons.check_circle_outline; case 2: return Icons.cancel_outlined; + case 3: + return Icons.pause_circle_outline; default: return Icons.question_mark_outlined; } @@ -169,6 +171,8 @@ Color switchSuccessColor(int status) { return const Color(0xFF0A6900); case 2: return const Color(0xff760707); + case 3: + return const Color(0xFF0044AA); default: return const Color(0xFF9D8D00); } diff --git a/frontend/lib/screens/build_screen.dart b/frontend/lib/screens/build_screen.dart index 0d92879..b76e75b 100644 --- a/frontend/lib/screens/build_screen.dart +++ b/frontend/lib/screens/build_screen.dart @@ -24,7 +24,7 @@ class _BuildScreenState extends State { body: APIBuilder( dto: BuildDTO(buildID: widget.buildID), interval: const Duration(seconds: 10), - onLoad: () => const Text("no data"), + onLoad: () => const Text("loading"), onData: (buildData) { final start_time = DateTime.fromMillisecondsSinceEpoch( (buildData.start_time ?? 0) * 1000); @@ -64,11 +64,23 @@ class _BuildScreenState extends State { const SizedBox( height: 15, ), - BuildOutput(build: buildData) + _buildPage(buildData) ], ); }), appBar: AppBar(), ); } + + Widget _buildPage(Build build) { + switch (build.status) { + case 3: + return const Text("in Queue"); + case 0: + case 1: + case 2: + default: + return BuildOutput(build: build); + } + } } diff --git a/frontend/lib/screens/package_screen.dart b/frontend/lib/screens/package_screen.dart index a15762e..f4b186d 100644 --- a/frontend/lib/screens/package_screen.dart +++ b/frontend/lib/screens/package_screen.dart @@ -1,6 +1,3 @@ -import 'dart:async'; - -import 'package:aurcache/api/builds.dart'; import 'package:aurcache/api/packages.dart'; import 'package:aurcache/components/api/APIBuilder.dart'; import 'package:aurcache/providers/builds_provider.dart';