"""The CLI tool options mix-ins."""
import os
import pytz
from plaso.analysis import manager as analysis_manager
from plaso.analyzers.hashers import manager as hashers_manager
from plaso.cli import logger
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.cli.helpers import profiling
from plaso.formatters import yaml_formatters_file
from plaso.lib import errors
from plaso.output import manager as output_manager
# TODO: pass argument_parser instead of argument_group and add groups
# in mix-ins.
[docs]
class AnalysisPluginOptions:
"""Analysis plugin options mix-in."""
# pylint: disable=no-member
def _CreateAnalysisPlugins(self, options):
"""Creates the analysis plugins.
Args:
options (argparse.Namespace): command line arguments.
Returns:
dict[str, AnalysisPlugin]: analysis plugins and their names.
"""
if not self._analysis_plugins:
return {}
analysis_plugins = analysis_manager.AnalysisPluginManager.GetPluginObjects(
self._analysis_plugins
)
for analysis_plugin in analysis_plugins.values():
helpers_manager.ArgumentHelperManager.ParseOptions(options, analysis_plugin)
return analysis_plugins
[docs]
def ListAnalysisPlugins(self):
"""Lists the analysis modules."""
analysis_plugin_info = (
analysis_manager.AnalysisPluginManager.GetAllPluginInformation()
)
column_width = 10
for name, _, _ in analysis_plugin_info:
column_width = max(column_width, len(name))
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Description"],
title="Analysis Plugins",
)
# TODO: add support for a 3 column table.
for name, description, type_string in analysis_plugin_info:
table_view.AddRow([name, f"{description:s} [{type_string:s}]"])
table_view.Write(self._output_writer)
[docs]
class HashersOptions:
"""Hashers options mix-in."""
# pylint: disable=no-member
[docs]
def __init__(self):
"""Initializes hasher options."""
super().__init__()
self._hasher_file_size_limit = None
self._hasher_names_string = None
[docs]
def ListHashers(self):
"""Lists information about the available hashers."""
hashers_information = hashers_manager.HashersManager.GetHashersInformation()
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Description"],
title="Hashers",
)
for name, description in sorted(hashers_information):
table_view.AddRow([name, description])
table_view.Write(self._output_writer)
[docs]
class OutputModuleOptions:
"""Output module options mix-in.
Attributes:
list_time_zones (bool): True if the time zones should be listed.
"""
# pylint: disable=no-member
# Output format that have second-only date and time value and/or a limited
# predefined set of output fields.
_DEPRECATED_OUTPUT_FORMATS = frozenset(["l2tcsv", "l2ttln", "tln"])
[docs]
def __init__(self):
"""Initializes output module options."""
super().__init__()
self._output_additional_fields = []
self._output_custom_fields = []
self._output_custom_formatters_path = None
self._output_dynamic_time = None
self._output_filename = None
self._output_format = None
self._output_module = None
self._output_time_zone = None
self.list_time_zones = False
def _CreateOutputModule(self, options):
"""Creates an output module.
Args:
options (argparse.Namespace): command line arguments.
Returns:
OutputModule: output module.
Raises:
BadConfigOption: if parameters are missing.
RuntimeError: if the output module cannot be created.
"""
if self._output_format in self._DEPRECATED_OUTPUT_FORMATS:
self._PrintUserWarning(
(
f"the output format: {self._output_format:s} has significant "
f"limitations such as second-only date and time values and/or "
f"a limited predefined set of output fields. It is strongly "
f"recommend to use an alternative output format like: dynamic."
)
)
try:
output_module = output_manager.OutputManager.NewOutputModule(
self._output_format
)
except (KeyError, ValueError) as exception:
raise RuntimeError(
f"Unable to create output module with error: {exception!s}"
)
if output_module.WRITES_OUTPUT_FILE:
if not self._output_filename:
raise errors.BadConfigOption(
f"Output format: {self._output_format:s} requires an output file"
)
if os.path.exists(self._output_filename):
raise errors.BadConfigOption(
f"Output file already exists: {self._output_filename:s}"
)
output_module.Open(path=self._output_filename)
else:
output_module.Open()
helpers_manager.ArgumentHelperManager.ParseOptions(options, output_module)
# Check if there are parameters that have not been defined and need to
# in order for the output module to continue. Prompt user to supply
# those that may be missing.
missing_parameters = output_module.GetMissingArguments()
if missing_parameters and self._unattended_mode:
parameters_string = ", ".join(missing_parameters)
raise errors.BadConfigOption(
(
f"Unable to create output module missing parameters: "
f"{parameters_string:s}"
)
)
while missing_parameters:
self._PromptUserForMissingOutputModuleParameters(
options, missing_parameters
)
helpers_manager.ArgumentHelperManager.ParseOptions(options, output_module)
missing_parameters = output_module.GetMissingArguments()
if output_module.SUPPORTS_ADDITIONAL_FIELDS:
output_module.SetAdditionalFields(self._output_additional_fields)
elif self._output_additional_fields:
self._PrintUserWarning(
(
f"output module: {self._output_format:s} does not support "
f"additional fields"
)
)
if output_module.SUPPORTS_CUSTOM_FIELDS:
output_module.SetCustomFields(self._output_custom_fields)
elif self._output_custom_fields:
self._PrintUserWarning(
(
f"output module: {self._output_format:s} does not support "
f"custom fields"
)
)
return output_module
def _GetOutputModulesInformation(self):
"""Retrieves the output modules information.
Returns:
list[tuple[str, str]]: pairs of output module names and descriptions.
"""
output_modules_information = []
for name, output_class in output_manager.OutputManager.GetOutputClasses():
output_modules_information.append((name, output_class.DESCRIPTION))
return output_modules_information
def _ParseOutputOptions(self, options):
"""Parses the output options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
additional_fields = self.ParseStringOption(options, "additional_fields")
if additional_fields:
self._output_additional_fields = additional_fields.split(",")
custom_fields = self.ParseStringOption(options, "custom_fields")
if custom_fields:
for custom_field in custom_fields.split(","):
try:
name, value = custom_field.split(":")
except ValueError:
raise errors.BadConfigOption(
f"Unsupported custom field: {custom_field:s}"
)
self._output_custom_fields.append((name, value))
custom_formatters_path = self.ParseStringOption(
options, "custom_formatter_definitions_path"
)
if custom_formatters_path and not os.path.isfile(custom_formatters_path):
raise errors.BadConfigOption(
(
f"Unable to determine path to custom formatter definitions: "
f"{custom_formatters_path:s}"
)
)
if custom_formatters_path:
logger.info(
f"Custom formatter definitions path: {custom_formatters_path:s}"
)
if custom_formatters_path:
message_formatters_file = yaml_formatters_file.YAMLFormattersFile()
message_formatters_file.ReadFromFile(custom_formatters_path)
self._output_custom_formatters_path = custom_formatters_path
self._output_dynamic_time = getattr(options, "dynamic_time", False)
time_zone_string = self.ParseStringOption(options, "output_time_zone")
if isinstance(time_zone_string, str):
if time_zone_string.lower() == "list":
self.list_time_zones = True
elif time_zone_string:
try:
pytz.timezone(time_zone_string)
except pytz.UnknownTimeZoneError:
raise errors.BadConfigOption(
f"Unknown time zone: {time_zone_string:s}"
)
self._output_time_zone = time_zone_string
def _PromptUserForMissingOutputModuleParameters(self, options, missing_parameters):
"""Prompts the user for missing output module parameters.
Args:
options (argparse.Namespace): command line arguments.
missing_parameters (list[str]): names of missing output module parameters.
"""
for parameter in missing_parameters:
value = None
while not value:
value = self._PromptUserForInput(
f"Please provide a value for {parameter:s}"
)
setattr(options, parameter, value)
[docs]
def AddOutputOptions(self, argument_group):
"""Adds the output options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
argument_group.add_argument(
"--additional_fields",
"--additional-fields",
dest="additional_fields",
type=str,
action="store",
default="",
help=(
"Defines additional fields to be included in the output besides "
"the default fields. Multiple additional field names can be "
"defined as a list of comma separated values. Output formats that "
"support additional fields are: dynamic, opensearch and xlsx."
),
)
argument_group.add_argument(
"--custom_fields",
"--custom-fields",
dest="custom_fields",
type=str,
action="store",
default="",
help=(
"Defines custom fields to be included in the output besides "
'the default fields. A custom field is defined as "name:value". '
"Multiple custom field names can be defined as list of comma "
"separated values. Note that regular fields will are favoured "
"above custom fields with same name. Output formats that support "
"this are: dynamic, opensearch and xlsx."
),
)
argument_group.add_argument(
"--custom_formatter_definitions",
"--custom-formatter-definitions",
dest="custom_formatter_definitions_path",
type=str,
metavar="PATH",
action="store",
help=(
"Path to a file containing custom event formatter definitions, "
"which is a .yaml file. Custom event formatter definitions can "
"be used to customize event messages and override the built-in "
"event formatter definitions."
),
)
argument_group.add_argument(
"--dynamic_time",
"--dynamic-time",
dest="dynamic_time",
action="store_true",
default=False,
help=(
"Indicate that the output should use dynamic time. Output formats "
"that support dynamic time are: dynamic"
),
)
# Note the default here is None so we can determine if the time zone
# option was set.
argument_group.add_argument(
"--output_time_zone",
"--output-time-zone",
dest="output_time_zone",
action="store",
metavar="TIME_ZONE",
type=str,
default=None,
help=(
"time zone of date and time values written to the output, if "
'supported by the output format. Use "list" to see a list of '
"available time zones. Output formats that support an output "
"time zone are: dynamic and l2t_csv."
),
)
[docs]
def ListOutputModules(self):
"""Lists the output modules."""
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Description"],
title="Output Modules",
)
output_classes = sorted(output_manager.OutputManager.GetOutputClasses())
for name, output_class in output_classes:
table_view.AddRow([name, output_class.DESCRIPTION])
table_view.Write(self._output_writer)
disabled_classes = sorted(
output_manager.OutputManager.GetDisabledOutputClasses()
)
if not disabled_classes:
return
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Description"],
title="Disabled Output Modules",
)
for name, output_class in disabled_classes:
table_view.AddRow([name, output_class.DESCRIPTION])
table_view.Write(self._output_writer)
[docs]
class ProfilingOptions:
"""Profiling options mix-in."""
# pylint: disable=no-member
[docs]
def __init__(self):
"""Initializes profiling options."""
super().__init__()
self._profilers = set()
self._profiling_directory = None
self._profiling_sample_rate = (
profiling.ProfilingArgumentsHelper.DEFAULT_PROFILING_SAMPLE_RATE
)
[docs]
def ListProfilers(self):
"""Lists information about the available profilers."""
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Description"],
title="Profilers",
)
profilers_information = sorted(
profiling.ProfilingArgumentsHelper.PROFILERS_INFORMATION.items()
)
for name, description in profilers_information:
table_view.AddRow([name, description])
table_view.Write(self._output_writer)
[docs]
class StorageFileOptions:
"""Storage file options mix-in."""
[docs]
def AddStorageOptions(self, argument_parser):
"""Adds the storage options to the argument group.
Args:
argument_parser (argparse.ArgumentParser): argparse argument parser.
"""
argument_parser.add_argument(
"storage_file",
metavar="PATH",
nargs="?",
type=str,
default=None,
help="Path to a storage file.",
)