Source code for plaso.cli.image_export_tool

# -*- coding: utf-8 -*-
"""The image export CLI tool."""

import argparse
import codecs
import collections
import io
import json
import os
import textwrap

from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.lib import errors as dfvfs_errors
from dfvfs.resolver import context
from dfvfs.resolver import resolver as path_spec_resolver

from plaso.analyzers.hashers import manager as hashers_manager
from plaso.cli import logger
from plaso.cli import storage_media_tool
from plaso.cli.helpers import manager as helpers_manager
from plaso.engine import engine
from plaso.engine import extractors
from plaso.engine import path_helper
from plaso.filters import file_entry as file_entry_filters
from plaso.lib import errors
from plaso.lib import loggers
from plaso.lib import specification
from plaso.storage.fake import writer as fake_writer


[docs] class ImageExportTool(storage_media_tool.StorageMediaTool): """Class that implements the image export CLI tool. Attributes: has_filters (bool): True if filters have been specified via the options. list_signature_identifiers (bool): True if information about the signature identifiers should be shown. """ NAME = 'image_export' DESCRIPTION = ( 'This is a simple collector designed to export files inside an ' 'image, both within a regular RAW image as well as inside a VSS. ' 'The tool uses a collection filter that uses the same syntax as a ' 'targeted plaso filter.') EPILOG = 'And that is how you export files, plaso style.' _COPY_BUFFER_SIZE = 32768 _DIRTY_CHARACTERS = frozenset([ '\x00', '\x01', '\x02', '\x03', '\x04', '\x05', '\x06', '\x07', '\x08', '\x09', '\x0a', '\x0b', '\x0c', '\x0d', '\x0e', '\x0f', '\x10', '\x11', '\x12', '\x13', '\x14', '\x15', '\x16', '\x17', '\x18', '\x19', '\x1a', '\x1b', '\x1c', '\x1d', '\x1e', '\x1f', os.path.sep, '!', '$', '%', '&', '*', '+', ':', ';', '<', '>', '?', '@', '|', '~', '\x7f']) _HASHES_FILENAME = 'hashes.json' _READ_BUFFER_SIZE = 4096 # TODO: remove this redirect. _SOURCE_OPTION = 'image' _SOURCE_TYPES_TO_PREPROCESS = frozenset([ dfvfs_definitions.SOURCE_TYPE_DIRECTORY, dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_DEVICE, dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE]) _SPECIFICATION_FILE_ENCODING = 'utf-8'
[docs] def __init__(self, input_reader=None, output_writer=None): """Initializes the CLI tool object. Args: input_reader (Optional[InputReader]): input reader, where None indicates that the stdin input reader should be used. output_writer (Optional[OutputWriter]): output writer, where None indicates that the stdout output writer should be used. """ super(ImageExportTool, self).__init__( input_reader=input_reader, output_writer=output_writer) self._abort = False self._artifact_definitions_path = None self._artifact_filters = None self._artifacts_registry = None self._custom_artifacts_path = None self._destination_path = None self._digests = {} self._filter_collection = file_entry_filters.FileEntryFilterCollection() self._filter_file = None self._no_hashes = False self._path_spec_extractor = extractors.PathSpecExtractor() self._process_memory_limit = None self._paths_by_hash = collections.defaultdict(list) self._resolver_context = context.Context() self._skip_duplicates = True self.has_filters = False self.list_signature_identifiers = False
def _CalculateDigestHash(self, file_entry, data_stream_name): """Calculates a SHA-256 digest of the contents of the file entry. Args: file_entry (dfvfs.FileEntry): file entry whose content will be hashed. data_stream_name (str): name of the data stream whose content is to be hashed. Returns: str: hexadecimal representation of the SHA-256 hash or None if the digest cannot be determined. """ file_object = file_entry.GetFileObject(data_stream_name=data_stream_name) if not file_object: return None file_object.seek(0, os.SEEK_SET) hasher_object = hashers_manager.HashersManager.GetHasher('sha256') data = file_object.read(self._READ_BUFFER_SIZE) while data: hasher_object.Update(data) data = file_object.read(self._READ_BUFFER_SIZE) return hasher_object.GetStringDigest() def _CreateSanitizedDestination( self, source_file_entry, file_system_path_spec, source_data_stream_name, destination_path): """Creates a sanitized path of both destination directory and filename. This function replaces non-printable and other characters defined in _DIRTY_CHARACTERS with an underscore "_". Args: source_file_entry (dfvfs.FileEntry): file entry of the source file. file_system_path_spec (dfvfs.PathSpec): path specifications of the source file system to process. source_data_stream_name (str): name of the data stream of the source file entry. destination_path (str): path of the destination directory. Returns: tuple[str, str]: sanitized paths of both destination directory and filename. """ file_system = source_file_entry.GetFileSystem() path = getattr(file_system_path_spec, 'location', None) path_segments = file_system.SplitPath(path) # Sanitize each path segment. for index, path_segment in enumerate(path_segments): path_segments[index] = ''.join([ character if character not in self._DIRTY_CHARACTERS else '_' for character in path_segment]) target_filename = path_segments.pop() parent_path_spec = getattr(source_file_entry.path_spec, 'parent', None) while parent_path_spec: if parent_path_spec.type_indicator in ( dfvfs_definitions.FILE_SYSTEM_TYPE_INDICATORS): path_segments.insert(0, parent_path_spec.location[1:]) break if parent_path_spec.type_indicator == ( dfvfs_definitions.TYPE_INDICATOR_VSHADOW): path_segments.insert(0, parent_path_spec.location[1:]) parent_path_spec = getattr(parent_path_spec, 'parent', None) target_directory = os.path.join(destination_path, *path_segments) if source_data_stream_name: target_filename = '_'.join([target_filename, source_data_stream_name]) return target_directory, target_filename def _ExtractDataStream( self, file_entry, data_stream_name, destination_path, skip_duplicates=True): """Extracts a data stream. Args: file_entry (dfvfs.FileEntry): file entry containing the data stream. data_stream_name (str): name of the data stream. destination_path (str): path where the extracted files should be stored. skip_duplicates (Optional[bool]): True if files with duplicate content should be skipped. """ if not data_stream_name and not file_entry.IsFile(): return display_name = path_helper.PathHelper.GetDisplayNameForPathSpec( file_entry.path_spec) try: digest = self._CalculateDigestHash(file_entry, data_stream_name) except (IOError, dfvfs_errors.BackEndError) as exception: logger.error(( f'[skipping] unable to read content of file entry: {display_name:s} ' f'with error: {exception!s}')) return if not digest: logger.error( f'[skipping] unable to read content of file entry: {display_name:s}') return target_directory, target_filename = self._CreateSanitizedDestination( file_entry, file_entry.path_spec, data_stream_name, destination_path) # If does not exist, append path separator to have consistent behaviour. if not destination_path.endswith(os.path.sep): destination_path = destination_path + os.path.sep target_path = os.path.join(target_directory, target_filename) if target_path.startswith(destination_path): path = target_path[len(destination_path):] self._paths_by_hash[digest].append(path) if skip_duplicates: duplicate_display_name = self._digests.get(digest, None) if duplicate_display_name: logger.warning(( f'[skipping] file entry: {display_name:s} is a duplicate of: ' f'{duplicate_display_name:s} with digest: {digest:s}')) return self._digests[digest] = display_name if not os.path.isdir(target_directory): os.makedirs(target_directory) if os.path.exists(target_path): logger.warning(( f'[skipping] unable to export contents of file entry: ' f'{display_name:s} because exported file: {target_path:s} already ' f'exists.')) return try: self._WriteFileEntry(file_entry, data_stream_name, target_path) except (IOError, dfvfs_errors.BackEndError) as exception: logger.error(( f'[skipping] unable to export contents of file entry: ' f'{display_name:s} with error: {exception!s}')) try: os.remove(target_path) except (IOError, OSError): pass def _ExtractFileEntry( self, file_entry, destination_path, skip_duplicates=True): """Extracts a file entry. Args: file_entry (dfvfs.FileEntry): file entry whose content is to be written. destination_path (str): path where the extracted files should be stored. skip_duplicates (Optional[bool]): True if files with duplicate content should be skipped. """ if not self._filter_collection.Matches(file_entry): return file_entry_processed = False for data_stream in file_entry.data_streams: if self._abort: break self._ExtractDataStream( file_entry, data_stream.name, destination_path, skip_duplicates=skip_duplicates) file_entry_processed = True if not file_entry_processed: self._ExtractDataStream( file_entry, '', destination_path, skip_duplicates=skip_duplicates) # TODO: merge with collector and/or engine. def _Extract( self, file_system_path_specs, destination_path, output_writer, artifact_filters, filter_file, artifact_definitions_path, custom_artifacts_path, skip_duplicates=True): """Extracts files. This method runs the file extraction process on the image and potentially on every VSS if that is wanted. Args: file_system_path_specs (list[dfvfs.PathSpec]): path specifications of the source file systems to process. destination_path (str): path where the extracted files should be stored. output_writer (CLIOutputWriter): output writer. artifact_definitions_path (str): path to artifact definitions file. custom_artifacts_path (str): path to custom artifact definitions file. artifact_filters (list[str]): names of artifact definitions that are used for filtering file system and Windows Registry key paths. filter_file (str): path of the file that contains the filter file path filters. skip_duplicates (Optional[bool]): True if files with duplicate content should be skipped. Raises: BadConfigOption: if an invalid collection filter was specified. """ extraction_engine = engine.BaseEngine() extraction_engine.BuildArtifactsRegistry( artifact_definitions_path, custom_artifacts_path) storage_writer = fake_writer.FakeStorageWriter() storage_writer.Open() # If the source is a directory or a storage media image run pre-processing. system_configurations = [] if self._source_type in self._SOURCE_TYPES_TO_PREPROCESS: try: logger.debug('Starting preprocessing.') # Setting storage writer to None here since we do not want to store # preprocessing information. system_configurations = extraction_engine.PreprocessSource( self._file_system_path_specs, storage_writer, resolver_context=self._resolver_context) logger.debug('Preprocessing done.') except IOError as exception: logger.error(f'Unable to preprocess with error: {exception!s}') # TODO: use system_configurations instead of knowledge base _ = system_configurations environment_variables = ( extraction_engine.knowledge_base.GetEnvironmentVariables()) user_accounts = list(storage_writer.GetAttributeContainers('user_account')) try: extraction_engine.BuildCollectionFilters( environment_variables, user_accounts, artifact_filter_names=artifact_filters, filter_file_path=filter_file) except errors.InvalidFilter as exception: raise errors.BadConfigOption( f'Unable to build collection filters with error: {exception!s}') excluded_find_specs = extraction_engine.GetCollectionExcludedFindSpecs() included_find_specs = extraction_engine.GetCollectionIncludedFindSpecs() output_writer.Write('Extracting file entries.\n') for file_system_path_spec in file_system_path_specs: path_spec_generator = self._path_spec_extractor.ExtractPathSpecs( file_system_path_spec, find_specs=included_find_specs, resolver_context=self._resolver_context) for path_spec in path_spec_generator: file_entry = path_spec_resolver.Resolver.OpenFileEntry( path_spec, resolver_context=self._resolver_context) if not file_entry: path_spec_string = self._GetPathSpecificationString(path_spec) logger.warning(( f'Unable to open file entry for path specfication: ' f'{path_spec_string:s}')) continue skip_file_entry = False for find_spec in excluded_find_specs or []: skip_file_entry = find_spec.CompareLocation(file_entry) if skip_file_entry: break if skip_file_entry: logger.info(( f'Skipped: {file_entry.path_spec.location:s} because of ' f'exclusion filter.')) continue self._ExtractFileEntry( file_entry, destination_path, skip_duplicates=skip_duplicates) def _ParseExtensionsString(self, extensions_string): """Parses the extensions string. Args: extensions_string (str): comma separated extensions to filter. """ if not extensions_string: return extensions_string = extensions_string.lower() extensions = [ extension.strip() for extension in extensions_string.split(',')] file_entry_filter = file_entry_filters.ExtensionsFileEntryFilter(extensions) self._filter_collection.AddFilter(file_entry_filter) def _ParseNamesString(self, names_string): """Parses the name string. Args: names_string (str): comma separated filenames to filter. """ if not names_string: return names_string = names_string.lower() names = [name.strip() for name in names_string.split(',')] file_entry_filter = file_entry_filters.NamesFileEntryFilter(names) self._filter_collection.AddFilter(file_entry_filter) def _ParseFilterOptions(self, options): """Parses the filter options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ names = ['artifact_filters', 'date_filters', 'filter_file'] helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=names) extensions_string = self.ParseStringOption(options, 'extensions_string') self._ParseExtensionsString(extensions_string) names_string = getattr(options, 'names_string', None) self._ParseNamesString(names_string) signature_identifiers = getattr(options, 'signature_identifiers', None) try: self._ParseSignatureIdentifiers( self._data_location, signature_identifiers) except (IOError, ValueError) as exception: raise errors.BadConfigOption(exception) if self._artifact_filters or self._filter_file: self.has_filters = True else: self.has_filters = self._filter_collection.HasFilters() def _ParseSignatureIdentifiers(self, data_location, signature_identifiers): """Parses the signature identifiers. Args: data_location (str): location of the format specification file, for example, "signatures.conf". signature_identifiers (str): comma separated signature identifiers. Raises: IOError: if the format specification file could not be read from the specified data location. OSError: if the format specification file could not be read from the specified data location. ValueError: if no data location was specified. """ if not signature_identifiers: return if not data_location: raise ValueError('Missing data location.') path = os.path.join(data_location, 'signatures.conf') if not os.path.exists(path): raise IOError(f'No such format specification file: {path:s}') try: specification_store = self._ReadSpecificationFile(path) except IOError as exception: raise IOError(( f'Unable to read format specification file: {path:s} with error: ' f'{exception!s}')) signature_identifiers = signature_identifiers.lower() signature_identifiers = [ identifier.strip() for identifier in signature_identifiers.split(',')] file_entry_filter = file_entry_filters.SignaturesFileEntryFilter( specification_store, signature_identifiers) self._filter_collection.AddFilter(file_entry_filter) def _ReadSpecificationFile(self, path): """Reads the format specification file. Args: path (str): path of the format specification file. Returns: FormatSpecificationStore: format specification store. """ specification_store = specification.FormatSpecificationStore() with io.open( path, 'rt', encoding=self._SPECIFICATION_FILE_ENCODING) as file_object: for line in file_object.readlines(): line = line.strip() if not line or line.startswith('#'): continue try: identifier, offset, pattern = line.split() except ValueError: logger.error(f'[skipping] invalid line: {line:s}') continue try: offset = int(offset, 10) except ValueError: logger.error(f'[skipping] invalid offset in line: {line:s}') continue try: # TODO: find another way to do this that doesn't use an undocumented # API. pattern = codecs.escape_decode(pattern)[0] # ValueError is raised when the patterns contains invalid escaped # characters, such as "\xg1". except ValueError: logger.error(f'[skipping] invalid pattern in line: {line:s}') continue format_specification = specification.FormatSpecification(identifier) format_specification.AddNewSignature(pattern, offset=offset) specification_store.AddSpecification(format_specification) return specification_store def _WriteFileEntry(self, file_entry, data_stream_name, destination_file): """Writes the contents of the source file entry to a destination file. Note that this function will overwrite an existing file. Args: file_entry (dfvfs.FileEntry): file entry whose content is to be written. data_stream_name (str): name of the data stream whose content is to be written. destination_file (str): path of the destination file. """ source_file_object = file_entry.GetFileObject( data_stream_name=data_stream_name) if not source_file_object: return with open(destination_file, 'wb') as destination_file_object: source_file_object.seek(0, os.SEEK_SET) data = source_file_object.read(self._COPY_BUFFER_SIZE) while data: destination_file_object.write(data) data = source_file_object.read(self._COPY_BUFFER_SIZE)
[docs] def AddFilterOptions(self, argument_group): """Adds the filter options to the argument group. Args: argument_group (argparse._ArgumentGroup): argparse argument group. """ names = ['artifact_filters', 'date_filters', 'filter_file'] helpers_manager.ArgumentHelperManager.AddCommandLineArguments( argument_group, names=names) argument_group.add_argument( '-x', '--extensions', dest='extensions_string', action='store', type=str, metavar='EXTENSIONS', help=( 'Filter on file name extensions. This option accepts multiple ' 'multiple comma separated values e.g. "csv,docx,pst".')) argument_group.add_argument( '--names', dest='names_string', action='store', type=str, metavar='NAMES', help=( 'Filter on file names. This option accepts a comma separated ' 'string denoting all file names, e.g. -x ' '"NTUSER.DAT,UsrClass.dat".')) argument_group.add_argument( '--signatures', dest='signature_identifiers', action='store', type=str, metavar='IDENTIFIERS', help=( 'Filter on file format signature identifiers. This option ' 'accepts multiple comma separated values e.g. "esedb,lnk". ' 'Use "list" to show an overview of the supported file format ' 'signatures.'))
[docs] def ListSignatureIdentifiers(self): """Lists the signature identifier. Raises: BadConfigOption: if the data location is invalid. """ if not self._data_location: raise errors.BadConfigOption('Missing data location.') path = os.path.join(self._data_location, 'signatures.conf') if not os.path.exists(path): raise errors.BadConfigOption( f'No such format specification file: {path:s}') try: specification_store = self._ReadSpecificationFile(path) except IOError as exception: raise errors.BadConfigOption(( f'Unable to read format specification file: {path:s} with error: ' f'{exception!s}')) identifiers = [] for format_specification in specification_store.specifications: identifiers.append(format_specification.identifier) self._output_writer.Write('Available signature identifiers:\n') self._output_writer.Write( '\n'.join(textwrap.wrap(', '.join(sorted(identifiers)), 79))) self._output_writer.Write('\n\n')
[docs] def ParseArguments(self, arguments): """Parses the command line arguments. Args: arguments (list[str]): command line arguments. Returns: bool: True if the arguments were successfully parsed. """ loggers.ConfigureLogging() argument_parser = argparse.ArgumentParser( description=self.DESCRIPTION, epilog=self.EPILOG, add_help=False, formatter_class=argparse.RawDescriptionHelpFormatter) self.AddBasicOptions(argument_parser) self.AddInformationalOptions(argument_parser) argument_helper_names = [ 'artifact_definitions', 'data_location', 'vfs_backend'] if self._CanEnforceProcessMemoryLimit(): argument_helper_names.append('process_resources') helpers_manager.ArgumentHelperManager.AddCommandLineArguments( argument_parser, names=argument_helper_names) self.AddLogFileOptions(argument_parser) self.AddStorageMediaImageOptions(argument_parser) self.AddVSSProcessingOptions(argument_parser) self.AddCredentialOptions(argument_parser) self.AddFilterOptions(argument_parser) argument_parser.add_argument( '-w', '--write', action='store', dest='path', type=str, metavar='PATH', default='export', help=( 'The directory in which extracted files should be stored.')) argument_parser.add_argument( '--include_duplicates', '--include-duplicates', dest='include_duplicates', action='store_true', default=False, help=( 'By default a digest hash (SHA-256) is calculated for each file ' '(data stream). These hashes are compared to the previously ' 'exported files and duplicates are skipped. Use this option to ' 'include duplicate files in the export.')) argument_parser.add_argument( '--no_hashes', '--no-hashes', dest='no_hashes', action='store_true', default=False, help=( f'Do not generate the {self._HASHES_FILENAME:s} file')) argument_parser.add_argument( self._SOURCE_OPTION, nargs='?', action='store', metavar='IMAGE', default=None, type=str, help=( 'The full path to the image file that we are about to extract ' 'files from, it should be a raw image or another image that ' 'Plaso supports.')) try: options = argument_parser.parse_args(arguments) except UnicodeEncodeError: # If we get here we are attempting to print help in a non-Unicode # terminal. self._output_writer.Write('') self._output_writer.Write(argument_parser.format_help()) return False try: self.ParseOptions(options) except errors.BadConfigOption as exception: self._output_writer.Write(f'ERROR: {exception!s}\n') self._output_writer.Write('\n') self._output_writer.Write(argument_parser.format_usage()) return False self._WaitUserWarning() loggers.ConfigureLogging( debug_output=self._debug_mode, filename=self._log_file, quiet_mode=self._quiet_mode) return True
[docs] def ParseOptions(self, options): """Parses the options and initializes the front-end. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ # The data location is required to list signatures. helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=['data_location']) self.show_troubleshooting = getattr(options, 'show_troubleshooting', False) # Check the list options first otherwise required options will raise. signature_identifiers = self.ParseStringOption( options, 'signature_identifiers') if signature_identifiers == 'list': self.list_signature_identifiers = True if self.list_signature_identifiers or self.show_troubleshooting: return self._ParseInformationalOptions(options) self._ParseLogFileOptions(options) self._ParseStorageMediaOptions(options) self._destination_path = self.ParseStringOption( options, 'path', default_value='export') if not self._data_location: logger.warning('Unable to automatically determine data location.') argument_helper_names = [ 'artifact_definitions', 'process_resources', 'vfs_backend'] helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=argument_helper_names) if self._vfs_back_end == 'fsext': dfvfs_definitions.PREFERRED_EXT_BACK_END = ( dfvfs_definitions.TYPE_INDICATOR_EXT) elif self._vfs_back_end == 'fsfat': dfvfs_definitions.PREFERRED_FAT_BACK_END = ( dfvfs_definitions.TYPE_INDICATOR_FAT) elif self._vfs_back_end == 'fshfs': dfvfs_definitions.PREFERRED_HFS_BACK_END = ( dfvfs_definitions.TYPE_INDICATOR_HFS) elif self._vfs_back_end == 'fsntfs': dfvfs_definitions.PREFERRED_NTFS_BACK_END = ( dfvfs_definitions.TYPE_INDICATOR_NTFS) elif self._vfs_back_end == 'tsk': dfvfs_definitions.PREFERRED_EXT_BACK_END = ( dfvfs_definitions.TYPE_INDICATOR_TSK) dfvfs_definitions.PREFERRED_FAT_BACK_END = ( dfvfs_definitions.TYPE_INDICATOR_TSK) dfvfs_definitions.PREFERRED_GPT_BACK_END = ( dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION) dfvfs_definitions.PREFERRED_HFS_BACK_END = ( dfvfs_definitions.TYPE_INDICATOR_TSK) dfvfs_definitions.PREFERRED_NTFS_BACK_END = ( dfvfs_definitions.TYPE_INDICATOR_TSK) elif self._vfs_back_end == 'vsgpt': dfvfs_definitions.PREFERRED_GPT_BACK_END = ( dfvfs_definitions.TYPE_INDICATOR_GPT) self._ParseFilterOptions(options) include_duplicates = getattr(options, 'include_duplicates', False) self._skip_duplicates = not include_duplicates self._no_hashes = getattr(options, 'no_hashes', False) self._EnforceProcessMemoryLimit(self._process_memory_limit)
[docs] def PrintFilterCollection(self): """Prints the filter collection.""" self._filter_collection.Print(self._output_writer)
[docs] def ProcessSource(self): """Processes the source. Raises: SourceScannerError: if the source scanner could not find a supported file system. UserAbort: if the user initiated an abort. """ try: self.ScanSource(self._source_path) except dfvfs_errors.UserAbort as exception: raise errors.UserAbort(exception) self._output_writer.Write('Export started.\n') if not os.path.isdir(self._destination_path): os.makedirs(self._destination_path) self._Extract( self._file_system_path_specs, self._destination_path, self._output_writer, self._artifact_filters, self._filter_file, self._artifact_definitions_path, self._custom_artifacts_path, skip_duplicates=self._skip_duplicates) json_data = [] if not self._no_hashes: hashes_file_path = os.path.join( self._destination_path, self._HASHES_FILENAME) with open(hashes_file_path, 'w', encoding='utf-8') as file_object: for sha256, paths in self._paths_by_hash.items(): json_data.append({'sha256': sha256, 'paths': paths}) json.dump(json_data, file_object) self._output_writer.Write('Export completed.\n') self._output_writer.Write('\n')