sources: transform() is only used in the curl sources, remove from ABC

This commit is contained in:
Michael Vogt 2024-02-06 17:10:50 +01:00 committed by Achilleas Koutsou
parent 7431653882
commit 1fc7ead2f4
6 changed files with 7 additions and 20 deletions

View file

@ -4,7 +4,7 @@ import hashlib
import json import json
import os import os
import tempfile import tempfile
from typing import ClassVar, Dict, Tuple from typing import ClassVar, Dict
from . import host from . import host
from .objectstore import ObjectStore from .objectstore import ObjectStore
@ -105,11 +105,6 @@ class SourceService(host.Service):
"""Returns True if the item to download is in cache. """ """Returns True if the item to download is in cache. """
return os.path.isfile(f"{self.cache}/{checksum}") return os.path.isfile(f"{self.cache}/{checksum}")
# pylint: disable=[no-self-use]
def transform(self, checksum, desc) -> Tuple:
"""Modify the input data before downloading. By default only transforms an item object to a Tupple."""
return checksum, desc
@staticmethod @staticmethod
def load_items(fds): def load_items(fds):
with os.fdopen(fds.steal(0)) as f: with os.fdopen(fds.steal(0)) as f:

View file

@ -13,7 +13,6 @@ by osbuild itself.
Buildhost commands used: `skopeo`. Buildhost commands used: `skopeo`.
""" """
import concurrent.futures
import hashlib import hashlib
import subprocess as sp import subprocess as sp
import sys import sys
@ -66,11 +65,8 @@ class ContainersStorageSource(sources.SourceService):
self.exists(checksum, desc) self.exists(checksum, desc)
def fetch_all(self, items) -> None: def fetch_all(self, items) -> None:
# prepare each item as a (checksum, desc) tuple (where desc=None) for checksum in items:
transformed = map(lambda i: self.transform(i, None), items) self.fetch_one(checksum, None)
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self.fetch_one, *zip(*transformed)):
pass
def exists(self, checksum, _) -> bool: def exists(self, checksum, _) -> bool:
image_id = checksum.split(":")[1] image_id = checksum.split(":")[1]

View file

@ -59,9 +59,8 @@ class InlineSource(sources.SourceService):
def fetch_all(self, items: Dict) -> None: def fetch_all(self, items: Dict) -> None:
filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded
for args in transformed: for args in filtered:
self.fetch_one(*args) self.fetch_one(*args)
def fetch_one(self, checksum, desc): def fetch_one(self, checksum, desc):

View file

@ -91,10 +91,9 @@ class OSTreeSource(sources.SourceService):
def fetch_all(self, items: Dict) -> None: def fetch_all(self, items: Dict) -> None:
filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self.fetch_one, *zip(*transformed)): for _ in executor.map(self.fetch_one, *zip(*filtered)):
pass pass
def fetch_one(self, checksum, desc): def fetch_one(self, checksum, desc):

View file

@ -104,10 +104,9 @@ class SkopeoSource(sources.SourceService):
def fetch_all(self, items: Dict) -> None: def fetch_all(self, items: Dict) -> None:
filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self.fetch_one, *zip(*transformed)): for _ in executor.map(self.fetch_one, *zip(*filtered)):
pass pass
def fetch_one(self, checksum, desc): def fetch_one(self, checksum, desc):

View file

@ -89,10 +89,9 @@ class SkopeoIndexSource(sources.SourceService):
def fetch_all(self, items: Dict) -> None: def fetch_all(self, items: Dict) -> None:
filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache filtered = filter(lambda i: not self.exists(i[0], i[1]), items.items()) # discards items already in cache
transformed = map(lambda i: self.transform(i[0], i[1]), filtered) # prepare each item to be downloaded
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for _ in executor.map(self.fetch_one, *zip(*transformed)): for _ in executor.map(self.fetch_one, *zip(*filtered)):
pass pass
def fetch_one(self, checksum, desc): def fetch_one(self, checksum, desc):