Source code for plaso.cli.tools
# -*- coding: utf-8 -*-
"""The command line interface (CLI) tools classes."""
import abc
import codecs
import datetime
import locale
import sys
import time
import textwrap
import pytz
try:
import resource
except ImportError:
resource = None
import plaso
from plaso.cli import logger
from plaso.cli import views
from plaso.lib import definitions
from plaso.lib import errors
[docs]
class CLITool(object):
"""Command line interface tool.
Attributes:
preferred_encoding (str): preferred encoding of single-byte or multi-byte
character strings, sometimes referred to as extended ASCII.
show_troubleshooting (bool): True if troubleshooting information should
be shown.
"""
NAME = ''
# The maximum number of characters of a line written to the output writer.
_LINE_LENGTH = 80
# The fall back preferred encoding.
_PREFERRED_ENCODING = 'utf-8'
[docs]
def __init__(self, input_reader=None, output_writer=None):
"""Initializes a command line interface tool.
Args:
input_reader (Optional[CLIInputReader]): input reader, where None
indicates that the stdin input reader should be used.
output_writer (Optional[CLIOutputWriter]): output writer, where None
indicates that the stdout output writer should be used.
"""
super(CLITool, self).__init__()
preferred_encoding = locale.getpreferredencoding()
if not preferred_encoding:
preferred_encoding = self._PREFERRED_ENCODING
elif isinstance(preferred_encoding, bytes):
preferred_encoding = preferred_encoding.decode('utf-8')
if not input_reader:
input_reader = StdinInputReader(encoding=preferred_encoding)
if not output_writer:
output_writer = StdoutOutputWriter(encoding=preferred_encoding)
self._data_location = None
self._debug_mode = False
self._encode_errors = 'strict'
self._has_user_warning = False
self._input_reader = input_reader
self._log_file = None
self._output_writer = output_writer
self._quiet_mode = False
self._unattended_mode = False
self._views_format_type = views.ViewsFactory.FORMAT_TYPE_CLI
self._vfs_back_end = 'auto'
self.preferred_encoding = preferred_encoding
self.show_troubleshooting = False
@property
def data_location(self):
"""str: path of the data files."""
return self._data_location
def _CanEnforceProcessMemoryLimit(self):
"""Determines if a process memory limit can be enforced.
Returns:
bool: True if a process memory limit can be enforced, False otherwise.
"""
return bool(resource)
def _EncodeString(self, string):
"""Encodes a string in the preferred encoding.
Returns:
bytes: encoded string.
"""
try:
# Note that encode() will first convert string into a Unicode string
# if necessary.
encoded_string = string.encode(
self.preferred_encoding, errors=self._encode_errors)
except UnicodeEncodeError:
if self._encode_errors == 'strict':
logger.error(
'Unable to properly write output due to encoding error. '
'Switching to error tolerant encoding which can result in '
'non Basic Latin (C0) characters to be replaced with "?" or '
'"\\ufffd".')
self._encode_errors = 'replace'
encoded_string = string.encode(
self.preferred_encoding, errors=self._encode_errors)
return encoded_string
def _EnforceProcessMemoryLimit(self, memory_limit):
"""Enforces a process memory limit.
Args:
memory_limit (int): maximum number of bytes the process is allowed
to allocate, where 0 represents no limit and None a default of
4 GiB.
"""
# Resource is not supported on Windows.
if resource:
if memory_limit is None:
memory_limit = 4 * 1024 * 1024 * 1024
elif memory_limit == 0:
memory_limit = resource.RLIM_INFINITY
try:
resource.setrlimit(resource.RLIMIT_DATA, (memory_limit, memory_limit))
except ValueError:
current_limit = resource.getrlimit(resource.RLIMIT_DATA)[0]
logger.warning((
f'Unable to set memory limit to {memory_limit!s} current limit '
f'is {current_limit!s}.'))
def _GetPathSpecificationString(self, path_spec):
"""Retrieves a printable string representation of the path specification.
Args:
path_spec (dfvfs.PathSpec): path specification.
Returns:
str: printable string representation of the path specification.
"""
if not path_spec:
return 'N/A'
return '\n'.join([
line.translate(definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE)
for line in path_spec.comparable.split('\n')])
def _ParseInformationalOptions(self, options):
"""Parses the informational options.
Args:
options (argparse.Namespace): command line arguments.
"""
self._debug_mode = getattr(options, 'debug', False)
self._quiet_mode = getattr(options, 'quiet', False)
self._unattended_mode = getattr(options, 'unattended', False)
if self._debug_mode and self._quiet_mode:
logger.warning(
'Cannot use debug and quiet mode at the same time, defaulting to '
'debug output.')
def _ParseLogFileOptions(self, options):
"""Parses the log file options.
Args:
options (argparse.Namespace): command line arguments.
"""
self._log_file = self.ParseStringOption(options, 'log_file')
if not self._log_file:
local_date_time = datetime.datetime.now()
self._log_file = (
f'{self.NAME:s}-{local_date_time.year:04d}{local_date_time.month:02d}'
f'{local_date_time.day:02d}T{local_date_time.hour:02d}'
f'{local_date_time.minute:02d}{local_date_time.second:02d}.log.gz')
def _PrintUserWarning(self, warning_text):
"""Prints a warning to the user.
Args:
warning_text (str): text used to warn the user.
"""
warning_text = textwrap.wrap(f'WARNING: {warning_text:s}', 80)
print('\n'.join(warning_text), file=sys.stderr)
print('', file=sys.stderr)
self._has_user_warning = True
def _PromptUserForInput(self, input_text):
"""Prompts user for an input.
Args:
input_text (str): text used for prompting the user for input.
Returns:
str: input read from the user.
"""
self._output_writer.Write(f'{input_text:s}: ')
return self._input_reader.Read()
def _WaitUserWarning(self):
"""Waits 15 seconds after printing warnings to the user."""
if self._has_user_warning:
print('Waiting for 15 second to give you time to cancel.')
print('')
time.sleep(15)
[docs]
def AddBasicOptions(self, argument_group):
"""Adds the basic options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
version_string = self.GetVersionInformation()
# We want a custom help message and not the default argparse one.
argument_group.add_argument(
'-h', '--help', action='help',
help='Show this help message and exit.')
argument_group.add_argument(
'--troubles', dest='show_troubleshooting', action='store_true',
default=False, help='Show troubleshooting information.')
argument_group.add_argument(
'-V', '--version', dest='version', action='version',
version=version_string, help='Show the version information.')
[docs]
def AddInformationalOptions(self, argument_group):
"""Adds the informational options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
argument_group.add_argument(
'-d', '--debug', dest='debug', action='store_true', default=False,
help='Enable debug output.')
argument_group.add_argument(
'-q', '--quiet', dest='quiet', action='store_true', default=False,
help='Disable informational output.')
argument_group.add_argument(
'-u', '--unattended', dest='unattended', action='store_true',
default=False, help=(
'Enable unattended mode and do not ask the user for additional '
'input when needed, but terminate with an error instead.'))
[docs]
def AddLogFileOptions(self, argument_group):
"""Adds the log file option to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
argument_group.add_argument(
'--logfile', '--log_file', '--log-file', action='store',
metavar='FILENAME', dest='log_file', type=str, default='', help=(
f'Path of the file in which to store log messages, by default '
f'this file will be named: "{self.NAME:s}-YYYYMMDDThhmmss.log.gz". '
f'Note that the file will be gzip compressed if the extension is '
f'".gz".'))
[docs]
def CheckOutDated(self):
"""Checks if the version of plaso is outdated and warns the user."""
version_date_time = datetime.datetime(
int(plaso.__version__[0:4], 10),
int(plaso.__version__[4:6], 10),
int(plaso.__version__[6:8], 10))
date_time_delta = datetime.datetime.utcnow() - version_date_time
if date_time_delta.days > 180:
logger.warning((
'This version of plaso is more than 6 months old. We strongly '
'recommend to update it.'))
self._PrintUserWarning((
'the version of plaso you are using is more than 6 months old. We '
'strongly recommend to update it.'))
[docs]
def GetCommandLineArguments(self):
"""Retrieves the command line arguments.
Returns:
str: command line arguments.
"""
command_line_arguments = sys.argv
if not command_line_arguments:
return ''
if isinstance(command_line_arguments[0], bytes):
encoding = sys.stdin.encoding
# Note that sys.stdin.encoding can be None.
if not encoding:
encoding = self.preferred_encoding
try:
command_line_arguments = [
argument.decode(encoding) for argument in command_line_arguments]
except UnicodeDecodeError:
logger.error(
'Unable to properly read command line input due to encoding '
'error. Replacing non Basic Latin (C0) characters with "?" or '
'"\\ufffd".')
command_line_arguments = [
argument.decode(encoding, errors='replace')
for argument in command_line_arguments]
return ' '.join(command_line_arguments)
[docs]
def GetVersionInformation(self):
"""Retrieves the version information.
Returns:
str: version information.
"""
return f'plaso - {self.NAME:s} version {plaso.__version__:s}'
[docs]
def ListTimeZones(self):
"""Lists the time zones."""
max_length = 0
for time_zone_name in pytz.all_timezones:
max_length = max(max_length, len(time_zone_name))
utc_date_time = datetime.datetime.utcnow()
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, column_names=['Time zone', 'UTC Offset'],
title='Zones')
for time_zone_name in pytz.all_timezones:
try:
local_time_zone = pytz.timezone(time_zone_name)
except AssertionError as exception:
logger.error((
f'Unable to determine information about time zone: '
f'{time_zone_name:s} with error: {exception!s}'))
continue
local_date_string = str(local_time_zone.localize(utc_date_time))
if '+' in local_date_string:
_, _, utc_offset = local_date_string.rpartition('+')
utc_offset_string = f'+{utc_offset:s}'
else:
_, _, utc_offset = local_date_string.rpartition('-')
utc_offset_string = '-{utc_offset:s}'
table_view.AddRow([time_zone_name, utc_offset_string])
table_view.Write(self._output_writer)
[docs]
def ParseNumericOption(self, options, name, base=10, default_value=None):
"""Parses a numeric option.
If the option is not set the default value is returned.
Args:
options (argparse.Namespace): command line arguments.
name (str): name of the numeric option.
base (Optional[int]): base of the numeric value.
default_value (Optional[object]): default value.
Returns:
int: numeric value.
Raises:
BadConfigOption: if the options are invalid.
"""
numeric_value = getattr(options, name, None)
if not numeric_value:
return default_value
try:
return int(numeric_value, base)
except (TypeError, ValueError):
name = name.replace('_', ' ')
raise errors.BadConfigOption(
f'Unsupported numeric value {name:s}: {numeric_value!s}.')
[docs]
def ParseStringOption(self, options, argument_name, default_value=None):
"""Parses a string command line argument.
Args:
options (argparse.Namespace): command line arguments.
argument_name (str): name of the command line argument.
default_value (Optional[object]): default value of the command line
argument.
Returns:
object: command line argument value. If the command line argument is
not set the default value will be returned.
Raises:
BadConfigOption: if the command line argument value cannot be converted
to a Unicode string.
"""
argument_value = getattr(options, argument_name, None)
if not argument_value:
return default_value
if isinstance(argument_value, bytes):
encoding = sys.stdin.encoding
# Note that sys.stdin.encoding can be None.
if not encoding:
encoding = self.preferred_encoding
try:
argument_value = codecs.decode(argument_value, encoding)
except UnicodeDecodeError as exception:
raise errors.BadConfigOption((
f'Unable to convert option: {argument_name:s} to Unicode with '
f'error: {exception!s}.'))
elif not isinstance(argument_value, str):
raise errors.BadConfigOption(
f'Unsupported option: {argument_name:s} string type required.')
return argument_value
[docs]
def PrintSeparatorLine(self):
"""Prints a separator line."""
self._output_writer.Write('-' * self._LINE_LENGTH)
self._output_writer.Write('\n')
[docs]
class CLIInputReader(object):
"""Command line interface input reader interface."""
[docs]
def __init__(self, encoding='utf-8'):
"""Initializes an input reader.
Args:
encoding (Optional[str]): input encoding.
"""
super(CLIInputReader, self).__init__()
self._encoding = encoding
# pylint: disable=redundant-returns-doc
[docs]
@abc.abstractmethod
def Read(self):
"""Reads a string from the input.
Returns:
str: input.
"""
[docs]
class CLIOutputWriter(object):
"""Command line interface output writer interface."""
[docs]
def __init__(self, encoding='utf-8'):
"""Initializes an output writer.
Args:
encoding (Optional[str]): output encoding.
"""
super(CLIOutputWriter, self).__init__()
self._encoding = encoding
[docs]
@abc.abstractmethod
def Write(self, string):
"""Writes a string to the output.
Args:
string (str): output.
"""
[docs]
class FileObjectInputReader(CLIInputReader):
"""File object command line interface input reader.
This input reader relies on the file-like object having a readline method.
"""
[docs]
def __init__(self, file_object, encoding='utf-8'):
"""Initializes a file object command line interface input reader.
Args:
file_object (file): file-like object to read from.
encoding (Optional[str]): input encoding.
"""
super(FileObjectInputReader, self).__init__(encoding=encoding)
self._errors = 'strict'
self._file_object = file_object
[docs]
def Read(self):
"""Reads a string from the input.
Returns:
str: input.
"""
encoded_string = self._file_object.readline()
if isinstance(encoded_string, str):
return encoded_string
try:
string = codecs.decode(encoded_string, self._encoding, self._errors)
except UnicodeDecodeError:
if self._errors == 'strict':
logger.error(
'Unable to properly read input due to encoding error. '
'Switching to error tolerant encoding which can result in '
'non Basic Latin (C0) characters to be replaced with "?" or '
'"\\ufffd".')
self._errors = 'replace'
string = codecs.decode(encoded_string, self._encoding, self._errors)
return string
[docs]
class StdinInputReader(FileObjectInputReader):
"""Stdin command line interface input reader."""
[docs]
def __init__(self, encoding='utf-8'):
"""Initializes an stdin input reader.
Args:
encoding (Optional[str]): input encoding.
"""
super(StdinInputReader, self).__init__(sys.stdin, encoding=encoding)
[docs]
def Read(self):
"""Reads a string from the input.
Returns:
str: input.
"""
# Flush stdout to guarantee that all output has been provided before waiting
# for input.
sys.stdout.flush()
return super(StdinInputReader, self).Read()
[docs]
class FileObjectOutputWriter(CLIOutputWriter):
"""File object command line interface output writer.
This output writer relies on the file-like object having a write method.
"""
[docs]
def __init__(self, file_object, encoding='utf-8'):
"""Initializes a file object command line interface output writer.
Args:
file_object (file): file-like object to read from.
encoding (Optional[str]): output encoding.
"""
super(FileObjectOutputWriter, self).__init__(encoding=encoding)
self._errors = 'strict'
self._file_object = file_object
[docs]
def Write(self, string):
"""Writes a string to the output.
Args:
string (str): output.
"""
try:
# Note that encode() will first convert string into a Unicode string
# if necessary.
encoded_string = codecs.encode(string, self._encoding, self._errors)
except UnicodeEncodeError:
if self._errors == 'strict':
logger.error(
'Unable to properly write output due to encoding error. '
'Switching to error tolerant encoding which can result in '
'non Basic Latin (C0) characters to be replaced with "?" or '
'"\\ufffd".')
self._errors = 'replace'
encoded_string = codecs.encode(string, self._encoding, self._errors)
self._file_object.write(encoded_string)
[docs]
class StdoutOutputWriter(FileObjectOutputWriter):
"""Stdout command line interface output writer."""
[docs]
def __init__(self, encoding='utf-8'):
"""Initializes a stdout output writer.
Args:
encoding (Optional[str]): output encoding.
"""
super(StdoutOutputWriter, self).__init__(sys.stdout, encoding=encoding)
[docs]
def Write(self, string):
"""Writes a string to the output.
Args:
string (str): output.
"""
sys.stdout.write(string)