Source code for plaso.cli.tool_options

# -*- coding: utf-8 -*-
"""The CLI tool options mix-ins."""

import os
import pytz

from plaso.analysis import manager as analysis_manager
from plaso.analyzers.hashers import manager as hashers_manager
from plaso.cli import logger
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.cli.helpers import profiling
from plaso.formatters import yaml_formatters_file
from plaso.lib import errors
from plaso.output import manager as output_manager


# TODO: pass argument_parser instead of argument_group and add groups
# in mix-ins.


[docs] class AnalysisPluginOptions(object): """Analysis plugin options mix-in.""" # pylint: disable=no-member def _CreateAnalysisPlugins(self, options): """Creates the analysis plugins. Args: options (argparse.Namespace): command line arguments. Returns: dict[str, AnalysisPlugin]: analysis plugins and their names. """ if not self._analysis_plugins: return {} analysis_plugins = ( analysis_manager.AnalysisPluginManager.GetPluginObjects( self._analysis_plugins)) for analysis_plugin in analysis_plugins.values(): helpers_manager.ArgumentHelperManager.ParseOptions( options, analysis_plugin) return analysis_plugins
[docs] def ListAnalysisPlugins(self): """Lists the analysis modules.""" analysis_plugin_info = ( analysis_manager.AnalysisPluginManager.GetAllPluginInformation()) column_width = 10 for name, _, _ in analysis_plugin_info: if len(name) > column_width: column_width = len(name) table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Name', 'Description'], title='Analysis Plugins') # TODO: add support for a 3 column table. for name, description, type_string in analysis_plugin_info: table_view.AddRow([name, f'{description:s} [{type_string:s}]']) table_view.Write(self._output_writer)
[docs] class HashersOptions(object): """Hashers options mix-in.""" # pylint: disable=no-member
[docs] def __init__(self): """Initializes hasher options.""" super(HashersOptions, self).__init__() self._hasher_file_size_limit = None self._hasher_names_string = None
[docs] def ListHashers(self): """Lists information about the available hashers.""" hashers_information = hashers_manager.HashersManager.GetHashersInformation() table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Name', 'Description'], title='Hashers') for name, description in sorted(hashers_information): table_view.AddRow([name, description]) table_view.Write(self._output_writer)
[docs] class OutputModuleOptions(object): """Output module options mix-in. Attributes: list_time_zones (bool): True if the time zones should be listed. """ # pylint: disable=no-member # Output format that have second-only date and time value and/or a limited # predefined set of output fields. _DEPRECATED_OUTPUT_FORMATS = frozenset(['l2tcsv', 'l2ttln', 'tln'])
[docs] def __init__(self): """Initializes output module options.""" super(OutputModuleOptions, self).__init__() self._output_additional_fields = [] self._output_custom_fields = [] self._output_custom_formatters_path = None self._output_dynamic_time = None self._output_filename = None self._output_format = None self._output_module = None self._output_time_zone = None self.list_time_zones = False
def _CreateOutputModule(self, options): """Creates an output module. Args: options (argparse.Namespace): command line arguments. Returns: OutputModule: output module. Raises: BadConfigOption: if parameters are missing. RuntimeError: if the output module cannot be created. """ if self._output_format in self._DEPRECATED_OUTPUT_FORMATS: self._PrintUserWarning(( f'the output format: {self._output_format:s} has significant ' f'limitations such as second-only date and time values and/or ' f'a limited predefined set of output fields. It is strongly ' f'recommend to use an alternative output format like: dynamic.')) try: output_module = output_manager.OutputManager.NewOutputModule( self._output_format) except (KeyError, ValueError) as exception: raise RuntimeError( f'Unable to create output module with error: {exception!s}') if output_module.WRITES_OUTPUT_FILE: if not self._output_filename: raise errors.BadConfigOption( f'Output format: {self._output_format:s} requires an output file') if os.path.exists(self._output_filename): raise errors.BadConfigOption( f'Output file already exists: {self._output_filename:s}') output_module.Open(path=self._output_filename) else: output_module.Open() helpers_manager.ArgumentHelperManager.ParseOptions(options, output_module) # Check if there are parameters that have not been defined and need to # in order for the output module to continue. Prompt user to supply # those that may be missing. missing_parameters = output_module.GetMissingArguments() if missing_parameters and self._unattended_mode: parameters_string = ', '.join(missing_parameters) raise errors.BadConfigOption(( f'Unable to create output module missing parameters: ' f'{parameters_string:s}')) while missing_parameters: self._PromptUserForMissingOutputModuleParameters( options, missing_parameters) helpers_manager.ArgumentHelperManager.ParseOptions(options, output_module) missing_parameters = output_module.GetMissingArguments() if output_module.SUPPORTS_ADDITIONAL_FIELDS: output_module.SetAdditionalFields(self._output_additional_fields) elif self._output_additional_fields: self._PrintUserWarning(( f'output module: {self._output_format:s} does not support ' f'additional fields')) if output_module.SUPPORTS_CUSTOM_FIELDS: output_module.SetCustomFields(self._output_custom_fields) elif self._output_custom_fields: self._PrintUserWarning(( f'output module: {self._output_format:s} does not support ' f'custom fields')) return output_module def _GetOutputModulesInformation(self): """Retrieves the output modules information. Returns: list[tuple[str, str]]: pairs of output module names and descriptions. """ output_modules_information = [] for name, output_class in output_manager.OutputManager.GetOutputClasses(): output_modules_information.append((name, output_class.DESCRIPTION)) return output_modules_information def _ParseOutputOptions(self, options): """Parses the output options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ additional_fields = self.ParseStringOption(options, 'additional_fields') if additional_fields: self._output_additional_fields = additional_fields.split(',') custom_fields = self.ParseStringOption(options, 'custom_fields') if custom_fields: for custom_field in custom_fields.split(','): try: name, value = custom_field.split(':') except ValueError: raise errors.BadConfigOption( f'Unsupported custom field: {custom_field:s}') self._output_custom_fields.append((name, value)) custom_formatters_path = self.ParseStringOption( options, 'custom_formatter_definitions_path') if custom_formatters_path and not os.path.isfile(custom_formatters_path): raise errors.BadConfigOption(( f'Unable to determine path to custom formatter definitions: ' f'{custom_formatters_path:s}')) if custom_formatters_path: logger.info( f'Custom formatter definitions path: {custom_formatters_path:s}') if custom_formatters_path: message_formatters_file = yaml_formatters_file.YAMLFormattersFile() message_formatters_file.ReadFromFile(custom_formatters_path) self._output_custom_formatters_path = custom_formatters_path self._output_dynamic_time = getattr(options, 'dynamic_time', False) time_zone_string = self.ParseStringOption(options, 'output_time_zone') if isinstance(time_zone_string, str): if time_zone_string.lower() == 'list': self.list_time_zones = True elif time_zone_string: try: pytz.timezone(time_zone_string) except pytz.UnknownTimeZoneError: raise errors.BadConfigOption( f'Unknown time zone: {time_zone_string:s}') self._output_time_zone = time_zone_string def _PromptUserForMissingOutputModuleParameters( self, options, missing_parameters): """Prompts the user for missing output module parameters. Args: options (argparse.Namespace): command line arguments. missing_parameters (list[str]): names of missing output module parameters. """ for parameter in missing_parameters: value = None while not value: value = self._PromptUserForInput( f'Please provide a value for {parameter:s}') setattr(options, parameter, value)
[docs] def AddOutputOptions(self, argument_group): """Adds the output options to the argument group. Args: argument_group (argparse._ArgumentGroup): argparse argument group. """ argument_group.add_argument( '--additional_fields', '--additional-fields', dest='additional_fields', type=str, action='store', default='', help=( 'Defines additional fields to be included in the output besides ' 'the default fields. Multiple additional field names can be ' 'defined as a list of comma separated values. Output formats that ' 'support additional fields are: dynamic, opensearch and xlsx.')) argument_group.add_argument( '--custom_fields', '--custom-fields', dest='custom_fields', type=str, action='store', default='', help=( 'Defines custom fields to be included in the output besides ' 'the default fields. A custom field is defined as "name:value". ' 'Multiple custom field names can be defined as list of comma ' 'separated values. Note that regular fields will are favoured ' 'above custom fields with same name. Output formats that support ' 'this are: dynamic, opensearch and xlsx.')) argument_group.add_argument( '--custom_formatter_definitions', '--custom-formatter-definitions', dest='custom_formatter_definitions_path', type=str, metavar='PATH', action='store', help=( 'Path to a file containing custom event formatter definitions, ' 'which is a .yaml file. Custom event formatter definitions can ' 'be used to customize event messages and override the built-in ' 'event formatter definitions.')) argument_group.add_argument( '--dynamic_time', '--dynamic-time', dest='dynamic_time', action='store_true', default=False, help=( 'Indicate that the output should use dynamic time. Output formats ' 'that support dynamic time are: dynamic')) # Note the default here is None so we can determine if the time zone # option was set. argument_group.add_argument( '--output_time_zone', '--output-time-zone', dest='output_time_zone', action='store', metavar='TIME_ZONE', type=str, default=None, help=( 'time zone of date and time values written to the output, if ' 'supported by the output format. Use "list" to see a list of ' 'available time zones. Output formats that support an output ' 'time zone are: dynamic and l2t_csv.'))
[docs] def ListOutputModules(self): """Lists the output modules.""" table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Name', 'Description'], title='Output Modules') output_classes = sorted( output_manager.OutputManager.GetOutputClasses()) for name, output_class in output_classes: table_view.AddRow([name, output_class.DESCRIPTION]) table_view.Write(self._output_writer) disabled_classes = sorted( output_manager.OutputManager.GetDisabledOutputClasses()) if not disabled_classes: return table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Name', 'Description'], title='Disabled Output Modules') for name, output_class in disabled_classes: table_view.AddRow([name, output_class.DESCRIPTION]) table_view.Write(self._output_writer)
[docs] class ProfilingOptions(object): """Profiling options mix-in.""" # pylint: disable=no-member
[docs] def __init__(self): """Initializes profiling options.""" super(ProfilingOptions, self).__init__() self._profilers = set() self._profiling_directory = None self._profiling_sample_rate = ( profiling.ProfilingArgumentsHelper.DEFAULT_PROFILING_SAMPLE_RATE)
[docs] def ListProfilers(self): """Lists information about the available profilers.""" table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Name', 'Description'], title='Profilers') profilers_information = sorted( profiling.ProfilingArgumentsHelper.PROFILERS_INFORMATION.items()) for name, description in profilers_information: table_view.AddRow([name, description]) table_view.Write(self._output_writer)
[docs] class StorageFileOptions(object): """Storage file options mix-in."""
[docs] def AddStorageOptions(self, argument_parser): """Adds the storage options to the argument group. Args: argument_parser (argparse.ArgumentParser): argparse argument parser. """ argument_parser.add_argument( 'storage_file', metavar='PATH', nargs='?', type=str, default=None, help='Path to a storage file.')