# Copyright 2013-2018 The Distro Tracker Developers
# See the COPYRIGHT file at the top-level directory of this distribution and
# at https://deb.li/DTAuthors
#
# This file is part of Distro Tracker. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
# including this file, may be copied, modified, propagated, or distributed
# except according to the terms contained in the LICENSE file.
"""Utilities for processing Debian package information."""
import os
import re
import shutil
import subprocess
import tarfile
import apt
from debian import deb822
from django.conf import settings
from django.urls import reverse
from django.utils.encoding import force_bytes
import apt_pkg
from distro_tracker.core.utils.email_messages import \
name_and_address_from_string as parse_address
from distro_tracker.core.utils.email_messages import \
names_and_addresses_from_string as parse_addresses
[docs]def package_hashdir(package_name):
"""
Returns the name of the hash directory used to avoid having too
many entries in a single directory. It's usually the first letter
of the package except for lib* packages where it's the first 4
letters.
:param package_name: The package name.
:type package_name: str
:returns: Name of the hash directory.
:rtype: str
"""
if package_name is None:
return None
if package_name.startswith('lib'):
return package_name[0:4]
else:
return package_name[0:1]
[docs]def package_url(package_name):
"""
Returns the URL of the page dedicated to this package name.
:param package_name: The package name.
:type package_name: str or PackageName model
:returns: Name of the hash directory.
:rtype: str
"""
if package_name is None:
return None
return reverse('dtracker-package-page',
kwargs={'package_name': str(package_name)})
[docs]class SourcePackageRetrieveError(Exception):
pass
[docs]class AptCache(object):
"""
A class for handling cached package information.
"""
DEFAULT_MAX_SIZE = 1 * 1024 ** 3 # 1 GiB
QUILT_FORMAT = '3.0 (quilt)'
[docs] class AcquireProgress(apt.progress.base.AcquireProgress):
"""
Instances of this class can be passed to :meth:`apt.cache.Cache.update`
calls.
It provides a way to track which files were changed and which were not
by an update operation.
"""
def __init__(self, *args, **kwargs):
super(AptCache.AcquireProgress, self).__init__(*args, **kwargs)
self.fetched = []
self.hit = []
[docs] def done(self, item):
self.fetched.append(os.path.split(item.owner.destfile)[1])
[docs] def ims_hit(self, item):
self.hit.append(os.path.split(item.owner.destfile)[1])
[docs] def pulse(self, owner):
return True
def __init__(self):
# The root cache directory is a subdirectory in the
# DISTRO_TRACKER_CACHE_DIRECTORY
self.cache_root_dir = os.path.join(
settings.DISTRO_TRACKER_CACHE_DIRECTORY,
'apt-cache'
)
self.sources_list_path = os.path.join(
self.cache_root_dir, 'etc', 'sources.list')
self.conf_file_path = os.path.join(self.cache_root_dir,
'etc', 'apt.conf')
os.environ['APT_CONFIG'] = self.conf_file_path
self.sources = []
self.packages = []
self.cache_max_size = getattr(
settings, 'DISTRO_TRACKER_APT_CACHE_MAX_SIZE',
self.DEFAULT_MAX_SIZE)
#: The directory where source package files are cached
self.source_cache_directory = os.path.join(self.cache_root_dir,
'packages')
self._cache_size = None # Evaluate the cache size lazily
self.configure_cache()
@property
def cache_size(self):
if self._cache_size is None:
self._cache_size = \
self.get_directory_size(self.source_cache_directory)
return self._cache_size
[docs] def get_directory_size(self, directory_path):
"""
Returns the total space taken by the given directory in bytes.
:param directory_path: The path to the directory
:type directory_path: string
:rtype: int
"""
# Convert the directory path to bytes to make sure all os calls deal
# with bytes, not unicode objects.
# This way any file names with invalid utf-8 names, are correctly
# handled, without causing an error.
directory_path = force_bytes(directory_path)
total_size = 0
for dirpath, dirnames, filenames in os.walk(directory_path):
for file_name in filenames:
file_path = os.path.join(dirpath, file_name)
stat = os.lstat(file_path)
total_size += stat.st_size
return total_size
[docs] def clear_cache(self):
"""
Removes all cache information. This causes the next update to retrieve
fresh repository files.
"""
self._remove_dir(self.cache_root_dir)
self.configure_cache()
[docs] def update_sources_list(self):
"""
Updates the ``sources.list`` file used to list repositories for which
package information should be cached.
"""
from distro_tracker.core.models import Repository
directory = os.path.dirname(self.sources_list_path)
if not os.path.exists(directory):
os.makedirs(directory)
with open(self.sources_list_path, 'w') as sources_list:
for repository in Repository.objects.all():
sources_list.write(repository.sources_list_entry + '\n')
[docs] def update_apt_conf(self):
"""
Updates the ``apt.conf`` file which gives general settings for the
:class:`apt.cache.Cache`.
In particular, this updates the list of all architectures which should
be considered in package updates based on architectures that the
repositories support.
"""
from distro_tracker.core.models import Architecture
with open(self.conf_file_path, 'w') as conf_file:
conf_file.write('APT::Architectures { ')
for architecture in Architecture.objects.all():
conf_file.write('"{arch}"; '.format(arch=architecture))
conf_file.write('};\n')
conf_file.write('Acquire::CompressionTypes::Order:: "xz";\n')
conf_file.write('Dir "{}/";\n'.format(self.cache_root_dir))
conf_file.write('Dir::State "state/";\n')
conf_file.write('Dir::State::status "dpkg-status";\n')
conf_file.write('Dir::Etc "etc/";\n')
conf_file.write('Dir::Etc::sourcelist "{src}";\n'.format(
src=self.sources_list_path))
conf_file.write('Dir::Etc::Trusted "{src}";\n'.format(
src=settings.DISTRO_TRACKER_TRUSTED_GPG_MAIN_FILE))
conf_file.write('Dir::Etc::TrustedParts "{src}";\n'.format(
src=settings.DISTRO_TRACKER_TRUSTED_GPG_PARTS_DIR))
def _index_file_full_path(self, file_name):
"""
Returns the absolute path for the given cached index file.
:param file_name: The name of the cached index file.
:type file_name: string
:rtype: string
"""
return os.path.join(
apt_pkg.config.find_dir('Dir::State::lists'),
file_name
)
def _match_index_file_to_repository(self, sources_file):
"""
Returns a two-tuple ``(class:`Repository <distro_tracker.core.
models.Repository>`, component)``. The class:`Repository
<distro_tracker.core.models.Repository>` instance which matches the
given cached ``Sources`` file and the ``component`` of the ``Source``.
:rtype: (:class:`Repository <distro_tracker.core.models.Repository>`,
string)
"""
from distro_tracker.core.models import Repository
sources_list = apt_pkg.SourceList()
sources_list.read_main_list()
component_url = None
component = None
for entry in sources_list.list:
for index_file in entry.index_files:
if os.path.basename(sources_file) in index_file.describe:
base_url, component, _ = index_file.describe.split(None, 2)
base_url = base_url.rstrip('/')
component_url = base_url + '/' + component
break
components = component.split('/')
if len(components) >= 2:
component = components[1].strip()
for repository in Repository.objects.all():
if component_url in repository.component_urls:
return repository, component
def _get_all_cached_files(self):
"""
Returns a list of all cached files.
"""
lists_directory = apt_pkg.config.find_dir('Dir::State::lists')
try:
return [
os.path.join(lists_directory, file_name)
for file_name in os.listdir(lists_directory)
if os.path.isfile(os.path.join(lists_directory, file_name))
]
except OSError:
# The directory structure does not exist => nothing is cached
return []
[docs] def get_cached_files(self, filter_function=None):
"""
Returns cached files, optionally filtered by the given
``filter_function``
:param filter_function: Takes a file name as the only parameter and
returns a :class:`bool` indicating whether it should be included
in the result.
:type filter_function: callable
:returns: A list of cached file names
:rtype: list
"""
if filter_function is None:
# Include all files if the filter function is not provided
def filter_function(x):
return True
return [
file_name
for file_name in self._get_all_cached_files()
if filter_function(file_name)
]
[docs] def get_sources_files_for_repository(self, repository):
"""
Returns all ``Sources`` files which are cached for the given
repository.
For instance, ``Sources`` files for different suites are cached
separately.
:param repository: The repository for which to return all cached
``Sources`` files
:type repository: :class:`Repository
<distro_tracker.core.models.Repository>`
:rtype: ``iterable`` of strings
"""
return self.get_cached_files(
lambda file_name: (
file_name.endswith('Sources') and
self._match_index_file_to_repository(
file_name)[0] == repository))
[docs] def get_packages_files_for_repository(self, repository):
"""
Returns all ``Packages`` files which are cached for the given
repository.
For instance, ``Packages`` files for different suites are cached
separately.
:param repository: The repository for which to return all cached
``Packages`` files
:type repository: :class:`Repository
<distro_tracker.core.models.Repository>`
:rtype: ``iterable`` of strings
"""
return self.get_cached_files(
lambda file_name: (
file_name.endswith('Packages') and
self._match_index_file_to_repository(
file_name)[0] == repository))
[docs] def update_repositories(self, force_download=False):
"""
Initiates a cache update.
:param force_download: If set to ``True`` causes the cache to be
cleared before starting the update, thus making sure all index
files are downloaded again.
:returns: A two-tuple ``(updated_sources, updated_packages)``. Each of
the tuple's members is a list of
(:class:`Repository <distro_tracker.core.models.Repository>`,
``component``, ``file_name``) tuple representing the repository
which was updated, component, and the file which contains the fresh
information. The file is either a ``Sources`` or a ``Packages``
file respectively.
"""
if force_download:
self.clear_cache()
self.configure_cache()
cache = apt.Cache(rootdir=self.cache_root_dir)
progress = AptCache.AcquireProgress()
cache.update(progress)
updated_sources = []
updated_packages = []
for fetched_file in progress.fetched:
if fetched_file.endswith('Sources'):
dest = updated_sources
elif fetched_file.endswith('Packages'):
dest = updated_packages
else:
continue
repository, component = self._match_index_file_to_repository(
fetched_file)
dest.append((
repository, component, self._index_file_full_path(fetched_file)
))
return updated_sources, updated_packages
def _get_format(self, record):
"""
Returns the Format field value of the given source package record.
"""
record = deb822.Deb822(record)
return record['format']
def _extract_quilt_package_debian_tar(self, debian_tar_path, outdir):
"""
Extracts the given tarball to the given output directory.
"""
with tarfile.open(debian_tar_path) as archive_file:
archive_file.extractall(outdir)
[docs] def get_package_source_cache_directory(self, package_name):
"""
Returns the path to the directory where a particular source package is
cached.
:param package_name: The name of the source package
:type package_name: string
:rtype: string
"""
package_hash = (
package_name[0]
if not package_name.startswith('lib') else
package_name[:4]
)
return os.path.join(
self.source_cache_directory,
package_hash,
package_name)
[docs] def get_source_version_cache_directory(self, package_name, version):
"""
Returns the path to the directory where a particular source package
version files are extracted.
:param package_name: The name of the source package
:type package_name: string
:param version: The version of the source package
:type version: string
:rtype: string
"""
package_dir = self.get_package_source_cache_directory(package_name)
return os.path.join(package_dir, package_name + '-' + version)
def _remove_dir(self, directory_path):
"""
Removes the given directory, including any subdirectories and files.
The method makes sure to correctly handle the situation where the
directory contains files with names which are invalid utf-8.
"""
# Convert the directory path to bytes to make sure all os calls deal
# with bytes, not unicode objects.
# This way any file names with invalid utf-8 names, are correctly
# handled, without causing an error.
directory_path = force_bytes(directory_path)
if os.path.exists(directory_path):
shutil.rmtree(directory_path)
[docs] def clear_cached_sources(self):
"""
Clears all cached package source files.
"""
self._remove_dir(self.source_cache_directory)
self._cache_size = self.get_directory_size(self.source_cache_directory)
def _get_apt_source_records(self, source_name, version):
"""
Returns a :class:`apt_pkg.SourceRecords` instance where the given
source package is the current working record.
"""
apt.Cache(rootdir=self.cache_root_dir) # must be pre-created
source_records = apt_pkg.SourceRecords()
source_records.restart()
# Find the cached record matching this source package and version
found = False
while source_records.lookup(source_name):
if source_records.version == version:
found = True
break
if not found:
# Package version does not exist in the cache
raise SourcePackageRetrieveError(
"Could not retrieve package {pkg} version {ver}:"
" No such version found in the cache".format(
pkg=source_name, ver=version))
return source_records
def _extract_dpkg_source(self, retrieved_files, outdir):
"""
Uses dpkg-source to extract the source package.
"""
dsc_file_path = next(
file_path
for file_path in retrieved_files
if file_path.endswith('.dsc'))
dsc_file_path = os.path.abspath(dsc_file_path)
outdir = os.path.abspath(outdir)
subprocess.check_output(["dpkg-source", "-x", dsc_file_path, outdir],
stderr=subprocess.STDOUT)
def _apt_acquire_package(self,
source_records,
dest_dir_path,
debian_directory_only):
"""
Using :class:`apt_pkg.Acquire`, retrieves the source files for the
source package described by the current source_records record.
:param source_records: The record describing the source package whose
files should be retrieved.
:type source_records: :class:`apt_pkg.Acquire`
:param dest_dir_path: The path to the directory where the downloaded
files should be saved.
:type dest_dir_path: string
:param debian_directory_only: A flag indicating whether only the debian
directory should be downloaded.
:returns: A list of absolute paths of all retrieved source files.
:rtype: list of strings
"""
package_format = self._get_format(source_records.record)
# A reference to each AcquireFile instance must be kept
files = []
acquire = apt_pkg.Acquire(apt.progress.base.AcquireProgress())
for md5, size, path, file_type in source_records.files:
base = os.path.basename(path)
dest_file_path = os.path.join(dest_dir_path, base)
if debian_directory_only and package_format == self.QUILT_FORMAT:
if file_type != 'diff':
# Only retrieve the .debian.tar.* file for quilt packages
# when only the debian directory is wanted
continue
files.append(apt_pkg.AcquireFile(
acquire,
source_records.index.archive_uri(path),
md5,
size,
base,
destfile=dest_file_path
))
acquire.run()
# Check if all items are correctly retrieved and build the list of file
# paths.
retrieved_paths = []
for item in acquire.items:
if item.status != item.STAT_DONE:
raise SourcePackageRetrieveError(
'Could not retrieve file {file}: {error}'.format(
file=item.destfile,
error=item.error_text.decode('utf-8')))
retrieved_paths.append(item.destfile)
return retrieved_paths
[docs] def retrieve_source(self, source_name, version,
debian_directory_only=False):
"""
Retrieve the source package files for the given source package version.
:param source_name: The name of the source package
:type source_name: string
:param version: The version of the source package
:type version: string
:param debian_directory_only: Flag indicating if the method should try
to retrieve only the debian directory of the source package. This
is usually only possible when the package format is 3.0 (quilt).
:type debian_directory_only: Boolean
:returns: The path to the directory containing the extracted source
package files.
:rtype: string
"""
if self.cache_size > self.cache_max_size:
# If the maximum allowed cache size has been exceeded,
# clear the cache
self.clear_cached_sources()
source_records = self._get_apt_source_records(source_name, version)
dest_dir_path = self.get_package_source_cache_directory(source_name)
if not os.path.exists(dest_dir_path):
os.makedirs(dest_dir_path)
# Remember the size of the directory in the beginning
old_size = self.get_directory_size(dest_dir_path)
# Download the source files
retrieved_files = self._apt_acquire_package(
source_records, dest_dir_path, debian_directory_only)
# Extract the retrieved source files
outdir = self.get_source_version_cache_directory(source_name, version)
# dpkg-source expects this directory not to exist
self._remove_dir(outdir)
package_format = self._get_format(source_records.record)
if debian_directory_only and package_format == self.QUILT_FORMAT:
# dpkg-source cannot extract an incomplete package
self._extract_quilt_package_debian_tar(retrieved_files[0], outdir)
else:
# Let dpkg-source handle the extraction in all other cases
self._extract_dpkg_source(retrieved_files, outdir)
# Update the current cache size based on the changes made by getting
# this source package.
new_size = self.get_directory_size(dest_dir_path)
size_delta = new_size - old_size
self._cache_size += size_delta
return outdir
[docs]def html_package_list(packages):
packages_html = []
for package in packages:
if "/" in package:
(source_package_name, remain) = package.split("/", 1)
remain = "/%s" % (remain,)
else:
(source_package_name, remain) = (package, "")
html = '<a href="{}">{}</a>{}'.format(
package_url(source_package_name), source_package_name, remain)
packages_html.append(html)
return ', '.join(packages_html)