From 15dbb90eda51b312d501b73b95b8849924c9f615 Mon Sep 17 00:00:00 2001 From: Achilleas Koutsou Date: Fri, 15 Sep 2023 14:36:53 +0200 Subject: [PATCH] upload/pulp: wait for tasks in UploadAndDistributeCommit When uploading and distributing a commit, wait for any async tasks to finish before returning. There are two tasks that can block this function: - Creating a distribution: this only happens when a new repository is created. - Import commit: this will always happen in this function. --- internal/upload/pulp/pulp.go | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/internal/upload/pulp/pulp.go b/internal/upload/pulp/pulp.go index b2e7653f6..da21f5d95 100644 --- a/internal/upload/pulp/pulp.go +++ b/internal/upload/pulp/pulp.go @@ -7,6 +7,8 @@ import ( "io" "net/http" "os" + "sync" + "time" "github.com/osbuild/pulp-client/pulpclient" "github.com/sirupsen/logrus" @@ -247,6 +249,26 @@ func (cl *Client) TaskWaitingOrRunning(task string) bool { return state == TASK_RUNNING || state == TASK_WAITING } +// Block until all tasks are complete. +func (cl *Client) waitFor(tasks []string) { + wg := sync.WaitGroup{} + logrus.Info("setting up wait group") + for idx := range tasks { + task := tasks[idx] + logrus.Infof("task %q", task) + wg.Add(1) + go func() { + logrus.Infof("waiting for task %q to finish", task) + for cl.TaskWaitingOrRunning(task) { + time.Sleep(15 * time.Second) + } + logrus.Infof("task %q done", task) + wg.Done() + }() + } + wg.Wait() +} + // UploadAndDistributeCommit uploads a commit, creates a repository if // necessary, imports the commit to the repository, and distributes the // repository. @@ -273,6 +295,8 @@ func (cl *Client) UploadAndDistributeCommit(archivePath, repoName, basePath stri return "", err } + tasks := make([]string, 0) + if repoHref == "" { // repository does not exist: create it and distribute logrus.Infof("repository not found - creating repository %q", repoName) @@ -284,15 +308,22 @@ func (cl *Client) UploadAndDistributeCommit(archivePath, repoName, basePath stri repoHref = href logrus.Infof("created repository %q (%s)", repoName, repoHref) logrus.Infof("creating distribution at %q", basePath) - if _, err := cl.DistributeOSTreeRepo(basePath, repoName, repoHref); err != nil { + distTask, err := cl.DistributeOSTreeRepo(basePath, repoName, repoHref) + if err != nil { return "", err } + tasks = append(tasks, distTask) } logrus.Infof("importing commit %q to repo %q", fileHref, repoHref) - if _, err := cl.ImportCommit(fileHref, repoHref); err != nil { + importTask, err := cl.ImportCommit(fileHref, repoHref) + if err != nil { return "", err } + tasks = append(tasks, importTask) + + logrus.Infof("blocking on %d tasks", len(tasks)) + cl.waitFor(tasks) repoURL, err := cl.GetDistributionURLForOSTreeRepo(repoHref) if err != nil {