upload/pulp: wait for tasks in UploadAndDistributeCommit

When uploading and distributing a commit, wait for any async tasks to
finish before returning.  There are two tasks that can block this
function:
- Creating a distribution: this only happens when a new repository is
  created.
- Import commit: this will always happen in this function.
This commit is contained in:
Achilleas Koutsou 2023-09-15 14:36:53 +02:00
parent 6a98136cc0
commit 15dbb90eda

View file

@ -7,6 +7,8 @@ import (
"io"
"net/http"
"os"
"sync"
"time"
"github.com/osbuild/pulp-client/pulpclient"
"github.com/sirupsen/logrus"
@ -247,6 +249,26 @@ func (cl *Client) TaskWaitingOrRunning(task string) bool {
return state == TASK_RUNNING || state == TASK_WAITING
}
// Block until all tasks are complete.
func (cl *Client) waitFor(tasks []string) {
wg := sync.WaitGroup{}
logrus.Info("setting up wait group")
for idx := range tasks {
task := tasks[idx]
logrus.Infof("task %q", task)
wg.Add(1)
go func() {
logrus.Infof("waiting for task %q to finish", task)
for cl.TaskWaitingOrRunning(task) {
time.Sleep(15 * time.Second)
}
logrus.Infof("task %q done", task)
wg.Done()
}()
}
wg.Wait()
}
// UploadAndDistributeCommit uploads a commit, creates a repository if
// necessary, imports the commit to the repository, and distributes the
// repository.
@ -273,6 +295,8 @@ func (cl *Client) UploadAndDistributeCommit(archivePath, repoName, basePath stri
return "", err
}
tasks := make([]string, 0)
if repoHref == "" {
// repository does not exist: create it and distribute
logrus.Infof("repository not found - creating repository %q", repoName)
@ -284,15 +308,22 @@ func (cl *Client) UploadAndDistributeCommit(archivePath, repoName, basePath stri
repoHref = href
logrus.Infof("created repository %q (%s)", repoName, repoHref)
logrus.Infof("creating distribution at %q", basePath)
if _, err := cl.DistributeOSTreeRepo(basePath, repoName, repoHref); err != nil {
distTask, err := cl.DistributeOSTreeRepo(basePath, repoName, repoHref)
if err != nil {
return "", err
}
tasks = append(tasks, distTask)
}
logrus.Infof("importing commit %q to repo %q", fileHref, repoHref)
if _, err := cl.ImportCommit(fileHref, repoHref); err != nil {
importTask, err := cl.ImportCommit(fileHref, repoHref)
if err != nil {
return "", err
}
tasks = append(tasks, importTask)
logrus.Infof("blocking on %d tasks", len(tasks))
cl.waitFor(tasks)
repoURL, err := cl.GetDistributionURLForOSTreeRepo(repoHref)
if err != nil {