Source code for plaso.cli.tools

"""The command line interface (CLI) tools classes."""

import abc
import codecs
import datetime
import locale
import sys
import time
import textwrap

import pytz

try:
    import resource
except ImportError:
    resource = None

import plaso

from plaso.cli import logger
from plaso.cli import views
from plaso.lib import definitions
from plaso.lib import errors


[docs] class CLITool: """Command line interface tool. Attributes: preferred_encoding (str): preferred encoding of single-byte or multi-byte character strings, sometimes referred to as extended ASCII. show_troubleshooting (bool): True if troubleshooting information should be shown. """ NAME = "" # The maximum number of characters of a line written to the output writer. _LINE_LENGTH = 80 # The fall back preferred encoding. _PREFERRED_ENCODING = "utf-8"
[docs] def __init__(self, input_reader=None, output_writer=None): """Initializes a command line interface tool. Args: input_reader (Optional[CLIInputReader]): input reader, where None indicates that the stdin input reader should be used. output_writer (Optional[CLIOutputWriter]): output writer, where None indicates that the stdout output writer should be used. """ super().__init__() preferred_encoding = locale.getpreferredencoding() if not preferred_encoding: preferred_encoding = self._PREFERRED_ENCODING elif isinstance(preferred_encoding, bytes): preferred_encoding = preferred_encoding.decode("utf-8") if not input_reader: input_reader = StdinInputReader(encoding=preferred_encoding) if not output_writer: output_writer = StdoutOutputWriter(encoding=preferred_encoding) self._data_location = None self._debug_mode = False self._encode_errors = "strict" self._has_user_warning = False self._input_reader = input_reader self._log_file = None self._output_writer = output_writer self._quiet_mode = False self._unattended_mode = False self._views_format_type = views.ViewsFactory.FORMAT_TYPE_CLI self._vfs_back_end = "auto" self.preferred_encoding = preferred_encoding self.show_troubleshooting = False
@property def data_location(self): """str: path of the data files.""" return self._data_location def _CanEnforceProcessMemoryLimit(self): """Determines if a process memory limit can be enforced. Returns: bool: True if a process memory limit can be enforced, False otherwise. """ return bool(resource) def _EncodeString(self, string): """Encodes a string in the preferred encoding. Returns: bytes: encoded string. """ try: # Note that encode() will first convert string into a Unicode string # if necessary. encoded_string = string.encode( self.preferred_encoding, errors=self._encode_errors ) except UnicodeEncodeError: if self._encode_errors == "strict": logger.error( "Unable to properly write output due to encoding error. " "Switching to error tolerant encoding which can result in " 'non Basic Latin (C0) characters to be replaced with "?" or ' '"\\ufffd".' ) self._encode_errors = "replace" encoded_string = string.encode( self.preferred_encoding, errors=self._encode_errors ) return encoded_string def _EnforceProcessMemoryLimit(self, memory_limit): """Enforces a process memory limit. Args: memory_limit (int): maximum number of bytes the process is allowed to allocate, where 0 represents no limit and None a default of 4 GiB. """ # Resource is not supported on Windows. if resource: if memory_limit is None: memory_limit = 4 * 1024 * 1024 * 1024 elif memory_limit == 0: memory_limit = resource.RLIM_INFINITY try: resource.setrlimit(resource.RLIMIT_DATA, (memory_limit, memory_limit)) except ValueError: current_limit = resource.getrlimit(resource.RLIMIT_DATA)[0] logger.warning( ( f"Unable to set memory limit to {memory_limit!s} current limit " f"is {current_limit!s}." ) ) def _GetPathSpecificationString(self, path_spec): """Retrieves a printable string representation of the path specification. Args: path_spec (dfvfs.PathSpec): path specification. Returns: str: printable string representation of the path specification. """ if not path_spec: return "N/A" return "\n".join( [ line.translate(definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE) for line in path_spec.comparable.split("\n") ] ) def _ParseInformationalOptions(self, options): """Parses the informational options. Args: options (argparse.Namespace): command line arguments. """ self._debug_mode = getattr(options, "debug", False) self._quiet_mode = getattr(options, "quiet", False) self._unattended_mode = getattr(options, "unattended", False) if self._debug_mode and self._quiet_mode: logger.warning( "Cannot use debug and quiet mode at the same time, defaulting to " "debug output." ) def _ParseLogFileOptions(self, options): """Parses the log file options. Args: options (argparse.Namespace): command line arguments. """ self._log_file = self.ParseStringOption(options, "log_file") if not self._log_file: local_date_time = datetime.datetime.now() self._log_file = ( f"{self.NAME:s}-{local_date_time.year:04d}{local_date_time.month:02d}" f"{local_date_time.day:02d}T{local_date_time.hour:02d}" f"{local_date_time.minute:02d}{local_date_time.second:02d}.log.gz" ) def _PrintUserWarning(self, warning_text): """Prints a warning to the user. Args: warning_text (str): text used to warn the user. """ warning_text = textwrap.wrap(f"WARNING: {warning_text:s}", 80) print("\n".join(warning_text), file=sys.stderr) print("", file=sys.stderr) self._has_user_warning = True def _PromptUserForInput(self, input_text): """Prompts user for an input. Args: input_text (str): text used for prompting the user for input. Returns: str: input read from the user. """ self._output_writer.Write(f"{input_text:s}: ") return self._input_reader.Read() def _WaitUserWarning(self): """Waits 15 seconds after printing warnings to the user.""" if self._has_user_warning: print("Waiting for 15 second to give you time to cancel.") print("") time.sleep(15)
[docs] def AddBasicOptions(self, argument_group): """Adds the basic options to the argument group. Args: argument_group (argparse._ArgumentGroup): argparse argument group. """ version_string = self.GetVersionInformation() # We want a custom help message and not the default argparse one. argument_group.add_argument( "-h", "--help", action="help", help="Show this help message and exit." ) argument_group.add_argument( "--troubles", dest="show_troubleshooting", action="store_true", default=False, help="Show troubleshooting information.", ) argument_group.add_argument( "-V", "--version", dest="version", action="version", version=version_string, help="Show the version information.", )
[docs] def AddInformationalOptions(self, argument_group): """Adds the informational options to the argument group. Args: argument_group (argparse._ArgumentGroup): argparse argument group. """ argument_group.add_argument( "-d", "--debug", dest="debug", action="store_true", default=False, help="Enable debug output.", ) argument_group.add_argument( "-q", "--quiet", dest="quiet", action="store_true", default=False, help="Disable informational output.", ) argument_group.add_argument( "-u", "--unattended", dest="unattended", action="store_true", default=False, help=( "Enable unattended mode and do not ask the user for additional " "input when needed, but terminate with an error instead." ), )
[docs] def AddLogFileOptions(self, argument_group): """Adds the log file option to the argument group. Args: argument_group (argparse._ArgumentGroup): argparse argument group. """ argument_group.add_argument( "--logfile", "--log_file", "--log-file", action="store", metavar="FILENAME", dest="log_file", type=str, default="", help=( f"Path of the file in which to store log messages, by default " f'this file will be named: "{self.NAME:s}-YYYYMMDDThhmmss.log.gz". ' f"Note that the file will be gzip compressed if the extension is " f'".gz".' ), )
[docs] def CheckOutDated(self): """Checks if the version of plaso is outdated and warns the user.""" version_date_time = datetime.datetime( int(plaso.__version__[0:4], 10), int(plaso.__version__[4:6], 10), int(plaso.__version__[6:8], 10), ) date_time_delta = datetime.datetime.utcnow() - version_date_time if date_time_delta.days > 180: logger.warning( ( "This version of plaso is more than 6 months old. We strongly " "recommend to update it." ) ) self._PrintUserWarning( ( "the version of plaso you are using is more than 6 months old. We " "strongly recommend to update it." ) )
[docs] def GetCommandLineArguments(self): """Retrieves the command line arguments. Returns: str: command line arguments. """ command_line_arguments = sys.argv if not command_line_arguments: return "" if isinstance(command_line_arguments[0], bytes): encoding = sys.stdin.encoding # Note that sys.stdin.encoding can be None. if not encoding: encoding = self.preferred_encoding try: command_line_arguments = [ argument.decode(encoding) for argument in command_line_arguments ] except UnicodeDecodeError: logger.error( "Unable to properly read command line input due to encoding " 'error. Replacing non Basic Latin (C0) characters with "?" or ' '"\\ufffd".' ) command_line_arguments = [ argument.decode(encoding, errors="replace") for argument in command_line_arguments ] return " ".join(command_line_arguments)
[docs] def GetVersionInformation(self): """Retrieves the version information. Returns: str: version information. """ return f"plaso - {self.NAME:s} version {plaso.__version__:s}"
[docs] def ListTimeZones(self): """Lists the time zones.""" max_length = 0 for time_zone_name in pytz.all_timezones: max_length = max(max_length, len(time_zone_name)) utc_date_time = datetime.datetime.utcnow() table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Time zone", "UTC Offset"], title="Zones", ) for time_zone_name in pytz.all_timezones: try: local_time_zone = pytz.timezone(time_zone_name) except AssertionError as exception: logger.error( ( f"Unable to determine information about time zone: " f"{time_zone_name:s} with error: {exception!s}" ) ) continue local_date_string = str(local_time_zone.localize(utc_date_time)) if "+" in local_date_string: _, _, utc_offset = local_date_string.rpartition("+") utc_offset_string = f"+{utc_offset:s}" else: _, _, utc_offset = local_date_string.rpartition("-") utc_offset_string = f"-{utc_offset:s}" table_view.AddRow([time_zone_name, utc_offset_string]) table_view.Write(self._output_writer)
[docs] def ParseNumericOption(self, options, name, base=10, default_value=None): """Parses a numeric option. If the option is not set the default value is returned. Args: options (argparse.Namespace): command line arguments. name (str): name of the numeric option. base (Optional[int]): base of the numeric value. default_value (Optional[object]): default value. Returns: int: numeric value. Raises: BadConfigOption: if the options are invalid. """ numeric_value = getattr(options, name, None) if not numeric_value: return default_value try: return int(numeric_value, base) except (TypeError, ValueError): name = name.replace("_", " ") raise errors.BadConfigOption( f"Unsupported numeric value {name:s}: {numeric_value!s}." )
[docs] def ParseStringOption(self, options, argument_name, default_value=None): """Parses a string command line argument. Args: options (argparse.Namespace): command line arguments. argument_name (str): name of the command line argument. default_value (Optional[object]): default value of the command line argument. Returns: object: command line argument value. If the command line argument is not set the default value will be returned. Raises: BadConfigOption: if the command line argument value cannot be converted to a Unicode string. """ argument_value = getattr(options, argument_name, None) if not argument_value: return default_value if isinstance(argument_value, bytes): encoding = sys.stdin.encoding # Note that sys.stdin.encoding can be None. if not encoding: encoding = self.preferred_encoding try: argument_value = codecs.decode(argument_value, encoding) except UnicodeDecodeError as exception: raise errors.BadConfigOption( ( f"Unable to convert option: {argument_name:s} to Unicode with " f"error: {exception!s}." ) ) elif not isinstance(argument_value, str): raise errors.BadConfigOption( f"Unsupported option: {argument_name:s} string type required." ) return argument_value
[docs] def PrintSeparatorLine(self): """Prints a separator line.""" self._output_writer.Write("-" * self._LINE_LENGTH) self._output_writer.Write("\n")
[docs] class CLIInputReader: """Command line interface input reader interface."""
[docs] def __init__(self, encoding="utf-8"): """Initializes an input reader. Args: encoding (Optional[str]): input encoding. """ super().__init__() self._encoding = encoding
# pylint: disable=redundant-returns-doc
[docs] @abc.abstractmethod def Read(self): """Reads a string from the input. Returns: str: input. """
[docs] class CLIOutputWriter: """Command line interface output writer interface."""
[docs] def __init__(self, encoding="utf-8"): """Initializes an output writer. Args: encoding (Optional[str]): output encoding. """ super().__init__() self._encoding = encoding
[docs] @abc.abstractmethod def Write(self, string): """Writes a string to the output. Args: string (str): output. """
[docs] class FileObjectInputReader(CLIInputReader): """File object command line interface input reader. This input reader relies on the file-like object having a readline method. """
[docs] def __init__(self, file_object, encoding="utf-8"): """Initializes a file object command line interface input reader. Args: file_object (file): file-like object to read from. encoding (Optional[str]): input encoding. """ super().__init__(encoding=encoding) self._errors = "strict" self._file_object = file_object
[docs] def Read(self): """Reads a string from the input. Returns: str: input. """ encoded_string = self._file_object.readline() if isinstance(encoded_string, str): return encoded_string try: string = codecs.decode(encoded_string, self._encoding, self._errors) except UnicodeDecodeError: if self._errors == "strict": logger.error( "Unable to properly read input due to encoding error. " "Switching to error tolerant encoding which can result in " 'non Basic Latin (C0) characters to be replaced with "?" or ' '"\\ufffd".' ) self._errors = "replace" string = codecs.decode(encoded_string, self._encoding, self._errors) return string
[docs] class StdinInputReader(FileObjectInputReader): """Stdin command line interface input reader."""
[docs] def __init__(self, encoding="utf-8"): """Initializes an stdin input reader. Args: encoding (Optional[str]): input encoding. """ super().__init__(sys.stdin, encoding=encoding)
[docs] def Read(self): """Reads a string from the input. Returns: str: input. """ # Flush stdout to guarantee that all output has been provided before waiting # for input. sys.stdout.flush() return super().Read()
[docs] class FileObjectOutputWriter(CLIOutputWriter): """File object command line interface output writer. This output writer relies on the file-like object having a write method. """
[docs] def __init__(self, file_object, encoding="utf-8"): """Initializes a file object command line interface output writer. Args: file_object (file): file-like object to read from. encoding (Optional[str]): output encoding. """ super().__init__(encoding=encoding) self._errors = "strict" self._file_object = file_object
[docs] def Write(self, string): """Writes a string to the output. Args: string (str): output. """ try: # Note that encode() will first convert string into a Unicode string # if necessary. encoded_string = codecs.encode(string, self._encoding, self._errors) except UnicodeEncodeError: if self._errors == "strict": logger.error( "Unable to properly write output due to encoding error. " "Switching to error tolerant encoding which can result in " 'non Basic Latin (C0) characters to be replaced with "?" or ' '"\\ufffd".' ) self._errors = "replace" encoded_string = codecs.encode(string, self._encoding, self._errors) self._file_object.write(encoded_string)
[docs] class StdoutOutputWriter(FileObjectOutputWriter): """Stdout command line interface output writer."""
[docs] def __init__(self, encoding="utf-8"): """Initializes a stdout output writer. Args: encoding (Optional[str]): output encoding. """ super().__init__(sys.stdout, encoding=encoding)
[docs] def Write(self, string): """Writes a string to the output. Args: string (str): output. """ sys.stdout.write(string)