Source code for plaso.cli.storage_media_tool
"""The storage media CLI tool."""
import codecs
import os
from dfvfs.analyzer import analyzer as dfvfs_analyzer
from dfvfs.analyzer import cs_analyzer_helper
from dfvfs.helpers import command_line as dfvfs_command_line
from dfvfs.helpers import volume_scanner as dfvfs_volume_scanner
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.lib import errors as dfvfs_errors
from dfvfs.volume import apfs_volume_system
from dfvfs.volume import lvm_volume_system
from dfvfs.volume import vshadow_volume_system
from plaso.cli import tools
from plaso.engine import configurations
from plaso.lib import errors
try:
# Disable experimental Core Storage support.
dfvfs_analyzer.Analyzer.DeregisterHelper(cs_analyzer_helper.CSAnalyzerHelper())
except KeyError:
pass
[docs]
class StorageMediaToolVolumeScannerOptions(dfvfs_volume_scanner.VolumeScannerOptions):
"""Volume scanner options used by the storage media tool.
Attributes:
snapshots_only (bool): True if the current volume of a volume with snapshots
should be ignored.
"""
[docs]
def __init__(self):
"""Initializes volume scanner options."""
super().__init__()
self.snapshots_only = False
[docs]
class StorageMediaToolMediator(dfvfs_command_line.CLIVolumeScannerMediator):
"""Mediator between the storage media tool and user input."""
def _PrintAPFSVolumeIdentifiersOverview(self, volume_system, volume_identifiers):
"""Prints an overview of APFS volume identifiers.
Args:
volume_system (dfvfs.APFSVolumeSystem): volume system.
volume_identifiers (list[str]): allowed volume identifiers.
Raises:
dfvfs.ScannerError: if a volume cannot be resolved from the volume
identifier.
"""
super()._PrintAPFSVolumeIdentifiersOverview(volume_system, volume_identifiers)
self._output_writer.Write("\n")
def _PrintLVMVolumeIdentifiersOverview(self, volume_system, volume_identifiers):
"""Prints an overview of LVM volume identifiers.
Args:
volume_system (dfvfs.LVMVolumeSystem): volume system.
volume_identifiers (list[str]): allowed volume identifiers.
Raises:
dfvfs.ScannerError: if a volume cannot be resolved from the volume
identifier.
"""
super()._PrintLVMVolumeIdentifiersOverview(volume_system, volume_identifiers)
self._output_writer.Write("\n")
def _PrintTSKPartitionIdentifiersOverview(self, volume_system, volume_identifiers):
"""Prints an overview of TSK partition identifiers.
Args:
volume_system (dfvfs.TSKVolumeSystem): volume system.
volume_identifiers (list[str]): allowed volume identifiers.
Raises:
dfvfs.ScannerError: if a volume cannot be resolved from the volume
identifier.
"""
super()._PrintPartitionIdentifiersOverview(volume_system, volume_identifiers)
self._output_writer.Write("\n")
def _PrintVSSStoreIdentifiersOverview(self, volume_system, volume_identifiers):
"""Prints an overview of VSS store identifiers.
Args:
volume_system (dfvfs.VShadowVolumeSystem): volume system.
volume_identifiers (list[str]): allowed volume identifiers.
Raises:
dfvfs.ScannerError: if a volume cannot be resolved from the volume
identifier.
"""
super()._PrintVSSStoreIdentifiersOverview(volume_system, volume_identifiers)
self._output_writer.Write("\n")
# pylint: disable=arguments-differ
# TODO: replace this method by the one defined by CLIVolumeScannerMediator
[docs]
def ParseVolumeIdentifiersString(self, volume_identifiers_string, prefix="v"):
"""Parses a user specified volume identifiers string.
Args:
volume_identifiers_string (str): user specified volume identifiers. A
range of volumes can be defined as: "3..5". Multiple volumes can be
defined as: "1,3,5" (a list of comma separated values). Ranges and
lists can also be combined as: "1,3..5". The first volume is 1. All
volumes can be defined as: "all".
prefix (Optional[str]): volume identifier prefix.
Returns:
list[str]: volume identifiers with prefix or the string "all".
Raises:
ValueError: if the volume identifiers string is invalid.
"""
return self._ParseVolumeIdentifiersString(
volume_identifiers_string, prefix=prefix
)
[docs]
def PromptUserForVSSCurrentVolume(self):
"""Prompts the user if the current volume with VSS should be processed.
Returns:
bool: True if the current volume with VSS should be processed.
"""
while True:
self._output_writer.Write(
"Volume Shadow Snapshots (VSS) were selected also process current\n"
"volume? [yes, no]\n"
)
process_current_volume = self._input_reader.Read()
process_current_volume = process_current_volume.strip()
process_current_volume = process_current_volume.lower()
if not process_current_volume or process_current_volume in ("no", "yes"):
break
self._output_writer.Write(
"\nUnsupported option, please try again or abort with Ctrl^C.\n\n"
)
self._output_writer.Write("\n")
return not process_current_volume or process_current_volume == "yes"
[docs]
class StorageMediaToolVolumeScanner(dfvfs_volume_scanner.VolumeScanner):
"""Volume scanner used by the storage media tool."""
[docs]
def __init__(self, mediator=None):
"""Initializes a volume scanner.
Args:
mediator (Optional[VolumeScannerMediator]): a volume scanner mediator.
"""
super().__init__(mediator=mediator)
self._credential_configurations = []
self._snapshots_only = False
@property
def source_type(self):
"""str: type of source."""
return self._source_type
def _GetBasePathSpecs(self, scan_context, options):
"""Determines the base path specifications.
Args:
scan_context (SourceScannerContext): source scanner context.
options (VolumeScannerOptions): volume scanner options.
Returns:
list[PathSpec]: path specifications.
Raises:
dfvfs.ScannerError: if the format of or within the source is not
supported or no partitions were found.
"""
# TODO: difference with dfVFS.
self._snapshots_only = options.snapshots_only
scan_node = scan_context.GetRootScanNode()
if scan_context.source_type not in (
scan_context.SOURCE_TYPE_STORAGE_MEDIA_DEVICE,
scan_context.SOURCE_TYPE_STORAGE_MEDIA_IMAGE,
):
return [scan_node.path_spec]
# Get the first node where where we need to decide what to process.
while len(scan_node.sub_nodes) == 1:
scan_node = scan_node.sub_nodes[0]
base_path_specs = []
if scan_node.type_indicator not in (
dfvfs_definitions.TYPE_INDICATOR_GPT,
dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION,
):
self._ScanVolume(scan_context, scan_node, options, base_path_specs)
else:
# Determine which partition needs to be processed.
partition_identifiers = self._GetPartitionIdentifiers(scan_node, options)
# TODO: difference with dfVFS.
if not partition_identifiers:
raise dfvfs_errors.ScannerError("No partitions found.")
for partition_identifier in partition_identifiers:
sub_scan_node = scan_node.GetSubNodeByLocation(
f"/{partition_identifier:s}"
)
self._ScanVolume(scan_context, sub_scan_node, options, base_path_specs)
return base_path_specs
def _ScanEncryptedVolume(self, scan_context, scan_node, options):
"""Scans an encrypted volume scan node for volume and file systems.
Args:
scan_context (SourceScannerContext): source scanner context.
scan_node (SourceScanNode): volume scan node.
options (VolumeScannerOptions): volume scanner options.
Raises:
dfvfs.ScannerError: if the format of or within the source is not
supported, the scan node is invalid or there are no credentials
defined for the format.
"""
super()._ScanEncryptedVolume(scan_context, scan_node, options)
if not scan_context.IsLockedScanNode(scan_node.path_spec):
credential_type, credential_data = scan_node.credential
credential_configuration = configurations.CredentialConfiguration(
credential_data=credential_data,
credential_type=credential_type,
path_spec=scan_node.path_spec,
)
self._credential_configurations.append(credential_configuration)
def _ScanFileSystem(self, scan_node, base_path_specs):
"""Scans a file system scan node for file systems.
Args:
scan_node (SourceScanNode): file system scan node.
base_path_specs (list[PathSpec]): file system base path specifications.
Raises:
dfvfs.ScannerError: if the scan node is invalid.
"""
if not scan_node or not scan_node.path_spec:
raise dfvfs_errors.ScannerError("Invalid or missing file system scan node.")
# TODO: difference with dfVFS for current VSS volume support.
if self._snapshots_only:
if scan_node.parent_node.sub_nodes[0].type_indicator == (
dfvfs_definitions.TYPE_INDICATOR_VSHADOW
):
return
base_path_specs.append(scan_node.path_spec)
def _ScanVolumeSystemRoot(self, scan_context, scan_node, options, base_path_specs):
"""Scans a volume system root scan node for volume and file systems.
Args:
scan_context (SourceScannerContext): source scanner context.
scan_node (SourceScanNode): volume system root scan node.
options (VolumeScannerOptions): volume scanner options.
base_path_specs (list[PathSpec]): file system base path specifications.
Raises:
dfvfs.ScannerError: if the scan node is invalid, the scan node type is not
supported or if a sub scan node cannot be retrieved.
"""
if not scan_node or not scan_node.path_spec:
raise dfvfs_errors.ScannerError("Invalid scan node.")
if scan_node.type_indicator == (
dfvfs_definitions.TYPE_INDICATOR_APFS_CONTAINER
):
volume_system = apfs_volume_system.APFSVolumeSystem()
volume_system.Open(scan_node.path_spec)
volume_identifiers = self._GetVolumeIdentifiers(volume_system, options)
elif scan_node.type_indicator in (
dfvfs_definitions.TYPE_INDICATOR_GPT,
dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION,
):
volume_identifiers = self._GetPartitionIdentifiers(scan_node, options)
elif scan_node.type_indicator == dfvfs_definitions.TYPE_INDICATOR_LVM:
volume_system = lvm_volume_system.LVMVolumeSystem()
volume_system.Open(scan_node.path_spec)
volume_identifiers = self._GetVolumeIdentifiers(volume_system, options)
elif scan_node.type_indicator == dfvfs_definitions.TYPE_INDICATOR_VSHADOW:
volume_system = vshadow_volume_system.VShadowVolumeSystem()
volume_system.Open(scan_node.path_spec)
volume_identifiers = self._GetVolumeSnapshotIdentifiers(
volume_system, options
)
# Process VSS stores (snapshots) starting with the most recent one.
volume_identifiers.reverse()
# TODO: difference with dfVFS for current VSS volume support.
if not options.snapshots_only and self._mediator and volume_identifiers:
snapshots_only = not self._mediator.PromptUserForVSSCurrentVolume()
options.snapshots_only = snapshots_only
self._snapshots_only = snapshots_only
else:
raise dfvfs_errors.ScannerError(
f"Unsupported volume system type: {scan_node.type_indicator:s}."
)
for volume_identifier in volume_identifiers:
sub_scan_node = scan_node.GetSubNodeByLocation(f"/{volume_identifier:s}")
if not sub_scan_node:
raise dfvfs_errors.ScannerError(
f"Scan node missing for volume identifier: {volume_identifier:s}."
)
self._ScanVolume(scan_context, sub_scan_node, options, base_path_specs)
[docs]
def ScanSource(self, source_path, options, base_path_specs):
"""Scans the source path for volume and file systems.
This function sets the internal source path specification and source
type values.
Args:
source_path (str): path to the source.
options (VolumeScannerOptions): volume scanner options.
base_path_specs (list[PathSpec]): file system base path specifications.
Returns:
dfvfs.SourceScannerContext: source scanner context.
Raises:
dfvfs.ScannerError: if the format of or within the source is
not supported.
"""
scan_context = self._ScanSource(source_path)
self._source_path = source_path
self._source_type = scan_context.source_type
scanner_base_path_specs = self._GetBasePathSpecs(scan_context, options)
base_path_specs.extend(scanner_base_path_specs)
return scan_context
[docs]
class StorageMediaTool(tools.CLITool):
"""CLI tool that supports a storage media device or image as input."""
# TODO: remove this redirect.
_SOURCE_OPTION = "source"
_BINARY_DATA_CREDENTIAL_TYPES = ["key_data"]
_SUPPORTED_CREDENTIAL_TYPES = [
"key_data",
"password",
"recovery_password",
"startup_key",
]
[docs]
def __init__(self, input_reader=None, output_writer=None):
"""Initializes a CLI tool that supports storage media as input.
Args:
input_reader (Optional[InputReader]): input reader, where None indicates
that the stdin input reader should be used.
output_writer (Optional[OutputWriter]): output writer, where None
indicates that the stdout output writer should be used.
"""
super().__init__(input_reader=input_reader, output_writer=output_writer)
self._custom_artifacts_path = None
self._artifact_definitions_path = None
self._artifact_filters = None
self._credentials = []
self._credential_configurations = []
self._file_system_path_specs = []
self._filter_file = None
self._mediator = StorageMediaToolMediator(
input_reader=input_reader, output_writer=output_writer
)
self._partitions = None
self._source_path = None
self._source_type = None
self._volumes = None
self._vss_only = False
self._vss_stores = None
def _ParseCredentialOptions(self, options):
"""Parses the credential options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
credentials = getattr(options, "credentials", [])
if not isinstance(credentials, list):
raise errors.BadConfigOption("Unsupported credentials value.")
for credential_string in credentials:
credential_type, _, credential_data = credential_string.partition(":")
if not credential_type or not credential_data:
raise errors.BadConfigOption(
f"Badly formatted credential: {credential_string:s}."
)
if credential_type not in self._SUPPORTED_CREDENTIAL_TYPES:
raise errors.BadConfigOption(
f"Unsupported credential type for: {credential_string:s}."
)
if credential_type in self._BINARY_DATA_CREDENTIAL_TYPES:
try:
credential_data = codecs.decode(credential_data, "hex")
except TypeError:
raise errors.BadConfigOption(
f"Unsupported credential data for: {credential_string:s}."
)
self._credentials.append((credential_type, credential_data))
def _ParseSourcePathOption(self, options):
"""Parses the source path option.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
self._source_path = self.ParseStringOption(options, self._SOURCE_OPTION)
if not self._source_path:
raise errors.BadConfigOption("Missing source path.")
self._source_path = os.path.abspath(self._source_path)
def _ParseStorageMediaOptions(self, options):
"""Parses the storage media options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
self._ParseStorageMediaImageOptions(options)
self._ParseVSSProcessingOptions(options)
self._ParseCredentialOptions(options)
self._ParseSourcePathOption(options)
def _ParseStorageMediaImageOptions(self, options):
"""Parses the storage media image options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
self._partitions = getattr(options, "partitions", None)
if self._partitions:
try:
self._mediator.ParseVolumeIdentifiersString(
self._partitions, prefix="p"
)
except ValueError:
raise errors.BadConfigOption("Unsupported partitions")
self._volumes = getattr(options, "volumes", None)
if self._volumes:
try:
self._mediator.ParseVolumeIdentifiersString(
self._volumes, prefix="apfs"
)
except ValueError:
raise errors.BadConfigOption("Unsupported volumes")
def _ParseVSSProcessingOptions(self, options):
"""Parses the VSS processing options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
process_vss = not getattr(options, "no_vss", False)
vss_only = getattr(options, "vss_only", False)
vss_stores = getattr(options, "vss_stores", None)
if not process_vss:
vss_stores = "none"
self._PrintUserWarning(
"The --no_vss option is deprecated use --vss_stores=none instead."
)
if vss_stores and vss_stores != "none":
try:
self._mediator.ParseVolumeIdentifiersString(vss_stores, prefix="vss")
except ValueError:
raise errors.BadConfigOption("Unsupported VSS stores")
self._vss_only = vss_only
self._vss_stores = vss_stores
[docs]
def AddCredentialOptions(self, argument_group):
"""Adds the credential options to the argument group.
The credential options are use to unlock encrypted volumes.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
credential_types = ", ".join(self._SUPPORTED_CREDENTIAL_TYPES)
argument_group.add_argument(
"--credential",
action="append",
default=[],
type=str,
dest="credentials",
metavar="TYPE:DATA",
help=(
f"Define a credentials that can be used to unlock encrypted "
f"volumes e.g. BitLocker. The credential is defined as type:data "
f'e.g. "password:BDE-test". Supported credential types are: '
f"{credential_types:s}. Binary key data is expected to be passed "
f"in BASE-16 encoding (hexadecimal). WARNING credentials passed "
f"via command line arguments can end up in logs, so use this "
f"option with care."
),
)
[docs]
def AddStorageMediaImageOptions(self, argument_group):
"""Adds the storage media image options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
argument_group.add_argument(
"--partitions",
"--partition",
dest="partitions",
action="store",
type=str,
default=None,
help=(
"Define partitions to be processed. A range of "
'partitions can be defined as: "3..5". Multiple partitions can '
'be defined as: "1,3,5" (a list of comma separated values). '
'Ranges and lists can also be combined as: "1,3..5". The first '
'partition is 1. All partitions can be specified with: "all".'
),
)
argument_group.add_argument(
"--volumes",
"--volume",
dest="volumes",
action="store",
type=str,
default=None,
help=(
"Define volumes to be processed. A range of volumes can be defined "
'as: "3..5". Multiple volumes can be defined as: "1,3,5" (a list '
"of comma separated values). Ranges and lists can also be combined "
'as: "1,3..5". The first volume is 1. All volumes can be specified '
'with: "all".'
),
)
[docs]
def AddVSSProcessingOptions(self, argument_group):
"""Adds the VSS processing options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
argument_group.add_argument(
"--no_vss",
"--no-vss",
dest="no_vss",
action="store_true",
default=False,
help=(
"Do not scan for Volume Shadow Snapshots (VSS). This means that "
"Volume Shadow Snapshots (VSS) are not processed. WARNING: this "
"option is deprecated use --vss_stores=none instead."
),
)
argument_group.add_argument(
"--vss_only",
"--vss-only",
dest="vss_only",
action="store_true",
default=False,
help=(
"Do not process the current volume if Volume Shadow Snapshots "
"(VSS) have been selected."
),
)
argument_group.add_argument(
"--vss_stores",
"--vss-stores",
dest="vss_stores",
action="store",
type=str,
default=None,
help=(
"Define Volume Shadow Snapshots (VSS) (or stores) that need to be "
'processed. A range of snapshots can be defined as: "3..5". '
'Multiple snapshots can be defined as: "1,3,5" (a list of comma '
"separated values). Ranges and lists can also be combined as: "
'"1,3..5". The first snapshot is 1. All snapshots can be defined '
'as: "all" and no snapshots as: "none".'
),
)
[docs]
def ScanSource(self, source_path):
"""Scans the source path for volume and file systems.
This function sets the internal source path specification and source
type values.
Args:
source_path (str): path to the source.
Raises:
SourceScannerError: if the format of or within the source is
not supported.
"""
# Symbolic links are resolved here and not earlier to preserve the user
# specified source path in storage and reporting.
if os.path.islink(source_path):
source_path = os.path.realpath(source_path)
options = StorageMediaToolVolumeScannerOptions()
options.credentials = self._credentials
if self._vss_only:
options.scan_mode = options.SCAN_MODE_SNAPSHOTS_ONLY
else:
options.scan_mode = options.SCAN_MODE_ALL
options.snapshots_only = self._vss_only
if self._partitions == "all":
options.partitions = ["all"]
else:
options.partitions = self._partitions
if not self._vss_stores and self._unattended_mode:
options.snapshots = ["none"]
elif self._vss_stores == "all":
options.snapshots = ["all"]
elif self._vss_stores == "none":
options.snapshots = ["none"]
else:
options.snapshots = self._vss_stores
if self._volumes == "all":
options.volumes = ["all"]
else:
options.volumes = self._volumes
if self._unattended_mode:
mediator = None
else:
mediator = self._mediator
volume_scanner = StorageMediaToolVolumeScanner(mediator=mediator)
try:
base_path_specs = volume_scanner.GetBasePathSpecs(
source_path, options=options
)
except dfvfs_errors.ScannerError as exception:
raise errors.SourceScannerError(exception)
if not base_path_specs:
raise errors.SourceScannerError("No supported file system found in source.")
# pylint: disable=protected-access
self._credential_configurations = volume_scanner._credential_configurations
self._file_system_path_specs = base_path_specs
self._source_type = volume_scanner.source_type