Source code for plaso.cli.storage_media_tool

# -*- coding: utf-8 -*-
"""The storage media CLI tool."""

import codecs
import os

from dfvfs.analyzer import analyzer as dfvfs_analyzer
from dfvfs.analyzer import cs_analyzer_helper
from dfvfs.helpers import command_line as dfvfs_command_line
from dfvfs.helpers import volume_scanner as dfvfs_volume_scanner
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.lib import errors as dfvfs_errors
from dfvfs.volume import apfs_volume_system
from dfvfs.volume import lvm_volume_system
from dfvfs.volume import vshadow_volume_system

from plaso.cli import tools
from plaso.engine import configurations
from plaso.lib import errors


try:
  # Disable experimental Core Storage support.
  dfvfs_analyzer.Analyzer.DeregisterHelper(
      cs_analyzer_helper.CSAnalyzerHelper())
except KeyError:
  pass


[docs] class StorageMediaToolVolumeScannerOptions( dfvfs_volume_scanner.VolumeScannerOptions): """Volume scanner options used by the storage media tool. Attributes: snapshots_only (bool): True if the current volume of a volume with snapshots should be ignored. """
[docs] def __init__(self): """Initializes volume scanner options.""" super(StorageMediaToolVolumeScannerOptions, self).__init__() self.snapshots_only = False
[docs] class StorageMediaToolMediator(dfvfs_command_line.CLIVolumeScannerMediator): """Mediator between the storage media tool and user input.""" def _PrintAPFSVolumeIdentifiersOverview( self, volume_system, volume_identifiers): """Prints an overview of APFS volume identifiers. Args: volume_system (dfvfs.APFSVolumeSystem): volume system. volume_identifiers (list[str]): allowed volume identifiers. Raises: dfvfs.ScannerError: if a volume cannot be resolved from the volume identifier. """ super(StorageMediaToolMediator, self)._PrintAPFSVolumeIdentifiersOverview( volume_system, volume_identifiers) self._output_writer.Write('\n') def _PrintLVMVolumeIdentifiersOverview( self, volume_system, volume_identifiers): """Prints an overview of LVM volume identifiers. Args: volume_system (dfvfs.LVMVolumeSystem): volume system. volume_identifiers (list[str]): allowed volume identifiers. Raises: dfvfs.ScannerError: if a volume cannot be resolved from the volume identifier. """ super(StorageMediaToolMediator, self)._PrintLVMVolumeIdentifiersOverview( volume_system, volume_identifiers) self._output_writer.Write('\n') def _PrintTSKPartitionIdentifiersOverview( self, volume_system, volume_identifiers): """Prints an overview of TSK partition identifiers. Args: volume_system (dfvfs.TSKVolumeSystem): volume system. volume_identifiers (list[str]): allowed volume identifiers. Raises: dfvfs.ScannerError: if a volume cannot be resolved from the volume identifier. """ super(StorageMediaToolMediator, self)._PrintPartitionIdentifiersOverview( volume_system, volume_identifiers) self._output_writer.Write('\n') def _PrintVSSStoreIdentifiersOverview( self, volume_system, volume_identifiers): """Prints an overview of VSS store identifiers. Args: volume_system (dfvfs.VShadowVolumeSystem): volume system. volume_identifiers (list[str]): allowed volume identifiers. Raises: dfvfs.ScannerError: if a volume cannot be resolved from the volume identifier. """ super(StorageMediaToolMediator, self)._PrintVSSStoreIdentifiersOverview( volume_system, volume_identifiers) self._output_writer.Write('\n') # pylint: disable=arguments-differ # TODO: replace this method by the one defined by CLIVolumeScannerMediator
[docs] def ParseVolumeIdentifiersString( self, volume_identifiers_string, prefix='v'): """Parses a user specified volume identifiers string. Args: volume_identifiers_string (str): user specified volume identifiers. A range of volumes can be defined as: "3..5". Multiple volumes can be defined as: "1,3,5" (a list of comma separated values). Ranges and lists can also be combined as: "1,3..5". The first volume is 1. All volumes can be defined as: "all". prefix (Optional[str]): volume identifier prefix. Returns: list[str]: volume identifiers with prefix or the string "all". Raises: ValueError: if the volume identifiers string is invalid. """ return self._ParseVolumeIdentifiersString( volume_identifiers_string, prefix=prefix)
[docs] def PromptUserForVSSCurrentVolume(self): """Prompts the user if the current volume with VSS should be processed. Returns: bool: True if the current volume with VSS should be processed. """ while True: self._output_writer.Write( 'Volume Shadow Snapshots (VSS) were selected also process current\n' 'volume? [yes, no]\n') process_current_volume = self._input_reader.Read() process_current_volume = process_current_volume.strip() process_current_volume = process_current_volume.lower() if (not process_current_volume or process_current_volume in ('no', 'yes')): break self._output_writer.Write( '\n' 'Unsupported option, please try again or abort with Ctrl^C.\n' '\n') self._output_writer.Write('\n') return not process_current_volume or process_current_volume == 'yes'
[docs] class StorageMediaToolVolumeScanner(dfvfs_volume_scanner.VolumeScanner): """Volume scanner used by the storage media tool."""
[docs] def __init__(self, mediator=None): """Initializes a volume scanner. Args: mediator (Optional[VolumeScannerMediator]): a volume scanner mediator. """ super(StorageMediaToolVolumeScanner, self).__init__(mediator=mediator) self._credential_configurations = [] self._snapshots_only = False
@property def source_type(self): """str: type of source.""" return self._source_type def _GetBasePathSpecs(self, scan_context, options): """Determines the base path specifications. Args: scan_context (SourceScannerContext): source scanner context. options (VolumeScannerOptions): volume scanner options. Returns: list[PathSpec]: path specifications. Raises: dfvfs.ScannerError: if the format of or within the source is not supported or no partitions were found. """ # TODO: difference with dfVFS. self._snapshots_only = options.snapshots_only scan_node = scan_context.GetRootScanNode() if scan_context.source_type not in ( scan_context.SOURCE_TYPE_STORAGE_MEDIA_DEVICE, scan_context.SOURCE_TYPE_STORAGE_MEDIA_IMAGE): return [scan_node.path_spec] # Get the first node where where we need to decide what to process. while len(scan_node.sub_nodes) == 1: scan_node = scan_node.sub_nodes[0] base_path_specs = [] if scan_node.type_indicator not in ( dfvfs_definitions.TYPE_INDICATOR_GPT, dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION): self._ScanVolume(scan_context, scan_node, options, base_path_specs) else: # Determine which partition needs to be processed. partition_identifiers = self._GetPartitionIdentifiers(scan_node, options) # TODO: difference with dfVFS. if not partition_identifiers: raise dfvfs_errors.ScannerError('No partitions found.') for partition_identifier in partition_identifiers: sub_scan_node = scan_node.GetSubNodeByLocation( f'/{partition_identifier:s}') self._ScanVolume(scan_context, sub_scan_node, options, base_path_specs) return base_path_specs def _ScanEncryptedVolume(self, scan_context, scan_node, options): """Scans an encrypted volume scan node for volume and file systems. Args: scan_context (SourceScannerContext): source scanner context. scan_node (SourceScanNode): volume scan node. options (VolumeScannerOptions): volume scanner options. Raises: dfvfs.ScannerError: if the format of or within the source is not supported, the scan node is invalid or there are no credentials defined for the format. """ super(StorageMediaToolVolumeScanner, self)._ScanEncryptedVolume( scan_context, scan_node, options) if not scan_context.IsLockedScanNode(scan_node.path_spec): credential_type, credential_data = scan_node.credential credential_configuration = configurations.CredentialConfiguration( credential_data=credential_data, credential_type=credential_type, path_spec=scan_node.path_spec) self._credential_configurations.append(credential_configuration) def _ScanFileSystem(self, scan_node, base_path_specs): """Scans a file system scan node for file systems. Args: scan_node (SourceScanNode): file system scan node. base_path_specs (list[PathSpec]): file system base path specifications. Raises: dfvfs.ScannerError: if the scan node is invalid. """ if not scan_node or not scan_node.path_spec: raise dfvfs_errors.ScannerError( 'Invalid or missing file system scan node.') # TODO: difference with dfVFS for current VSS volume support. if self._snapshots_only: if scan_node.parent_node.sub_nodes[0].type_indicator == ( dfvfs_definitions.TYPE_INDICATOR_VSHADOW): return base_path_specs.append(scan_node.path_spec) def _ScanVolumeSystemRoot( self, scan_context, scan_node, options, base_path_specs): """Scans a volume system root scan node for volume and file systems. Args: scan_context (SourceScannerContext): source scanner context. scan_node (SourceScanNode): volume system root scan node. options (VolumeScannerOptions): volume scanner options. base_path_specs (list[PathSpec]): file system base path specifications. Raises: dfvfs.ScannerError: if the scan node is invalid, the scan node type is not supported or if a sub scan node cannot be retrieved. """ if not scan_node or not scan_node.path_spec: raise dfvfs_errors.ScannerError('Invalid scan node.') if scan_node.type_indicator == ( dfvfs_definitions.TYPE_INDICATOR_APFS_CONTAINER): volume_system = apfs_volume_system.APFSVolumeSystem() volume_system.Open(scan_node.path_spec) volume_identifiers = self._GetVolumeIdentifiers(volume_system, options) elif scan_node.type_indicator in ( dfvfs_definitions.TYPE_INDICATOR_GPT, dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION): volume_identifiers = self._GetPartitionIdentifiers(scan_node, options) elif scan_node.type_indicator == dfvfs_definitions.TYPE_INDICATOR_LVM: volume_system = lvm_volume_system.LVMVolumeSystem() volume_system.Open(scan_node.path_spec) volume_identifiers = self._GetVolumeIdentifiers(volume_system, options) elif scan_node.type_indicator == dfvfs_definitions.TYPE_INDICATOR_VSHADOW: volume_system = vshadow_volume_system.VShadowVolumeSystem() volume_system.Open(scan_node.path_spec) volume_identifiers = self._GetVolumeSnapshotIdentifiers( volume_system, options) # Process VSS stores (snapshots) starting with the most recent one. volume_identifiers.reverse() # TODO: difference with dfVFS for current VSS volume support. if not options.snapshots_only and self._mediator and volume_identifiers: snapshots_only = not self._mediator.PromptUserForVSSCurrentVolume() options.snapshots_only = snapshots_only self._snapshots_only = snapshots_only else: raise dfvfs_errors.ScannerError( f'Unsupported volume system type: {scan_node.type_indicator:s}.') for volume_identifier in volume_identifiers: sub_scan_node = scan_node.GetSubNodeByLocation(f'/{volume_identifier:s}') if not sub_scan_node: raise dfvfs_errors.ScannerError( f'Scan node missing for volume identifier: {volume_identifier:s}.') self._ScanVolume(scan_context, sub_scan_node, options, base_path_specs)
[docs] def ScanSource(self, source_path, options, base_path_specs): """Scans the source path for volume and file systems. This function sets the internal source path specification and source type values. Args: source_path (str): path to the source. options (VolumeScannerOptions): volume scanner options. base_path_specs (list[PathSpec]): file system base path specifications. Returns: dfvfs.SourceScannerContext: source scanner context. Raises: dfvfs.ScannerError: if the format of or within the source is not supported. """ scan_context = self._ScanSource(source_path) self._source_path = source_path self._source_type = scan_context.source_type scanner_base_path_specs = self._GetBasePathSpecs(scan_context, options) base_path_specs.extend(scanner_base_path_specs) return scan_context
[docs] class StorageMediaTool(tools.CLITool): """CLI tool that supports a storage media device or image as input.""" # TODO: remove this redirect. _SOURCE_OPTION = 'source' _BINARY_DATA_CREDENTIAL_TYPES = ['key_data'] _SUPPORTED_CREDENTIAL_TYPES = [ 'key_data', 'password', 'recovery_password', 'startup_key']
[docs] def __init__(self, input_reader=None, output_writer=None): """Initializes a CLI tool that supports storage media as input. Args: input_reader (Optional[InputReader]): input reader, where None indicates that the stdin input reader should be used. output_writer (Optional[OutputWriter]): output writer, where None indicates that the stdout output writer should be used. """ super(StorageMediaTool, self).__init__( input_reader=input_reader, output_writer=output_writer) self._custom_artifacts_path = None self._artifact_definitions_path = None self._artifact_filters = None self._credentials = [] self._credential_configurations = [] self._file_system_path_specs = [] self._filter_file = None self._mediator = StorageMediaToolMediator( input_reader=input_reader, output_writer=output_writer) self._partitions = None self._source_path = None self._source_type = None self._volumes = None self._vss_only = False self._vss_stores = None
def _ParseCredentialOptions(self, options): """Parses the credential options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ credentials = getattr(options, 'credentials', []) if not isinstance(credentials, list): raise errors.BadConfigOption('Unsupported credentials value.') for credential_string in credentials: credential_type, _, credential_data = credential_string.partition(':') if not credential_type or not credential_data: raise errors.BadConfigOption( f'Badly formatted credential: {credential_string:s}.') if credential_type not in self._SUPPORTED_CREDENTIAL_TYPES: raise errors.BadConfigOption( f'Unsupported credential type for: {credential_string:s}.') if credential_type in self._BINARY_DATA_CREDENTIAL_TYPES: try: credential_data = codecs.decode(credential_data, 'hex') except TypeError: raise errors.BadConfigOption( f'Unsupported credential data for: {credential_string:s}.') self._credentials.append((credential_type, credential_data)) def _ParseSourcePathOption(self, options): """Parses the source path option. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ self._source_path = self.ParseStringOption(options, self._SOURCE_OPTION) if not self._source_path: raise errors.BadConfigOption('Missing source path.') self._source_path = os.path.abspath(self._source_path) def _ParseStorageMediaOptions(self, options): """Parses the storage media options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ self._ParseStorageMediaImageOptions(options) self._ParseVSSProcessingOptions(options) self._ParseCredentialOptions(options) self._ParseSourcePathOption(options) def _ParseStorageMediaImageOptions(self, options): """Parses the storage media image options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ self._partitions = getattr(options, 'partitions', None) if self._partitions: try: self._mediator.ParseVolumeIdentifiersString( self._partitions, prefix='p') except ValueError: raise errors.BadConfigOption('Unsupported partitions') self._volumes = getattr(options, 'volumes', None) if self._volumes: try: self._mediator.ParseVolumeIdentifiersString( self._volumes, prefix='apfs') except ValueError: raise errors.BadConfigOption('Unsupported volumes') def _ParseVSSProcessingOptions(self, options): """Parses the VSS processing options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ process_vss = not getattr(options, 'no_vss', False) vss_only = getattr(options, 'vss_only', False) vss_stores = getattr(options, 'vss_stores', None) if not process_vss: vss_stores = 'none' self._PrintUserWarning( 'The --no_vss option is deprecated use --vss_stores=none instead.') if vss_stores and vss_stores != 'none': try: self._mediator.ParseVolumeIdentifiersString(vss_stores, prefix='vss') except ValueError: raise errors.BadConfigOption('Unsupported VSS stores') self._vss_only = vss_only self._vss_stores = vss_stores
[docs] def AddCredentialOptions(self, argument_group): """Adds the credential options to the argument group. The credential options are use to unlock encrypted volumes. Args: argument_group (argparse._ArgumentGroup): argparse argument group. """ credential_types = ', '.join(self._SUPPORTED_CREDENTIAL_TYPES) argument_group.add_argument( '--credential', action='append', default=[], type=str, dest='credentials', metavar='TYPE:DATA', help=( f'Define a credentials that can be used to unlock encrypted ' f'volumes e.g. BitLocker. The credential is defined as type:data ' f'e.g. "password:BDE-test". Supported credential types are: ' f'{credential_types:s}. Binary key data is expected to be passed ' f'in BASE-16 encoding (hexadecimal). WARNING credentials passed ' f'via command line arguments can end up in logs, so use this ' f'option with care.'))
[docs] def AddStorageMediaImageOptions(self, argument_group): """Adds the storage media image options to the argument group. Args: argument_group (argparse._ArgumentGroup): argparse argument group. """ argument_group.add_argument( '--partitions', '--partition', dest='partitions', action='store', type=str, default=None, help=( 'Define partitions to be processed. A range of ' 'partitions can be defined as: "3..5". Multiple partitions can ' 'be defined as: "1,3,5" (a list of comma separated values). ' 'Ranges and lists can also be combined as: "1,3..5". The first ' 'partition is 1. All partitions can be specified with: "all".')) argument_group.add_argument( '--volumes', '--volume', dest='volumes', action='store', type=str, default=None, help=( 'Define volumes to be processed. A range of volumes can be defined ' 'as: "3..5". Multiple volumes can be defined as: "1,3,5" (a list ' 'of comma separated values). Ranges and lists can also be combined ' 'as: "1,3..5". The first volume is 1. All volumes can be specified ' 'with: "all".'))
[docs] def AddVSSProcessingOptions(self, argument_group): """Adds the VSS processing options to the argument group. Args: argument_group (argparse._ArgumentGroup): argparse argument group. """ argument_group.add_argument( '--no_vss', '--no-vss', dest='no_vss', action='store_true', default=False, help=( 'Do not scan for Volume Shadow Snapshots (VSS). This means that ' 'Volume Shadow Snapshots (VSS) are not processed. WARNING: this ' 'option is deprecated use --vss_stores=none instead.')) argument_group.add_argument( '--vss_only', '--vss-only', dest='vss_only', action='store_true', default=False, help=( 'Do not process the current volume if Volume Shadow Snapshots ' '(VSS) have been selected.')) argument_group.add_argument( '--vss_stores', '--vss-stores', dest='vss_stores', action='store', type=str, default=None, help=( 'Define Volume Shadow Snapshots (VSS) (or stores) that need to be ' 'processed. A range of snapshots can be defined as: "3..5". ' 'Multiple snapshots can be defined as: "1,3,5" (a list of comma ' 'separated values). Ranges and lists can also be combined as: ' '"1,3..5". The first snapshot is 1. All snapshots can be defined ' 'as: "all" and no snapshots as: "none".'))
[docs] def ScanSource(self, source_path): """Scans the source path for volume and file systems. This function sets the internal source path specification and source type values. Args: source_path (str): path to the source. Raises: SourceScannerError: if the format of or within the source is not supported. """ # Symbolic links are resolved here and not earlier to preserve the user # specified source path in storage and reporting. if os.path.islink(source_path): source_path = os.path.realpath(source_path) options = StorageMediaToolVolumeScannerOptions() options.credentials = self._credentials if self._vss_only: options.scan_mode = options.SCAN_MODE_SNAPSHOTS_ONLY else: options.scan_mode = options.SCAN_MODE_ALL options.snapshots_only = self._vss_only if self._partitions == 'all': options.partitions = ['all'] else: options.partitions = self._partitions if not self._vss_stores and self._unattended_mode: options.snapshots = ['none'] elif self._vss_stores == 'all': options.snapshots = ['all'] elif self._vss_stores == 'none': options.snapshots = ['none'] else: options.snapshots = self._vss_stores if self._volumes == 'all': options.volumes = ['all'] else: options.volumes = self._volumes if self._unattended_mode: mediator = None else: mediator = self._mediator volume_scanner = StorageMediaToolVolumeScanner(mediator=mediator) try: base_path_specs = volume_scanner.GetBasePathSpecs( source_path, options=options) except dfvfs_errors.ScannerError as exception: raise errors.SourceScannerError(exception) if not base_path_specs: raise errors.SourceScannerError( 'No supported file system found in source.') # pylint: disable=protected-access self._credential_configurations = volume_scanner._credential_configurations self._file_system_path_specs = base_path_specs self._source_type = volume_scanner.source_type