Source code for plaso.cli.pinfo_tool
"""The pinfo CLI tool."""
import argparse
import collections
import json
import os
import uuid
from dfdatetime import posix_time as dfdatetime_posix_time
from plaso.cli import logger
from plaso.cli import tool_options
from plaso.cli import tools
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.containers import events
from plaso.containers import event_sources
from plaso.containers import reports
from plaso.containers import warnings
from plaso.engine import path_helper
from plaso.lib import errors
from plaso.lib import loggers
from plaso.serializer import json_serializer
from plaso.storage import factory as storage_factory
[docs]
class PinfoTool(tools.CLITool, tool_options.StorageFileOptions):
"""Pinfo CLI tool.
Attributes:
compare_storage_information (bool): True if the tool is used to compare
stores.
generate_report (bool): True if a predefined report type should be
generated.
list_reports (bool): True if the report types should be listed.
list_sections (bool): True if the section types should be listed.
"""
NAME = "pinfo"
DESCRIPTION = (
"Shows information about a Plaso storage file, for example how it was "
"collected, what information was extracted from a source, etc."
)
_DEFAULT_HASH_TYPE = "sha256"
_HASH_CHOICES = ("md5", "sha1", "sha256")
_REPORTS = {
"browser_search": (
"Report browser searches determined by the browser_search "
"analysis plugin."
),
"chrome_extension": (
"Report Chrome extensions determined by the chrome_extension "
"analysis plugin."
),
"environment_variables": (
"Report environment variables extracted during processing."
),
"file_hashes": "Report file hashes calculated during processing.",
"windows_services": (
"Report Windows services and drivers extracted during processing."
),
"winevt_providers": (
"Report Windows EventLog providers extracted during processing."
),
}
_DEFAULT_REPORT_TYPE = "none"
_REPORT_CHOICES = sorted(list(_REPORTS.keys()) + ["list", "none"])
_SECTIONS = {
"events": "Show information about events.",
"reports": "Show information about analysis reports.",
"sessions": "Show information about sessions.",
"sources": "Show information about event sources.",
"warnings": "Show information about warnings during processing.",
}
_DEFAULT_OUTPUT_FORMAT = "text"
_SUPPORTED_OUTPUT_FORMATS = ("json", "markdown", "text")
_CONTAINER_TYPE_ANALYSIS_REPORT = reports.AnalysisReport.CONTAINER_TYPE
_CONTAINER_TYPE_EVENT_DATA_STREAM = events.EventDataStream.CONTAINER_TYPE
_CONTAINER_TYPE_EVENT_SOURCE = event_sources.EventSource.CONTAINER_TYPE
_CONTAINER_TYPE_EXTRACTION_WARNING = warnings.ExtractionWarning.CONTAINER_TYPE
_CONTAINER_TYPE_PREPROCESSING_WARNING = warnings.PreprocessingWarning.CONTAINER_TYPE
_CONTAINER_TYPE_RECOVERY_WARNING = warnings.RecoveryWarning.CONTAINER_TYPE
_CONTAINER_TYPE_TIMELINING_WARNING = warnings.TimeliningWarning.CONTAINER_TYPE
[docs]
def __init__(self, input_reader=None, output_writer=None):
"""Initializes the CLI tool object.
Args:
input_reader (Optional[InputReader]): input reader, where None indicates
that the stdin input reader should be used.
output_writer (Optional[OutputWriter]): output writer, where None
indicates that the stdout output writer should be used.
"""
super().__init__(input_reader=input_reader, output_writer=output_writer)
self._compare_storage_file_path = None
self._output_filename = None
self._output_format = "text"
self._hash_type = self._DEFAULT_HASH_TYPE
self._process_memory_limit = None
self._report_type = self._DEFAULT_REPORT_TYPE
self._sections = None
self._storage_file_path = None
self._verbose = False
self.compare_storage_information = False
self.generate_report = False
self.list_reports = False
self.list_sections = False
def _CalculateStorageCounters(self, storage_reader):
"""Calculates the counters of the entire storage.
Args:
storage_reader (StorageReader): storage reader.
Returns:
dict[str, collections.Counter]: storage counters.
"""
# TODO: determine analysis report counter from actual stored analysis
# reports or remove.
analysis_reports_counter = collections.Counter()
analysis_reports_counter_error = False
data_types_counter = {}
if storage_reader.HasAttributeContainers("data_type_count"):
data_types_counter = {
data_type_count.name: data_type_count.number_of_events
for data_type_count in storage_reader.GetAttributeContainers(
"data_type_count"
)
}
data_types_counter = collections.Counter(data_types_counter)
data_types_counter_error = False
event_labels_counter = {}
if storage_reader.HasAttributeContainers("event_label_count"):
event_labels_counter = {
event_label_count.label: event_label_count.number_of_events
for event_label_count in storage_reader.GetAttributeContainers(
"event_label_count"
)
}
event_labels_counter = collections.Counter(event_labels_counter)
event_labels_counter_error = False
parsers_counter = {}
if storage_reader.HasAttributeContainers("parser_count"):
parsers_counter = {
parser_count.name: parser_count.number_of_events
for parser_count in storage_reader.GetAttributeContainers(
"parser_count"
)
}
parsers_counter = collections.Counter(parsers_counter)
parsers_counter_error = False
storage_counters = {}
extraction_warnings_by_path_spec = collections.Counter()
extraction_warnings_by_parser_chain = collections.Counter()
for warning in storage_reader.GetAttributeContainers(
self._CONTAINER_TYPE_EXTRACTION_WARNING
):
path_spec_string = self._GetPathSpecificationString(warning.path_spec)
extraction_warnings_by_path_spec[path_spec_string] += 1
extraction_warnings_by_parser_chain[warning.parser_chain] += 1
storage_counters["extraction_warnings_by_path_spec"] = (
extraction_warnings_by_path_spec
)
storage_counters["extraction_warnings_by_parser_chain"] = (
extraction_warnings_by_parser_chain
)
recovery_warnings_by_path_spec = collections.Counter()
recovery_warnings_by_parser_chain = collections.Counter()
for warning in storage_reader.GetAttributeContainers(
self._CONTAINER_TYPE_RECOVERY_WARNING
):
path_spec_string = self._GetPathSpecificationString(warning.path_spec)
recovery_warnings_by_path_spec[path_spec_string] += 1
recovery_warnings_by_parser_chain[warning.parser_chain] += 1
storage_counters["recovery_warnings_by_path_spec"] = (
recovery_warnings_by_path_spec
)
storage_counters["recovery_warnings_by_parser_chain"] = (
recovery_warnings_by_parser_chain
)
timelining_warnings_by_path_spec = collections.Counter()
timelining_warnings_by_parser_chain = collections.Counter()
if storage_reader.HasAttributeContainers(
self._CONTAINER_TYPE_TIMELINING_WARNING
):
for warning in storage_reader.GetAttributeContainers(
self._CONTAINER_TYPE_TIMELINING_WARNING
):
path_spec_string = self._GetPathSpecificationString(warning.path_spec)
timelining_warnings_by_path_spec[path_spec_string] += 1
timelining_warnings_by_parser_chain[warning.parser_chain] += 1
storage_counters["timelining_warnings_by_path_spec"] = (
timelining_warnings_by_path_spec
)
storage_counters["timelining_warnings_by_parser_chain"] = (
timelining_warnings_by_parser_chain
)
if not analysis_reports_counter_error:
storage_counters["analysis_reports"] = analysis_reports_counter
if not data_types_counter_error:
storage_counters["data_types"] = data_types_counter
if not event_labels_counter_error:
storage_counters["event_labels"] = event_labels_counter
if not parsers_counter_error:
storage_counters["parsers"] = parsers_counter
return storage_counters
def _CheckStorageFile(self, storage_file_path, warn_about_existing=False):
"""Checks if the storage file path is valid.
Args:
storage_file_path (str): path of the storage file.
warn_about_existing (bool): True if the user should be warned about
the storage file already existing.
Raises:
BadConfigOption: if the storage file path is invalid.
"""
if os.path.exists(storage_file_path):
if not os.path.isfile(storage_file_path):
raise errors.BadConfigOption(
f"Storage file: {storage_file_path:s} already exists and is not "
f"a file."
)
if warn_about_existing:
logger.warning("Appending to an already existing storage file.")
dirname = os.path.dirname(storage_file_path)
if not dirname:
dirname = "."
# TODO: add a more thorough check to see if the storage file really is
# a plaso storage file.
if not os.access(dirname, os.W_OK):
raise errors.BadConfigOption(
f"Unable to write to storage file: {storage_file_path:s}"
)
def _CompareCounter(self, counter, compare_counter):
"""Compares two counters.
Args:
counter (collections.Counter): counter.
compare_counter (collections.Counter): counter to compare against.
Returns:
dict[str, tuple[int, int]]: mismatching results per key.
"""
keys = set(counter.keys())
keys.union(compare_counter.keys())
differences = {}
for key in keys:
value = counter.get(key, 0)
compare_value = compare_counter.get(key, 0)
if value != compare_value:
differences[key] = (value, compare_value)
return differences
def _CompareStores(self, storage_reader, compare_storage_reader):
"""Compares the contents of two stores.
Args:
storage_reader (StorageReader): storage reader.
compare_storage_reader (StorageReader): storage to compare against.
Returns:
bool: True if the content of the stores is identical.
"""
stores_are_identical = True
storage_counters = self._CalculateStorageCounters(storage_reader)
compare_storage_counters = self._CalculateStorageCounters(
compare_storage_reader
)
# Compare number of events by data type.
data_types_counter = storage_counters.get("data_types", collections.Counter())
compare_data_types_counter = compare_storage_counters.get(
"data_types", collections.Counter()
)
differences = self._CompareCounter(
data_types_counter, compare_data_types_counter
)
if differences:
stores_are_identical = False
self._PrintCounterDifferences(
differences,
column_names=["Data type name", "Number of events"],
title="Events generated per data type",
)
# Compare number of events.
parsers_counter = storage_counters.get("parsers", collections.Counter())
compare_parsers_counter = compare_storage_counters.get(
"parsers", collections.Counter()
)
differences = self._CompareCounter(parsers_counter, compare_parsers_counter)
if differences:
stores_are_identical = False
self._PrintCounterDifferences(
differences,
column_names=["Parser (plugin) name", "Number of events"],
title="Events generated per parser",
)
# Compare extraction warnings by parser chain.
warnings_counter = storage_counters.get(
"extraction_warnings_by_parser_chain", collections.Counter()
)
compare_warnings_counter = compare_storage_counters.get(
"extraction_warnings_by_parser_chain", collections.Counter()
)
differences = self._CompareCounter(warnings_counter, compare_warnings_counter)
if differences:
stores_are_identical = False
self._PrintCounterDifferences(
differences,
column_names=["Parser (plugin) name", "Number of warnings"],
title="Extraction warnings generated per parser",
)
# Compare extraction warnings by path specification
warnings_counter = storage_counters.get(
"extraction_warnings_by_path_spec", collections.Counter()
)
compare_warnings_counter = compare_storage_counters.get(
"extraction_warnings_by_path_spec", collections.Counter()
)
differences = self._CompareCounter(warnings_counter, compare_warnings_counter)
if differences:
stores_are_identical = False
self._PrintCounterDifferences(
differences,
column_names=["Number of warnings", "Pathspec"],
reverse=True,
title="Pathspecs with most extraction warnings",
)
# Compare recovery warnings by parser chain.
warnings_counter = storage_counters.get(
"recovery_warnings_by_parser_chain", collections.Counter()
)
compare_warnings_counter = compare_storage_counters.get(
"recovery_warnings_by_parser_chain", collections.Counter()
)
differences = self._CompareCounter(warnings_counter, compare_warnings_counter)
if differences:
stores_are_identical = False
self._PrintCounterDifferences(
differences,
column_names=["Parser (plugin) name", "Number of warnings"],
title="Recovery warnings generated per parser",
)
# Compare recovery warnings by path specification
warnings_counter = storage_counters.get(
"recovery_warnings_by_path_spec", collections.Counter()
)
compare_warnings_counter = compare_storage_counters.get(
"recovery_warnings_by_path_spec", collections.Counter()
)
differences = self._CompareCounter(warnings_counter, compare_warnings_counter)
if differences:
stores_are_identical = False
self._PrintCounterDifferences(
differences,
column_names=["Number of warnings", "Pathspec"],
reverse=True,
title="Pathspecs with most recovery warnings",
)
# Compare timelining warnings by parser chain.
warnings_counter = storage_counters.get(
"timelining_warnings_by_parser_chain", collections.Counter()
)
compare_warnings_counter = compare_storage_counters.get(
"timelining_warnings_by_parser_chain", collections.Counter()
)
differences = self._CompareCounter(warnings_counter, compare_warnings_counter)
if differences:
stores_are_identical = False
self._PrintCounterDifferences(
differences,
column_names=["Parser (plugin) name", "Number of warnings"],
title="Timelining warnings generated per parser",
)
# Compare timelining warnings by path specification.
warnings_counter = storage_counters.get(
"timelining_warnings_by_path_spec", collections.Counter()
)
compare_warnings_counter = compare_storage_counters.get(
"timelining_warnings_by_path_spec", collections.Counter()
)
differences = self._CompareCounter(warnings_counter, compare_warnings_counter)
if differences:
stores_are_identical = False
self._PrintCounterDifferences(
differences,
column_names=["Number of warnings", "Pathspec"],
reverse=True,
title="Pathspecs with most timelining warnings",
)
# Compare event labels.
labels_counter = storage_counters.get("event_labels", collections.Counter())
compare_labels_counter = compare_storage_counters.get(
"event_labels", collections.Counter()
)
differences = self._CompareCounter(labels_counter, compare_labels_counter)
if differences:
stores_are_identical = False
self._PrintCounterDifferences(
differences,
column_names=["Label", "Number of event tags"],
title="Event tags generated per label",
)
# Compare analysis reports.
reports_counter = storage_counters.get(
"analysis_reports", collections.Counter()
)
compare_reports_counter = compare_storage_counters.get(
"analysis_reports", collections.Counter()
)
differences = self._CompareCounter(reports_counter, compare_reports_counter)
if differences:
stores_are_identical = False
self._PrintCounterDifferences(
differences,
column_names=["Plugin name", "Number of reports"],
title="Reports generated per plugin",
)
return stores_are_identical
def _GenerateAnalysisResultsReport(
self,
storage_reader,
json_base_type,
column_titles,
container_type,
attribute_names,
attribute_mappings,
):
"""Generates an analysis results report.
Args:
storage_reader (StorageReader): storage reader.
json_base_type (str): JSON base type.
column_titles (list[str]): column titles of the Markdown and tab
separated tables.
container_type (str): attribute container type.
attribute_names (list[str]): names of the attributes to report.
attribute_mappings (dict[str, dict[st, str]]): mappings of attribute
values to human readable strings.
"""
self._GenerateReportHeader(json_base_type, column_titles)
entry_format_string = self._GenerateReportEntryFormatString(attribute_names)
if storage_reader.HasAttributeContainers(container_type):
generator = storage_reader.GetAttributeContainers(container_type)
for artifact_index, analysis_result in enumerate(generator):
if self._output_format == "json":
if artifact_index > 0:
self._output_writer.Write(",\n")
attribute_values = analysis_result.CopyToDict()
for key in attribute_names:
value = attribute_values.get(key)
if value is None:
value = ""
elif isinstance(value, int):
value = attribute_mappings.get(key, {}).get(value, value)
elif self._output_format == "json":
value = value.replace("\\", "\\\\")
attribute_values[key] = value
self._output_writer.Write(
entry_format_string.format(**attribute_values)
)
self._GenerateReportFooter()
def _GenerateFileHashesReport(self, storage_reader):
"""Generates a file hashes report.
Args:
storage_reader (StorageReader): storage reader.
"""
hash_type = self._hash_type.upper()
self._GenerateReportHeader(
"file_hashes", [f"{hash_type:s} hash", "Display name"]
)
hash_attribute_name = f"{self._hash_type:s}_hash"
entry_format_string = self._GenerateReportEntryFormatString(
[hash_attribute_name, "display_name"]
)
generator = storage_reader.GetAttributeContainers(
self._CONTAINER_TYPE_EVENT_DATA_STREAM
)
for event_data_stream_index, event_data_stream in enumerate(generator):
if self._output_format == "json":
if event_data_stream_index > 0:
self._output_writer.Write(",\n")
hash_value = getattr(event_data_stream, hash_attribute_name, None) or "N/A"
display_name = "N/A"
if event_data_stream.path_spec:
display_name = path_helper.PathHelper.GetDisplayNameForPathSpec(
event_data_stream.path_spec
)
attribute_values = {
"display_name": display_name,
hash_attribute_name: hash_value,
}
self._output_writer.Write(entry_format_string.format(**attribute_values))
self._GenerateReportFooter()
def _GenerateReportEntryFormatString(self, attribute_names):
"""Generates a report entry format string.
Args:
attribute_names (list[str]): names of the attributes to report.
Returns:
str: entry format string.
"""
if self._output_format == "json":
attributes_string = ", ".join(
[f'"{name:s}": "{{{name:s}!s}}"' for name in attribute_names]
)
return f" {{{{{attributes_string:s}}}}}"
if self._output_format == "markdown":
attributes_string = " | ".join(
[f"{{{name:s}!s}}" for name in attribute_names]
)
return f"{attributes_string:s}\n"
if self._output_format == "text":
attributes_string = "\t".join(
[f"{{{name:s}!s}}" for name in attribute_names]
)
return f"{attributes_string:s}\n"
return ""
def _GenerateReportFooter(self):
"""Generates a report footer."""
if self._output_format == "json":
self._output_writer.Write("\n]}\n")
def _GenerateReportHeader(self, json_base_type, column_titles):
"""Generates a report header.
Args:
json_base_type (str): JSON base type.
column_titles (list[str]): column titles of the Markdown and tab
separated tables.
"""
if self._output_format == "json":
self._output_writer.Write(f'{{"{json_base_type:s}": [\n')
elif self._output_format == "markdown":
titles_string = " | ".join(column_titles)
self._output_writer.Write(f"{titles_string:s}\n")
separators_string = " | ".join(["---"] * len(column_titles))
self._output_writer.Write(f"{separators_string:s}\n")
elif self._output_format == "text":
titles_string = "\t".join(column_titles)
self._output_writer.Write(f"{titles_string:s}\n")
def _GenerateWinEvtProvidersReport(self, storage_reader):
"""Generates a Windows Event Log providers report.
Args:
storage_reader (StorageReader): storage reader.
"""
column_titles = [
"Identifier",
"Log source(s)",
"Log type(s)",
"Event message file(s)",
"Parameter message file(s)",
]
self._GenerateReportHeader("winevt_providers", column_titles)
attribute_names = [
"identifier",
"log_sources",
"log_types",
"event_message_files",
"parameter_message_files",
]
entry_format_string = self._GenerateReportEntryFormatString(attribute_names)
if storage_reader.HasAttributeContainers("windows_eventlog_provider"):
generator = storage_reader.GetAttributeContainers(
"windows_eventlog_provider"
)
for artifact_index, artifact in enumerate(generator):
if self._output_format == "json":
if artifact_index > 0:
self._output_writer.Write(",\n")
attribute_values = {
"identifier": artifact.identifier or "",
"event_message_files": artifact.event_message_files or [],
"log_sources": artifact.log_sources or [],
"log_types": artifact.log_types or [],
"parameter_message_files": artifact.parameter_message_files or [],
}
self._output_writer.Write(
entry_format_string.format(**attribute_values)
)
self._GenerateReportFooter()
def _GetStorageReader(self, path):
"""Retrieves a storage reader.
Args:
path (str): path of the storage file.
Returns:
StorageReader: storage reader or None if the storage file format
is not supported.
Raises:
BadConfigOption: if the storage file format is not supported.
"""
storage_reader = storage_factory.StorageFactory.CreateStorageReaderForFile(path)
if not storage_reader:
raise errors.BadConfigOption(
f"Format of storage file: {path:s} not supported."
)
return storage_reader
def _PrintAnalysisReportCounter(
self, analysis_reports_counter, session_identifier=None
):
"""Prints the analysis reports counter.
Args:
analysis_reports_counter (collections.Counter): number of analysis
reports per analysis plugin.
session_identifier (Optional[str]): session identifier, formatted as
a UUID.
"""
if self._output_format == "json":
json_string = json.dumps(analysis_reports_counter)
self._output_writer.Write(f'"analysis_reports": {json_string:s}')
elif self._output_format in ("markdown", "text") and analysis_reports_counter:
title = "Reports generated per plugin"
if session_identifier:
title = ": ".join([title, session_identifier])
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Plugin name", "Number of reports"],
title=title,
)
for key, value in sorted(analysis_reports_counter.items()):
if key != "total":
table_view.AddRow([key, value])
try:
total = analysis_reports_counter["total"]
except KeyError:
total = "N/A"
table_view.AddRow(["Total", total])
table_view.Write(self._output_writer)
def _PrintAnalysisReportsDetails(self, storage_reader):
"""Prints the details of the analysis reports.
Args:
storage_reader (StorageReader): storage reader.
"""
has_analysis_reports = storage_reader.HasAttributeContainers(
self._CONTAINER_TYPE_ANALYSIS_REPORT
)
if self._output_format == "text" and not has_analysis_reports:
self._output_writer.Write("\nNo analysis reports stored.\n")
else:
generator = storage_reader.GetAttributeContainers(
self._CONTAINER_TYPE_ANALYSIS_REPORT
)
for index, analysis_report in enumerate(generator):
date_time_string = None
if analysis_report.time_compiled is not None:
date_time = dfdatetime_posix_time.PosixTimeInMicroseconds(
timestamp=analysis_report.time_compiled
)
date_time_string = date_time.CopyToDateTimeStringISO8601()
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, title=f"Analysis report: {index:d}"
)
table_view.AddRow(["Name plugin", analysis_report.plugin_name or "N/A"])
table_view.AddRow(["Date and time", date_time_string or "N/A"])
table_view.AddRow(
["Event filter", analysis_report.event_filter or "N/A"]
)
if not analysis_report.analysis_counter:
table_view.AddRow(["Text", analysis_report.text or ""])
else:
table_view.AddRow(["Results", ""])
for key, value in sorted(analysis_report.analysis_counter.items()):
table_view.AddRow([key, value])
table_view.Write(self._output_writer)
def _PrintAnalysisReportSection(self, storage_reader, analysis_reports_counter):
"""Prints the analysis reports section.
Args:
storage_reader (StorageReader): storage reader.
analysis_reports_counter (collections.Counter): number of analysis
reports per analysis plugin.
"""
self._PrintAnalysisReportCounter(analysis_reports_counter)
if self._output_format in ("markdown", "text"):
self._PrintAnalysisReportsDetails(storage_reader)
def _PrintCounterDifferences(
self, differences, column_names=None, reverse=False, title=None
):
"""Prints the counter differences.
Args:
differences (dict[str, tuple[int, int]]): mismatching results per key.
column_names (Optional[list[str]]): column names.
reverse (Optional[bool]): True if the key and values of differences
should be printed in reverse order.
title (Optional[str]): title.
"""
# TODO: add support for 3 column table?
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, column_names=column_names, title=title
)
for key, value in sorted(differences.items()):
value_string = f"{value[0]:d} ({value[1]:d})"
if reverse:
table_view.AddRow([value_string, key])
else:
table_view.AddRow([key, value_string])
table_view.Write(self._output_writer)
self._output_writer.Write("\n")
def _PrintDataTypesCounter(self, data_types_counter, session_identifier=None):
"""Prints the data types counter.
Args:
data_types_counter (collections.Counter): number of events per data type.
session_identifier (Optional[str]): session identifier, formatted as
a UUID.
"""
if self._output_format == "json":
if session_identifier:
self._output_writer.Write(", ")
json_string = json.dumps(data_types_counter)
self._output_writer.Write(f'"data_types": {json_string:s}')
elif self._output_format in ("markdown", "text"):
if self._output_format == "text" and not data_types_counter:
if not session_identifier:
self._output_writer.Write("\nNo events stored.\n")
else:
title = "Events generated per data type"
if session_identifier:
title = f"{title:s}: {session_identifier:s}"
title_level = 4
else:
title_level = 2
if not data_types_counter:
if not session_identifier:
heading_start = "#" * title_level
self._output_writer.Write(
f"{heading_start:s} {title:s}\n\nN/A\n\n"
)
else:
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Data type name", "Number of events"],
title=title,
title_level=title_level,
)
for key, value in sorted(data_types_counter.items()):
if key != "total":
table_view.AddRow([key, value])
table_view.AddRow(["Total", data_types_counter["total"]])
table_view.Write(self._output_writer)
def _PrintEventLabelsCounter(self, event_labels_counter, session_identifier=None):
"""Prints the event labels counter.
Args:
event_labels_counter (collections.Counter): number of event tags per
label.
session_identifier (Optional[str]): session identifier, formatted as
a UUID.
"""
if self._output_format == "json":
json_string = json.dumps(event_labels_counter)
self._output_writer.Write(f'"event_labels": {json_string:s}')
elif self._output_format in ("markdown", "text"):
if self._output_format == "text" and not event_labels_counter:
if not session_identifier:
self._output_writer.Write("\nNo events labels stored.\n")
else:
title = "Event tags generated per label"
if session_identifier:
title = ": ".join([title, session_identifier])
title_level = 4
else:
title_level = 2
if not event_labels_counter:
if not session_identifier:
heading_marker = "#" * title_level
self._output_writer.Write(
f"{heading_marker:s} {title:s}\n\nN/A\n\n"
)
else:
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Label", "Number of event tags"],
title=title,
title_level=title_level,
)
for key, value in sorted(event_labels_counter.items()):
if key != "total":
table_view.AddRow([key, value])
try:
total = event_labels_counter["total"]
except KeyError:
total = "N/A"
table_view.AddRow(["Total", total])
table_view.Write(self._output_writer)
def _PrintParsersCounter(self, parsers_counter, session_identifier=None):
"""Prints the parsers counter.
Args:
parsers_counter (collections.Counter): number of events per parser or
parser plugin.
session_identifier (Optional[str]): session identifier, formatted as
a UUID.
"""
if self._output_format == "json":
if session_identifier:
self._output_writer.Write(", ")
json_string = json.dumps(parsers_counter)
self._output_writer.Write(f'"parsers": {json_string:s}')
elif self._output_format in ("markdown", "text"):
if self._output_format == "text" and not parsers_counter:
if not session_identifier:
self._output_writer.Write("\nNo events stored.\n")
else:
title = "Events generated per parser"
if session_identifier:
title = ": ".join([title, session_identifier])
title_level = 4
else:
title_level = 2
if not parsers_counter:
if not session_identifier:
heading_marker = "#" * title_level
self._output_writer.Write(
f"{heading_marker:s} {title:s}\n\nN/A\n\n"
)
else:
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Parser (plugin) name", "Number of events"],
title=title,
title_level=title_level,
)
for key, value in sorted(parsers_counter.items()):
if key != "total":
table_view.AddRow([key, value])
table_view.AddRow(["Total", parsers_counter["total"]])
table_view.Write(self._output_writer)
def _PrintPreprocessingWarningsDetails(self, storage_reader):
"""Prints the details of the preprocessing warnings.
Args:
storage_reader (StorageReader): storage reader.
"""
generator = storage_reader.GetAttributeContainers(
self._CONTAINER_TYPE_PREPROCESSING_WARNING
)
for index, warning in enumerate(generator):
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, title=f"Preprocessing warning: {index:d}"
)
table_view.AddRow(["Message", warning.message])
table_view.AddRow(["Plugin name", warning.plugin_name])
if warning.path_spec:
# TODO: add helper method to format path spec as row.
path_spec_string = self._GetPathSpecificationString(warning.path_spec)
for path_index, line in enumerate(path_spec_string.split("\n")):
if not line:
continue
if path_index == 0:
table_view.AddRow(["Path specification", line])
else:
table_view.AddRow(["", line])
table_view.Write(self._output_writer)
def _PrintWarningsDetails(self, storage_reader, container_type, warning_type):
"""Prints the details of warnings.
Args:
storage_reader (StorageReader): storage reader.
container_type (str): attribute container type.
warning_type (str): warning type.
"""
generator = storage_reader.GetAttributeContainers(container_type)
for index, warning in enumerate(generator):
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, title=f"{warning_type:s} warning: {index:d}"
)
table_view.AddRow(["Message", warning.message])
table_view.AddRow(["Parser chain", warning.parser_chain])
path_spec_string = self._GetPathSpecificationString(warning.path_spec)
for path_index, line in enumerate(path_spec_string.split("\n")):
if not line:
continue
if path_index == 0:
table_view.AddRow(["Path specification", line])
else:
table_view.AddRow(["", line])
table_view.Write(self._output_writer)
def _PrintSessionDetailsAsJSON(self, session):
"""Prints the details of a session as JSON.
Args:
session (Session): session.
"""
json_string = json_serializer.JSONAttributeContainerSerializer.WriteSerialized(
session
)
self._output_writer.Write(f'"session": {json_string:s}')
def _PrintSessionDetailsAsTable(self, session, session_identifier):
"""Prints the details of a session as a table.
Args:
session (Session): session.
session_identifier (str): session identifier, formatted as a UUID.
"""
start_time = "N/A"
if session.start_time is not None:
date_time = dfdatetime_posix_time.PosixTimeInMicroseconds(
timestamp=session.start_time
)
start_time = date_time.CopyToDateTimeStringISO8601()
completion_time = "N/A"
if session.completion_time is not None:
date_time = dfdatetime_posix_time.PosixTimeInMicroseconds(
timestamp=session.completion_time
)
completion_time = date_time.CopyToDateTimeStringISO8601()
enabled_parser_names = "N/A"
if session.enabled_parser_names:
enabled_parser_names = ", ".join(sorted(session.enabled_parser_names))
command_line_arguments = session.command_line_arguments or "N/A"
parser_filter_expression = session.parser_filter_expression or "N/A"
preferred_encoding = session.preferred_encoding or "N/A"
preferred_time_zone = session.preferred_time_zone or "N/A"
if session.artifact_filters:
artifact_filters_string = ", ".join(session.artifact_filters)
else:
artifact_filters_string = "N/A"
filter_file = session.filter_file or "N/A"
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, title=f"Session: {session_identifier:s}"
)
table_view.AddRow(["Start time", start_time])
table_view.AddRow(["Completion time", completion_time])
table_view.AddRow(["Product name", session.product_name])
table_view.AddRow(["Product version", session.product_version])
table_view.AddRow(["Command line arguments", command_line_arguments])
table_view.AddRow(["Parser filter expression", parser_filter_expression])
table_view.AddRow(["Enabled parser and plugins", enabled_parser_names])
table_view.AddRow(["Preferred encoding", preferred_encoding])
table_view.AddRow(["Preferred time zone", preferred_time_zone])
table_view.AddRow(["Debug mode", session.debug_mode])
table_view.AddRow(["Artifact filters", artifact_filters_string])
table_view.AddRow(["Filter file", filter_file])
table_view.Write(self._output_writer)
def _PrintSessionsDetails(self, storage_reader):
"""Prints the details of the sessions.
Args:
storage_reader (BaseStore): storage.
"""
if self._output_format == "json":
self._output_writer.Write('"sessions": {')
for session_index, session in enumerate(storage_reader.GetSessions()):
session_identifier = str(uuid.UUID(hex=session.identifier))
if self._output_format == "json":
if session_index != 0:
self._output_writer.Write(", ")
self._PrintSessionDetailsAsJSON(session)
elif self._output_format in ("markdown", "text"):
self._PrintSessionDetailsAsTable(session, session_identifier)
if self._verbose:
system_configuration = storage_reader.GetAttributeContainerByIndex(
"system_configuration", session_index
)
if system_configuration:
self._PrintSystemConfigurations(
storage_reader,
[system_configuration],
session_identifier=session_identifier,
)
if self._output_format == "json":
self._output_writer.Write("}")
def _PrintSessionsOverview(self, storage_reader):
"""Prints a sessions overview.
Args:
storage_reader (StorageReader): storage reader.
"""
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, title="Sessions", title_level=2
)
for session in storage_reader.GetSessions():
date_time = dfdatetime_posix_time.PosixTimeInMicroseconds(
timestamp=session.start_time
)
start_time = date_time.CopyToDateTimeStringISO8601()
session_identifier = str(uuid.UUID(hex=session.identifier))
table_view.AddRow([session_identifier, start_time])
table_view.Write(self._output_writer)
def _PrintSessionsSection(self, storage_reader):
"""Prints the session section.
Args:
storage_reader (StorageReader): storage reader.
"""
if self._output_format in ("markdown", "text"):
self._PrintSessionsOverview(storage_reader)
if (
self._output_format == "json"
or self._verbose
or "sessions" in self._sections
):
self._PrintSessionsDetails(storage_reader)
def _PrintSystemConfiguration(
self, storage_reader, system_configuration, session_identifier=None
):
"""Prints the details of a system configuration.
Args:
storage_reader (StorageReader): storage reader.
system_configuration (SystemConfiguration): system configuration.
session_identifier (Optional[str]): session identifier, formatted as
a UUID.
"""
if not system_configuration:
return
title = "System configuration"
if session_identifier:
title = ": ".join([title, session_identifier])
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, title=title
)
hostname = "N/A"
if system_configuration.hostname:
hostname = system_configuration.hostname.name
operating_system = system_configuration.operating_system or "N/A"
operating_system_product = (
system_configuration.operating_system_product or "N/A"
)
operating_system_version = (
system_configuration.operating_system_version or "N/A"
)
language = system_configuration.language or "N/A"
code_page = system_configuration.code_page or "N/A"
keyboard_layout = system_configuration.keyboard_layout or "N/A"
time_zone = system_configuration.time_zone or "N/A"
table_view.AddRow(["Hostname", hostname])
table_view.AddRow(["Operating system", operating_system])
table_view.AddRow(["Operating system product", operating_system_product])
table_view.AddRow(["Operating system version", operating_system_version])
table_view.AddRow(["Language", language])
table_view.AddRow(["Code page", code_page])
table_view.AddRow(["Keyboard layout", keyboard_layout])
table_view.AddRow(["Time zone", time_zone])
table_view.Write(self._output_writer)
title = "Available time zones"
if session_identifier:
title = ": ".join([title, session_identifier])
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Offset from UTC"],
title=title,
)
# TODO: filter time zones on specific system configuration.
for time_zone in storage_reader.GetAttributeContainers("time_zone"):
hours_from_utc, minutes_from_utc = divmod(time_zone.offset, 60)
if hours_from_utc < 0:
sign = "+"
hours_from_utc *= -1
else:
sign = "-"
time_zone_offset = f"{sign:s}{hours_from_utc:02d}:{minutes_from_utc:02d}"
table_view.AddRow([time_zone.name, time_zone_offset])
table_view.Write(self._output_writer)
title = "User accounts"
if session_identifier:
title = ": ".join([title, session_identifier])
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Username", "User directory"],
title=title,
)
# TODO: filter user accounts on specific system configuration.
for user_account in storage_reader.GetAttributeContainers("user_account"):
table_view.AddRow([user_account.username, user_account.user_directory])
table_view.Write(self._output_writer)
def _PrintSystemConfigurations(
self, storage_reader, system_configurations, session_identifier=None
):
"""Prints the details of system configurations.
Args:
storage_reader (StorageReader): storage reader.
system_configurations (list[SystemConfiguration]): system configurations.
session_identifier (Optional[str]): session identifier, formatted as
a UUID.
"""
if self._output_format == "json":
self._output_writer.Write(', "system_configurations": {')
for configuration_index, configuration in enumerate(system_configurations):
if self._output_format == "json":
if configuration_index != 0:
self._output_writer.Write(", ")
json_string = (
json_serializer.JSONAttributeContainerSerializer.WriteSerialized(
configuration.system_configuration
)
)
self._output_writer.Write(f'"system_configuration": {json_string:s}')
elif self._output_format in ("markdown", "text"):
self._PrintSystemConfiguration(
storage_reader, configuration, session_identifier=session_identifier
)
if self._output_format == "json":
self._output_writer.Write("}")
def _PrintStorageInformation(self, storage_reader):
"""Prints information about the store.
Args:
storage_reader (StorageReader): storage reader.
"""
if self._output_format == "json":
self._output_writer.Write("{")
elif self._output_format in ("markdown", "text"):
self._PrintStorageOverviewAsTable(storage_reader)
section_written = False
if self._sections == "all" or "sessions" in self._sections:
self._PrintSessionsSection(storage_reader)
section_written = True
if self._sections == "all" or "sources" in self._sections:
if self._output_format == "json" and section_written:
self._output_writer.Write(", ")
self._PrintSourcesOverview(storage_reader)
section_written = True
if self._output_format == "json" and section_written:
self._output_writer.Write(", ")
storage_counters = self._CalculateStorageCounters(storage_reader)
if self._output_format == "json":
self._output_writer.Write('"storage_counters": {')
section_written = False
if self._sections == "all" or "events" in self._sections:
data_types = storage_counters.get("data_types", collections.Counter())
self._PrintDataTypesCounter(data_types)
section_written = True
if self._output_format == "json" and section_written:
self._output_writer.Write(", ")
section_written = False
parsers = storage_counters.get("parsers", collections.Counter())
self._PrintParsersCounter(parsers)
section_written = True
event_labels = storage_counters.get("event_labels", collections.Counter())
if self._output_format == "json" and section_written:
self._output_writer.Write(", ")
self._PrintEventLabelsCounter(event_labels)
if self._sections == "all" or "warnings" in self._sections:
if self._output_format == "json" and section_written:
self._output_writer.Write(", ")
self._PrintWarningsSection(storage_reader, storage_counters)
section_written = True
if self._sections == "all" or "reports" in self._sections:
if self._output_format == "json" and section_written:
self._output_writer.Write(", ")
analysis_reports = storage_counters.get(
"analysis_reports", collections.Counter()
)
self._PrintAnalysisReportSection(storage_reader, analysis_reports)
if self._output_format == "json":
self._output_writer.Write("}")
if self._output_format == "json":
self._output_writer.Write("}")
elif self._output_format in ("markdown", "text"):
self._output_writer.Write("\n")
def _PrintSourcesOverview(self, storage_reader):
"""Prints a sources overview.
Args:
storage_reader (StorageReader): storage reader.
"""
if self._output_format == "json":
self._output_writer.Write('"event_sources": {')
elif self._output_format in ("markdown", "text"):
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, title="Event sources", title_level=2
)
generator = storage_reader.GetAttributeContainers(
self._CONTAINER_TYPE_EVENT_SOURCE
)
number_of_event_sources = 0
for source_index, source in enumerate(generator):
if self._output_format == "json":
if source_index != 0:
self._output_writer.Write(", ")
json_string = (
json_serializer.JSONAttributeContainerSerializer.WriteSerialized(
source
)
)
self._output_writer.Write(f'"source": {json_string:s}')
elif self._output_format in ("markdown", "text"):
if self._verbose or "sources" in self._sections:
path_spec_string = self._GetPathSpecificationString(
source.path_spec
)
for path_index, line in enumerate(path_spec_string.split("\n")):
if not line:
continue
if path_index == 0:
table_view.AddRow([f"{source_index:d}", line])
else:
table_view.AddRow(["", line])
number_of_event_sources += 1
if self._output_format == "json":
self._output_writer.Write("}")
elif self._output_format in ("markdown", "text"):
if not (self._verbose or "sources" in self._sections):
table_view.AddRow(["Total", f"{number_of_event_sources:d}"])
table_view.Write(self._output_writer)
def _PrintStorageOverviewAsTable(self, storage_reader):
"""Prints a storage overview as a table.
Args:
storage_reader (StorageReader): storage reader.
"""
format_version = storage_reader.GetFormatVersion()
serialization_format = storage_reader.GetSerializationFormat()
table_view = views.ViewsFactory.GetTableView(
self._views_format_type, title="Plaso Storage Information", title_level=1
)
table_view.AddRow(["Filename", os.path.basename(self._storage_file_path)])
table_view.AddRow(["Format version", format_version])
table_view.AddRow(["Serialization format", serialization_format])
table_view.Write(self._output_writer)
def _PrintWarningCountersJSON(
self, warnings_by_path_spec, warnings_by_parser_chain
):
"""Prints JSON containing a summary of the number of warnings.
Args:
warnings_by_path_spec (collections.Counter): number of warnings per
path specification.
warnings_by_parser_chain (collections.Counter): number of warnings per
parser chain.
"""
json_string = json.dumps(warnings_by_parser_chain)
self._output_writer.Write(f'"warnings_by_parser": {json_string:s}')
json_string = json.dumps(warnings_by_path_spec)
self._output_writer.Write(f', "warnings_by_path_spec": {json_string:s}')
def _PrintWarningCountersTable(
self, description, warnings_by_path_spec, warnings_by_parser_chain
):
"""Prints a table containing a summary of the number of warnings.
Args:
description (str): description of the type of warning.
warnings_by_path_spec (collections.Counter): number of warnings per
path specification.
warnings_by_parser_chain (collections.Counter): number of warnings per
parser chain.
"""
if warnings_by_parser_chain:
title = description.title()
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Parser (plugin) name", "Number of warnings"],
title=f"{title:s} warnings generated per parser",
)
for parser_chain, count in warnings_by_parser_chain.items():
parser_chain = parser_chain or "<No parser>"
table_view.AddRow([parser_chain, f"{count:d}"])
table_view.Write(self._output_writer)
if warnings_by_path_spec:
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Number of warnings", "Pathspec"],
title=f"Path specifications with most {description:s} warnings",
)
for path_spec, count in warnings_by_path_spec.most_common(10):
for path_index, line in enumerate(path_spec.split("\n")):
if not line:
continue
if path_index == 0:
table_view.AddRow([f"{count:d}", line])
else:
table_view.AddRow(["", line])
table_view.Write(self._output_writer)
def _PrintWarningsSection(self, storage_reader, storage_counters):
"""Prints the warnings section.
Args:
storage_reader (StorageReader): storage reader.
storage_counters (dict[str, collections.Counter]): storage counters.
"""
has_extraction_warnings = storage_reader.HasAttributeContainers(
self._CONTAINER_TYPE_EXTRACTION_WARNING
)
has_preprocessing_warnings = storage_reader.HasAttributeContainers(
self._CONTAINER_TYPE_PREPROCESSING_WARNING
)
has_recovery_warnings = storage_reader.HasAttributeContainers(
self._CONTAINER_TYPE_RECOVERY_WARNING
)
if (
self._output_format == "text"
and not has_extraction_warnings
and not has_preprocessing_warnings
and not has_recovery_warnings
):
self._output_writer.Write("\nNo warnings stored.\n")
else:
warnings_by_path_spec = storage_counters.get(
"extraction_warnings_by_path_spec", collections.Counter()
)
warnings_by_parser_chain = storage_counters.get(
"extraction_warnings_by_parser_chain", collections.Counter()
)
if self._output_format == "json":
self._PrintWarningCountersJSON(
warnings_by_path_spec, warnings_by_parser_chain
)
elif self._output_format in ("markdown", "text"):
self._PrintWarningCountersTable(
"extraction", warnings_by_path_spec, warnings_by_parser_chain
)
if self._verbose or "warnings" in self._sections:
self._PrintWarningsDetails(
storage_reader,
self._CONTAINER_TYPE_EXTRACTION_WARNING,
"Extraction",
)
warnings_by_path_spec = storage_counters.get(
"preprocessing_warnings_by_path_spec", collections.Counter()
)
warnings_by_parser_chain = storage_counters.get(
"preprocessing_warnings_by_parser_chain", collections.Counter()
)
# TODO: print preprocessing warnings as part of JSON output format.
if self._output_format in ("markdown", "text"):
self._PrintWarningCountersTable(
"preprocessing", warnings_by_path_spec, warnings_by_parser_chain
)
if self._verbose or "warnings" in self._sections:
self._PrintPreprocessingWarningsDetails(storage_reader)
warnings_by_path_spec = storage_counters.get(
"recovery_warnings_by_path_spec", collections.Counter()
)
warnings_by_parser_chain = storage_counters.get(
"recovery_warnings_by_parser_chain", collections.Counter()
)
# TODO: print recovery warnings as part of JSON output format.
if self._output_format in ("markdown", "text"):
self._PrintWarningCountersTable(
"recovery", warnings_by_path_spec, warnings_by_parser_chain
)
if self._verbose or "warnings" in self._sections:
self._PrintWarningsDetails(
storage_reader,
self._CONTAINER_TYPE_RECOVERY_WARNING,
"Recovery",
)
warnings_by_path_spec = storage_counters.get(
"timelining_warnings_by_path_spec", collections.Counter()
)
warnings_by_parser_chain = storage_counters.get(
"timelining_warnings_by_parser_chain", collections.Counter()
)
# TODO: print timelining warnings as part of JSON output format.
if self._output_format in ("markdown", "text"):
self._PrintWarningCountersTable(
"timelining", warnings_by_path_spec, warnings_by_parser_chain
)
if self._verbose or "warnings" in self._sections:
self._PrintWarningsDetails(
storage_reader,
self._CONTAINER_TYPE_TIMELINING_WARNING,
"Timelining",
)
[docs]
def CompareStores(self):
"""Compares the contents of two stores.
Returns:
bool: True if the content of the stores is identical.
Raises:
BadConfigOption: if the storage file format is not supported.
"""
storage_reader = self._GetStorageReader(self._storage_file_path)
compare_storage_reader = self._GetStorageReader(self._compare_storage_file_path)
try:
result = self._CompareStores(storage_reader, compare_storage_reader)
finally:
compare_storage_reader.Close()
storage_reader.Close()
if result:
self._output_writer.Write("Storage files are identical.\n")
else:
self._output_writer.Write("Storage files are different.\n")
return result
[docs]
def GenerateReport(self):
"""Generates a report.
Raises:
BadConfigOption: if the storage file format is not supported.
"""
storage_reader = self._GetStorageReader(self._storage_file_path)
try:
if self._report_type == "browser_search":
column_titles = ["Search engine", "Search term", "Number of queries"]
attribute_names = ["search_engine", "search_term", "number_of_queries"]
attribute_mappings = {}
self._GenerateAnalysisResultsReport(
storage_reader,
"browser_searches",
column_titles,
"browser_search_analysis_result",
attribute_names,
attribute_mappings,
)
elif self._report_type == "chrome_extension":
column_titles = ["Username", "Extension identifier", "Extension"]
attribute_names = ["username", "extension_identifier", "extension"]
attribute_mappings = {}
self._GenerateAnalysisResultsReport(
storage_reader,
"chrome_extensions",
column_titles,
"chrome_extension_analysis_result",
attribute_names,
attribute_mappings,
)
elif self._report_type == "environment_variables":
column_titles = ["Name", "Value"]
attribute_names = ["name", "value"]
attribute_mappings = {}
self._GenerateAnalysisResultsReport(
storage_reader,
"environment_variables",
column_titles,
"environment_variable",
attribute_names,
attribute_mappings,
)
elif self._report_type == "file_hashes":
self._GenerateFileHashesReport(storage_reader)
elif self._report_type == "windows_services":
column_titles = ["Name", "Service type", "Start type", "Image path"]
attribute_names = ["name", "service_type", "start_type", "image_path"]
# TODO: consider using message formatting helpers?
attribute_mappings = {
"service_type": {
0x01: "Kernel device driver (0x01)",
0x02: "File system driver (0x02)",
0x04: "Adapter (0x04)",
0x10: "Stand-alone service (0x10)",
0x20: "Shared service (0x20)",
},
"start_type": {
0: "Boot (0)",
1: "System (1)",
2: "Automatic (2)",
3: "On demand (3)",
4: "Disabled (4)",
},
}
self._GenerateAnalysisResultsReport(
storage_reader,
"windows_services",
column_titles,
"windows_service_configuration",
attribute_names,
attribute_mappings,
)
elif self._report_type == "winevt_providers":
self._GenerateWinEvtProvidersReport(storage_reader)
finally:
storage_reader.Close()
[docs]
def ListReports(self):
"""Lists information about the available report types."""
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Description"],
title="Reports",
)
for name, description in sorted(self._REPORTS.items()):
table_view.AddRow([name, description])
table_view.Write(self._output_writer)
[docs]
def ListSections(self):
"""Lists information about the available sections."""
table_view = views.ViewsFactory.GetTableView(
self._views_format_type,
column_names=["Name", "Description"],
title="Sections",
)
for name, description in sorted(self._SECTIONS.items()):
table_view.AddRow([name, description])
table_view.Write(self._output_writer)
[docs]
def ParseArguments(self, arguments):
"""Parses the command line arguments.
Args:
arguments (list[str]): command line arguments.
Returns:
bool: True if the arguments were successfully parsed.
"""
loggers.ConfigureLogging()
argument_parser = argparse.ArgumentParser(
description=self.DESCRIPTION,
add_help=False,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
self.AddBasicOptions(argument_parser)
self.AddStorageOptions(argument_parser)
self.AddLogFileOptions(argument_parser)
if self._CanEnforceProcessMemoryLimit():
helpers_manager.ArgumentHelperManager.AddCommandLineArguments(
argument_parser, names=["process_resources"]
)
argument_parser.add_argument(
"--compare",
dest="compare_storage_file",
type=str,
action="store",
default="",
metavar="STORAGE_FILE",
help=("The path of the storage file to compare against."),
)
output_formats = ", ".join(sorted(self._SUPPORTED_OUTPUT_FORMATS))
argument_parser.add_argument(
"--output_format",
"--output-format",
dest="output_format",
type=str,
choices=self._SUPPORTED_OUTPUT_FORMATS,
action="store",
default=self._DEFAULT_OUTPUT_FORMAT,
metavar="FORMAT",
help=(
f"Format of the output, the default is: "
f"{self._DEFAULT_OUTPUT_FORMAT:s}. Supported options: "
f"{output_formats:s}."
),
)
hash_choices = ", ".join(self._HASH_CHOICES)
argument_parser.add_argument(
"--hash",
dest="hash",
choices=self._HASH_CHOICES,
action="store",
metavar="TYPE",
default=self._DEFAULT_HASH_TYPE,
help=(
f"Type of hash to output in file_hashes report. Supported options: "
f"{hash_choices:s}"
),
)
report_choices = ", ".join(self._REPORT_CHOICES)
argument_parser.add_argument(
"--report",
dest="report",
choices=self._REPORT_CHOICES,
action="store",
metavar="TYPE",
default=self._DEFAULT_REPORT_TYPE,
help=(
f"Report on specific information. Supported options: "
f"{report_choices:s}"
),
)
argument_parser.add_argument(
"--sections",
dest="sections",
type=str,
action="store",
default="all",
metavar="SECTIONS_LIST",
help=(
"List of sections to output. This is a comma separated list where "
'each entry is the name of a section. Use "--sections list" to '
'list the available sections and "--sections all" to show all '
"available sections."
),
)
argument_parser.add_argument(
"-v",
"--verbose",
dest="verbose",
action="store_true",
default=False,
help="Print verbose output.",
)
argument_parser.add_argument(
"-w", "--write", metavar="OUTPUTFILE", dest="write", help="Output filename."
)
try:
options = argument_parser.parse_args(arguments)
except UnicodeEncodeError:
# If we get here we are attempting to print help in a non-Unicode
# terminal.
self._output_writer.Write("\n")
self._output_writer.Write(argument_parser.format_help())
return False
try:
self.ParseOptions(options)
except errors.BadConfigOption as exception:
self._output_writer.Write(f"ERROR: {exception!s}\n")
self._output_writer.Write("\n")
self._output_writer.Write(argument_parser.format_usage())
return False
self._WaitUserWarning()
loggers.ConfigureLogging(
debug_output=self._debug_mode,
filename=self._log_file,
quiet_mode=self._quiet_mode,
)
return True
[docs]
def ParseOptions(self, options):
"""Parses the options.
Args:
options (argparse.Namespace): command line arguments.
Raises:
BadConfigOption: if the options are invalid.
"""
self._ParseInformationalOptions(options)
self._ParseLogFileOptions(options)
self._verbose = getattr(options, "verbose", False)
self._report_type = getattr(options, "report", self._DEFAULT_REPORT_TYPE)
self._sections = getattr(options, "sections", "")
self.list_reports = self._report_type == "list"
self.list_sections = self._sections == "list"
self.show_troubleshooting = getattr(options, "show_troubleshooting", False)
if self.list_reports or self.list_sections or self.show_troubleshooting:
return
if self._report_type != "none":
if self._report_type not in self._REPORTS:
raise errors.BadConfigOption(
f"Unsupported report type: {self._report_type:s}"
)
self.generate_report = True
if self._sections != "all":
self._sections = self._sections.split(",")
self._hash_type = getattr(options, "hash", self._DEFAULT_HASH_TYPE)
self._output_filename = getattr(options, "write", None)
helpers_manager.ArgumentHelperManager.ParseOptions(
options, self, names=["process_resources"]
)
# TODO: move check into _CheckStorageFile.
self._storage_file_path = self.ParseStringOption(options, "storage_file")
if not self._storage_file_path:
raise errors.BadConfigOption("Missing storage file option.")
if not os.path.isfile(self._storage_file_path):
raise errors.BadConfigOption(
f"No such storage file: {self._storage_file_path:s}"
)
compare_storage_file_path = self.ParseStringOption(
options, "compare_storage_file"
)
if compare_storage_file_path:
if not os.path.isfile(compare_storage_file_path):
raise errors.BadConfigOption(
f"No such storage file: {compare_storage_file_path:s}"
)
self._compare_storage_file_path = compare_storage_file_path
self.compare_storage_information = True
self._output_format = self.ParseStringOption(options, "output_format")
if self._output_filename:
if os.path.exists(self._output_filename):
raise errors.BadConfigOption(
f"Output file: {self._output_filename:s} already exists."
)
# pylint: disable=consider-using-with
output_file_object = open(self._output_filename, "wb")
self._output_writer = tools.FileObjectOutputWriter(output_file_object)
self._EnforceProcessMemoryLimit(self._process_memory_limit)
[docs]
def PrintStorageInformation(self):
"""Prints the storage information.
Raises:
BadConfigOption: if the storage file format is not supported.
"""
if self._output_format == "markdown":
self._views_format_type = views.ViewsFactory.FORMAT_TYPE_MARKDOWN
elif self._output_format == "text":
self._views_format_type = views.ViewsFactory.FORMAT_TYPE_CLI
storage_reader = self._GetStorageReader(self._storage_file_path)
try:
self._PrintStorageInformation(storage_reader)
finally:
storage_reader.Close()