Source code for AutoArchive._services.archiver._external_tar_archiver_provider

# _external_tar_archiver_provider.py
#
# Project: AutoArchive
# License: GNU GPLv3
#
# Copyright (C) 2003 - 2021 Róbert Čerňanský



""":class:`_ExternalTarArchiverProvider` class."""



__all__ = ["_ExternalTarArchiverProvider"]



# {{{ INCLUDES

import os
import glob
import shutil
import select
import itertools
import subprocess
import re
import errno
import tempfile

from AutoArchive._infrastructure.utils import Utils
from AutoArchive._infrastructure.py_additions import staticproperty
from . import BackupTypes, BackupSubOperations, ArchiverFeatures, BackupOperationErrors, MIN_COMPRESSION_STRENGTH, \
    MAX_COMPRESSION_STRENGTH
from ._tar_archiver_provider_base import _TarArchiverProviderBase, _BACKUP_TYPES_TO_EXTENSIONS
from AutoArchive._infrastructure.utils.interval import IntervalElement, Interval

# }}} INCLUDES



# {{{ CLASSES

[docs]class _ExternalTarArchiverProvider(_TarArchiverProviderBase): """External archiver service provider. See also: :class:`._TarArchiverProviderBase`. :raise OSError: If creation of the snapshot directory failed.""" # name of the archiver binary __ARCHIVER_BINARY = "tar" # directories where tar is being looked up if it was not found in PATH __ARCHIVER_LOCATIONS_FALLBACKS = ("/bin", "/usr/bin", "/usr/local/bin") # backup type to GNU tar compress option map __BACKUP_TYPE_TO_COMPRESS_OPTION = {BackupTypes.Tar: "", BackupTypes.TarGz: "--gzip", BackupTypes.TarBz2: "--bzip2", BackupTypes.TarXz: "--xz", BackupTypes.TarZst: "--zstd"} # {{{ _TarArchiverProviderBase overrides def __init__(self, workDir): super().__init__(workDir) # stores the error state during the backup operation self.__errorOccurred = None # path to the tar binary self.__archiver = self.__locateTar() self.__ExternalTarIncrementalUtility.makeSnapshotsDir(workDir) @staticproperty def supportedBackupTypes(): "See :attr:`._TarArchiverProviderBase.supportedBackupTypes`" return frozenset({BackupTypes.Tar, BackupTypes.TarGz, BackupTypes.TarBz2, BackupTypes.TarXz, BackupTypes.TarZst})
[docs] def backupFiles(self, backupDefinition, compressionStrength = None, overwriteAtStart = False): "See: :meth:`._TarArchiverProviderBase.backupFiles()`." super().backupFiles(backupDefinition, compressionStrength, overwriteAtStart) self.__raiseIfBadCompressionStrength(compressionStrength) backupFilePath = self.getBackupFilePath_(backupDefinition.backupId, backupDefinition.backupType, backupDefinition.destination) workingBackupFilePath = backupFilePath if overwriteAtStart else self.getWorkingPath_(backupFilePath) sysEnvironment = os.environ.copy() arguments = self.__arguments(backupDefinition.backupType, workingBackupFilePath, backupDefinition.root, backupDefinition.includeFiles, backupDefinition.excludeFiles, compressionStrength, sysEnvironment) sysEnvironment.update(arguments[1]) tarProcess = self.__executeTar(arguments[0], sysEnvironment) self.__processTarOutput(tarProcess) if not overwriteAtStart: shutil.move(workingBackupFilePath, backupFilePath) return backupFilePath
[docs] def backupFilesIncrementally(self, backupDefinition, compressionStrength = None, level = None, overwriteAtStart = False): "See: :meth:`._TarArchiverProviderBase.backupFilesIncrementally()`." super().backupFilesIncrementally(backupDefinition, compressionStrength, level, overwriteAtStart) self.__raiseIfBadCompressionStrength(compressionStrength) externalTarIncrementalUtility = self.__ExternalTarIncrementalUtility(backupDefinition.backupId, self.workDir_) maxBackupLevel = externalTarIncrementalUtility.getMaxBackupLevel() if level is None: level = maxBackupLevel if level < 0 or level > maxBackupLevel: raise ValueError(str.format( "'level' must be from interval 0 <= level <= maxBackupLevel ({}). The passed value was {}.", maxBackupLevel, level)) workingSnapshotFilePath = externalTarIncrementalUtility.createWorkingSnapshotFile(level) backupFilePath = self.getBackupFilePath_(backupDefinition.backupId, backupDefinition.backupType, backupDefinition.destination, level) workingBackupFilePath = backupFilePath if overwriteAtStart else self.getWorkingPath_(backupFilePath) sysEnvironment = os.environ.copy() arguments = self.__arguments(backupDefinition.backupType, workingBackupFilePath, backupDefinition.root, backupDefinition.includeFiles, backupDefinition.excludeFiles, compressionStrength, sysEnvironment, workingSnapshotFilePath) sysEnvironment.update(arguments[1]) try: tarProcess = self.__executeTar(arguments[0], sysEnvironment) self.__processTarOutput(tarProcess) if not overwriteAtStart: shutil.move(workingBackupFilePath, backupFilePath) externalTarIncrementalUtility.manageSnapshotFiles(level, workingSnapshotFilePath) finally: if os.path.exists(workingSnapshotFilePath): os.remove(workingSnapshotFilePath) return backupFilePath
[docs] def removeBackupIncrements(self, backupDefinition, level = None, keepingId = None): "See: :meth:`._TarArchiverProviderBase.removeBackupIncrements()`." externalTarIncrementalUtility = self.__ExternalTarIncrementalUtility(backupDefinition.backupId, self.workDir_) self.raiseIfUnsupportedBackupType_(backupDefinition.backupType) if level is not None: if level < 0: raise ValueError(str.format("'level' must be > 0. The passed value was {}.", level)) else: level = self.getMaxBackupLevel(backupDefinition.backupId) removeLevel = level backupExists = True while backupExists: backupFilePath = self.getBackupFilePath_( backupDefinition.backupId, backupDefinition.backupType, backupDefinition.destination, removeLevel, keepingId) backupExists = os.path.exists(backupFilePath) if backupExists: os.remove(backupFilePath) externalTarIncrementalUtility.tryRemoveSnapshotFile(removeLevel, keepingId) removeLevel += 1 if self.getMaxBackupLevel(backupDefinition.backupId) > level and keepingId is None: # ouch! some rogue snapshots still exists; deal with them slowly and painfully externalTarIncrementalUtility.removeSnapshotFiles(level)
[docs] @classmethod def getSupportedFeatures(cls, backupType = None): "See: :meth:`._TarArchiverProviderBase.getSupportedFeatures()`." if backupType is not None: cls.raiseIfUnsupportedBackupType_(backupType) if backupType == BackupTypes.Tar: supportedFeatures = frozenset((ArchiverFeatures.Incremental,)) else: supportedFeatures = frozenset({ArchiverFeatures.Incremental, ArchiverFeatures.CompressionStrength}) return supportedFeatures
[docs] def getMaxBackupLevel(self, backupId): "See: :meth:`._TarArchiverProviderBase.getMaxBackupLevel()`." return self.__ExternalTarIncrementalUtility(backupId, self.workDir_).getMaxBackupLevel()
[docs] @Utils.uniq def getStoredBackupIds(self): "See: :meth:`._TarArchiverProviderBase.getStoredBackupIds()`." snapshots = self.__ExternalTarIncrementalUtility.getSnapshotsForBackup( self.__ExternalTarIncrementalUtility.getSnapshotsDir(self.workDir_)) return (os.path.splitext(os.path.splitext(os.path.basename(snapshot))[0])[0] for snapshot in snapshots)
[docs] def purgeStoredBackupData(self, backupId): "See: :meth:`._TarArchiverProviderBase.purgeStoredBackupData()`." snapshotsDir = self.__ExternalTarIncrementalUtility.getSnapshotsDir(self.workDir_) snapshots = self.__ExternalTarIncrementalUtility.getSnapshotsForBackup(snapshotsDir, backupId) for snapshot in snapshots: os.remove(os.path.join(snapshotsDir, snapshot))
[docs] def doesAnyBackupLevelExist(self, backupDefinition, fromLevel = 0, keepingId = None): "See: :meth:`._TarArchiverProviderBase.doesAnyBackupLevelExist()`." keepToken = "." + keepingId if keepingId else "" # SMELL: Backup path is similarly assembled in super().getBackupFilePath_. level0Glob = os.path.join(backupDefinition.destination, backupDefinition.backupId + keepToken + "." + _BACKUP_TYPES_TO_EXTENSIONS[backupDefinition.backupType]) levelGreaterThan0Glob = os.path.join(backupDefinition.destination, backupDefinition.backupId + ".*" + keepToken + "." + _BACKUP_TYPES_TO_EXTENSIONS[backupDefinition.backupType]) backups = itertools.chain(glob.iglob(os.path.join(level0Glob)), glob.iglob(os.path.join(levelGreaterThan0Glob))) backups = itertools.dropwhile( lambda bac: self.__ExternalTarIncrementalUtility.getLevelFromFileName( os.path.basename(bac), keepingId is not None) < fromLevel, backups) return bool(list(itertools.islice(backups, 1)))
# }}} _TarArchiverProviderBase overrides def __executeTar(self, arguments, environment): environment["LC_MESSAGES"] = "C" if "LC_ALL" in environment: del environment["LC_ALL"] try: tarProcess = subprocess.Popen([self.__archiver] + arguments, stdout = subprocess.PIPE, stderr = subprocess.PIPE, env = environment, universal_newlines = True) except OSError as ex: raise OSError(str.format("Error while executing external archiving program: {}.", self.__archiver), self.__archiver, ex) return tarProcess def __processTarOutput(self, archiverProcess): self.__errorOccurred = False self.backupOperationError += self.__onBackupOperationError try: # capture program's standard output and standard error and use CmdlineUi-like interface to print captured # messages; note that the order of messages written to stdout vs. messages written to stderr might not be # preserved while True: readyStreams = select.select((archiverProcess.stdout, archiverProcess.stderr), (), ())[0] streamActive = False for readyStream in readyStreams: line = readyStream.readline() if line: streamActive = True self.__propagateArchiverMessage(line[:-1], readyStream is not archiverProcess.stdout) if archiverProcess.poll() is not None and not streamActive: break finally: self.backupOperationError -= self.__onBackupOperationError if archiverProcess.returncode: self.__handleArchiverExitCode(archiverProcess.returncode) def __arguments(self, backupType, backupFilePath, root, includeFiles, excludeFiles, compressionStrength, sysEnvironment, snapshotPath = None): "Assembles and returns arguments to the tar binary." compressOption = self.__BACKUP_TYPE_TO_COMPRESS_OPTION[backupType] # operation has to be first one archiverOptions = ["--create", "--format=posix", "--verbose"] # insert options required for this archiver if compressOption: archiverOptions.append(compressOption) if snapshotPath is not None: archiverOptions.append("--listed-incremental=" + snapshotPath) # add options required for this archiver type archiverOptions += ["--file=" + backupFilePath, "--directory=" + root] # add converted include and exclude files archiverOptions += self.__convertIncludesAndExcludes(includeFiles, excludeFiles) # create environment environment = self.__setupCompressionStrength(backupType, compressionStrength, sysEnvironment) return archiverOptions, environment def __propagateArchiverMessage(self, message, sentToStderr = False): """Propagates archiver message as event. Parses the passed ``message``, evaluates it and fires :meth:`._TarArchiverProviderBase.backupOperationError` event if it is an (non-fatal) error message or :meth:`._TarArchiverProviderBase.fileAdd` otherwise.""" if sentToStderr: # messages that will be ignored if not message or re.search("(: Exiting with failure status due to previous errors)|" + "(: (.*): Directory is new)", message): return match = re.search(": (.*): cannot stat: (.*)", message, re.IGNORECASE) if match: if match.groups()[1].find(os.strerror(errno.EACCES)) != -1: self.backupOperationError(BackupSubOperations.Stat, BackupOperationErrors.PermissionDenied, match.groups()[0]) else: self.backupOperationError(BackupSubOperations.Stat, BackupOperationErrors.UnknownOsError, match.groups()[0], match.groups()[1]) return match = re.search(": (.*): cannot open: (.*)", message, re.IGNORECASE) if match: if match.groups()[1].find(os.strerror(errno.EACCES)) != -1: self.backupOperationError(BackupSubOperations.Open, BackupOperationErrors.PermissionDenied, match.groups()[0]) else: self.backupOperationError(BackupSubOperations.Open, BackupOperationErrors.UnknownOsError, match.groups()[0], match.groups()[1]) return match = re.search(": (.*): socket ignored", message, re.IGNORECASE) if match: self.backupOperationError(BackupSubOperations.Open, BackupOperationErrors.SocketIgnored, match.groups()[0]) return match = re.search(": (.*): file changed as we read it", message, re.IGNORECASE) if match: self.backupOperationError(BackupSubOperations.Read, BackupOperationErrors.FileChanged, match.groups()[0]) return match = re.search(": (.*): directory has been renamed", message, re.IGNORECASE) if match: self.backupOperationError(BackupSubOperations.Read, BackupOperationErrors.DirectoryRenamed, match.groups()[0]) return match = re.search(": No space left on device", message, re.IGNORECASE) if match: raise RuntimeError("No space left on device.") match = re.search("(unrecognized option.*)|(Try.+--help.+for more information.*)", message, re.IGNORECASE) if match: raise RuntimeError(str.format("Incompatible external archiver binary: {} ({}).", self.__archiver, match.group(0))) match = re.search(": Error is not recoverable: exiting now", message, re.IGNORECASE) if match: raise RuntimeError("External archiver aborted.") match = re.search(": (.*): (.+)", message, re.IGNORECASE) if match: self.backupOperationError(BackupSubOperations.UnknownFileOperation, BackupOperationErrors.UnknownError, match.groups()[0], match.groups()[1]) return self.backupOperationError(BackupSubOperations.Unknown, BackupOperationErrors.UnknownError, unknownErrorString = message) else: self.fileAdd(message) def __handleArchiverExitCode(self, exitCode): if exitCode == 1: self.backupOperationError(BackupSubOperations.Finish, BackupOperationErrors.SomeFilesChanged) else: if not self.__errorOccurred: raise RuntimeError(str.format("Unexpected failure of the archiver program; exit code: {}", exitCode)) @staticmethod def __convertIncludesAndExcludes(includeFiles, excludeFiles): """Converts list of files and list of excluded files to the form suitable for the archiver program. :return: List of arguments for the archiver.""" archiverOptions = [] if excludeFiles: archiverOptions.append("--anchored") for exclude in excludeFiles: archiverOptions.append("--exclude=" + exclude) archiverOptions += includeFiles return archiverOptions @staticmethod def __setupCompressionStrength(backupType, compressionStrength, sysEnvironment): """Converts compression strength to an environment variable. :return: Dictionary representing environment with the required environment variable.""" compressionStrength = \ IntervalElement(compressionStrength, Interval(MIN_COMPRESSION_STRENGTH, MAX_COMPRESSION_STRENGTH)) environment = {} # SMELL: Each backup type should have its own archiver provider class if compressionStrength.value is not None: if backupType == BackupTypes.TarGz: envName = "GZIP" compressionStrength = compressionStrength.remapTo(Interval(1, 9)) elif backupType == BackupTypes.TarBz2: envName = "BZIP2" compressionStrength = compressionStrength.remapTo(Interval(1, 9)) elif backupType == BackupTypes.TarXz: envName = "XZ_OPT" elif backupType == BackupTypes.TarZst: envName = "ZSTD_CLEVEL" compressionStrength = compressionStrength.remapTo(Interval(1, 19)) else: raise RuntimeError(str.format("Unexpected backup type: {}", backupType)) if backupType == BackupTypes.TarZst: environment[envName] = str(compressionStrength.value) else: sysEnvValue = sysEnvironment[envName] + " " if envName in sysEnvironment else "" environment[envName] = sysEnvValue + "-" + str(compressionStrength.value) return environment def __onBackupOperationError(self, operation, error, filesystemObjectName = None, unknownErrorString = None): self.__errorOccurred = self.__errorOccurred or \ (operation != BackupSubOperations.Finish and (error == BackupOperationErrors.PermissionDenied or error == BackupOperationErrors.UnknownOsError or error == BackupOperationErrors.UnknownError)) @staticmethod def __raiseIfBadCompressionStrength(compressionStrength): if compressionStrength is not None and \ (compressionStrength < MIN_COMPRESSION_STRENGTH or compressionStrength > MAX_COMPRESSION_STRENGTH): raise ValueError(str.format("Compression strength value {} is out of defined interval", compressionStrength)) @classmethod def __locateTar(cls): def getTarPath(directories): resultPath = None for directory in directories: testTarPath = os.path.join(directory, cls.__ARCHIVER_BINARY) if os.path.exists(testTarPath) and os.access(testTarPath, os.R_OK | os.X_OK): resultPath = testTarPath break return resultPath # first try to find tar in PATH tarPath = getTarPath(os.get_exec_path()) # then try fallback locations if tarPath is None: tarPath = getTarPath(cls.__ARCHIVER_LOCATIONS_FALLBACKS) if tarPath is None: raise OSError(str.format("Unable to locate the archiver binary: {}.", cls.__ARCHIVER_BINARY)) return tarPath class __ExternalTarIncrementalUtility: """Utility class for GNU tar incremental backup operations.""" # subdirectory for snapshots __SNAPSHOTS_SUBDIR = "snapshots" # suffix for snapshot files used in incremental backups __SNAPSHOT_SUFFIX = ".snar" def __init__(self, backupId, workDir): self.__backupId = backupId self.__workDir = workDir self.__snapshotsDir = self.getSnapshotsDir(self.__workDir) def getMaxBackupLevel(self): """Returns maximal backup level that is possible to create. :raise OSError: If a system error occurred.""" currentBackupLevel = self.__getBackupLevel() return currentBackupLevel + 1 if currentBackupLevel is not None else 0 def getSnapshotFileName(self, level, keepingId = None): "Returns full path to snapshot file for a certain backup level." keepingToken = "." + keepingId if keepingId else "" return os.path.join(self.__snapshotsDir, self.__backupId + self.__getLevelSuffix(level) + keepingToken + self.__SNAPSHOT_SUFFIX) def getSnapshots(self): "Returns sequence of snapshot file names for current backup." return self.getSnapshotsForBackup(self.__snapshotsDir, self.__backupId) @classmethod def getSnapshotsForBackup(cls, snapshotsDir, backupId = ""): """Returns sequence of snapshot file names for the archive named ``backupId`` or all of them. :param snapshotsDir: Directory where the snapshot files are stored. Can be obtained with :meth:`getSnapshotsDir` method. :type snapshotsDir: ``str`` :param backupId: Name of the archive for which the snapshot file names shall be returned. If not specified all snapshot files will be returned. :type backupId: ``str`` :return: Sequence of snapshot file names. :rtype: ``Sequence<str>`` :raise OSError: If ``snapshotsDir`` does not exists or is not accessible. The exception contains two parameters: the error message and the name of the directory.""" if not os.path.isdir(snapshotsDir): raise OSError("Snapshots directory does not exists.", snapshotsDir) if not os.access(snapshotsDir, os.R_OK | os.X_OK): raise OSError("Snapshots directory is not accessible for reading or listing", snapshotsDir) if backupId == "": backupId = "*" snapshots = itertools.chain( glob.iglob(os.path.join(snapshotsDir, backupId + cls.__SNAPSHOT_SUFFIX)), glob.iglob(os.path.join(snapshotsDir, backupId + ".*" + cls.__SNAPSHOT_SUFFIX))) return tuple(os.path.basename(snapshot) for snapshot in snapshots) @classmethod def getSnapshotsDir(cls, workDir): return os.path.join(workDir, cls.__SNAPSHOTS_SUBDIR) def manageSnapshotFiles(self, level, latestLevelSnapshotFilePath): """Moves snapshot file to its proper location and name in order to preserve it and removes redundant ones. :raise OSError: If a system error occurred.""" self.removeSnapshotFiles(level + 1) shutil.move(latestLevelSnapshotFilePath, self.getSnapshotFileName(level)) # change the file permissions according to umask umask = os.umask(0) os.umask(umask) os.chmod(self.getSnapshotFileName(level), 0o666 & ~umask) def removeSnapshotFiles(self, level): """Remove snapshot files for levels higher or equal to ``level``.""" for snapshot in self.getSnapshots(): if self.getLevelFromFileName(snapshot) >= level: os.remove(os.path.join(self.__snapshotsDir, snapshot)) # SMELL: Currently a snapshot file with a keeping ID other than None will never exists. Snapshots are not kept. def tryRemoveSnapshotFile(self, level, keepingId = None): """Removes snapshot file for given backup level if it exists. :param level: Backup level for which the snapshot file shall be removed. :type level: ``int`` :param keepingId: If not ``None`` a kept snapshot with this ID will be removed. :type keepingId: ``str`` :return: ``True`` if the snapshot file was removed; ``False`` if the file does not exists. :raise OSError: If a system error occurred.""" snapshotFileName = self.getSnapshotFileName(level, keepingId) if os.path.exists(snapshotFileName): try: os.remove(snapshotFileName) except OSError as ex: if ex.errno != errno.ENOENT: raise return False return True return False def createWorkingSnapshotFile(self, level): """Copies snapshot file for ``level`` to a temporary file.""" tempFileDescriptor, tempFilePath = tempfile.mkstemp(".snar", "autoarchive") # copy the snapshot file for previous level to a temporary one which will be used to create the new # increment; after that it will be moved to the proper location and name according to processed archive and # level; this way backup files for each created level will be preserved and thus it will be possible to # create also lower level backups (otherwise it would be only possible to create level N+1 backup (where N # is the latest/current level) or level 0 (go from the beginning)) if level > 0: with open(self.getSnapshotFileName(level - 1), "rb") as srcSnapshotFile: with open(tempFileDescriptor, "wb") as tempFile: shutil.copyfileobj(srcSnapshotFile, tempFile) else: # we do not need a previous snapshot file if the backup level is 0 os.remove(tempFilePath) return tempFilePath @classmethod def makeSnapshotsDir(cls, workDir): """Creates the snapshots directory. :raise OSError: If creation of the directory was not successful.""" snapshotsDir = cls.getSnapshotsDir(workDir) if not os.path.exists(snapshotsDir): try: os.mkdir(snapshotsDir) except OSError as ex: if ex.errno != errno.EEXIST: raise @staticmethod def getLevelFromFileName(fileName, keptBackup = False): """Extracts backup level number from the file name. :param fileName: Name of the file used to get the backup level from. It should be in the form: '<archive_name>[.<level>].<suffix>'. :type fileName: ``str`` :return: The :term:`backup level` retrieved from the file name. :rtype: ``int``""" root, levelToken = os.path.splitext(os.path.splitext(fileName)[0]) if keptBackup: root, levelToken = os.path.splitext(root) try: level = int(levelToken[1:]) except ValueError: levelToken = os.path.splitext(root)[1] try: level = int(levelToken[1:]) except ValueError: level = 0 return level def __getBackupLevel(self): level = None snapshotFiles = self.getSnapshots() if len(snapshotFiles) > 0: level = max((self.getLevelFromFileName(snapshotFile) for snapshotFile in snapshotFiles)) return level @staticmethod def __getLevelSuffix(level): """Returns file name suffix according to the backup level. :param level: Backup level for which the suffix shall be returned. :type level: ``int`` :return: File name suffix for backup level in the form '.<level>'. :rtype: ``str``""" return "." + str(level) if level > 0 else ""
# }}} CLASSES