util: Consolidate parse functions into util

- Move functions to the 'util' to centralize common
functionality, reducing code duplication and improving
maintainability across the codebase.

Signed-off-by: Renata Ravanelli <rravanel@redhat.com>
This commit is contained in:
Renata Ravanelli 2024-02-27 13:14:35 -03:00 committed by Michael Vogt
parent 4f8ff6bc99
commit 6d4d1962eb
4 changed files with 111 additions and 102 deletions

View file

@ -1,7 +1,10 @@
"""Helpers related to parsing"""
import os
import re
from typing import Union
from typing import Union, Dict
from urllib.parse import ParseResult, urlparse
def parse_size(s: str) -> Union[int, str]:
@ -31,3 +34,71 @@ def parse_size(s: str) -> Union[int, str]:
return "unlimited"
raise TypeError(f"invalid size value: '{s}'")
def parse_mount(url: ParseResult, args: Dict):
"""
Parses the mount URL to extract the root path.
Parameters:
- url (ParseResult): The ParseResult object obtained from urlparse.
- args (Dict): A dictionary containing arguments including mounts information.
"""
name = url.netloc
if name:
root = args["mounts"].get(name, {}).get("path")
if not root:
raise ValueError(f"Unknown mount '{name}'")
else:
root = args["paths"]["mounts"]
return root
def parse_input(url: ParseResult, args: Dict):
"""
Parses the input URL to extract the root path.
Parameters:
- url (ParseResult): The ParseResult object obtained from urlparse.
- args (Dict): A dictionary containing arguments including mounts information.
"""
name = url.netloc
root = args["inputs"].get(name, {}).get("path")
if root is None:
raise ValueError(f"Unknown input '{name}'")
return root
def parse_location(location, args):
"""
Parses the location URL to derive the corresponding file path.
Parameters:
- location (str): The location URL to be parsed.
- args (Dict): A dictionary containing arguments including tree and mount information.
"""
url = urlparse(location)
scheme = url.scheme
if scheme == "tree":
root = args["tree"]
elif scheme == "mount":
root = parse_mount(url, args)
elif scheme == "input":
root = parse_input(url, args)
else:
raise ValueError(f"Unsupported scheme '{scheme}'")
assert url.path.startswith("/")
path = os.path.relpath(url.path, "/")
path = os.path.join(root, path)
path = os.path.normpath(path)
if url.path.endswith("/"):
path = os.path.join(path, ".")
return path

View file

@ -2,51 +2,46 @@
import os
import subprocess
import sys
from typing import Dict
from urllib.parse import ParseResult, urlparse
import osbuild.api
from osbuild.util import parsing
def parse_mount(url: ParseResult, args: Dict):
name = url.netloc
if name:
root = args["mounts"].get(name, {}).get("path")
if not root:
raise ValueError(f"Unknown mount '{name}'")
else:
root = args["paths"]["mounts"]
return root
def parse_location(location, args):
url = urlparse(location)
scheme = url.scheme
if scheme == "tree":
root = args["tree"]
elif scheme == "mount":
root = parse_mount(url, args)
else:
raise ValueError(f"Unsupported scheme '{scheme}'")
assert url.path.startswith("/")
path = os.path.relpath(url.path, "/")
path = os.path.join(root, path)
path = os.path.normpath(path)
if url.path.endswith("/"):
path = os.path.join(path, ".")
return path
SCHEMA_2 = r"""
"options": {
"additionalProperties": false,
"properties": {
"items": {
"type": "object",
"additionalProperties": false,
"patternProperties": {
"^mount:\/\/[^\/]+\/|^tree:\/\/\/": {
"type": "object",
"required": ["immutable"],
"properties": {
"immutable": {
"type": "boolean",
"description": "Make the file/directory immutable",
"default": true
}
}
}
}
}
}
},
"devices": {
"type": "object",
"additionalProperties": true
},
"mounts": {
"type": "array"
}
"""
def main(args, options):
for path, cmdargs in options["items"].items():
immutable = cmdargs["immutable"]
dst = parse_location(path, args)
dst = parsing.parse_location(path, args)
op = '+' if immutable else '-'
subprocess.run(["chattr", f"{op}i", dst], check=True)

View file

@ -2,64 +2,17 @@
import os
import subprocess
import sys
from typing import Dict
from urllib.parse import ParseResult, urlparse
import osbuild.api
def parse_mount(url: ParseResult, args: Dict):
name = url.netloc
if name:
root = args["mounts"].get(name, {}).get("path")
if not root:
raise ValueError(f"Unknown mount '{name}'")
else:
root = args["paths"]["mounts"]
return root
def parse_input(url: ParseResult, args: Dict):
name = url.netloc
root = args["inputs"].get(name, {}).get("path")
if root is None:
raise ValueError(f"Unknown input '{name}'")
return root
def parse_location(location, args):
url = urlparse(location)
scheme = url.scheme
if scheme == "tree":
root = args["tree"]
elif scheme == "mount":
root = parse_mount(url, args)
elif scheme == "input":
root = parse_input(url, args)
else:
raise ValueError(f"Unsupported scheme '{scheme}'")
assert url.path.startswith("/")
path = os.path.relpath(url.path, "/")
path = os.path.join(root, path)
path = os.path.normpath(path)
if url.path.endswith("/"):
path = os.path.join(path, ".")
return path
from osbuild.util import parsing
def main(args, options):
items = options["paths"]
for path in items:
src = parse_location(path["from"], args)
dst = parse_location(path["to"], args)
src = parsing.parse_location(path["from"], args)
dst = parsing.parse_location(path["to"], args)
remove_destination = path.get("remove_destination", False)
print(f"copying '{src}' -> '{dst}'")

View file

@ -6,25 +6,15 @@ import osbuild.api
from osbuild.util import bls
def main(paths, tree, options):
def main(args, options):
kopts = options.get("kernel_opts", [])
bootpath = options.get("bootpath", "tree:///boot")
url = urlparse(bootpath)
scheme = url.scheme
if scheme == "tree":
root = tree
elif scheme == "mount":
root = paths["mounts"]
else:
raise ValueError(f"Unsupported scheme '{scheme}'")
assert url.path.startswith("/")
bootroot = root + url.path
bootroot = parsing.parse_location(bootpath, args)
bls.options_append(bootroot, kopts)
if __name__ == '__main__':
args = osbuild.api.arguments()
r = main(args["paths"], args["tree"], args["options"])
r = main(args, args["options"])
sys.exit(r)