Source code for plaso.cli.log2timeline_tool
"""The log2timeline CLI tool."""
import argparse
import sys
import textwrap
import plaso
# The following import makes sure the output modules are registered.
from plaso import output # pylint: disable=unused-import
from plaso.analyzers.hashers import manager as hashers_manager
from plaso.cli import extraction_tool
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.lib import definitions
from plaso.lib import errors
from plaso.lib import loggers
from plaso.parsers import manager as parsers_manager
[docs]
class Log2TimelineTool(extraction_tool.ExtractionTool):
"""Log2timeline CLI tool.
Attributes:
dependencies_check (bool): True if the availability and versions of
dependencies should be checked.
list_archive_types (bool): True if the archive types should be listed.
list_hashers (bool): True if the hashers should be listed.
list_parsers_and_plugins (bool): True if the parsers and plugins should
be listed.
list_profilers (bool): True if the profilers should be listed.
show_info (bool): True if information about hashers, parsers, plugins,
etc. should be shown.
"""
NAME = "log2timeline"
DESCRIPTION = textwrap.dedent(
"\n".join(
[
"",
(
"log2timeline is a command line tool to extract events from "
"individual "
),
"files, recursing a directory (e.g. mount point) or storage media ",
"image or device.",
"",
"More information can be gathered from here:",
" https://plaso.readthedocs.io/en/latest/sources/user/"
"Using-log2timeline.html",
"",
]
)
)
EPILOG = textwrap.dedent(
"\n".join(
[
"",
"Example usage:",
"",
"Run the tool against a storage media image (full kitchen sink)",
" log2timeline.py /cases/mycase/storage.plaso ímynd.dd",
"",
"Instead of answering questions, indicate some of the options on the",
"command line (including data from particular VSS stores).",
" log2timeline.py --vss_stores 1,2 /cases/plaso_vss.plaso image.E01",
"",
"And that is how you build a timeline using log2timeline...",
"",
]
)
)
[docs]
def __init__(self, input_reader=None, output_writer=None):
"""Initializes a log2timeline CLI tool.
Args:
input_reader (Optional[InputReader]): input reader, where None indicates
that the stdin input reader should be used.
output_writer (Optional[OutputWriter]): output writer, where None
indicates that the stdout output writer should be used.
"""
super().__init__(input_reader=input_reader, output_writer=output_writer)
self._storage_serializer_format = definitions.SERIALIZER_FORMAT_JSON
self.dependencies_check = True
self.list_archive_types = False
self.list_hashers = False
self.list_parsers_and_plugins = False
self.list_profilers = False
self.show_info = False
def _GetPluginData(self):
"""Retrieves the version and various plugin information.
Returns:
dict[str, list[str]]: available parsers and plugins.
"""
return_dict = {}
return_dict["Versions"] = [
("plaso engine", plaso.__version__),
("python", sys.version),
]
hashers_information = hashers_manager.HashersManager.GetHashersInformation()
parsers_information = parsers_manager.ParsersManager.GetParsersInformation()
plugins_information = (
parsers_manager.ParsersManager.GetParserPluginsInformation()
)
presets_information = self._presets_manager.GetPresetsInformation()
return_dict["Hashers"] = hashers_information
return_dict["Parsers"] = parsers_information
return_dict["Parser Plugins"] = plugins_information
return_dict["Parser Presets"] = presets_information
return return_dict
[docs]
def AddStorageOptions(self, argument_group): # pylint: disable=arguments-renamed
"""Adds the storage options to the argument group.
Args:
argument_group (argparse._ArgumentGroup): argparse argument group.
"""
argument_group.add_argument(
"--storage_file",
"--storage-file",
dest="storage_file",
metavar="PATH",
type=str,
default=None,
help=(
"The path of the storage file. If not specified, one will be made "
"in the form <timestamp>-<source>.plaso"
),
)
[docs]
def ParseArguments(self, arguments):
"""Parses the command line arguments.
Args:
arguments (list[str]): command line arguments.
Returns:
bool: True if the arguments were successfully parsed.
"""
loggers.ConfigureLogging()
argument_parser = argparse.ArgumentParser(
description=self.DESCRIPTION,
epilog=self.EPILOG,
add_help=False,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
self.AddBasicOptions(argument_parser)
data_location_group = argument_parser.add_argument_group(
"data location arguments"
)
argument_helper_names = ["artifact_definitions", "data_location"]
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
data_location_group, names=argument_helper_names
)
extraction_group = argument_parser.add_argument_group("extraction arguments")
argument_helper_names = [
"archives",
"artifact_filters",
"extraction",
"filter_file",
"hashers",
"parsers",
"yara_rules",
]
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
extraction_group, names=argument_helper_names
)
self.AddStorageMediaImageOptions(extraction_group)
self.AddExtractionOptions(extraction_group)
self.AddVSSProcessingOptions(extraction_group)
self.AddCredentialOptions(extraction_group)
info_group = argument_parser.add_argument_group("informational arguments")
self.AddInformationalOptions(info_group)
info_group.add_argument(
"--info",
dest="show_info",
action="store_true",
default=False,
help="Print out information about supported plugins and parsers.",
)
info_group.add_argument(
"--use_markdown",
"--use-markdown",
dest="use_markdown",
action="store_true",
default=False,
help=(
"Output lists in Markdown format use in combination with "
'"--hashers list", "--parsers list" or "--timezone list"'
),
)
info_group.add_argument(
"--no_dependencies_check",
"--no-dependencies-check",
dest="dependencies_check",
action="store_false",
default=True,
help="Disable the dependencies check.",
)
self.AddLogFileOptions(info_group)
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
info_group, names=["status_view"]
)
processing_group = argument_parser.add_argument_group("processing arguments")
self.AddPerformanceOptions(processing_group)
self.AddProcessingOptions(processing_group)
processing_group.add_argument(
"--sigsegv_handler",
"--sigsegv-handler",
dest="sigsegv_handler",
action="store_true",
default=False,
help=(
"Enables the SIGSEGV handler. WARNING this functionality is "
"experimental and will a deadlock worker process if a real "
"segfault is caught, but not signal SIGSEGV. This functionality "
"is therefore primarily intended for debugging purposes"
),
)
profiling_group = argument_parser.add_argument_group("profiling arguments")
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
profiling_group, names=["profiling"]
)
storage_group = argument_parser.add_argument_group("storage arguments")
self.AddStorageOptions(storage_group)
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
storage_group, names=["storage_format"]
)
argument_parser.add_argument(
self._SOURCE_OPTION,
action="store",
metavar="SOURCE",
nargs="?",
default=None,
type=str,
help=(
"Path to a source device, file or directory. If the source is "
"a supported storage media device or image file, archive file "
"or a directory, the files within are processed recursively."
),
)
try:
options = argument_parser.parse_args(arguments)
except UnicodeEncodeError:
# If we get here we are attempting to print help in a non-Unicode
# terminal.
self._output_writer.Write("\n")
self._output_writer.Write(argument_parser.format_help())
return False
# Properly prepare the attributes according to local encoding.
if self.preferred_encoding == "ascii":
self._PrintUserWarning(
(
"the preferred encoding of your system is ASCII, which is not "
"optimal for the typically non-ASCII characters that need to be "
"parsed and processed. This will most likely result in an error."
)
)
try:
self.ParseOptions(options)
except errors.BadConfigOption as exception:
self._output_writer.Write(f"ERROR: {exception!s}\n")
self._output_writer.Write("\n")
self._output_writer.Write(argument_parser.format_usage())
return False
self._command_line_arguments = self.GetCommandLineArguments()
self._WaitUserWarning()
loggers.ConfigureLogging(
debug_output=self._debug_mode,
filename=self._log_file,
quiet_mode=self._quiet_mode,
)
return True
[docs]
def ParseOptions(self, options):
"""Parses the options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
# The extraction options are dependent on the data location.
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=["data_location"]
)
self._ReadParserPresetsFromFile()
# Check the list options first otherwise required options will raise.
argument_helper_names = ["archives", "hashers", "parsers", "profiling"]
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=argument_helper_names
)
self._ParseExtractionOptions(options)
self.list_archive_types = self._archive_types_string == "list"
self.list_hashers = self._hasher_names_string == "list"
self.list_parsers_and_plugins = self._parser_filter_expression == "list"
self.list_profilers = self._profilers == "list"
self.show_info = getattr(options, "show_info", False)
self.show_troubleshooting = getattr(options, "show_troubleshooting", False)
if getattr(options, "use_markdown", False):
self._views_format_type = views.ViewsFactory.FORMAT_TYPE_MARKDOWN
self.dependencies_check = getattr(options, "dependencies_check", True)
if (
self.list_archive_types
or self.list_hashers
or self.list_language_tags
or self.list_parsers_and_plugins
or self.list_profilers
or self.list_time_zones
or self.show_info
or self.show_troubleshooting
):
return
self._ParseInformationalOptions(options)
argument_helper_names = [
"artifact_definitions",
"artifact_filters",
"extraction",
"filter_file",
"status_view",
"storage_format",
"yara_rules",
]
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=argument_helper_names
)
self._ParseLogFileOptions(options)
self._ParseStorageMediaOptions(options)
self._ParsePerformanceOptions(options)
self._ParseProcessingOptions(options)
self._storage_file_path = self.ParseStringOption(options, "storage_file")
if not self._storage_file_path:
self._storage_file_path = self._GenerateStorageFileName()
if not self._storage_file_path:
raise errors.BadConfigOption("Missing storage file option.")
serializer_format = getattr(
options, "serializer_format", definitions.SERIALIZER_FORMAT_JSON
)
if serializer_format not in definitions.SERIALIZER_FORMATS:
raise errors.BadConfigOption(
f"Unsupported storage serializer format: {serializer_format:s}"
)
self._storage_serializer_format = serializer_format
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=["status_view"]
)
self._enable_sigsegv_handler = getattr(options, "sigsegv_handler", False)
self._EnforceProcessMemoryLimit(self._process_memory_limit)
[docs]
def ShowInfo(self):
"""Shows information about available hashers, parsers, plugins, etc."""
title = " log2timeline/plaso information "
self._output_writer.Write(f"{title:=^80s}\n")
plugin_list = self._GetPluginData()
for header, data in plugin_list.items():
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Description"],
title=header,
)
for entry_header, entry_data in sorted(data):
table_view.AddRow([entry_header, entry_data])
table_view.Write(self._output_writer)