Source code for plaso.cli.tool_options

"""The CLI tool options mix-ins."""

import os
import pytz

from plaso.analysis import manager as analysis_manager
from plaso.analyzers.hashers import manager as hashers_manager
from plaso.cli import logger
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.cli.helpers import profiling
from plaso.formatters import yaml_formatters_file
from plaso.lib import errors
from plaso.output import manager as output_manager

# TODO: pass argument_parser instead of argument_group and add groups
# in mix-ins.


[docs] class AnalysisPluginOptions: """Analysis plugin options mix-in.""" # pylint: disable=no-member def _CreateAnalysisPlugins(self, options): """Creates the analysis plugins. Args: options (argparse.Namespace): command line arguments. Returns: dict[str, AnalysisPlugin]: analysis plugins and their names. """ if not self._analysis_plugins: return {} analysis_plugins = analysis_manager.AnalysisPluginManager.GetPluginObjects( self._analysis_plugins ) for analysis_plugin in analysis_plugins.values(): helpers_manager.ArgumentHelperManager.ParseOptions(options, analysis_plugin) return analysis_plugins
[docs] def ListAnalysisPlugins(self): """Lists the analysis modules.""" analysis_plugin_info = ( analysis_manager.AnalysisPluginManager.GetAllPluginInformation() ) column_width = 10 for name, _, _ in analysis_plugin_info: column_width = max(column_width, len(name)) table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Name", "Description"], title="Analysis Plugins", ) # TODO: add support for a 3 column table. for name, description, type_string in analysis_plugin_info: table_view.AddRow([name, f"{description:s} [{type_string:s}]"]) table_view.Write(self._output_writer)
[docs] class HashersOptions: """Hashers options mix-in.""" # pylint: disable=no-member
[docs] def __init__(self): """Initializes hasher options.""" super().__init__() self._hasher_file_size_limit = None self._hasher_names_string = None
[docs] def ListHashers(self): """Lists information about the available hashers.""" hashers_information = hashers_manager.HashersManager.GetHashersInformation() table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Name", "Description"], title="Hashers", ) for name, description in sorted(hashers_information): table_view.AddRow([name, description]) table_view.Write(self._output_writer)
[docs] class OutputModuleOptions: """Output module options mix-in. Attributes: list_time_zones (bool): True if the time zones should be listed. """ # pylint: disable=no-member # Output format that have second-only date and time value and/or a limited # predefined set of output fields. _DEPRECATED_OUTPUT_FORMATS = frozenset(["l2tcsv", "l2ttln", "tln"])
[docs] def __init__(self): """Initializes output module options.""" super().__init__() self._output_additional_fields = [] self._output_custom_fields = [] self._output_custom_formatters_path = None self._output_dynamic_time = None self._output_filename = None self._output_format = None self._output_module = None self._output_time_zone = None self.list_time_zones = False
def _CreateOutputModule(self, options): """Creates an output module. Args: options (argparse.Namespace): command line arguments. Returns: OutputModule: output module. Raises: BadConfigOption: if parameters are missing. RuntimeError: if the output module cannot be created. """ if self._output_format in self._DEPRECATED_OUTPUT_FORMATS: self._PrintUserWarning( ( f"the output format: {self._output_format:s} has significant " f"limitations such as second-only date and time values and/or " f"a limited predefined set of output fields. It is strongly " f"recommend to use an alternative output format like: dynamic." ) ) try: output_module = output_manager.OutputManager.NewOutputModule( self._output_format ) except (KeyError, ValueError) as exception: raise RuntimeError( f"Unable to create output module with error: {exception!s}" ) if output_module.WRITES_OUTPUT_FILE: if not self._output_filename: raise errors.BadConfigOption( f"Output format: {self._output_format:s} requires an output file" ) if os.path.exists(self._output_filename): raise errors.BadConfigOption( f"Output file already exists: {self._output_filename:s}" ) output_module.Open(path=self._output_filename) else: output_module.Open() helpers_manager.ArgumentHelperManager.ParseOptions(options, output_module) # Check if there are parameters that have not been defined and need to # in order for the output module to continue. Prompt user to supply # those that may be missing. missing_parameters = output_module.GetMissingArguments() if missing_parameters and self._unattended_mode: parameters_string = ", ".join(missing_parameters) raise errors.BadConfigOption( ( f"Unable to create output module missing parameters: " f"{parameters_string:s}" ) ) while missing_parameters: self._PromptUserForMissingOutputModuleParameters( options, missing_parameters ) helpers_manager.ArgumentHelperManager.ParseOptions(options, output_module) missing_parameters = output_module.GetMissingArguments() if output_module.SUPPORTS_ADDITIONAL_FIELDS: output_module.SetAdditionalFields(self._output_additional_fields) elif self._output_additional_fields: self._PrintUserWarning( ( f"output module: {self._output_format:s} does not support " f"additional fields" ) ) if output_module.SUPPORTS_CUSTOM_FIELDS: output_module.SetCustomFields(self._output_custom_fields) elif self._output_custom_fields: self._PrintUserWarning( ( f"output module: {self._output_format:s} does not support " f"custom fields" ) ) return output_module def _GetOutputModulesInformation(self): """Retrieves the output modules information. Returns: list[tuple[str, str]]: pairs of output module names and descriptions. """ output_modules_information = [] for name, output_class in output_manager.OutputManager.GetOutputClasses(): output_modules_information.append((name, output_class.DESCRIPTION)) return output_modules_information def _ParseOutputOptions(self, options): """Parses the output options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ additional_fields = self.ParseStringOption(options, "additional_fields") if additional_fields: self._output_additional_fields = additional_fields.split(",") custom_fields = self.ParseStringOption(options, "custom_fields") if custom_fields: for custom_field in custom_fields.split(","): try: name, value = custom_field.split(":") except ValueError: raise errors.BadConfigOption( f"Unsupported custom field: {custom_field:s}" ) self._output_custom_fields.append((name, value)) custom_formatters_path = self.ParseStringOption( options, "custom_formatter_definitions_path" ) if custom_formatters_path and not os.path.isfile(custom_formatters_path): raise errors.BadConfigOption( ( f"Unable to determine path to custom formatter definitions: " f"{custom_formatters_path:s}" ) ) if custom_formatters_path: logger.info( f"Custom formatter definitions path: {custom_formatters_path:s}" ) if custom_formatters_path: message_formatters_file = yaml_formatters_file.YAMLFormattersFile() message_formatters_file.ReadFromFile(custom_formatters_path) self._output_custom_formatters_path = custom_formatters_path self._output_dynamic_time = getattr(options, "dynamic_time", False) time_zone_string = self.ParseStringOption(options, "output_time_zone") if isinstance(time_zone_string, str): if time_zone_string.lower() == "list": self.list_time_zones = True elif time_zone_string: try: pytz.timezone(time_zone_string) except pytz.UnknownTimeZoneError: raise errors.BadConfigOption( f"Unknown time zone: {time_zone_string:s}" ) self._output_time_zone = time_zone_string def _PromptUserForMissingOutputModuleParameters(self, options, missing_parameters): """Prompts the user for missing output module parameters. Args: options (argparse.Namespace): command line arguments. missing_parameters (list[str]): names of missing output module parameters. """ for parameter in missing_parameters: value = None while not value: value = self._PromptUserForInput( f"Please provide a value for {parameter:s}" ) setattr(options, parameter, value)
[docs] def AddOutputOptions(self, argument_group): """Adds the output options to the argument group. Args: argument_group (argparse._ArgumentGroup): argparse argument group. """ argument_group.add_argument( "--additional_fields", "--additional-fields", dest="additional_fields", type=str, action="store", default="", help=( "Defines additional fields to be included in the output besides " "the default fields. Multiple additional field names can be " "defined as a list of comma separated values. Output formats that " "support additional fields are: dynamic, opensearch and xlsx." ), ) argument_group.add_argument( "--custom_fields", "--custom-fields", dest="custom_fields", type=str, action="store", default="", help=( "Defines custom fields to be included in the output besides " 'the default fields. A custom field is defined as "name:value". ' "Multiple custom field names can be defined as list of comma " "separated values. Note that regular fields will are favoured " "above custom fields with same name. Output formats that support " "this are: dynamic, opensearch and xlsx." ), ) argument_group.add_argument( "--custom_formatter_definitions", "--custom-formatter-definitions", dest="custom_formatter_definitions_path", type=str, metavar="PATH", action="store", help=( "Path to a file containing custom event formatter definitions, " "which is a .yaml file. Custom event formatter definitions can " "be used to customize event messages and override the built-in " "event formatter definitions." ), ) argument_group.add_argument( "--dynamic_time", "--dynamic-time", dest="dynamic_time", action="store_true", default=False, help=( "Indicate that the output should use dynamic time. Output formats " "that support dynamic time are: dynamic" ), ) # Note the default here is None so we can determine if the time zone # option was set. argument_group.add_argument( "--output_time_zone", "--output-time-zone", dest="output_time_zone", action="store", metavar="TIME_ZONE", type=str, default=None, help=( "time zone of date and time values written to the output, if " 'supported by the output format. Use "list" to see a list of ' "available time zones. Output formats that support an output " "time zone are: dynamic and l2t_csv." ), )
[docs] def ListOutputModules(self): """Lists the output modules.""" table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Name", "Description"], title="Output Modules", ) output_classes = sorted(output_manager.OutputManager.GetOutputClasses()) for name, output_class in output_classes: table_view.AddRow([name, output_class.DESCRIPTION]) table_view.Write(self._output_writer) disabled_classes = sorted( output_manager.OutputManager.GetDisabledOutputClasses() ) if not disabled_classes: return table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Name", "Description"], title="Disabled Output Modules", ) for name, output_class in disabled_classes: table_view.AddRow([name, output_class.DESCRIPTION]) table_view.Write(self._output_writer)
[docs] class ProfilingOptions: """Profiling options mix-in.""" # pylint: disable=no-member
[docs] def __init__(self): """Initializes profiling options.""" super().__init__() self._profilers = set() self._profiling_directory = None self._profiling_sample_rate = ( profiling.ProfilingArgumentsHelper.DEFAULT_PROFILING_SAMPLE_RATE )
[docs] def ListProfilers(self): """Lists information about the available profilers.""" table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Name", "Description"], title="Profilers", ) profilers_information = sorted( profiling.ProfilingArgumentsHelper.PROFILERS_INFORMATION.items() ) for name, description in profilers_information: table_view.AddRow([name, description]) table_view.Write(self._output_writer)
[docs] class StorageFileOptions: """Storage file options mix-in."""
[docs] def AddStorageOptions(self, argument_parser): """Adds the storage options to the argument group. Args: argument_parser (argparse.ArgumentParser): argparse argument parser. """ argument_parser.add_argument( "storage_file", metavar="PATH", nargs="?", type=str, default=None, help="Path to a storage file.", )