Source code for pylorax.api.projects

#
# Copyright (C) 2017  Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
import logging
log = logging.getLogger("lorax-composer")

from configparser import ConfigParser
import dnf
from glob import glob
import os
import time

from pylorax.api.bisect import insort_left
from pylorax.sysutils import joinpaths

TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"


[docs]class ProjectsError(Exception): pass
[docs]def api_time(t): """Convert time since epoch to a string :param t: Seconds since epoch :type t: int :returns: Time string :rtype: str """ return time.strftime(TIME_FORMAT, time.localtime(t))
[docs]def api_changelog(changelog): """Convert the changelog to a string :param changelog: A list of time, author, string tuples. :type changelog: tuple :returns: The most recent changelog text or "" :rtype: str This returns only the most recent changelog entry. """ try: entry = changelog[0][2] except IndexError: entry = "" return entry
[docs]def pkg_to_project(pkg): """Extract the details from a hawkey.Package object :param pkgs: hawkey.Package object with package details :type pkgs: hawkey.Package :returns: A dict with the name, summary, description, and url. :rtype: dict upstream_vcs is hard-coded to UPSTREAM_VCS """ return {"name": pkg.name, "summary": pkg.summary, "description": pkg.description, "homepage": pkg.url, "upstream_vcs": "UPSTREAM_VCS"}
[docs]def pkg_to_build(pkg): """Extract the build details from a hawkey.Package object :param pkg: hawkey.Package object with package details :type pkg: hawkey.Package :returns: A dict with the build details, epoch, release, arch, build_time, changelog, ... :rtype: dict metadata entries are hard-coded to {} Note that this only returns the build dict, it does not include the name, description, etc. """ return {"epoch": pkg.epoch, "release": pkg.release, "arch": pkg.arch, "build_time": api_time(pkg.buildtime), "changelog": "CHANGELOG_NEEDED", # XXX Not in hawkey.Package "build_config_ref": "BUILD_CONFIG_REF", "build_env_ref": "BUILD_ENV_REF", "metadata": {}, "source": {"license": pkg.license, "version": pkg.version, "source_ref": "SOURCE_REF", "metadata": {}}}
[docs]def pkg_to_project_info(pkg): """Extract the details from a hawkey.Package object :param pkg: hawkey.Package object with package details :type pkg: hawkey.Package :returns: A dict with the project details, as well as epoch, release, arch, build_time, changelog, ... :rtype: dict metadata entries are hard-coded to {} """ return {"name": pkg.name, "summary": pkg.summary, "description": pkg.description, "homepage": pkg.url, "upstream_vcs": "UPSTREAM_VCS", "builds": [pkg_to_build(pkg)]}
[docs]def pkg_to_dep(pkg): """Extract the info from a hawkey.Package object :param pkg: A hawkey.Package object :type pkg: hawkey.Package :returns: A dict with name, epoch, version, release, arch :rtype: dict """ return {"name": pkg.name, "epoch": pkg.epoch, "version": pkg.version, "release": pkg.release, "arch": pkg.arch}
[docs]def proj_to_module(proj): """Extract the name from a project_info dict :param pkg: dict with package details :type pkg: dict :returns: A dict with name, and group_type :rtype: dict group_type is hard-coded to "rpm" """ return {"name": proj["name"], "group_type": "rpm"}
[docs]def dep_evra(dep): """Return the epoch:version-release.arch for the dep :param dep: dependency dict :type dep: dict :returns: epoch:version-release.arch :rtype: str """ if dep["epoch"] == 0: return dep["version"]+"-"+dep["release"]+"."+dep["arch"] else: return str(dep["epoch"])+":"+dep["version"]+"-"+dep["release"]+"."+dep["arch"]
[docs]def dep_nevra(dep): """Return the name-epoch:version-release.arch""" return dep["name"]+"-"+dep_evra(dep)
[docs]def projects_list(dbo): """Return a list of projects :param dbo: dnf base object :type dbo: dnf.Base :returns: List of project info dicts with name, summary, description, homepage, upstream_vcs :rtype: list of dicts """ return projects_info(dbo, None)
[docs]def projects_info(dbo, project_names): """Return details about specific projects :param dbo: dnf base object :type dbo: dnf.Base :param project_names: List of names of projects to get info about :type project_names: str :returns: List of project info dicts with pkg_to_project as well as epoch, version, release, etc. :rtype: list of dicts If project_names is None it will return the full list of available packages """ if project_names: pkgs = dbo.sack.query().available().filter(name__glob=project_names) else: pkgs = dbo.sack.query().available() # iterate over pkgs # - if pkg.name isn't in the results yet, add pkg_to_project_info in sorted position # - if pkg.name is already in results, get its builds. If the build for pkg is different # in any way (version, arch, etc.) add it to the entry's builds list. If it is the same, # skip it. results = [] results_names = {} for p in pkgs: if p.name.lower() not in results_names: idx = insort_left(results, pkg_to_project_info(p), key=lambda p: p["name"].lower()) results_names[p.name.lower()] = idx else: build = pkg_to_build(p) if build not in results[results_names[p.name.lower()]]["builds"]: results[results_names[p.name.lower()]]["builds"].append(build) return results
def _depsolve(dbo, projects, groups): """Add projects to a new transaction :param dbo: dnf base object :type dbo: dnf.Base :param projects: The projects and version globs to find the dependencies for :type projects: List of tuples :param groups: The groups to include in dependency solving :type groups: List of str :returns: None :rtype: None :raises: ProjectsError if there was a problem installing something """ # This resets the transaction and updates the cache. # It is important that the cache always be synchronized because Anaconda will grab its own copy # and if that is different the NEVRAs will not match and the build will fail. dbo.reset(goal=True) install_errors = [] for name in groups: try: dbo.group_install(name, ["mandatory", "default"]) except dnf.exceptions.MarkingError as e: install_errors.append(("Group %s" % (name), str(e))) for name, version in projects: # Find the best package matching the name + version glob # dnf can return multiple packages if it is in more than 1 repository query = dbo.sack.query().filterm(provides__glob=name) if version: query.filterm(version__glob=version) query.filterm(latest=1) if not query: install_errors.append(("%s-%s" % (name, version), "No match")) continue sltr = dnf.selector.Selector(dbo.sack).set(pkg=query) # NOTE: dnf says in near future there will be a "goal" attribute of Base class # so yes, we're using a 'private' attribute here on purpose and with permission. dbo._goal.install(select=sltr, optional=False) if install_errors: raise ProjectsError("The following package(s) had problems: %s" % ",".join(["%s (%s)" % (pattern, err) for pattern, err in install_errors]))
[docs]def projects_depsolve(dbo, projects, groups): """Return the dependencies for a list of projects :param dbo: dnf base object :type dbo: dnf.Base :param projects: The projects to find the dependencies for :type projects: List of Strings :param groups: The groups to include in dependency solving :type groups: List of str :returns: NEVRA's of the project and its dependencies :rtype: list of dicts :raises: ProjectsError if there was a problem installing something """ _depsolve(dbo, projects, groups) try: dbo.resolve() except dnf.exceptions.DepsolveError as e: raise ProjectsError("There was a problem depsolving %s: %s" % (projects, str(e))) if len(dbo.transaction) == 0: return [] return sorted(map(pkg_to_dep, dbo.transaction.install_set), key=lambda p: p["name"].lower())
[docs]def estimate_size(packages, block_size=6144): """Estimate the installed size of a package list :param packages: The packages to be installed :type packages: list of hawkey.Package objects :param block_size: The block size to use for rounding up file sizes. :type block_size: int :returns: The estimated size of installed packages :rtype: int Estimating actual requirements is difficult without the actual file sizes, which dnf doesn't provide access to. So use the file count and block size to estimate a minimum size for each package. """ installed_size = 0 for p in packages: installed_size += len(p.files) * block_size installed_size += p.installsize return installed_size
[docs]def projects_depsolve_with_size(dbo, projects, groups, with_core=True): """Return the dependencies and installed size for a list of projects :param dbo: dnf base object :type dbo: dnf.Base :param project_names: The projects to find the dependencies for :type project_names: List of Strings :param groups: The groups to include in dependency solving :type groups: List of str :returns: installed size and a list of NEVRA's of the project and its dependencies :rtype: tuple of (int, list of dicts) :raises: ProjectsError if there was a problem installing something """ _depsolve(dbo, projects, groups) if with_core: dbo.group_install("core", ['mandatory', 'default', 'optional']) try: dbo.resolve() except dnf.exceptions.DepsolveError as e: raise ProjectsError("There was a problem depsolving %s: %s" % (projects, str(e))) if len(dbo.transaction) == 0: return (0, []) installed_size = estimate_size(dbo.transaction.install_set) deps = sorted(map(pkg_to_dep, dbo.transaction.install_set), key=lambda p: p["name"].lower()) return (installed_size, deps)
[docs]def modules_list(dbo, module_names): """Return a list of modules :param dbo: dnf base object :type dbo: dnf.Base :param offset: Number of modules to skip :type limit: int :param limit: Maximum number of modules to return :type limit: int :returns: List of module information and total count :rtype: tuple of a list of dicts and an Int Modules don't exist in RHEL7 so this only returns projects and sets the type to "rpm" """ # TODO - Figure out what to do with this for Fedora 'modules' return list(map(proj_to_module, projects_info(dbo, module_names)))
[docs]def modules_info(dbo, module_names): """Return details about a module, including dependencies :param dbo: dnf base object :type dbo: dnf.Base :param module_names: Names of the modules to get info about :type module_names: str :returns: List of dicts with module details and dependencies. :rtype: list of dicts """ modules = projects_info(dbo, module_names) # Add the dependency info to each one for module in modules: module["dependencies"] = projects_depsolve(dbo, [(module["name"], "*.*")], []) return modules
[docs]def dnf_repo_to_file_repo(repo): """Return a string representation of a DNF Repo object suitable for writing to a .repo file :param repo: DNF Repository :type repo: dnf.RepoDict :returns: A string :rtype: str The DNF Repo.dump() function does not produce a string that can be used as a dnf .repo file, it ouputs baseurl and gpgkey as python lists which DNF cannot read. So do this manually with only the attributes we care about. """ repo_str = "[%s]\nname = %s\n" % (repo.id, repo.name) if repo.metalink: repo_str += "metalink = %s\n" % repo.metalink elif repo.mirrorlist: repo_str += "mirrorlist = %s\n" % repo.mirrorlist elif repo.baseurl: repo_str += "baseurl = %s\n" % repo.baseurl[0] else: raise RuntimeError("Repo has no baseurl, metalink, or mirrorlist") # proxy is optional if repo.proxy: repo_str += "proxy = %s\n" % repo.proxy repo_str += "sslverify = %s\n" % repo.sslverify repo_str += "gpgcheck = %s\n" % repo.gpgcheck if repo.gpgkey: repo_str += "gpgkey = %s\n" % ",".join(repo.gpgkey) if repo.skip_if_unavailable: repo_str += "skip_if_unavailable=1\n" return repo_str
[docs]def repo_to_source(repo, system_source, api=1): """Return a Weldr Source dict created from the DNF Repository :param repo: DNF Repository :type repo: dnf.RepoDict :param system_source: True if this source is an immutable system source :type system_source: bool :param api: Select which api version of the dict to return (default 1) :type api: int :returns: A dict with Weldr Source fields filled in :rtype: dict Example:: { "check_gpg": true, "check_ssl": true, "gpgkey_url": [ "file:///etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-28-x86_64" ], "id": "fedora", "name": "Fedora $releasever - $basearch", "proxy": "http://proxy.brianlane.com:8123", "system": true "type": "yum-metalink", "url": "https://mirrors.fedoraproject.org/metalink?repo=fedora-28&arch=x86_64" } The ``name`` field has changed in v1 of the API. In v0 of the API ``name`` is the repo.id, in v1 it is the repo.name and a new field, ``id`` has been added for the repo.id """ if api==0: source = {"name": repo.id, "system": system_source} else: source = {"id": repo.id, "name": repo.name, "system": system_source} if repo.baseurl: source["url"] = repo.baseurl[0] source["type"] = "yum-baseurl" elif repo.metalink: source["url"] = repo.metalink source["type"] = "yum-metalink" elif repo.mirrorlist: source["url"] = repo.mirrorlist source["type"] = "yum-mirrorlist" else: raise RuntimeError("Repo has no baseurl, metalink, or mirrorlist") # proxy is optional if repo.proxy: source["proxy"] = repo.proxy if not repo.sslverify: source["check_ssl"] = False else: source["check_ssl"] = True if not repo.gpgcheck: source["check_gpg"] = False else: source["check_gpg"] = True if repo.gpgkey: source["gpgkey_urls"] = list(repo.gpgkey) return source
[docs]def source_to_repodict(source): """Return a tuple suitable for use with dnf.add_new_repo :param source: A Weldr source dict :type source: dict :returns: A tuple of dnf.Repo attributes :rtype: (str, list, dict) Return a tuple with (id, baseurl|(), kwargs) that can be used with dnf.repos.add_new_repo """ kwargs = {} if "id" in source: # This is an API v1 source definition repoid = source["id"] if "name" in source: kwargs["name"] = source["name"] else: repoid = source["name"] # This will allow errors to be raised so we can catch them # without this they are logged, but the repo is silently disabled kwargs["skip_if_unavailable"] = False if source["type"] == "yum-baseurl": baseurl = [source["url"]] elif source["type"] == "yum-metalink": kwargs["metalink"] = source["url"] baseurl = () elif source["type"] == "yum-mirrorlist": kwargs["mirrorlist"] = source["url"] baseurl = () if "proxy" in source: kwargs["proxy"] = source["proxy"] if source["check_ssl"]: kwargs["sslverify"] = True else: kwargs["sslverify"] = False if source["check_gpg"]: kwargs["gpgcheck"] = True else: kwargs["gpgcheck"] = False if "gpgkey_urls" in source: kwargs["gpgkey"] = tuple(source["gpgkey_urls"]) return (repoid, baseurl, kwargs)
[docs]def source_to_repo(source, dnf_conf): """Return a dnf Repo object created from a source dict :param source: A Weldr source dict :type source: dict :param dnf_conf: The dnf Config object :type dnf_conf: dnf.conf :returns: A dnf Repo object :rtype: dnf.Repo Example:: { "check_gpg": True, "check_ssl": True, "gpgkey_urls": [ "file:///etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-28-x86_64" ], "id": "fedora", "name": "Fedora $releasever - $basearch", "proxy": "http://proxy.brianlane.com:8123", "system": True "type": "yum-metalink", "url": "https://mirrors.fedoraproject.org/metalink?repo=fedora-28&arch=x86_64" } If the ``id`` field is included it is used for the repo id, otherwise ``name`` is used. v0 of the API only used ``name``, v1 added the distinction between ``id`` and ``name``. """ repoid, baseurl, kwargs = source_to_repodict(source) repo = dnf.repo.Repo(repoid, dnf_conf) if baseurl: repo.baseurl = baseurl # Apply the rest of the kwargs to the Repo object for k, v in kwargs.items(): setattr(repo, k, v) repo.enable() return repo
[docs]def get_source_ids(source_path): """Return a list of the source ids in a file :param source_path: Full path and filename of the source (yum repo) file :type source_path: str :returns: A list of source id strings :rtype: list of str """ if not os.path.exists(source_path): return [] cfg = ConfigParser() cfg.read(source_path) return cfg.sections()
[docs]def get_repo_sources(source_glob): """Return a list of sources from a directory of yum repositories :param source_glob: A glob to use to match the source files, including full path :type source_glob: str :returns: A list of the source ids in all of the matching files :rtype: list of str """ sources = [] for f in glob(source_glob): sources.extend(get_source_ids(f)) return sources
[docs]def delete_repo_source(source_glob, source_id): """Delete a source from a repo file :param source_glob: A glob of the repo sources to search :type source_glob: str :param source_id: The repo id to delete :type source_id: str :returns: None :raises: ProjectsError if there was a problem A repo file may have multiple sources in it, delete only the selected source. If it is the last one in the file, delete the file. WARNING: This will delete ANY source, the caller needs to ensure that a system source_id isn't passed to it. """ found = False for f in glob(source_glob): try: cfg = ConfigParser() cfg.read(f) if source_id in cfg.sections(): found = True cfg.remove_section(source_id) # If there are other sections, rewrite the file without the deleted one if len(cfg.sections()) > 0: with open(f, "w") as cfg_file: cfg.write(cfg_file) else: # No sections left, just delete the file os.unlink(f) except Exception as e: raise ProjectsError("Problem deleting repo source %s: %s" % (source_id, str(e))) if not found: raise ProjectsError("source %s not found" % source_id)
[docs]def new_repo_source(dbo, repoid, source, repo_dir): """Add a new repo source from a Weldr source dict :param dbo: dnf base object :type dbo: dnf.Base :param id: The repo id (API v0 uses the name, v1 uses the id) :type id: str :param source: A Weldr source dict :type source: dict :returns: None :raises: ... Make sure access to the dbo has been locked before calling this. The `id` parameter will the the 'name' field for API v0, and the 'id' field for API v1 DNF variables will be substituted at load time, and on restart. """ try: # Remove it from the RepoDict (NOTE that this isn't explicitly supported by the DNF API) # If this repo already exists, delete it and replace it with the new one repos = list(r.id for r in dbo.repos.iter_enabled()) if repoid in repos: del dbo.repos[repoid] # Add the repo and substitute any dnf variables _, baseurl, kwargs = source_to_repodict(source) log.debug("repoid=%s, baseurl=%s, kwargs=%s", repoid, baseurl, kwargs) r = dbo.repos.add_new_repo(repoid, dbo.conf, baseurl, **kwargs) r.enable() log.info("Updating repository metadata after adding %s", repoid) dbo.fill_sack(load_system_repo=False) dbo.read_comps() # Remove any previous sources with this id, ignore it if it isn't found try: delete_repo_source(joinpaths(repo_dir, "*.repo"), repoid) except ProjectsError: pass # Make sure the source id can't contain a path traversal by taking the basename source_path = joinpaths(repo_dir, os.path.basename("%s.repo" % repoid)) # Write the un-substituted version of the repo to disk with open(source_path, "w") as f: repo = source_to_repo(source, dbo.conf) f.write(dnf_repo_to_file_repo(repo)) except Exception as e: log.error("(new_repo_source) adding %s failed: %s", repoid, str(e)) # Cleanup the mess, if loading it failed we don't want to leave it in memory repos = list(r.id for r in dbo.repos.iter_enabled()) if repoid in repos: del dbo.repos[repoid] log.info("Updating repository metadata after adding %s failed", repoid) dbo.fill_sack(load_system_repo=False) dbo.read_comps() raise