Source code for plaso.cli.extraction_tool
"""Shared functionality for an extraction CLI tool."""
import datetime
import os
import time
import pytz
from dfvfs.analyzer import analyzer as dfvfs_analyzer
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.lib import errors as dfvfs_errors
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import context as dfvfs_context
# The following import makes sure the analyzers are registered.
from plaso import analyzers # pylint: disable=unused-import
# The following import makes sure the parsers are registered.
from plaso import parsers # pylint: disable=unused-import
from plaso.cli import logger
from plaso.cli import status_view
from plaso.cli import storage_media_tool
from plaso.cli import tool_options
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.containers import artifacts
from plaso.engine import configurations
from plaso.engine import engine
from plaso.filters import parser_filter
from plaso.helpers import language_tags
from plaso.lib import definitions
from plaso.lib import errors
from plaso.multi_process import extraction_engine as multi_extraction_engine
from plaso.parsers import manager as parsers_manager
from plaso.parsers import presets as parsers_presets
from plaso.single_process import extraction_engine as single_extraction_engine
from plaso.storage import factory as storage_factory
[docs]
class ExtractionTool(
storage_media_tool.StorageMediaTool,
tool_options.HashersOptions,
tool_options.ProfilingOptions,
tool_options.StorageFileOptions,
):
"""Extraction CLI tool.
Attributes:
list_language_tags (bool): True if the language tags should be listed.
list_time_zones (bool): True if the time zones should be listed.
"""
_BYTES_IN_A_MIB = 1024 * 1024
# Approximately 250 MB of queued items per worker.
_DEFAULT_QUEUE_SIZE = 125000
_PRESETS_FILE_NAME = "presets.yaml"
_SOURCE_TYPES_TO_PREPROCESS = frozenset(
[
dfvfs_definitions.SOURCE_TYPE_DIRECTORY,
dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_DEVICE,
dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE,
]
)
_SUPPORTED_ARCHIVE_TYPES = {
"iso9660": "ISO-9660 disk image (.iso) file",
"modi": "MacOS disk image (.dmg) file",
"tar": "tape archive (.tar) file",
"vhdi": "Virtual Hard Disk image (.vhd, .vhdx) file",
"zip": "ZIP archive (.zip) file",
}
[docs]
def __init__(self, input_reader=None, output_writer=None):
"""Initializes an CLI tool.
Args:
input_reader (Optional[InputReader]): input reader, where None indicates
that the stdin input reader should be used.
output_writer (Optional[OutputWriter]): output writer, where None
indicates that the stdout output writer should be used.
"""
super().__init__(input_reader=input_reader, output_writer=output_writer)
self._archive_types_string = "none"
self._artifacts_registry = None
self._buffer_size = 0
self._command_line_arguments = None
self._enable_sigsegv_handler = False
self._expanded_parser_filter_expression = None
self._extract_winevt_resources = True
self._extract_winreg_binary = True
self._number_of_extraction_workers = 0
self._parser_filter_expression = None
self._preferred_codepage = None
self._preferred_language = None
self._preferred_time_zone = None
self._preferred_year = None
self._presets_file = None
self._presets_manager = parsers_presets.ParserPresetsManager()
self._process_compressed_streams = True
self._process_memory_limit = None
self._queue_size = self._DEFAULT_QUEUE_SIZE
self._resolver_context = dfvfs_context.Context()
self._single_process_mode = False
self._status_view = status_view.StatusView(self._output_writer, self.NAME)
self._status_view_file = "status.info"
self._status_view_interval = 0.5
self._status_view_mode = status_view.StatusView.MODE_WINDOW
self._storage_file_path = None
self._storage_format = definitions.STORAGE_FORMAT_SQLITE
self._task_storage_format = definitions.STORAGE_FORMAT_SQLITE
self._temporary_directory = None
self._worker_memory_limit = None
self._worker_timeout = None
self._yara_rules_string = None
self.list_language_tags = False
self.list_time_zones = False
def _CheckStorageFile(self, storage_file_path, warn_about_existing=False):
"""Checks if the storage file path is valid.
Args:
storage_file_path (str): path of the storage file.
warn_about_existing (bool): True if the user should be warned about
the storage file already existing.
Raises:
BadConfigOption: if the storage file path is invalid.
"""
if os.path.exists(storage_file_path):
if not os.path.isfile(storage_file_path):
raise errors.BadConfigOption(
f"Storage file: {storage_file_path:s} already exists and is not "
f"a file."
)
if warn_about_existing:
logger.warning("Appending to an already existing storage file.")
dirname = os.path.dirname(storage_file_path)
if not dirname:
dirname = "."
# TODO: add a more thorough check to see if the storage file really is
# a plaso storage file.
if not os.access(dirname, os.W_OK):
raise errors.BadConfigOption(
f"Unable to write to storage file: {storage_file_path:s}"
)
def _CreateExtractionEngine(self, single_process_mode):
"""Creates an extraction engine.
Args:
single_process_mode (bool): True if the engine should use single process
mode.
Returns:
BaseEngine: extraction engine.
"""
status_update_callback = self._status_view.GetExtractionStatusUpdateCallback()
if single_process_mode:
extraction_engine = single_extraction_engine.SingleProcessEngine(
status_update_callback=status_update_callback
)
else:
extraction_engine = multi_extraction_engine.ExtractionMultiProcessEngine(
number_of_worker_processes=self._number_of_extraction_workers,
status_update_callback=status_update_callback,
worker_memory_limit=self._worker_memory_limit,
worker_timeout=self._worker_timeout,
)
extraction_engine.SetStatusUpdateInterval(self._status_view_interval)
return extraction_engine
def _CreateExtractionProcessingConfiguration(self):
"""Creates an extraction processing configuration.
Returns:
ProcessingConfiguration: extraction processing configuration.
"""
configuration = configurations.ProcessingConfiguration()
configuration.artifact_definitions_path = self._artifact_definitions_path
configuration.custom_artifacts_path = self._custom_artifacts_path
configuration.data_location = self._data_location
configuration.extraction.archive_types_string = self._archive_types_string
configuration.artifact_filters = self._artifact_filters
configuration.credentials = self._credential_configurations
configuration.debug_output = self._debug_mode
configuration.extraction.hasher_file_size_limit = self._hasher_file_size_limit
configuration.extraction.extract_winevt_resources = (
self._extract_winevt_resources
)
configuration.extraction.extract_winreg_binary = self._extract_winreg_binary
configuration.extraction.hasher_names_string = self._hasher_names_string
configuration.extraction.process_compressed_streams = (
self._process_compressed_streams
)
configuration.extraction.yara_rules_string = self._yara_rules_string
configuration.filter_file = self._filter_file
configuration.log_filename = self._log_file
configuration.parser_filter_expression = self._expanded_parser_filter_expression
configuration.preferred_codepage = self._preferred_codepage
configuration.preferred_language = self._preferred_language
configuration.preferred_time_zone = self._preferred_time_zone
configuration.preferred_year = self._preferred_year
configuration.profiling.directory = self._profiling_directory
configuration.profiling.sample_rate = self._profiling_sample_rate
configuration.profiling.profilers = self._profilers
configuration.task_storage_format = self._task_storage_format
configuration.temporary_directory = self._temporary_directory
return configuration
def _GenerateStorageFileName(self):
"""Generates a name for the storage file.
The result use a timestamp and the basename of the source path.
Returns:
str: a filename for the storage file in the form <time>-<source>.plaso
Raises:
BadConfigOption: raised if the source path is not set.
"""
if not self._source_path:
raise errors.BadConfigOption("Please define a source (--source).")
timestamp = datetime.datetime.now()
datetime_string = timestamp.strftime("%Y%m%dT%H%M%S")
source_path = os.path.abspath(self._source_path)
if source_path.endswith(os.path.sep):
source_path = os.path.dirname(source_path)
source_name = os.path.basename(source_path)
if not source_name or source_name in ("/", "\\"):
# The user passed the filesystem's root as source
source_name = "ROOT"
return f"{datetime_string:s}-{source_name:s}.plaso"
def _GetExpandedParserFilterExpression(self, system_configuration):
"""Determines the expanded parser filter expression.
Args:
system_configuration (SystemConfigurationArtifact): system configuration.
Returns:
str: expanded parser filter expression.
Raises:
BadConfigOption: if presets in the parser filter expression could not
be expanded or if an invalid parser or plugin name is specified.
"""
parser_filter_expression = self._parser_filter_expression
if not parser_filter_expression and system_configuration:
operating_system_artifact = artifacts.OperatingSystemArtifact(
family=system_configuration.operating_system,
product=system_configuration.operating_system_product,
version=system_configuration.operating_system_version,
)
preset_definitions = self._presets_manager.GetPresetsByOperatingSystem(
operating_system_artifact
)
if preset_definitions:
self._parser_filter_expression = ",".join(
[preset_definition.name for preset_definition in preset_definitions]
)
logger.debug(
(
f"Parser filter expression set to preset: "
f"{self._parser_filter_expression:s}"
)
)
parser_filter_helper = parser_filter.ParserFilterExpressionHelper()
try:
parser_filter_expression = parser_filter_helper.ExpandPresets(
self._presets_manager, self._parser_filter_expression
)
parser_filter_string = parser_filter_expression or "N/A"
logger.debug(f"Parser filter expression set to: {parser_filter_string:s}")
except RuntimeError as exception:
raise errors.BadConfigOption(
(
f"Unable to expand presets in parser filter expression with "
f"error: {exception!s}"
)
)
parser_elements, invalid_parser_elements = (
parsers_manager.ParsersManager.CheckFilterExpression(
parser_filter_expression
)
)
if invalid_parser_elements:
invalid_parser_names_string = ",".join(invalid_parser_elements)
raise errors.BadConfigOption(
(
f"Unknown parser or plugin names in element(s): "
f'"{invalid_parser_names_string:s}" of parser filter expression: '
f"{parser_filter_expression:s}"
)
)
return ",".join(sorted(parser_elements))
def _ParseExtractionOptions(self, options):
"""Parses the extraction options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=["codepage", "language"]
)
# TODO: add preferred encoding
self.list_language_tags = self._preferred_language == "list"
self._extract_winevt_resources = getattr(
options, "extract_winevt_resources", True
)
time_zone_string = self.ParseStringOption(options, "timezone")
if isinstance(time_zone_string, str):
if time_zone_string.lower() == "list":
self.list_time_zones = True
elif time_zone_string:
try:
pytz.timezone(time_zone_string)
except pytz.UnknownTimeZoneError:
raise errors.BadConfigOption(
f"Unknown time zone: {time_zone_string:s}"
)
self._preferred_time_zone = time_zone_string
def _ParsePerformanceOptions(self, options):
"""Parses the performance options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
self._buffer_size = getattr(options, "buffer_size", 0)
if self._buffer_size:
# TODO: turn this into a generic function that supports more size
# suffixes both MB and MiB and also that does not allow m as a valid
# indicator for MiB since m represents milli not Mega.
try:
if self._buffer_size[-1].lower() == "m":
self._buffer_size = int(self._buffer_size[:-1], 10)
self._buffer_size *= self._BYTES_IN_A_MIB
else:
self._buffer_size = int(self._buffer_size, 10)
except ValueError:
raise errors.BadConfigOption(
f"Invalid buffer size: {self._buffer_size!s}."
)
self._queue_size = self.ParseNumericOption(options, "queue_size")
def _ParseProcessingOptions(self, options):
"""Parses the processing options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
self._single_process_mode = getattr(options, "single_process", False)
argument_helper_names = [
"process_resources",
"temporary_directory",
"vfs_backend",
"workers",
"zeromq",
]
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=argument_helper_names
)
if self._vfs_back_end == "fsext":
dfvfs_definitions.PREFERRED_EXT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_EXT
)
elif self._vfs_back_end == "fsfat":
dfvfs_definitions.PREFERRED_FAT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_FAT
)
elif self._vfs_back_end == "fshfs":
dfvfs_definitions.PREFERRED_HFS_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_HFS
)
elif self._vfs_back_end == "fsntfs":
dfvfs_definitions.PREFERRED_NTFS_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_NTFS
)
elif self._vfs_back_end == "tsk":
dfvfs_definitions.PREFERRED_EXT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK
)
dfvfs_definitions.PREFERRED_FAT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK
)
dfvfs_definitions.PREFERRED_GPT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION
)
dfvfs_definitions.PREFERRED_HFS_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK
)
dfvfs_definitions.PREFERRED_NTFS_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK
)
elif self._vfs_back_end == "vsgpt":
dfvfs_definitions.PREFERRED_GPT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_GPT
)
def _ProcessSource(self, session, storage_writer):
"""Processes the source and extract events.
Args:
session (Session): session in which the source is processed.
storage_writer (StorageWriter): storage writer to store extracted events.
Returns:
ProcessingStatus: processing status.
Raises:
BadConfigOption: if an invalid collection filter was specified.
"""
single_process_mode = self._single_process_mode
if self._source_type == dfvfs_definitions.SOURCE_TYPE_FILE:
single_process_mode = True
extraction_engine = self._CreateExtractionEngine(single_process_mode)
extraction_engine.BuildArtifactsRegistry(
self._artifact_definitions_path, self._custom_artifacts_path
)
source_configuration = artifacts.SourceConfigurationArtifact(
path=self._source_path, source_type=self._source_type
)
# TODO: check if the source was processed previously.
# TODO: add check for modification time of source.
# If the source is a directory or a storage media image run pre-processing.
system_configurations = []
if self._source_type in self._SOURCE_TYPES_TO_PREPROCESS:
try:
logger.debug("Starting preprocessing.")
system_configurations = extraction_engine.PreprocessSource(
self._file_system_path_specs,
storage_writer,
resolver_context=self._resolver_context,
)
logger.debug("Preprocessing done.")
except OSError as exception:
system_configurations = []
logger.error(f"Unable to preprocess with error: {exception!s}")
# TODO: check if the source was processed previously and if system
# configuration differs.
system_configuration = None
if system_configurations:
system_configuration = system_configurations[0]
# TODO: add support for more than 1 system configuration.
self._expanded_parser_filter_expression = (
self._GetExpandedParserFilterExpression(system_configuration)
)
enabled_parser_names = self._expanded_parser_filter_expression.split(",")
number_of_enabled_parsers = len(enabled_parser_names)
force_parser = False
if (
self._source_type == dfvfs_definitions.SOURCE_TYPE_FILE
and number_of_enabled_parsers == 1
):
force_parser = True
self._extract_winevt_resources = False
elif (
"winevt" not in enabled_parser_names
and "winevtx" not in enabled_parser_names
):
self._extract_winevt_resources = False
elif self._extract_winevt_resources and "pe" not in enabled_parser_names:
logger.warning(
"A Windows EventLog parser is enabled in combination with "
"extraction of Windows EventLog resources, but the Portable "
"Executable (PE) parser is disabled. Therefore Windows EventLog "
"resources cannot be extracted."
)
self._extract_winevt_resources = False
processing_configuration = self._CreateExtractionProcessingConfiguration()
processing_configuration.force_parser = force_parser
environment_variables = (
extraction_engine.knowledge_base.GetEnvironmentVariables()
)
user_accounts = list(storage_writer.GetAttributeContainers("user_account"))
try:
extraction_engine.BuildCollectionFilters(
environment_variables,
user_accounts,
artifact_filter_names=self._artifact_filters,
filter_file_path=self._filter_file,
)
except errors.InvalidFilter as exception:
raise errors.BadConfigOption(
f"Unable to build collection filters with error: {exception!s}"
)
session.artifact_filters = self._artifact_filters
session.command_line_arguments = self._command_line_arguments
session.debug_mode = self._debug_mode
session.enabled_parser_names = enabled_parser_names
session.extract_winevt_resources = self._extract_winevt_resources
session.filter_file = self._filter_file
session.parser_filter_expression = self._parser_filter_expression
session.preferred_codepage = self._preferred_codepage
session.preferred_encoding = self.preferred_encoding
session.preferred_language = self._preferred_language or "en-US"
session.preferred_time_zone = self._preferred_time_zone
session.preferred_year = self._preferred_year
storage_writer.AddAttributeContainer(session)
processing_status = None
try:
storage_writer.AddAttributeContainer(source_configuration)
for system_configuration in system_configurations:
storage_writer.AddAttributeContainer(system_configuration)
if single_process_mode:
logger.debug("Starting extraction in single process mode.")
processing_status = extraction_engine.ProcessSource(
storage_writer,
self._resolver_context,
processing_configuration,
system_configurations,
self._file_system_path_specs,
)
else:
logger.debug("Starting extraction in multi process mode.")
# The method is named ProcessSourceMulti because pylint 2.6.0 and
# later gets confused about keyword arguments when ProcessSource
# is used.
processing_status = extraction_engine.ProcessSourceMulti(
storage_writer,
session.identifier,
processing_configuration,
system_configurations,
self._file_system_path_specs,
enable_sigsegv_handler=self._enable_sigsegv_handler,
storage_file_path=self._storage_file_path,
)
finally:
session.aborted = getattr(processing_status, "aborted", True)
session.completion_time = int(time.time() * 1000000)
storage_writer.UpdateAttributeContainer(session)
return processing_status
def _ReadParserPresetsFromFile(self):
"""Reads the parser presets from the presets.yaml file.
Raises:
BadConfigOption: if the parser presets file cannot be read.
"""
self._presets_file = os.path.join(self._data_location, self._PRESETS_FILE_NAME)
if not os.path.isfile(self._presets_file):
raise errors.BadConfigOption(
f"No such parser presets file: {self._presets_file:s}"
)
try:
self._presets_manager.ReadFromFile(self._presets_file)
except errors.MalformedPresetError as exception:
raise errors.BadConfigOption(
f"Unable to read parser presets from file with error: {exception!s}"
)
def _ScanSourceForArchive(self, path_spec):
"""Determines if a path specification references an archive file.
Args:
path_spec (dfvfs.PathSpec): path specification of the data stream.
Returns:
dfvfs.PathSpec: path specification of the archive file or None if not
an archive file.
"""
try:
type_indicators = dfvfs_analyzer.Analyzer.GetCompressedStreamTypeIndicators(
path_spec, resolver_context=self._resolver_context
)
except OSError:
type_indicators = []
if len(type_indicators) > 1:
return False
if type_indicators:
type_indicator = type_indicators[0]
else:
type_indicator = None
if type_indicator == dfvfs_definitions.TYPE_INDICATOR_BZIP2:
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_COMPRESSED_STREAM,
compression_method=dfvfs_definitions.COMPRESSION_METHOD_BZIP2,
parent=path_spec,
)
elif type_indicator == dfvfs_definitions.TYPE_INDICATOR_GZIP:
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_GZIP, parent=path_spec
)
elif type_indicator == dfvfs_definitions.TYPE_INDICATOR_XZ:
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_COMPRESSED_STREAM,
compression_method=dfvfs_definitions.COMPRESSION_METHOD_XZ,
parent=path_spec,
)
try:
type_indicators = dfvfs_analyzer.Analyzer.GetArchiveTypeIndicators(
path_spec, resolver_context=self._resolver_context
)
except OSError:
return None
if len(type_indicators) != 1:
return None
return path_spec_factory.Factory.NewPathSpec(
type_indicators[0], location="/", parent=path_spec
)
[docs]
def AddExtractionOptions(self, argument_group):
"""Adds the extraction options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
argument_group, names=["codepage", "language"]
)
# Note defaults here are None so we can determine if an option was set.
argument_group.add_argument(
"--no_extract_winevt_resources",
"--no-extract-winevt-resources",
dest="extract_winevt_resources",
action="store_false",
default=True,
help=(
"Do not extract Windows EventLog resources such as event "
"message template strings. By default Windows EventLog "
"resources will be extracted when a Windows EventLog parser "
"is enabled."
),
)
# TODO: add preferred encoding
argument_group.add_argument(
"-z",
"--zone",
"--timezone",
dest="timezone",
action="store",
metavar="TIME_ZONE",
type=str,
default=None,
help=(
"preferred time zone of extracted date and time values that are "
"stored without a time zone indicator. The time zone is determined "
"based on the source data where possible otherwise it will default "
'to UTC. Use "list" to see a list of available time zones.'
),
)
[docs]
def AddPerformanceOptions(self, argument_group):
"""Adds the performance options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
argument_group.add_argument(
"--buffer_size",
"--buffer-size",
"--bs",
dest="buffer_size",
action="store",
default=0,
help=("The buffer size for the output (defaults to 196MiB)."),
)
argument_group.add_argument(
"--queue_size",
"--queue-size",
dest="queue_size",
action="store",
default=0,
help=(
f"The maximum number of queued items per worker (defaults to "
f"{self._DEFAULT_QUEUE_SIZE:d})"
),
)
[docs]
def AddProcessingOptions(self, argument_group):
"""Adds the processing options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
argument_group.add_argument(
"--single_process",
"--single-process",
dest="single_process",
action="store_true",
default=False,
help=("Indicate that the tool should run in a single process."),
)
argument_helper_names = [
"temporary_directory",
"vfs_backend",
"workers",
"zeromq",
]
if self._CanEnforceProcessMemoryLimit():
argument_helper_names.append("process_resources")
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
argument_group, names=argument_helper_names
)
[docs]
def ExtractEventsFromSources(self):
"""Processes the sources and extracts events.
Raises:
BadConfigOption: if the storage file path is invalid, or the storage
format not supported, or there was a failure to writing to the
storage.
OSError: if the extraction engine could not write to the storage.
SourceScannerError: if the source scanner could not find a supported
file system.
UserAbort: if the user initiated an abort.
"""
self._CheckStorageFile(self._storage_file_path, warn_about_existing=True)
try:
self.ScanSource(self._source_path)
except dfvfs_errors.UserAbort as exception:
raise errors.UserAbort(exception)
if self._source_type == dfvfs_definitions.SOURCE_TYPE_FILE:
archive_path_spec = self._ScanSourceForArchive(
self._file_system_path_specs[0]
)
if archive_path_spec:
self._file_system_path_specs = [archive_path_spec]
self._source_type = definitions.SOURCE_TYPE_ARCHIVE
self._status_view.SetMode(self._status_view_mode)
self._status_view.SetStatusFile(self._status_view_file)
self._status_view.SetSourceInformation(
self._source_path,
self._source_type,
artifact_filters=self._artifact_filters,
filter_file=self._filter_file,
)
self._output_writer.Write("\n")
self._status_view.PrintExtractionStatusHeader(None)
self._output_writer.Write("Processing started.\n")
# TODO: attach processing configuration to session?
session = engine.BaseEngine.CreateSession()
storage_writer = storage_factory.StorageFactory.CreateStorageWriter(
self._storage_format
)
if not storage_writer:
raise errors.BadConfigOption(
f"Unsupported storage format: {self._storage_format:s}"
)
try:
storage_writer.Open(path=self._storage_file_path)
except OSError as exception:
raise OSError(f"Unable to open storage with error: {exception!s}")
processing_status = None
number_of_extraction_warnings = 0
try:
stored_number_of_extraction_warnings = (
storage_writer.GetNumberOfAttributeContainers("extraction_warning")
)
try:
processing_status = self._ProcessSource(session, storage_writer)
finally:
number_of_extraction_warnings = (
storage_writer.GetNumberOfAttributeContainers("extraction_warning")
- stored_number_of_extraction_warnings
)
except OSError as exception:
raise OSError(f"Unable to write to storage with error: {exception!s}")
finally:
storage_writer.Close()
self._status_view.PrintExtractionSummary(
processing_status, number_of_extraction_warnings
)
[docs]
def ListArchiveTypes(self):
"""Lists information about supported archive types."""
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Description"],
title="Archive and storage media image types",
)
for name, description in sorted(self._SUPPORTED_ARCHIVE_TYPES.items()):
table_view.AddRow([name, description])
table_view.Write(self._output_writer)
[docs]
def ListLanguageTags(self):
"""Lists the language tags."""
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Language tag", "Description"],
title="Language tags",
)
for language_tag, description in language_tags.LanguageTagHelper.GetLanguages():
table_view.AddRow([language_tag, description])
table_view.Write(self._output_writer)
[docs]
def ListParsersAndPlugins(self):
"""Lists information about the available parsers and plugins."""
parsers_information = parsers_manager.ParsersManager.GetParsersInformation()
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Description"],
title="Parsers",
)
for name, description in sorted(parsers_information):
table_view.AddRow([name, description])
table_view.Write(self._output_writer)
parser_names = parsers_manager.ParsersManager.GetNamesOfParsersWithPlugins()
for parser_name in parser_names:
plugins_information = (
parsers_manager.ParsersManager.GetParserPluginsInformation(
parser_filter_expression=parser_name
)
)
table_title = f"Parser plugins: {parser_name:s}"
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Description"],
title=table_title,
)
for name, description in sorted(plugins_information):
table_view.AddRow([name, description])
table_view.Write(self._output_writer)
title = "Parser presets"
if self._presets_file:
source_path = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
presets_file = self._presets_file
if presets_file.startswith(source_path):
presets_file = presets_file[len(source_path) + 1 :]
title = f"{title:s} ({presets_file:s})"
presets_information = self._presets_manager.GetPresetsInformation()
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Parsers and plugins"],
title=title,
)
for name, description in sorted(presets_information):
table_view.AddRow([name, description])
table_view.Write(self._output_writer)