Source code for pylorax.treebuilder

# treebuilder.py - handle arch-specific tree building stuff using templates
#
# Copyright (C) 2011-2015 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# Author(s):  Will Woods <wwoods@redhat.com>

import logging
logger = logging.getLogger("pylorax.treebuilder")

import os, re
from os.path import basename
from shutil import copytree, copy2
from subprocess import CalledProcessError
from pathlib import Path
import itertools
import libdnf5 as dnf5
from libdnf5.common import QueryCmp_EQ as EQ

from pylorax.sysutils import joinpaths, remove
from pylorax.base import DataHolder
from pylorax.ltmpl import LoraxTemplateRunner
import pylorax.imgutils as imgutils
from pylorax.imgutils import DracutChroot
from pylorax.executils import runcmd, runcmd_output, execWithCapture

templatemap = {
    'x86_64':  'x86.tmpl',
    'ppc64le': 'ppc64le.tmpl',
    's390x':   's390.tmpl',
    'aarch64': 'aarch64.tmpl',
}

[docs] def generate_module_info(moddir, outfile=None): def module_desc(mod): output = runcmd_output(["modinfo", "-F", "description", mod]) return output.strip() def read_module_set(name): return set(l.strip() for l in open(joinpaths(moddir,name)) if ".ko" in l) modsets = {'scsi':read_module_set("modules.block"), 'eth':read_module_set("modules.networking")} modinfo = list() for root, _dirs, files in os.walk(moddir): for modtype, modset in modsets.items(): for mod in modset.intersection(files): # modules in this dir (name, _ext) = os.path.splitext(mod) # foo.ko -> (foo, .ko) desc = module_desc(joinpaths(root,mod)) or "%s driver" % name modinfo.append(dict(name=name, type=modtype, desc=desc)) out = open(outfile or joinpaths(moddir,"module-info"), "w") out.write("Version 0\n") for mod in sorted(modinfo, key=lambda m: m.get('name')): out.write('{name}\n\t{type}\n\t"{desc:.65}"\n'.format(**mod))
[docs] class RuntimeBuilder(object): '''Builds the anaconda runtime image. NOTE: dbo is optional, but if it is not included root must be set. ''' def __init__(self, product, arch, dbo=None, templatedir=None, installpkgs=None, excludepkgs=None, add_templates=None, add_template_vars=None, skip_branding=False, root=None): self.dbo = dbo if dbo: root = dbo.get_config().installroot if not root: raise RuntimeError("No root directory passed to RuntimeBuilder") self._runner = LoraxTemplateRunner(inroot=root, outroot=root, dbo=dbo, templatedir=templatedir, basearch=arch.basearch) self.add_templates = add_templates or [] self.add_template_vars = add_template_vars or {} self._installpkgs = installpkgs or [] self._excludepkgs = excludepkgs or [] # use a copy of product so we can modify it locally product = product.copy() product.name = product.name.lower() self._branding = self.get_branding(skip_branding, product) self.vars = DataHolder(arch=arch, product=product, dbo=dbo, root=root, basearch=arch.basearch, libdir=arch.libdir, branding=self._branding) self._runner.defaults = self.vars
[docs] def get_branding(self, skip, product): """Select the branding from the available 'system-release' packages The *best* way to control this is to have a single package in the repo provide 'system-release' When there are more than 1 package it will: - Make a list of the available packages - If variant is set look for a package ending with lower(variant) and use that - If there are one or more non-generic packages, use the first one after sorting Returns the package names of the system-release and release logos package """ if skip: return DataHolder(release=None, logos=None) release = None query = dnf5.rpm.PackageQuery(self.dbo) query.filter_provides(["system-release"], EQ) pkgs = sorted([p for p in list(query) if not p.get_name().startswith("generic")]) if not pkgs: logger.error("No system-release packages found, could not get the release") return DataHolder(release=None, logos=None) logger.debug("system-release packages: %s", ",".join(p.get_name() for p in pkgs)) if product.variant: variant = [p.get_name() for p in pkgs if p.get_name().endswith("-"+product.variant.lower())] if variant: release = variant[0] if not release: release = pkgs[0].get_name() # release logger.info('got release: %s', release) # logos uses the basename from release (fedora, redhat, centos, ...) logos, _suffix = release.split('-', 1) return DataHolder(release=release, logos=logos+"-logos")
[docs] def install(self): '''Install packages and do initial setup with runtime-install.tmpl''' if self._branding.release: self._runner.installpkg(self._branding.release) if self._branding.logos: self._runner.installpkg(self._branding.logos) if len(self._installpkgs) > 0: self._runner.installpkg(*self._installpkgs) if len(self._excludepkgs) > 0: self._runner.removepkg(*self._excludepkgs) self._runner.run("runtime-install.tmpl") for tmpl in self.add_templates: self._runner.run(tmpl, **self.add_template_vars)
[docs] def writepkglists(self, pkglistdir): '''debugging data: write out lists of package contents''' self._runner._writepkglists(pkglistdir)
[docs] def writepkgsizes(self, pkgsizefile): '''debugging data: write a big list of pkg sizes''' self._runner._writepkgsizes(pkgsizefile)
[docs] def postinstall(self): '''Do some post-install setup work with runtime-postinstall.tmpl''' # copy configdir into runtime root beforehand configdir = joinpaths(self._runner.templatedir,"config_files") configdir_path = "tmp/config_files" fullpath = joinpaths(self.vars.root, configdir_path) if os.path.exists(fullpath): remove(fullpath) copytree(configdir, fullpath) self._runner.run("runtime-postinstall.tmpl", configdir=configdir_path)
[docs] def cleanup(self): '''Remove unneeded packages and files with runtime-cleanup.tmpl''' self._runner.run("runtime-cleanup.tmpl")
[docs] def verify(self): '''Ensure that contents of the installroot can run''' status = True ELF_MAGIC = b'\x7fELF' # Iterate over all files in /usr/bin and /usr/sbin # For ELF files, gather them into a list and we'll check them all at # the end. For files with a #!, check them as we go elf_files = [] usr_bin = Path(self.vars.root + '/usr/bin') usr_sbin = Path(self.vars.root + '/usr/sbin') for path in (str(x) for x in itertools.chain(usr_bin.iterdir(), usr_sbin.iterdir()) \ if x.is_file()): with open(path, "rb") as f: magic = f.read(4) if magic == ELF_MAGIC: # Save the path, minus the chroot prefix elf_files.append(path[len(self.vars.root):]) elif magic[:2] == b'#!': # Reopen the file as text and read the first line. # Open as latin-1 so that stray 8-bit characters don't make # things blow up. We only really care about ASCII parts. with open(path, "rt", encoding="latin-1") as f_text: # Remove the #!, split on space, and take the first part shabang = f_text.readline()[2:].split()[0] # Does the path exist? if not os.path.exists(self.vars.root + shabang): logger.error('%s, needed by %s, does not exist', shabang, path) status = False # Now, run ldd on all the ELF files # Just run ldd once on everything so it isn't logged a million times. # At least one thing in the list isn't going to be a dynamic executable, # so use execWithCapture to ignore the exit code. filename = '' for line in execWithCapture('ldd', elf_files, root=self.vars.root, log_output=False, filter_stderr=True).split('\n'): if line and not line[0].isspace(): # New filename header, strip the : at the end and save filename = line[:-1] elif 'not found' in line: logger.error('%s, needed by %s, not found', line.split()[0], filename) status = False return status
[docs] def generate_module_data(self): root = self.vars.root moddir = joinpaths(root, "lib/modules/") for kernel in findkernels(root=root): ksyms = joinpaths(root, "boot/System.map-%s" % kernel.version) logger.info("doing depmod and module-info for %s", kernel.version) runcmd(["depmod", "-a", "-F", ksyms, "-b", root, kernel.version]) generate_module_info(moddir+kernel.version, outfile=moddir+"module-info")
[docs] def create_squashfs_runtime(self, outfile="/var/tmp/squashfs.img", compression="xz", compressargs=None, size=2): """Create a plain squashfs runtime""" compressargs = compressargs or [] os.makedirs(os.path.dirname(outfile)) # squash the rootfs return imgutils.mksquashfs(self.vars.root, outfile, compression, compressargs)
[docs] def create_ext4_runtime(self, outfile="/var/tmp/squashfs.img", compression="xz", compressargs=None, size=2): """Create a squashfs compressed ext4 runtime""" # make live rootfs image - must be named "LiveOS/rootfs.img" for dracut compressargs = compressargs or [] workdir = joinpaths(os.path.dirname(outfile), "runtime-workdir") os.makedirs(joinpaths(workdir, "LiveOS")) # Catch problems with the rootfs being too small and clearly log them try: imgutils.mkrootfsimg(self.vars.root, joinpaths(workdir, "LiveOS/rootfs.img"), "Anaconda", size=size) except CalledProcessError as e: if e.stdout and "No space left on device" in e.stdout: logger.error("The rootfs ran out of space with size=%d", size) raise # squash the live rootfs and clean up workdir rc = imgutils.mksquashfs(workdir, outfile, compression, compressargs) remove(workdir) return rc
[docs] def finished(self): pass
[docs] class TreeBuilder(object): '''Builds the arch-specific boot images. inroot should be the installtree root (the newly-built runtime dir)''' def __init__(self, product, arch, inroot, outroot, runtime, isolabel, domacboot=True, doupgrade=True, templatedir=None, add_templates=None, add_template_vars=None, workdir=None, extra_boot_args=""): # NOTE: if you pass an arg named "runtime" to a mako template it'll # clobber some mako internal variables - hence "runtime_img". self.vars = DataHolder(arch=arch, product=product, runtime_img=runtime, runtime_base=basename(runtime), inroot=inroot, outroot=outroot, basearch=arch.basearch, libdir=arch.libdir, isolabel=isolabel, udev=udev_escape, domacboot=domacboot, doupgrade=doupgrade, workdir=workdir, lower=string_lower, extra_boot_args=extra_boot_args) self._runner = LoraxTemplateRunner(inroot, outroot, templatedir=templatedir, basearch=arch.basearch) self._runner.defaults = self.vars self.add_templates = add_templates or [] self.add_template_vars = add_template_vars or {} self.templatedir = templatedir self.treeinfo_data = None @property def kernels(self): return findkernels(root=self.vars.inroot)
[docs] def rebuild_initrds(self, add_args=None, backup="", prefix=""): '''Rebuild all the initrds in the tree. If backup is specified, each initrd will be renamed with backup as a suffix before rebuilding. If backup is empty, the existing initrd files will be overwritten. If suffix is specified, the existing initrd is untouched and a new image is built with the filename "${prefix}-${kernel.version}.img" If the initrd doesn't exist its name will be created based on the name of the kernel. ''' add_args = add_args or [] args = ["--nomdadmconf", "--nolvmconf"] + add_args if not backup: args.append("--force") if not self.kernels: raise RuntimeError("No kernels found, cannot rebuild_initrds") with DracutChroot(self.vars.inroot) as dracut: for kernel in self.kernels: if prefix: idir = os.path.dirname(kernel.path) outfile = joinpaths(idir, prefix+'-'+kernel.version+'.img') elif hasattr(kernel, "initrd"): # If there is an existing initrd, use that outfile = kernel.initrd.path else: # Construct an initrd from the kernel name outfile = kernel.path.replace("vmlinuz-", "initrd-") + ".img" logger.info("rebuilding %s", outfile) if backup: initrd = joinpaths(self.vars.inroot, outfile) if os.path.exists(initrd): os.rename(initrd, initrd + backup) dracut.Run(args + [outfile, kernel.version])
[docs] def build(self): templatefile = templatemap[self.vars.arch.basearch] for tmpl in self.add_templates: self._runner.run(tmpl, **self.add_template_vars) self._runner.run(templatefile, kernels=self.kernels) self.treeinfo_data = self._runner.results.treeinfo self.implantisomd5()
[docs] def implantisomd5(self): for _section, data in self.treeinfo_data.items(): if 'boot.iso' in data: iso = joinpaths(self.vars.outroot, data['boot.iso']) runcmd(["implantisomd5", iso])
@property def dracut_hooks_path(self): """ Return the path to the lorax dracut hooks scripts Use the configured share dir if it is setup, otherwise default to /usr/share/lorax/dracut_hooks """ if self.templatedir: return joinpaths(self.templatedir, "dracut_hooks") else: return "/usr/share/lorax/dracut_hooks"
[docs] def copy_dracut_hooks(self, hooks): """ Copy the hook scripts in hooks into the installroot's /tmp/ and return a list of commands to pass to dracut when creating the initramfs hooks is a list of tuples with the name of the hook script and the target dracut hook directory (eg. [("99anaconda-copy-ks.sh", "/lib/dracut/hooks/pre-pivot")]) """ dracut_commands = [] for hook_script, dracut_path in hooks: src = joinpaths(self.dracut_hooks_path, hook_script) if not os.path.exists(src): logger.error("Missing lorax dracut hook script %s", (src)) continue dst = joinpaths(self.vars.inroot, "/tmp/", hook_script) copy2(src, dst) dracut_commands += ["--include", joinpaths("/tmp/", hook_script), dracut_path] return dracut_commands
#### TreeBuilder helper functions
[docs] def findkernels(root="/", kdir="boot"): # To find possible flavors, awk '/BuildKernel/ { print $4 }' kernel.spec flavors = ('debug', 'PAE', 'PAEdebug', 'smp', 'xen', 'lpae') kre = re.compile(r"vmlinuz-(?P<version>.+?\.(?P<arch>[a-z0-9_]+)" r"(.(?P<flavor>{0}))?)$".format("|".join(flavors))) kernels = [] bootfiles = os.listdir(joinpaths(root, kdir)) for f in bootfiles: match = kre.match(f) if match: kernel = DataHolder(path=joinpaths(kdir, f)) kernel.update(match.groupdict()) # sets version, arch, flavor kernels.append(kernel) # look for associated initrd/initramfs/etc. for kernel in kernels: for f in bootfiles: if f.endswith('-'+kernel.version+'.img'): imgtype, _rest = f.split('-',1) # special backwards-compat case if imgtype == 'initramfs': imgtype = 'initrd' kernel[imgtype] = DataHolder(path=joinpaths(kdir, f)) logger.debug("kernels=%s", kernels) return kernels
# udev whitelist: 'a-zA-Z0-9#+.:=@_-' (see is_whitelisted in libudev-util.c) udev_blacklist=' !"$%&\'()*,/;<>?[\\]^`{|}~' # ASCII printable, minus whitelist udev_blacklist += ''.join(chr(i) for i in range(32)) # ASCII non-printable
[docs] def udev_escape(label): out = '' for ch in label: out += ch if ch not in udev_blacklist else '\\x%02x' % ord(ch) return out
[docs] def string_lower(string): """ Return a lowercase string. :param string: String to lowercase This is used as a filter in the templates. """ return string.lower()