Source code for plaso.cli.pinfo_tool

"""The pinfo CLI tool."""

import argparse
import collections
import json
import os
import uuid

from dfdatetime import posix_time as dfdatetime_posix_time

from plaso.cli import logger
from plaso.cli import tool_options
from plaso.cli import tools
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.containers import events
from plaso.containers import event_sources
from plaso.containers import reports
from plaso.containers import warnings
from plaso.engine import path_helper
from plaso.lib import errors
from plaso.lib import loggers
from plaso.serializer import json_serializer
from plaso.storage import factory as storage_factory


[docs] class PinfoTool(tools.CLITool, tool_options.StorageFileOptions): """Pinfo CLI tool. Attributes: compare_storage_information (bool): True if the tool is used to compare stores. generate_report (bool): True if a predefined report type should be generated. list_reports (bool): True if the report types should be listed. list_sections (bool): True if the section types should be listed. """ NAME = "pinfo" DESCRIPTION = ( "Shows information about a Plaso storage file, for example how it was " "collected, what information was extracted from a source, etc." ) _DEFAULT_HASH_TYPE = "sha256" _HASH_CHOICES = ("md5", "sha1", "sha256") _REPORTS = { "browser_search": ( "Report browser searches determined by the browser_search " "analysis plugin." ), "chrome_extension": ( "Report Chrome extensions determined by the chrome_extension " "analysis plugin." ), "environment_variables": ( "Report environment variables extracted during processing." ), "file_hashes": "Report file hashes calculated during processing.", "windows_services": ( "Report Windows services and drivers extracted during processing." ), "winevt_providers": ( "Report Windows EventLog providers extracted during processing." ), } _DEFAULT_REPORT_TYPE = "none" _REPORT_CHOICES = sorted(list(_REPORTS.keys()) + ["list", "none"]) _SECTIONS = { "events": "Show information about events.", "reports": "Show information about analysis reports.", "sessions": "Show information about sessions.", "sources": "Show information about event sources.", "warnings": "Show information about warnings during processing.", } _DEFAULT_OUTPUT_FORMAT = "text" _SUPPORTED_OUTPUT_FORMATS = ("json", "markdown", "text") _CONTAINER_TYPE_ANALYSIS_REPORT = reports.AnalysisReport.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_DATA_STREAM = events.EventDataStream.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_SOURCE = event_sources.EventSource.CONTAINER_TYPE _CONTAINER_TYPE_EXTRACTION_WARNING = warnings.ExtractionWarning.CONTAINER_TYPE _CONTAINER_TYPE_PREPROCESSING_WARNING = warnings.PreprocessingWarning.CONTAINER_TYPE _CONTAINER_TYPE_RECOVERY_WARNING = warnings.RecoveryWarning.CONTAINER_TYPE _CONTAINER_TYPE_TIMELINING_WARNING = warnings.TimeliningWarning.CONTAINER_TYPE
[docs] def __init__(self, input_reader=None, output_writer=None): """Initializes the CLI tool object. Args: input_reader (Optional[InputReader]): input reader, where None indicates that the stdin input reader should be used. output_writer (Optional[OutputWriter]): output writer, where None indicates that the stdout output writer should be used. """ super().__init__(input_reader=input_reader, output_writer=output_writer) self._compare_storage_file_path = None self._output_filename = None self._output_format = "text" self._hash_type = self._DEFAULT_HASH_TYPE self._process_memory_limit = None self._report_type = self._DEFAULT_REPORT_TYPE self._sections = None self._storage_file_path = None self._verbose = False self.compare_storage_information = False self.generate_report = False self.list_reports = False self.list_sections = False
def _CalculateStorageCounters(self, storage_reader): """Calculates the counters of the entire storage. Args: storage_reader (StorageReader): storage reader. Returns: dict[str, collections.Counter]: storage counters. """ # TODO: determine analysis report counter from actual stored analysis # reports or remove. analysis_reports_counter = collections.Counter() analysis_reports_counter_error = False data_types_counter = {} if storage_reader.HasAttributeContainers("data_type_count"): data_types_counter = { data_type_count.name: data_type_count.number_of_events for data_type_count in storage_reader.GetAttributeContainers( "data_type_count" ) } data_types_counter = collections.Counter(data_types_counter) data_types_counter_error = False event_labels_counter = {} if storage_reader.HasAttributeContainers("event_label_count"): event_labels_counter = { event_label_count.label: event_label_count.number_of_events for event_label_count in storage_reader.GetAttributeContainers( "event_label_count" ) } event_labels_counter = collections.Counter(event_labels_counter) event_labels_counter_error = False parsers_counter = {} if storage_reader.HasAttributeContainers("parser_count"): parsers_counter = { parser_count.name: parser_count.number_of_events for parser_count in storage_reader.GetAttributeContainers( "parser_count" ) } parsers_counter = collections.Counter(parsers_counter) parsers_counter_error = False storage_counters = {} extraction_warnings_by_path_spec = collections.Counter() extraction_warnings_by_parser_chain = collections.Counter() for warning in storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_EXTRACTION_WARNING ): path_spec_string = self._GetPathSpecificationString(warning.path_spec) extraction_warnings_by_path_spec[path_spec_string] += 1 extraction_warnings_by_parser_chain[warning.parser_chain] += 1 storage_counters["extraction_warnings_by_path_spec"] = ( extraction_warnings_by_path_spec ) storage_counters["extraction_warnings_by_parser_chain"] = ( extraction_warnings_by_parser_chain ) recovery_warnings_by_path_spec = collections.Counter() recovery_warnings_by_parser_chain = collections.Counter() for warning in storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_RECOVERY_WARNING ): path_spec_string = self._GetPathSpecificationString(warning.path_spec) recovery_warnings_by_path_spec[path_spec_string] += 1 recovery_warnings_by_parser_chain[warning.parser_chain] += 1 storage_counters["recovery_warnings_by_path_spec"] = ( recovery_warnings_by_path_spec ) storage_counters["recovery_warnings_by_parser_chain"] = ( recovery_warnings_by_parser_chain ) timelining_warnings_by_path_spec = collections.Counter() timelining_warnings_by_parser_chain = collections.Counter() if storage_reader.HasAttributeContainers( self._CONTAINER_TYPE_TIMELINING_WARNING ): for warning in storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_TIMELINING_WARNING ): path_spec_string = self._GetPathSpecificationString(warning.path_spec) timelining_warnings_by_path_spec[path_spec_string] += 1 timelining_warnings_by_parser_chain[warning.parser_chain] += 1 storage_counters["timelining_warnings_by_path_spec"] = ( timelining_warnings_by_path_spec ) storage_counters["timelining_warnings_by_parser_chain"] = ( timelining_warnings_by_parser_chain ) if not analysis_reports_counter_error: storage_counters["analysis_reports"] = analysis_reports_counter if not data_types_counter_error: storage_counters["data_types"] = data_types_counter if not event_labels_counter_error: storage_counters["event_labels"] = event_labels_counter if not parsers_counter_error: storage_counters["parsers"] = parsers_counter return storage_counters def _CheckStorageFile(self, storage_file_path, warn_about_existing=False): """Checks if the storage file path is valid. Args: storage_file_path (str): path of the storage file. warn_about_existing (bool): True if the user should be warned about the storage file already existing. Raises: BadConfigOption: if the storage file path is invalid. """ if os.path.exists(storage_file_path): if not os.path.isfile(storage_file_path): raise errors.BadConfigOption( f"Storage file: {storage_file_path:s} already exists and is not " f"a file." ) if warn_about_existing: logger.warning("Appending to an already existing storage file.") dirname = os.path.dirname(storage_file_path) if not dirname: dirname = "." # TODO: add a more thorough check to see if the storage file really is # a plaso storage file. if not os.access(dirname, os.W_OK): raise errors.BadConfigOption( f"Unable to write to storage file: {storage_file_path:s}" ) def _CompareCounter(self, counter, compare_counter): """Compares two counters. Args: counter (collections.Counter): counter. compare_counter (collections.Counter): counter to compare against. Returns: dict[str, tuple[int, int]]: mismatching results per key. """ keys = set(counter.keys()) keys.union(compare_counter.keys()) differences = {} for key in keys: value = counter.get(key, 0) compare_value = compare_counter.get(key, 0) if value != compare_value: differences[key] = (value, compare_value) return differences def _CompareStores(self, storage_reader, compare_storage_reader): """Compares the contents of two stores. Args: storage_reader (StorageReader): storage reader. compare_storage_reader (StorageReader): storage to compare against. Returns: bool: True if the content of the stores is identical. """ stores_are_identical = True storage_counters = self._CalculateStorageCounters(storage_reader) compare_storage_counters = self._CalculateStorageCounters( compare_storage_reader ) # Compare number of events by data type. data_types_counter = storage_counters.get("data_types", collections.Counter()) compare_data_types_counter = compare_storage_counters.get( "data_types", collections.Counter() ) differences = self._CompareCounter( data_types_counter, compare_data_types_counter ) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=["Data type name", "Number of events"], title="Events generated per data type", ) # Compare number of events. parsers_counter = storage_counters.get("parsers", collections.Counter()) compare_parsers_counter = compare_storage_counters.get( "parsers", collections.Counter() ) differences = self._CompareCounter(parsers_counter, compare_parsers_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=["Parser (plugin) name", "Number of events"], title="Events generated per parser", ) # Compare extraction warnings by parser chain. warnings_counter = storage_counters.get( "extraction_warnings_by_parser_chain", collections.Counter() ) compare_warnings_counter = compare_storage_counters.get( "extraction_warnings_by_parser_chain", collections.Counter() ) differences = self._CompareCounter(warnings_counter, compare_warnings_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=["Parser (plugin) name", "Number of warnings"], title="Extraction warnings generated per parser", ) # Compare extraction warnings by path specification warnings_counter = storage_counters.get( "extraction_warnings_by_path_spec", collections.Counter() ) compare_warnings_counter = compare_storage_counters.get( "extraction_warnings_by_path_spec", collections.Counter() ) differences = self._CompareCounter(warnings_counter, compare_warnings_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=["Number of warnings", "Pathspec"], reverse=True, title="Pathspecs with most extraction warnings", ) # Compare recovery warnings by parser chain. warnings_counter = storage_counters.get( "recovery_warnings_by_parser_chain", collections.Counter() ) compare_warnings_counter = compare_storage_counters.get( "recovery_warnings_by_parser_chain", collections.Counter() ) differences = self._CompareCounter(warnings_counter, compare_warnings_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=["Parser (plugin) name", "Number of warnings"], title="Recovery warnings generated per parser", ) # Compare recovery warnings by path specification warnings_counter = storage_counters.get( "recovery_warnings_by_path_spec", collections.Counter() ) compare_warnings_counter = compare_storage_counters.get( "recovery_warnings_by_path_spec", collections.Counter() ) differences = self._CompareCounter(warnings_counter, compare_warnings_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=["Number of warnings", "Pathspec"], reverse=True, title="Pathspecs with most recovery warnings", ) # Compare timelining warnings by parser chain. warnings_counter = storage_counters.get( "timelining_warnings_by_parser_chain", collections.Counter() ) compare_warnings_counter = compare_storage_counters.get( "timelining_warnings_by_parser_chain", collections.Counter() ) differences = self._CompareCounter(warnings_counter, compare_warnings_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=["Parser (plugin) name", "Number of warnings"], title="Timelining warnings generated per parser", ) # Compare timelining warnings by path specification. warnings_counter = storage_counters.get( "timelining_warnings_by_path_spec", collections.Counter() ) compare_warnings_counter = compare_storage_counters.get( "timelining_warnings_by_path_spec", collections.Counter() ) differences = self._CompareCounter(warnings_counter, compare_warnings_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=["Number of warnings", "Pathspec"], reverse=True, title="Pathspecs with most timelining warnings", ) # Compare event labels. labels_counter = storage_counters.get("event_labels", collections.Counter()) compare_labels_counter = compare_storage_counters.get( "event_labels", collections.Counter() ) differences = self._CompareCounter(labels_counter, compare_labels_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=["Label", "Number of event tags"], title="Event tags generated per label", ) # Compare analysis reports. reports_counter = storage_counters.get( "analysis_reports", collections.Counter() ) compare_reports_counter = compare_storage_counters.get( "analysis_reports", collections.Counter() ) differences = self._CompareCounter(reports_counter, compare_reports_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=["Plugin name", "Number of reports"], title="Reports generated per plugin", ) return stores_are_identical def _GenerateAnalysisResultsReport( self, storage_reader, json_base_type, column_titles, container_type, attribute_names, attribute_mappings, ): """Generates an analysis results report. Args: storage_reader (StorageReader): storage reader. json_base_type (str): JSON base type. column_titles (list[str]): column titles of the Markdown and tab separated tables. container_type (str): attribute container type. attribute_names (list[str]): names of the attributes to report. attribute_mappings (dict[str, dict[st, str]]): mappings of attribute values to human readable strings. """ self._GenerateReportHeader(json_base_type, column_titles) entry_format_string = self._GenerateReportEntryFormatString(attribute_names) if storage_reader.HasAttributeContainers(container_type): generator = storage_reader.GetAttributeContainers(container_type) for artifact_index, analysis_result in enumerate(generator): if self._output_format == "json": if artifact_index > 0: self._output_writer.Write(",\n") attribute_values = analysis_result.CopyToDict() for key in attribute_names: value = attribute_values.get(key) if value is None: value = "" elif isinstance(value, int): value = attribute_mappings.get(key, {}).get(value, value) elif self._output_format == "json": value = value.replace("\\", "\\\\") attribute_values[key] = value self._output_writer.Write( entry_format_string.format(**attribute_values) ) self._GenerateReportFooter() def _GenerateFileHashesReport(self, storage_reader): """Generates a file hashes report. Args: storage_reader (StorageReader): storage reader. """ hash_type = self._hash_type.upper() self._GenerateReportHeader( "file_hashes", [f"{hash_type:s} hash", "Display name"] ) hash_attribute_name = f"{self._hash_type:s}_hash" entry_format_string = self._GenerateReportEntryFormatString( [hash_attribute_name, "display_name"] ) generator = storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_EVENT_DATA_STREAM ) for event_data_stream_index, event_data_stream in enumerate(generator): if self._output_format == "json": if event_data_stream_index > 0: self._output_writer.Write(",\n") hash_value = getattr(event_data_stream, hash_attribute_name, None) or "N/A" display_name = "N/A" if event_data_stream.path_spec: display_name = path_helper.PathHelper.GetDisplayNameForPathSpec( event_data_stream.path_spec ) attribute_values = { "display_name": display_name, hash_attribute_name: hash_value, } self._output_writer.Write(entry_format_string.format(**attribute_values)) self._GenerateReportFooter() def _GenerateReportEntryFormatString(self, attribute_names): """Generates a report entry format string. Args: attribute_names (list[str]): names of the attributes to report. Returns: str: entry format string. """ if self._output_format == "json": attributes_string = ", ".join( [f'"{name:s}": "{{{name:s}!s}}"' for name in attribute_names] ) return f" {{{{{attributes_string:s}}}}}" if self._output_format == "markdown": attributes_string = " | ".join( [f"{{{name:s}!s}}" for name in attribute_names] ) return f"{attributes_string:s}\n" if self._output_format == "text": attributes_string = "\t".join( [f"{{{name:s}!s}}" for name in attribute_names] ) return f"{attributes_string:s}\n" return "" def _GenerateReportFooter(self): """Generates a report footer.""" if self._output_format == "json": self._output_writer.Write("\n]}\n") def _GenerateReportHeader(self, json_base_type, column_titles): """Generates a report header. Args: json_base_type (str): JSON base type. column_titles (list[str]): column titles of the Markdown and tab separated tables. """ if self._output_format == "json": self._output_writer.Write(f'{{"{json_base_type:s}": [\n') elif self._output_format == "markdown": titles_string = " | ".join(column_titles) self._output_writer.Write(f"{titles_string:s}\n") separators_string = " | ".join(["---"] * len(column_titles)) self._output_writer.Write(f"{separators_string:s}\n") elif self._output_format == "text": titles_string = "\t".join(column_titles) self._output_writer.Write(f"{titles_string:s}\n") def _GenerateWinEvtProvidersReport(self, storage_reader): """Generates a Windows Event Log providers report. Args: storage_reader (StorageReader): storage reader. """ column_titles = [ "Identifier", "Log source(s)", "Log type(s)", "Event message file(s)", "Parameter message file(s)", ] self._GenerateReportHeader("winevt_providers", column_titles) attribute_names = [ "identifier", "log_sources", "log_types", "event_message_files", "parameter_message_files", ] entry_format_string = self._GenerateReportEntryFormatString(attribute_names) if storage_reader.HasAttributeContainers("windows_eventlog_provider"): generator = storage_reader.GetAttributeContainers( "windows_eventlog_provider" ) for artifact_index, artifact in enumerate(generator): if self._output_format == "json": if artifact_index > 0: self._output_writer.Write(",\n") attribute_values = { "identifier": artifact.identifier or "", "event_message_files": artifact.event_message_files or [], "log_sources": artifact.log_sources or [], "log_types": artifact.log_types or [], "parameter_message_files": artifact.parameter_message_files or [], } self._output_writer.Write( entry_format_string.format(**attribute_values) ) self._GenerateReportFooter() def _GetStorageReader(self, path): """Retrieves a storage reader. Args: path (str): path of the storage file. Returns: StorageReader: storage reader or None if the storage file format is not supported. Raises: BadConfigOption: if the storage file format is not supported. """ storage_reader = storage_factory.StorageFactory.CreateStorageReaderForFile(path) if not storage_reader: raise errors.BadConfigOption( f"Format of storage file: {path:s} not supported." ) return storage_reader def _PrintAnalysisReportCounter( self, analysis_reports_counter, session_identifier=None ): """Prints the analysis reports counter. Args: analysis_reports_counter (collections.Counter): number of analysis reports per analysis plugin. session_identifier (Optional[str]): session identifier, formatted as a UUID. """ if self._output_format == "json": json_string = json.dumps(analysis_reports_counter) self._output_writer.Write(f'"analysis_reports": {json_string:s}') elif self._output_format in ("markdown", "text") and analysis_reports_counter: title = "Reports generated per plugin" if session_identifier: title = ": ".join([title, session_identifier]) table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Plugin name", "Number of reports"], title=title, ) for key, value in sorted(analysis_reports_counter.items()): if key != "total": table_view.AddRow([key, value]) try: total = analysis_reports_counter["total"] except KeyError: total = "N/A" table_view.AddRow(["Total", total]) table_view.Write(self._output_writer) def _PrintAnalysisReportsDetails(self, storage_reader): """Prints the details of the analysis reports. Args: storage_reader (StorageReader): storage reader. """ has_analysis_reports = storage_reader.HasAttributeContainers( self._CONTAINER_TYPE_ANALYSIS_REPORT ) if self._output_format == "text" and not has_analysis_reports: self._output_writer.Write("\nNo analysis reports stored.\n") else: generator = storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_ANALYSIS_REPORT ) for index, analysis_report in enumerate(generator): date_time_string = None if analysis_report.time_compiled is not None: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=analysis_report.time_compiled ) date_time_string = date_time.CopyToDateTimeStringISO8601() table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=f"Analysis report: {index:d}" ) table_view.AddRow(["Name plugin", analysis_report.plugin_name or "N/A"]) table_view.AddRow(["Date and time", date_time_string or "N/A"]) table_view.AddRow( ["Event filter", analysis_report.event_filter or "N/A"] ) if not analysis_report.analysis_counter: table_view.AddRow(["Text", analysis_report.text or ""]) else: table_view.AddRow(["Results", ""]) for key, value in sorted(analysis_report.analysis_counter.items()): table_view.AddRow([key, value]) table_view.Write(self._output_writer) def _PrintAnalysisReportSection(self, storage_reader, analysis_reports_counter): """Prints the analysis reports section. Args: storage_reader (StorageReader): storage reader. analysis_reports_counter (collections.Counter): number of analysis reports per analysis plugin. """ self._PrintAnalysisReportCounter(analysis_reports_counter) if self._output_format in ("markdown", "text"): self._PrintAnalysisReportsDetails(storage_reader) def _PrintCounterDifferences( self, differences, column_names=None, reverse=False, title=None ): """Prints the counter differences. Args: differences (dict[str, tuple[int, int]]): mismatching results per key. column_names (Optional[list[str]]): column names. reverse (Optional[bool]): True if the key and values of differences should be printed in reverse order. title (Optional[str]): title. """ # TODO: add support for 3 column table? table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=column_names, title=title ) for key, value in sorted(differences.items()): value_string = f"{value[0]:d} ({value[1]:d})" if reverse: table_view.AddRow([value_string, key]) else: table_view.AddRow([key, value_string]) table_view.Write(self._output_writer) self._output_writer.Write("\n") def _PrintDataTypesCounter(self, data_types_counter, session_identifier=None): """Prints the data types counter. Args: data_types_counter (collections.Counter): number of events per data type. session_identifier (Optional[str]): session identifier, formatted as a UUID. """ if self._output_format == "json": if session_identifier: self._output_writer.Write(", ") json_string = json.dumps(data_types_counter) self._output_writer.Write(f'"data_types": {json_string:s}') elif self._output_format in ("markdown", "text"): if self._output_format == "text" and not data_types_counter: if not session_identifier: self._output_writer.Write("\nNo events stored.\n") else: title = "Events generated per data type" if session_identifier: title = f"{title:s}: {session_identifier:s}" title_level = 4 else: title_level = 2 if not data_types_counter: if not session_identifier: heading_start = "#" * title_level self._output_writer.Write( f"{heading_start:s} {title:s}\n\nN/A\n\n" ) else: table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Data type name", "Number of events"], title=title, title_level=title_level, ) for key, value in sorted(data_types_counter.items()): if key != "total": table_view.AddRow([key, value]) table_view.AddRow(["Total", data_types_counter["total"]]) table_view.Write(self._output_writer) def _PrintEventLabelsCounter(self, event_labels_counter, session_identifier=None): """Prints the event labels counter. Args: event_labels_counter (collections.Counter): number of event tags per label. session_identifier (Optional[str]): session identifier, formatted as a UUID. """ if self._output_format == "json": json_string = json.dumps(event_labels_counter) self._output_writer.Write(f'"event_labels": {json_string:s}') elif self._output_format in ("markdown", "text"): if self._output_format == "text" and not event_labels_counter: if not session_identifier: self._output_writer.Write("\nNo events labels stored.\n") else: title = "Event tags generated per label" if session_identifier: title = ": ".join([title, session_identifier]) title_level = 4 else: title_level = 2 if not event_labels_counter: if not session_identifier: heading_marker = "#" * title_level self._output_writer.Write( f"{heading_marker:s} {title:s}\n\nN/A\n\n" ) else: table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Label", "Number of event tags"], title=title, title_level=title_level, ) for key, value in sorted(event_labels_counter.items()): if key != "total": table_view.AddRow([key, value]) try: total = event_labels_counter["total"] except KeyError: total = "N/A" table_view.AddRow(["Total", total]) table_view.Write(self._output_writer) def _PrintParsersCounter(self, parsers_counter, session_identifier=None): """Prints the parsers counter. Args: parsers_counter (collections.Counter): number of events per parser or parser plugin. session_identifier (Optional[str]): session identifier, formatted as a UUID. """ if self._output_format == "json": if session_identifier: self._output_writer.Write(", ") json_string = json.dumps(parsers_counter) self._output_writer.Write(f'"parsers": {json_string:s}') elif self._output_format in ("markdown", "text"): if self._output_format == "text" and not parsers_counter: if not session_identifier: self._output_writer.Write("\nNo events stored.\n") else: title = "Events generated per parser" if session_identifier: title = ": ".join([title, session_identifier]) title_level = 4 else: title_level = 2 if not parsers_counter: if not session_identifier: heading_marker = "#" * title_level self._output_writer.Write( f"{heading_marker:s} {title:s}\n\nN/A\n\n" ) else: table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Parser (plugin) name", "Number of events"], title=title, title_level=title_level, ) for key, value in sorted(parsers_counter.items()): if key != "total": table_view.AddRow([key, value]) table_view.AddRow(["Total", parsers_counter["total"]]) table_view.Write(self._output_writer) def _PrintPreprocessingWarningsDetails(self, storage_reader): """Prints the details of the preprocessing warnings. Args: storage_reader (StorageReader): storage reader. """ generator = storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_PREPROCESSING_WARNING ) for index, warning in enumerate(generator): table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=f"Preprocessing warning: {index:d}" ) table_view.AddRow(["Message", warning.message]) table_view.AddRow(["Plugin name", warning.plugin_name]) if warning.path_spec: # TODO: add helper method to format path spec as row. path_spec_string = self._GetPathSpecificationString(warning.path_spec) for path_index, line in enumerate(path_spec_string.split("\n")): if not line: continue if path_index == 0: table_view.AddRow(["Path specification", line]) else: table_view.AddRow(["", line]) table_view.Write(self._output_writer) def _PrintWarningsDetails(self, storage_reader, container_type, warning_type): """Prints the details of warnings. Args: storage_reader (StorageReader): storage reader. container_type (str): attribute container type. warning_type (str): warning type. """ generator = storage_reader.GetAttributeContainers(container_type) for index, warning in enumerate(generator): table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=f"{warning_type:s} warning: {index:d}" ) table_view.AddRow(["Message", warning.message]) table_view.AddRow(["Parser chain", warning.parser_chain]) path_spec_string = self._GetPathSpecificationString(warning.path_spec) for path_index, line in enumerate(path_spec_string.split("\n")): if not line: continue if path_index == 0: table_view.AddRow(["Path specification", line]) else: table_view.AddRow(["", line]) table_view.Write(self._output_writer) def _PrintSessionDetailsAsJSON(self, session): """Prints the details of a session as JSON. Args: session (Session): session. """ json_string = json_serializer.JSONAttributeContainerSerializer.WriteSerialized( session ) self._output_writer.Write(f'"session": {json_string:s}') def _PrintSessionDetailsAsTable(self, session, session_identifier): """Prints the details of a session as a table. Args: session (Session): session. session_identifier (str): session identifier, formatted as a UUID. """ start_time = "N/A" if session.start_time is not None: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=session.start_time ) start_time = date_time.CopyToDateTimeStringISO8601() completion_time = "N/A" if session.completion_time is not None: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=session.completion_time ) completion_time = date_time.CopyToDateTimeStringISO8601() enabled_parser_names = "N/A" if session.enabled_parser_names: enabled_parser_names = ", ".join(sorted(session.enabled_parser_names)) command_line_arguments = session.command_line_arguments or "N/A" parser_filter_expression = session.parser_filter_expression or "N/A" preferred_encoding = session.preferred_encoding or "N/A" preferred_time_zone = session.preferred_time_zone or "N/A" if session.artifact_filters: artifact_filters_string = ", ".join(session.artifact_filters) else: artifact_filters_string = "N/A" filter_file = session.filter_file or "N/A" table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=f"Session: {session_identifier:s}" ) table_view.AddRow(["Start time", start_time]) table_view.AddRow(["Completion time", completion_time]) table_view.AddRow(["Product name", session.product_name]) table_view.AddRow(["Product version", session.product_version]) table_view.AddRow(["Command line arguments", command_line_arguments]) table_view.AddRow(["Parser filter expression", parser_filter_expression]) table_view.AddRow(["Enabled parser and plugins", enabled_parser_names]) table_view.AddRow(["Preferred encoding", preferred_encoding]) table_view.AddRow(["Preferred time zone", preferred_time_zone]) table_view.AddRow(["Debug mode", session.debug_mode]) table_view.AddRow(["Artifact filters", artifact_filters_string]) table_view.AddRow(["Filter file", filter_file]) table_view.Write(self._output_writer) def _PrintSessionsDetails(self, storage_reader): """Prints the details of the sessions. Args: storage_reader (BaseStore): storage. """ if self._output_format == "json": self._output_writer.Write('"sessions": {') for session_index, session in enumerate(storage_reader.GetSessions()): session_identifier = str(uuid.UUID(hex=session.identifier)) if self._output_format == "json": if session_index != 0: self._output_writer.Write(", ") self._PrintSessionDetailsAsJSON(session) elif self._output_format in ("markdown", "text"): self._PrintSessionDetailsAsTable(session, session_identifier) if self._verbose: system_configuration = storage_reader.GetAttributeContainerByIndex( "system_configuration", session_index ) if system_configuration: self._PrintSystemConfigurations( storage_reader, [system_configuration], session_identifier=session_identifier, ) if self._output_format == "json": self._output_writer.Write("}") def _PrintSessionsOverview(self, storage_reader): """Prints a sessions overview. Args: storage_reader (StorageReader): storage reader. """ table_view = views.ViewsFactory.GetTableView( self._views_format_type, title="Sessions", title_level=2 ) for session in storage_reader.GetSessions(): date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=session.start_time ) start_time = date_time.CopyToDateTimeStringISO8601() session_identifier = str(uuid.UUID(hex=session.identifier)) table_view.AddRow([session_identifier, start_time]) table_view.Write(self._output_writer) def _PrintSessionsSection(self, storage_reader): """Prints the session section. Args: storage_reader (StorageReader): storage reader. """ if self._output_format in ("markdown", "text"): self._PrintSessionsOverview(storage_reader) if ( self._output_format == "json" or self._verbose or "sessions" in self._sections ): self._PrintSessionsDetails(storage_reader) def _PrintSystemConfiguration( self, storage_reader, system_configuration, session_identifier=None ): """Prints the details of a system configuration. Args: storage_reader (StorageReader): storage reader. system_configuration (SystemConfiguration): system configuration. session_identifier (Optional[str]): session identifier, formatted as a UUID. """ if not system_configuration: return title = "System configuration" if session_identifier: title = ": ".join([title, session_identifier]) table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=title ) hostname = "N/A" if system_configuration.hostname: hostname = system_configuration.hostname.name operating_system = system_configuration.operating_system or "N/A" operating_system_product = ( system_configuration.operating_system_product or "N/A" ) operating_system_version = ( system_configuration.operating_system_version or "N/A" ) language = system_configuration.language or "N/A" code_page = system_configuration.code_page or "N/A" keyboard_layout = system_configuration.keyboard_layout or "N/A" time_zone = system_configuration.time_zone or "N/A" table_view.AddRow(["Hostname", hostname]) table_view.AddRow(["Operating system", operating_system]) table_view.AddRow(["Operating system product", operating_system_product]) table_view.AddRow(["Operating system version", operating_system_version]) table_view.AddRow(["Language", language]) table_view.AddRow(["Code page", code_page]) table_view.AddRow(["Keyboard layout", keyboard_layout]) table_view.AddRow(["Time zone", time_zone]) table_view.Write(self._output_writer) title = "Available time zones" if session_identifier: title = ": ".join([title, session_identifier]) table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Name", "Offset from UTC"], title=title, ) # TODO: filter time zones on specific system configuration. for time_zone in storage_reader.GetAttributeContainers("time_zone"): hours_from_utc, minutes_from_utc = divmod(time_zone.offset, 60) if hours_from_utc < 0: sign = "+" hours_from_utc *= -1 else: sign = "-" time_zone_offset = f"{sign:s}{hours_from_utc:02d}:{minutes_from_utc:02d}" table_view.AddRow([time_zone.name, time_zone_offset]) table_view.Write(self._output_writer) title = "User accounts" if session_identifier: title = ": ".join([title, session_identifier]) table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Username", "User directory"], title=title, ) # TODO: filter user accounts on specific system configuration. for user_account in storage_reader.GetAttributeContainers("user_account"): table_view.AddRow([user_account.username, user_account.user_directory]) table_view.Write(self._output_writer) def _PrintSystemConfigurations( self, storage_reader, system_configurations, session_identifier=None ): """Prints the details of system configurations. Args: storage_reader (StorageReader): storage reader. system_configurations (list[SystemConfiguration]): system configurations. session_identifier (Optional[str]): session identifier, formatted as a UUID. """ if self._output_format == "json": self._output_writer.Write(', "system_configurations": {') for configuration_index, configuration in enumerate(system_configurations): if self._output_format == "json": if configuration_index != 0: self._output_writer.Write(", ") json_string = ( json_serializer.JSONAttributeContainerSerializer.WriteSerialized( configuration.system_configuration ) ) self._output_writer.Write(f'"system_configuration": {json_string:s}') elif self._output_format in ("markdown", "text"): self._PrintSystemConfiguration( storage_reader, configuration, session_identifier=session_identifier ) if self._output_format == "json": self._output_writer.Write("}") def _PrintStorageInformation(self, storage_reader): """Prints information about the store. Args: storage_reader (StorageReader): storage reader. """ if self._output_format == "json": self._output_writer.Write("{") elif self._output_format in ("markdown", "text"): self._PrintStorageOverviewAsTable(storage_reader) section_written = False if self._sections == "all" or "sessions" in self._sections: self._PrintSessionsSection(storage_reader) section_written = True if self._sections == "all" or "sources" in self._sections: if self._output_format == "json" and section_written: self._output_writer.Write(", ") self._PrintSourcesOverview(storage_reader) section_written = True if self._output_format == "json" and section_written: self._output_writer.Write(", ") storage_counters = self._CalculateStorageCounters(storage_reader) if self._output_format == "json": self._output_writer.Write('"storage_counters": {') section_written = False if self._sections == "all" or "events" in self._sections: data_types = storage_counters.get("data_types", collections.Counter()) self._PrintDataTypesCounter(data_types) section_written = True if self._output_format == "json" and section_written: self._output_writer.Write(", ") section_written = False parsers = storage_counters.get("parsers", collections.Counter()) self._PrintParsersCounter(parsers) section_written = True event_labels = storage_counters.get("event_labels", collections.Counter()) if self._output_format == "json" and section_written: self._output_writer.Write(", ") self._PrintEventLabelsCounter(event_labels) if self._sections == "all" or "warnings" in self._sections: if self._output_format == "json" and section_written: self._output_writer.Write(", ") self._PrintWarningsSection(storage_reader, storage_counters) section_written = True if self._sections == "all" or "reports" in self._sections: if self._output_format == "json" and section_written: self._output_writer.Write(", ") analysis_reports = storage_counters.get( "analysis_reports", collections.Counter() ) self._PrintAnalysisReportSection(storage_reader, analysis_reports) if self._output_format == "json": self._output_writer.Write("}") if self._output_format == "json": self._output_writer.Write("}") elif self._output_format in ("markdown", "text"): self._output_writer.Write("\n") def _PrintSourcesOverview(self, storage_reader): """Prints a sources overview. Args: storage_reader (StorageReader): storage reader. """ if self._output_format == "json": self._output_writer.Write('"event_sources": {') elif self._output_format in ("markdown", "text"): table_view = views.ViewsFactory.GetTableView( self._views_format_type, title="Event sources", title_level=2 ) generator = storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_EVENT_SOURCE ) number_of_event_sources = 0 for source_index, source in enumerate(generator): if self._output_format == "json": if source_index != 0: self._output_writer.Write(", ") json_string = ( json_serializer.JSONAttributeContainerSerializer.WriteSerialized( source ) ) self._output_writer.Write(f'"source": {json_string:s}') elif self._output_format in ("markdown", "text"): if self._verbose or "sources" in self._sections: path_spec_string = self._GetPathSpecificationString( source.path_spec ) for path_index, line in enumerate(path_spec_string.split("\n")): if not line: continue if path_index == 0: table_view.AddRow([f"{source_index:d}", line]) else: table_view.AddRow(["", line]) number_of_event_sources += 1 if self._output_format == "json": self._output_writer.Write("}") elif self._output_format in ("markdown", "text"): if not (self._verbose or "sources" in self._sections): table_view.AddRow(["Total", f"{number_of_event_sources:d}"]) table_view.Write(self._output_writer) def _PrintStorageOverviewAsTable(self, storage_reader): """Prints a storage overview as a table. Args: storage_reader (StorageReader): storage reader. """ format_version = storage_reader.GetFormatVersion() serialization_format = storage_reader.GetSerializationFormat() table_view = views.ViewsFactory.GetTableView( self._views_format_type, title="Plaso Storage Information", title_level=1 ) table_view.AddRow(["Filename", os.path.basename(self._storage_file_path)]) table_view.AddRow(["Format version", format_version]) table_view.AddRow(["Serialization format", serialization_format]) table_view.Write(self._output_writer) def _PrintWarningCountersJSON( self, warnings_by_path_spec, warnings_by_parser_chain ): """Prints JSON containing a summary of the number of warnings. Args: warnings_by_path_spec (collections.Counter): number of warnings per path specification. warnings_by_parser_chain (collections.Counter): number of warnings per parser chain. """ json_string = json.dumps(warnings_by_parser_chain) self._output_writer.Write(f'"warnings_by_parser": {json_string:s}') json_string = json.dumps(warnings_by_path_spec) self._output_writer.Write(f', "warnings_by_path_spec": {json_string:s}') def _PrintWarningCountersTable( self, description, warnings_by_path_spec, warnings_by_parser_chain ): """Prints a table containing a summary of the number of warnings. Args: description (str): description of the type of warning. warnings_by_path_spec (collections.Counter): number of warnings per path specification. warnings_by_parser_chain (collections.Counter): number of warnings per parser chain. """ if warnings_by_parser_chain: title = description.title() table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Parser (plugin) name", "Number of warnings"], title=f"{title:s} warnings generated per parser", ) for parser_chain, count in warnings_by_parser_chain.items(): parser_chain = parser_chain or "<No parser>" table_view.AddRow([parser_chain, f"{count:d}"]) table_view.Write(self._output_writer) if warnings_by_path_spec: table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Number of warnings", "Pathspec"], title=f"Path specifications with most {description:s} warnings", ) for path_spec, count in warnings_by_path_spec.most_common(10): for path_index, line in enumerate(path_spec.split("\n")): if not line: continue if path_index == 0: table_view.AddRow([f"{count:d}", line]) else: table_view.AddRow(["", line]) table_view.Write(self._output_writer) def _PrintWarningsSection(self, storage_reader, storage_counters): """Prints the warnings section. Args: storage_reader (StorageReader): storage reader. storage_counters (dict[str, collections.Counter]): storage counters. """ has_extraction_warnings = storage_reader.HasAttributeContainers( self._CONTAINER_TYPE_EXTRACTION_WARNING ) has_preprocessing_warnings = storage_reader.HasAttributeContainers( self._CONTAINER_TYPE_PREPROCESSING_WARNING ) has_recovery_warnings = storage_reader.HasAttributeContainers( self._CONTAINER_TYPE_RECOVERY_WARNING ) if ( self._output_format == "text" and not has_extraction_warnings and not has_preprocessing_warnings and not has_recovery_warnings ): self._output_writer.Write("\nNo warnings stored.\n") else: warnings_by_path_spec = storage_counters.get( "extraction_warnings_by_path_spec", collections.Counter() ) warnings_by_parser_chain = storage_counters.get( "extraction_warnings_by_parser_chain", collections.Counter() ) if self._output_format == "json": self._PrintWarningCountersJSON( warnings_by_path_spec, warnings_by_parser_chain ) elif self._output_format in ("markdown", "text"): self._PrintWarningCountersTable( "extraction", warnings_by_path_spec, warnings_by_parser_chain ) if self._verbose or "warnings" in self._sections: self._PrintWarningsDetails( storage_reader, self._CONTAINER_TYPE_EXTRACTION_WARNING, "Extraction", ) warnings_by_path_spec = storage_counters.get( "preprocessing_warnings_by_path_spec", collections.Counter() ) warnings_by_parser_chain = storage_counters.get( "preprocessing_warnings_by_parser_chain", collections.Counter() ) # TODO: print preprocessing warnings as part of JSON output format. if self._output_format in ("markdown", "text"): self._PrintWarningCountersTable( "preprocessing", warnings_by_path_spec, warnings_by_parser_chain ) if self._verbose or "warnings" in self._sections: self._PrintPreprocessingWarningsDetails(storage_reader) warnings_by_path_spec = storage_counters.get( "recovery_warnings_by_path_spec", collections.Counter() ) warnings_by_parser_chain = storage_counters.get( "recovery_warnings_by_parser_chain", collections.Counter() ) # TODO: print recovery warnings as part of JSON output format. if self._output_format in ("markdown", "text"): self._PrintWarningCountersTable( "recovery", warnings_by_path_spec, warnings_by_parser_chain ) if self._verbose or "warnings" in self._sections: self._PrintWarningsDetails( storage_reader, self._CONTAINER_TYPE_RECOVERY_WARNING, "Recovery", ) warnings_by_path_spec = storage_counters.get( "timelining_warnings_by_path_spec", collections.Counter() ) warnings_by_parser_chain = storage_counters.get( "timelining_warnings_by_parser_chain", collections.Counter() ) # TODO: print timelining warnings as part of JSON output format. if self._output_format in ("markdown", "text"): self._PrintWarningCountersTable( "timelining", warnings_by_path_spec, warnings_by_parser_chain ) if self._verbose or "warnings" in self._sections: self._PrintWarningsDetails( storage_reader, self._CONTAINER_TYPE_TIMELINING_WARNING, "Timelining", )
[docs] def CompareStores(self): """Compares the contents of two stores. Returns: bool: True if the content of the stores is identical. Raises: BadConfigOption: if the storage file format is not supported. """ storage_reader = self._GetStorageReader(self._storage_file_path) compare_storage_reader = self._GetStorageReader(self._compare_storage_file_path) try: result = self._CompareStores(storage_reader, compare_storage_reader) finally: compare_storage_reader.Close() storage_reader.Close() if result: self._output_writer.Write("Storage files are identical.\n") else: self._output_writer.Write("Storage files are different.\n") return result
[docs] def GenerateReport(self): """Generates a report. Raises: BadConfigOption: if the storage file format is not supported. """ storage_reader = self._GetStorageReader(self._storage_file_path) try: if self._report_type == "browser_search": column_titles = ["Search engine", "Search term", "Number of queries"] attribute_names = ["search_engine", "search_term", "number_of_queries"] attribute_mappings = {} self._GenerateAnalysisResultsReport( storage_reader, "browser_searches", column_titles, "browser_search_analysis_result", attribute_names, attribute_mappings, ) elif self._report_type == "chrome_extension": column_titles = ["Username", "Extension identifier", "Extension"] attribute_names = ["username", "extension_identifier", "extension"] attribute_mappings = {} self._GenerateAnalysisResultsReport( storage_reader, "chrome_extensions", column_titles, "chrome_extension_analysis_result", attribute_names, attribute_mappings, ) elif self._report_type == "environment_variables": column_titles = ["Name", "Value"] attribute_names = ["name", "value"] attribute_mappings = {} self._GenerateAnalysisResultsReport( storage_reader, "environment_variables", column_titles, "environment_variable", attribute_names, attribute_mappings, ) elif self._report_type == "file_hashes": self._GenerateFileHashesReport(storage_reader) elif self._report_type == "windows_services": column_titles = ["Name", "Service type", "Start type", "Image path"] attribute_names = ["name", "service_type", "start_type", "image_path"] # TODO: consider using message formatting helpers? attribute_mappings = { "service_type": { 0x01: "Kernel device driver (0x01)", 0x02: "File system driver (0x02)", 0x04: "Adapter (0x04)", 0x10: "Stand-alone service (0x10)", 0x20: "Shared service (0x20)", }, "start_type": { 0: "Boot (0)", 1: "System (1)", 2: "Automatic (2)", 3: "On demand (3)", 4: "Disabled (4)", }, } self._GenerateAnalysisResultsReport( storage_reader, "windows_services", column_titles, "windows_service_configuration", attribute_names, attribute_mappings, ) elif self._report_type == "winevt_providers": self._GenerateWinEvtProvidersReport(storage_reader) finally: storage_reader.Close()
[docs] def ListReports(self): """Lists information about the available report types.""" table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Name", "Description"], title="Reports", ) for name, description in sorted(self._REPORTS.items()): table_view.AddRow([name, description]) table_view.Write(self._output_writer)
[docs] def ListSections(self): """Lists information about the available sections.""" table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=["Name", "Description"], title="Sections", ) for name, description in sorted(self._SECTIONS.items()): table_view.AddRow([name, description]) table_view.Write(self._output_writer)
[docs] def ParseArguments(self, arguments): """Parses the command line arguments. Args: arguments (list[str]): command line arguments. Returns: bool: True if the arguments were successfully parsed. """ loggers.ConfigureLogging() argument_parser = argparse.ArgumentParser( description=self.DESCRIPTION, add_help=False, formatter_class=argparse.RawDescriptionHelpFormatter, ) self.AddBasicOptions(argument_parser) self.AddStorageOptions(argument_parser) self.AddLogFileOptions(argument_parser) if self._CanEnforceProcessMemoryLimit(): helpers_manager.ArgumentHelperManager.AddCommandLineArguments( argument_parser, names=["process_resources"] ) argument_parser.add_argument( "--compare", dest="compare_storage_file", type=str, action="store", default="", metavar="STORAGE_FILE", help=("The path of the storage file to compare against."), ) output_formats = ", ".join(sorted(self._SUPPORTED_OUTPUT_FORMATS)) argument_parser.add_argument( "--output_format", "--output-format", dest="output_format", type=str, choices=self._SUPPORTED_OUTPUT_FORMATS, action="store", default=self._DEFAULT_OUTPUT_FORMAT, metavar="FORMAT", help=( f"Format of the output, the default is: " f"{self._DEFAULT_OUTPUT_FORMAT:s}. Supported options: " f"{output_formats:s}." ), ) hash_choices = ", ".join(self._HASH_CHOICES) argument_parser.add_argument( "--hash", dest="hash", choices=self._HASH_CHOICES, action="store", metavar="TYPE", default=self._DEFAULT_HASH_TYPE, help=( f"Type of hash to output in file_hashes report. Supported options: " f"{hash_choices:s}" ), ) report_choices = ", ".join(self._REPORT_CHOICES) argument_parser.add_argument( "--report", dest="report", choices=self._REPORT_CHOICES, action="store", metavar="TYPE", default=self._DEFAULT_REPORT_TYPE, help=( f"Report on specific information. Supported options: " f"{report_choices:s}" ), ) argument_parser.add_argument( "--sections", dest="sections", type=str, action="store", default="all", metavar="SECTIONS_LIST", help=( "List of sections to output. This is a comma separated list where " 'each entry is the name of a section. Use "--sections list" to ' 'list the available sections and "--sections all" to show all ' "available sections." ), ) argument_parser.add_argument( "-v", "--verbose", dest="verbose", action="store_true", default=False, help="Print verbose output.", ) argument_parser.add_argument( "-w", "--write", metavar="OUTPUTFILE", dest="write", help="Output filename." ) try: options = argument_parser.parse_args(arguments) except UnicodeEncodeError: # If we get here we are attempting to print help in a non-Unicode # terminal. self._output_writer.Write("\n") self._output_writer.Write(argument_parser.format_help()) return False try: self.ParseOptions(options) except errors.BadConfigOption as exception: self._output_writer.Write(f"ERROR: {exception!s}\n") self._output_writer.Write("\n") self._output_writer.Write(argument_parser.format_usage()) return False self._WaitUserWarning() loggers.ConfigureLogging( debug_output=self._debug_mode, filename=self._log_file, quiet_mode=self._quiet_mode, ) return True
[docs] def ParseOptions(self, options): """Parses the options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ self._ParseInformationalOptions(options) self._ParseLogFileOptions(options) self._verbose = getattr(options, "verbose", False) self._report_type = getattr(options, "report", self._DEFAULT_REPORT_TYPE) self._sections = getattr(options, "sections", "") self.list_reports = self._report_type == "list" self.list_sections = self._sections == "list" self.show_troubleshooting = getattr(options, "show_troubleshooting", False) if self.list_reports or self.list_sections or self.show_troubleshooting: return if self._report_type != "none": if self._report_type not in self._REPORTS: raise errors.BadConfigOption( f"Unsupported report type: {self._report_type:s}" ) self.generate_report = True if self._sections != "all": self._sections = self._sections.split(",") self._hash_type = getattr(options, "hash", self._DEFAULT_HASH_TYPE) self._output_filename = getattr(options, "write", None) helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=["process_resources"] ) # TODO: move check into _CheckStorageFile. self._storage_file_path = self.ParseStringOption(options, "storage_file") if not self._storage_file_path: raise errors.BadConfigOption("Missing storage file option.") if not os.path.isfile(self._storage_file_path): raise errors.BadConfigOption( f"No such storage file: {self._storage_file_path:s}" ) compare_storage_file_path = self.ParseStringOption( options, "compare_storage_file" ) if compare_storage_file_path: if not os.path.isfile(compare_storage_file_path): raise errors.BadConfigOption( f"No such storage file: {compare_storage_file_path:s}" ) self._compare_storage_file_path = compare_storage_file_path self.compare_storage_information = True self._output_format = self.ParseStringOption(options, "output_format") if self._output_filename: if os.path.exists(self._output_filename): raise errors.BadConfigOption( f"Output file: {self._output_filename:s} already exists." ) # pylint: disable=consider-using-with output_file_object = open(self._output_filename, "wb") self._output_writer = tools.FileObjectOutputWriter(output_file_object) self._EnforceProcessMemoryLimit(self._process_memory_limit)
[docs] def PrintStorageInformation(self): """Prints the storage information. Raises: BadConfigOption: if the storage file format is not supported. """ if self._output_format == "markdown": self._views_format_type = views.ViewsFactory.FORMAT_TYPE_MARKDOWN elif self._output_format == "text": self._views_format_type = views.ViewsFactory.FORMAT_TYPE_CLI storage_reader = self._GetStorageReader(self._storage_file_path) try: self._PrintStorageInformation(storage_reader) finally: storage_reader.Close()