Source code for plaso.cli.extraction_tool
# -*- coding: utf-8 -*-
"""Shared functionality for an extraction CLI tool."""
import datetime
import os
import time
import pytz
from dfvfs.analyzer import analyzer as dfvfs_analyzer
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.lib import errors as dfvfs_errors
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import context as dfvfs_context
# The following import makes sure the analyzers are registered.
from plaso import analyzers # pylint: disable=unused-import
# The following import makes sure the parsers are registered.
from plaso import parsers # pylint: disable=unused-import
from plaso.cli import logger
from plaso.cli import status_view
from plaso.cli import storage_media_tool
from plaso.cli import tool_options
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.containers import artifacts
from plaso.engine import configurations
from plaso.engine import engine
from plaso.single_process import extraction_engine as single_extraction_engine
from plaso.filters import parser_filter
from plaso.helpers import language_tags
from plaso.lib import definitions
from plaso.lib import errors
from plaso.multi_process import extraction_engine as multi_extraction_engine
from plaso.parsers import manager as parsers_manager
from plaso.parsers import presets as parsers_presets
from plaso.storage import factory as storage_factory
[docs]
class ExtractionTool(
storage_media_tool.StorageMediaTool,
tool_options.HashersOptions,
tool_options.ProfilingOptions,
tool_options.StorageFileOptions):
"""Extraction CLI tool.
Attributes:
list_language_tags (bool): True if the language tags should be listed.
list_time_zones (bool): True if the time zones should be listed.
"""
_BYTES_IN_A_MIB = 1024 * 1024
# Approximately 250 MB of queued items per worker.
_DEFAULT_QUEUE_SIZE = 125000
_PRESETS_FILE_NAME = 'presets.yaml'
_SOURCE_TYPES_TO_PREPROCESS = frozenset([
dfvfs_definitions.SOURCE_TYPE_DIRECTORY,
dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_DEVICE,
dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE])
_SUPPORTED_ARCHIVE_TYPES = {
'iso9660': 'ISO-9660 disk image (.iso) file',
'modi': 'MacOS disk image (.dmg) file',
'tar': 'tape archive (.tar) file',
'vhdi': 'Virtual Hard Disk image (.vhd, .vhdx) file',
'zip': 'ZIP archive (.zip) file'}
[docs]
def __init__(self, input_reader=None, output_writer=None):
"""Initializes an CLI tool.
Args:
input_reader (Optional[InputReader]): input reader, where None indicates
that the stdin input reader should be used.
output_writer (Optional[OutputWriter]): output writer, where None
indicates that the stdout output writer should be used.
"""
super(ExtractionTool, self).__init__(
input_reader=input_reader, output_writer=output_writer)
self._archive_types_string = 'none'
self._artifacts_registry = None
self._buffer_size = 0
self._command_line_arguments = None
self._enable_sigsegv_handler = False
self._expanded_parser_filter_expression = None
self._extract_winevt_resources = True
self._extract_winreg_binary = True
self._number_of_extraction_workers = 0
self._parser_filter_expression = None
self._preferred_codepage = None
self._preferred_language = None
self._preferred_time_zone = None
self._preferred_year = None
self._presets_file = None
self._presets_manager = parsers_presets.ParserPresetsManager()
self._process_compressed_streams = True
self._process_memory_limit = None
self._queue_size = self._DEFAULT_QUEUE_SIZE
self._resolver_context = dfvfs_context.Context()
self._single_process_mode = False
self._status_view = status_view.StatusView(self._output_writer, self.NAME)
self._status_view_file = 'status.info'
self._status_view_interval = 0.5
self._status_view_mode = status_view.StatusView.MODE_WINDOW
self._storage_file_path = None
self._storage_format = definitions.STORAGE_FORMAT_SQLITE
self._task_storage_format = definitions.STORAGE_FORMAT_SQLITE
self._temporary_directory = None
self._worker_memory_limit = None
self._worker_timeout = None
self._yara_rules_string = None
self.list_language_tags = False
self.list_time_zones = False
def _CheckStorageFile(self, storage_file_path, warn_about_existing=False):
"""Checks if the storage file path is valid.
Args:
storage_file_path (str): path of the storage file.
warn_about_existing (bool): True if the user should be warned about
the storage file already existing.
Raises:
BadConfigOption: if the storage file path is invalid.
"""
if os.path.exists(storage_file_path):
if not os.path.isfile(storage_file_path):
raise errors.BadConfigOption((
f'Storage file: {storage_file_path:s} already exists and is not '
f'a file.'))
if warn_about_existing:
logger.warning('Appending to an already existing storage file.')
dirname = os.path.dirname(storage_file_path)
if not dirname:
dirname = '.'
# TODO: add a more thorough check to see if the storage file really is
# a plaso storage file.
if not os.access(dirname, os.W_OK):
raise errors.BadConfigOption(
f'Unable to write to storage file: {storage_file_path:s}')
def _CreateExtractionEngine(self, single_process_mode):
"""Creates an extraction engine.
Args:
single_process_mode (bool): True if the engine should use single process
mode.
Returns:
BaseEngine: extraction engine.
"""
status_update_callback = (
self._status_view.GetExtractionStatusUpdateCallback())
if single_process_mode:
extraction_engine = single_extraction_engine.SingleProcessEngine(
status_update_callback=status_update_callback)
else:
extraction_engine = multi_extraction_engine.ExtractionMultiProcessEngine(
number_of_worker_processes=self._number_of_extraction_workers,
status_update_callback=status_update_callback,
worker_memory_limit=self._worker_memory_limit,
worker_timeout=self._worker_timeout)
extraction_engine.SetStatusUpdateInterval(self._status_view_interval)
return extraction_engine
def _CreateExtractionProcessingConfiguration(self):
"""Creates an extraction processing configuration.
Returns:
ProcessingConfiguration: extraction processing configuration.
"""
configuration = configurations.ProcessingConfiguration()
configuration.artifact_definitions_path = self._artifact_definitions_path
configuration.custom_artifacts_path = self._custom_artifacts_path
configuration.data_location = self._data_location
configuration.extraction.archive_types_string = self._archive_types_string
configuration.artifact_filters = self._artifact_filters
configuration.credentials = self._credential_configurations
configuration.debug_output = self._debug_mode
configuration.extraction.hasher_file_size_limit = (
self._hasher_file_size_limit)
configuration.extraction.extract_winevt_resources = (
self._extract_winevt_resources)
configuration.extraction.extract_winreg_binary = self._extract_winreg_binary
configuration.extraction.hasher_names_string = self._hasher_names_string
configuration.extraction.process_compressed_streams = (
self._process_compressed_streams)
configuration.extraction.yara_rules_string = self._yara_rules_string
configuration.filter_file = self._filter_file
configuration.log_filename = self._log_file
configuration.parser_filter_expression = (
self._expanded_parser_filter_expression)
configuration.preferred_codepage = self._preferred_codepage
configuration.preferred_language = self._preferred_language
configuration.preferred_time_zone = self._preferred_time_zone
configuration.preferred_year = self._preferred_year
configuration.profiling.directory = self._profiling_directory
configuration.profiling.sample_rate = self._profiling_sample_rate
configuration.profiling.profilers = self._profilers
configuration.task_storage_format = self._task_storage_format
configuration.temporary_directory = self._temporary_directory
return configuration
def _GenerateStorageFileName(self):
"""Generates a name for the storage file.
The result use a timestamp and the basename of the source path.
Returns:
str: a filename for the storage file in the form <time>-<source>.plaso
Raises:
BadConfigOption: raised if the source path is not set.
"""
if not self._source_path:
raise errors.BadConfigOption('Please define a source (--source).')
timestamp = datetime.datetime.now()
datetime_string = timestamp.strftime('%Y%m%dT%H%M%S')
source_path = os.path.abspath(self._source_path)
if source_path.endswith(os.path.sep):
source_path = os.path.dirname(source_path)
source_name = os.path.basename(source_path)
if not source_name or source_name in ('/', '\\'):
# The user passed the filesystem's root as source
source_name = 'ROOT'
return f'{datetime_string:s}-{source_name:s}.plaso'
def _GetExpandedParserFilterExpression(self, system_configuration):
"""Determines the expanded parser filter expression.
Args:
system_configuration (SystemConfigurationArtifact): system configuration.
Returns:
str: expanded parser filter expression.
Raises:
BadConfigOption: if presets in the parser filter expression could not
be expanded or if an invalid parser or plugin name is specified.
"""
parser_filter_expression = self._parser_filter_expression
if not parser_filter_expression and system_configuration:
operating_system_artifact = artifacts.OperatingSystemArtifact(
family=system_configuration.operating_system,
product=system_configuration.operating_system_product,
version=system_configuration.operating_system_version)
preset_definitions = self._presets_manager.GetPresetsByOperatingSystem(
operating_system_artifact)
if preset_definitions:
self._parser_filter_expression = ','.join([
preset_definition.name
for preset_definition in preset_definitions])
logger.debug((
f'Parser filter expression set to preset: '
f'{self._parser_filter_expression:s}'))
parser_filter_helper = parser_filter.ParserFilterExpressionHelper()
try:
parser_filter_expression = parser_filter_helper.ExpandPresets(
self._presets_manager, self._parser_filter_expression)
parser_filter_string = parser_filter_expression or 'N/A'
logger.debug(f'Parser filter expression set to: {parser_filter_string:s}')
except RuntimeError as exception:
raise errors.BadConfigOption((
f'Unable to expand presets in parser filter expression with '
f'error: {exception!s}'))
parser_elements, invalid_parser_elements = (
parsers_manager.ParsersManager.CheckFilterExpression(
parser_filter_expression))
if invalid_parser_elements:
invalid_parser_names_string = ','.join(invalid_parser_elements)
raise errors.BadConfigOption((
f'Unknown parser or plugin names in element(s): '
f'"{invalid_parser_names_string:s}" of parser filter expression: '
f'{parser_filter_expression:s}'))
return ','.join(sorted(parser_elements))
def _ParseExtractionOptions(self, options):
"""Parses the extraction options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=['codepage', 'language'])
# TODO: add preferred encoding
self.list_language_tags = self._preferred_language == 'list'
self._extract_winevt_resources = getattr(
options, 'extract_winevt_resources', True)
time_zone_string = self.ParseStringOption(options, 'timezone')
if isinstance(time_zone_string, str):
if time_zone_string.lower() == 'list':
self.list_time_zones = True
elif time_zone_string:
try:
pytz.timezone(time_zone_string)
except pytz.UnknownTimeZoneError:
raise errors.BadConfigOption(
f'Unknown time zone: {time_zone_string:s}')
self._preferred_time_zone = time_zone_string
def _ParsePerformanceOptions(self, options):
"""Parses the performance options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
self._buffer_size = getattr(options, 'buffer_size', 0)
if self._buffer_size:
# TODO: turn this into a generic function that supports more size
# suffixes both MB and MiB and also that does not allow m as a valid
# indicator for MiB since m represents milli not Mega.
try:
if self._buffer_size[-1].lower() == 'm':
self._buffer_size = int(self._buffer_size[:-1], 10)
self._buffer_size *= self._BYTES_IN_A_MIB
else:
self._buffer_size = int(self._buffer_size, 10)
except ValueError:
raise errors.BadConfigOption(
f'Invalid buffer size: {self._buffer_size!s}.')
self._queue_size = self.ParseNumericOption(options, 'queue_size')
def _ParseProcessingOptions(self, options):
"""Parses the processing options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
self._single_process_mode = getattr(options, 'single_process', False)
argument_helper_names = [
'process_resources', 'temporary_directory', 'vfs_backend', 'workers',
'zeromq']
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=argument_helper_names)
if self._vfs_back_end == 'fsext':
dfvfs_definitions.PREFERRED_EXT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_EXT)
elif self._vfs_back_end == 'fsfat':
dfvfs_definitions.PREFERRED_FAT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_FAT)
elif self._vfs_back_end == 'fshfs':
dfvfs_definitions.PREFERRED_HFS_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_HFS)
elif self._vfs_back_end == 'fsntfs':
dfvfs_definitions.PREFERRED_NTFS_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_NTFS)
elif self._vfs_back_end == 'tsk':
dfvfs_definitions.PREFERRED_EXT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK)
dfvfs_definitions.PREFERRED_FAT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK)
dfvfs_definitions.PREFERRED_GPT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION)
dfvfs_definitions.PREFERRED_HFS_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK)
dfvfs_definitions.PREFERRED_NTFS_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_TSK)
elif self._vfs_back_end == 'vsgpt':
dfvfs_definitions.PREFERRED_GPT_BACK_END = (
dfvfs_definitions.TYPE_INDICATOR_GPT)
def _ProcessSource(self, session, storage_writer):
"""Processes the source and extract events.
Args:
session (Session): session in which the source is processed.
storage_writer (StorageWriter): storage writer to store extracted events.
Returns:
ProcessingStatus: processing status.
Raises:
BadConfigOption: if an invalid collection filter was specified.
"""
single_process_mode = self._single_process_mode
if self._source_type == dfvfs_definitions.SOURCE_TYPE_FILE:
single_process_mode = True
extraction_engine = self._CreateExtractionEngine(single_process_mode)
extraction_engine.BuildArtifactsRegistry(
self._artifact_definitions_path, self._custom_artifacts_path)
source_configuration = artifacts.SourceConfigurationArtifact(
path=self._source_path, source_type=self._source_type)
# TODO: check if the source was processed previously.
# TODO: add check for modification time of source.
# If the source is a directory or a storage media image run pre-processing.
system_configurations = []
if self._source_type in self._SOURCE_TYPES_TO_PREPROCESS:
try:
logger.debug('Starting preprocessing.')
system_configurations = extraction_engine.PreprocessSource(
self._file_system_path_specs, storage_writer,
resolver_context=self._resolver_context)
logger.debug('Preprocessing done.')
except IOError as exception:
system_configurations = []
logger.error(f'Unable to preprocess with error: {exception!s}')
# TODO: check if the source was processed previously and if system
# configuration differs.
system_configuration = None
if system_configurations:
system_configuration = system_configurations[0]
# TODO: add support for more than 1 system configuration.
self._expanded_parser_filter_expression = (
self._GetExpandedParserFilterExpression(system_configuration))
enabled_parser_names = self._expanded_parser_filter_expression.split(',')
number_of_enabled_parsers = len(enabled_parser_names)
force_parser = False
if (self._source_type == dfvfs_definitions.SOURCE_TYPE_FILE and
number_of_enabled_parsers == 1):
force_parser = True
self._extract_winevt_resources = False
elif ('winevt' not in enabled_parser_names and
'winevtx' not in enabled_parser_names):
self._extract_winevt_resources = False
elif (self._extract_winevt_resources and
'pe' not in enabled_parser_names):
logger.warning(
'A Windows EventLog parser is enabled in combination with '
'extraction of Windows EventLog resources, but the Portable '
'Executable (PE) parser is disabled. Therefore Windows EventLog '
'resources cannot be extracted.')
self._extract_winevt_resources = False
processing_configuration = (
self._CreateExtractionProcessingConfiguration())
processing_configuration.force_parser = force_parser
environment_variables = (
extraction_engine.knowledge_base.GetEnvironmentVariables())
user_accounts = list(storage_writer.GetAttributeContainers('user_account'))
try:
extraction_engine.BuildCollectionFilters(
environment_variables, user_accounts,
artifact_filter_names=self._artifact_filters,
filter_file_path=self._filter_file)
except errors.InvalidFilter as exception:
raise errors.BadConfigOption(
f'Unable to build collection filters with error: {exception!s}')
session.artifact_filters = self._artifact_filters
session.command_line_arguments = self._command_line_arguments
session.debug_mode = self._debug_mode
session.enabled_parser_names = enabled_parser_names
session.extract_winevt_resources = self._extract_winevt_resources
session.filter_file_path = self._filter_file
session.parser_filter_expression = self._parser_filter_expression
session.preferred_codepage = self._preferred_codepage
session.preferred_encoding = self.preferred_encoding
session.preferred_language = self._preferred_language or 'en-US'
session.preferred_time_zone = self._preferred_time_zone
session.preferred_year = self._preferred_year
storage_writer.AddAttributeContainer(session)
processing_status = None
try:
storage_writer.AddAttributeContainer(source_configuration)
for system_configuration in system_configurations:
storage_writer.AddAttributeContainer(system_configuration)
if single_process_mode:
logger.debug('Starting extraction in single process mode.')
processing_status = extraction_engine.ProcessSource(
storage_writer, self._resolver_context, processing_configuration,
system_configurations, self._file_system_path_specs)
else:
logger.debug('Starting extraction in multi process mode.')
# The method is named ProcessSourceMulti because pylint 2.6.0 and
# later gets confused about keyword arguments when ProcessSource
# is used.
processing_status = extraction_engine.ProcessSourceMulti(
storage_writer, session.identifier, processing_configuration,
system_configurations, self._file_system_path_specs,
enable_sigsegv_handler=self._enable_sigsegv_handler,
storage_file_path=self._storage_file_path)
finally:
session.aborted = getattr(processing_status, 'aborted', True)
session.completion_time = int(time.time() * 1000000)
storage_writer.UpdateAttributeContainer(session)
return processing_status
def _ReadParserPresetsFromFile(self):
"""Reads the parser presets from the presets.yaml file.
Raises:
BadConfigOption: if the parser presets file cannot be read.
"""
self._presets_file = os.path.join(
self._data_location, self._PRESETS_FILE_NAME)
if not os.path.isfile(self._presets_file):
raise errors.BadConfigOption(
f'No such parser presets file: {self._presets_file:s}')
try:
self._presets_manager.ReadFromFile(self._presets_file)
except errors.MalformedPresetError as exception:
raise errors.BadConfigOption(
f'Unable to read parser presets from file with error: {exception!s}')
def _ScanSourceForArchive(self, path_spec):
"""Determines if a path specification references an archive file.
Args:
path_spec (dfvfs.PathSpec): path specification of the data stream.
Returns:
dfvfs.PathSpec: path specification of the archive file or None if not
an archive file.
"""
try:
type_indicators = (
dfvfs_analyzer.Analyzer.GetCompressedStreamTypeIndicators(
path_spec, resolver_context=self._resolver_context))
except IOError:
type_indicators = []
if len(type_indicators) > 1:
return False
if type_indicators:
type_indicator = type_indicators[0]
else:
type_indicator = None
if type_indicator == dfvfs_definitions.TYPE_INDICATOR_BZIP2:
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_COMPRESSED_STREAM,
compression_method=dfvfs_definitions.COMPRESSION_METHOD_BZIP2,
parent=path_spec)
elif type_indicator == dfvfs_definitions.TYPE_INDICATOR_GZIP:
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_GZIP, parent=path_spec)
elif type_indicator == dfvfs_definitions.TYPE_INDICATOR_XZ:
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_COMPRESSED_STREAM,
compression_method=dfvfs_definitions.COMPRESSION_METHOD_XZ,
parent=path_spec)
try:
type_indicators = dfvfs_analyzer.Analyzer.GetArchiveTypeIndicators(
path_spec, resolver_context=self._resolver_context)
except IOError:
return None
if len(type_indicators) != 1:
return None
return path_spec_factory.Factory.NewPathSpec(
type_indicators[0], location='/', parent=path_spec)
[docs]
def AddExtractionOptions(self, argument_group):
"""Adds the extraction options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
argument_group, names=['codepage', 'language'])
# Note defaults here are None so we can determine if an option was set.
argument_group.add_argument(
'--no_extract_winevt_resources', '--no-extract-winevt-resources',
dest='extract_winevt_resources', action='store_false', default=True,
help=('Do not extract Windows EventLog resources such as event '
'message template strings. By default Windows EventLog '
'resources will be extracted when a Windows EventLog parser '
'is enabled.'))
# TODO: add preferred encoding
argument_group.add_argument(
'-z', '--zone', '--timezone', dest='timezone', action='store',
metavar='TIME_ZONE', type=str, default=None, help=(
'preferred time zone of extracted date and time values that are '
'stored without a time zone indicator. The time zone is determined '
'based on the source data where possible otherwise it will default '
'to UTC. Use "list" to see a list of available time zones.'))
[docs]
def AddPerformanceOptions(self, argument_group):
"""Adds the performance options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
argument_group.add_argument(
'--buffer_size', '--buffer-size', '--bs', dest='buffer_size',
action='store', default=0, help=(
'The buffer size for the output (defaults to 196MiB).'))
argument_group.add_argument(
'--queue_size', '--queue-size', dest='queue_size', action='store',
default=0, help=(
f'The maximum number of queued items per worker (defaults to '
f'{self._DEFAULT_QUEUE_SIZE:d})'))
[docs]
def AddProcessingOptions(self, argument_group):
"""Adds the processing options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
argument_group.add_argument(
'--single_process', '--single-process', dest='single_process',
action='store_true', default=False, help=(
'Indicate that the tool should run in a single process.'))
argument_helper_names = [
'temporary_directory', 'vfs_backend', 'workers', 'zeromq']
if self._CanEnforceProcessMemoryLimit():
argument_helper_names.append('process_resources')
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
argument_group, names=argument_helper_names)
[docs]
def ExtractEventsFromSources(self):
"""Processes the sources and extracts events.
Raises:
BadConfigOption: if the storage file path is invalid, or the storage
format not supported, or there was a failure to writing to the
storage.
IOError: if the extraction engine could not write to the storage.
OSError: if the extraction engine could not write to the storage.
SourceScannerError: if the source scanner could not find a supported
file system.
UserAbort: if the user initiated an abort.
"""
self._CheckStorageFile(self._storage_file_path, warn_about_existing=True)
try:
self.ScanSource(self._source_path)
except dfvfs_errors.UserAbort as exception:
raise errors.UserAbort(exception)
if self._source_type == dfvfs_definitions.SOURCE_TYPE_FILE:
archive_path_spec = self._ScanSourceForArchive(
self._file_system_path_specs[0])
if archive_path_spec:
self._file_system_path_specs = [archive_path_spec]
self._source_type = definitions.SOURCE_TYPE_ARCHIVE
self._status_view.SetMode(self._status_view_mode)
self._status_view.SetStatusFile(self._status_view_file)
self._status_view.SetSourceInformation(
self._source_path, self._source_type,
artifact_filters=self._artifact_filters,
filter_file=self._filter_file)
self._output_writer.Write('\n')
self._status_view.PrintExtractionStatusHeader(None)
self._output_writer.Write('Processing started.\n')
# TODO: attach processing configuration to session?
session = engine.BaseEngine.CreateSession()
storage_writer = storage_factory.StorageFactory.CreateStorageWriter(
self._storage_format)
if not storage_writer:
raise errors.BadConfigOption(
f'Unsupported storage format: {self._storage_format:s}')
try:
storage_writer.Open(path=self._storage_file_path)
except IOError as exception:
raise IOError(f'Unable to open storage with error: {exception!s}')
processing_status = None
number_of_extraction_warnings = 0
try:
stored_number_of_extraction_warnings = (
storage_writer.GetNumberOfAttributeContainers('extraction_warning'))
try:
processing_status = self._ProcessSource(session, storage_writer)
finally:
number_of_extraction_warnings = (
storage_writer.GetNumberOfAttributeContainers(
'extraction_warning') - stored_number_of_extraction_warnings)
except IOError as exception:
raise IOError(f'Unable to write to storage with error: {exception!s}')
finally:
storage_writer.Close()
self._status_view.PrintExtractionSummary(
processing_status, number_of_extraction_warnings)
[docs]
def ListArchiveTypes(self):
"""Lists information about supported archive types."""
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, column_names=['Name', 'Description'],
title='Archive and storage media image types')
for name, description in sorted(self._SUPPORTED_ARCHIVE_TYPES.items()):
table_view.AddRow([name, description])
table_view.Write(self._output_writer)
[docs]
def ListLanguageTags(self):
"""Lists the language tags."""
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, column_names=['Language tag', 'Description'],
title='Language tags')
for language_tag, description in (
language_tags.LanguageTagHelper.GetLanguages()):
table_view.AddRow([language_tag, description])
table_view.Write(self._output_writer)
[docs]
def ListParsersAndPlugins(self):
"""Lists information about the available parsers and plugins."""
parsers_information = parsers_manager.ParsersManager.GetParsersInformation()
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, column_names=['Name', 'Description'],
title='Parsers')
for name, description in sorted(parsers_information):
table_view.AddRow([name, description])
table_view.Write(self._output_writer)
parser_names = parsers_manager.ParsersManager.GetNamesOfParsersWithPlugins()
for parser_name in parser_names:
plugins_information = (
parsers_manager.ParsersManager.GetParserPluginsInformation(
parser_filter_expression=parser_name))
table_title = f'Parser plugins: {parser_name:s}'
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, column_names=['Name', 'Description'],
title=table_title)
for name, description in sorted(plugins_information):
table_view.AddRow([name, description])
table_view.Write(self._output_writer)
title = 'Parser presets'
if self._presets_file:
source_path = os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__))))
presets_file = self._presets_file
if presets_file.startswith(source_path):
presets_file = presets_file[len(source_path) + 1:]
title = f'{title:s} ({presets_file:s})'
presets_information = self._presets_manager.GetPresetsInformation()
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, column_names=['Name', 'Parsers and plugins'],
title=title)
for name, description in sorted(presets_information):
table_view.AddRow([name, description])
table_view.Write(self._output_writer)