Source code for plaso.cli.pinfo_tool

# -*- coding: utf-8 -*-
"""The pinfo CLI tool."""

import argparse
import collections
import json
import os
import uuid

from dfdatetime import posix_time as dfdatetime_posix_time

from plaso.cli import logger
from plaso.cli import tool_options
from plaso.cli import tools
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.containers import events
from plaso.containers import event_sources
from plaso.containers import reports
from plaso.containers import warnings
from plaso.engine import path_helper
from plaso.lib import errors
from plaso.lib import loggers
from plaso.serializer import json_serializer
from plaso.storage import factory as storage_factory


[docs] class PinfoTool(tools.CLITool, tool_options.StorageFileOptions): """Pinfo CLI tool. Attributes: compare_storage_information (bool): True if the tool is used to compare stores. generate_report (bool): True if a predefined report type should be generated. list_reports (bool): True if the report types should be listed. list_sections (bool): True if the section types should be listed. """ NAME = 'pinfo' DESCRIPTION = ( 'Shows information about a Plaso storage file, for example how it was ' 'collected, what information was extracted from a source, etc.') _DEFAULT_HASH_TYPE = 'sha256' _HASH_CHOICES = ('md5', 'sha1', 'sha256') _REPORTS = { 'browser_search': ( 'Report browser searches determined by the browser_search ' 'analysis plugin.'), 'chrome_extension': ( 'Report Chrome extensions determined by the chrome_extension ' 'analysis plugin.'), 'environment_variables': ( 'Report environment variables extracted during processing.'), 'file_hashes': 'Report file hashes calculated during processing.', 'windows_services': ( 'Report Windows services and drivers extracted during processing.'), 'winevt_providers': ( 'Report Windows EventLog providers extracted during processing.')} _DEFAULT_REPORT_TYPE = 'none' _REPORT_CHOICES = sorted(list(_REPORTS.keys()) + ['list', 'none']) _SECTIONS = { 'events': 'Show information about events.', 'reports': 'Show information about analysis reports.', 'sessions': 'Show information about sessions.', 'sources': 'Show information about event sources.', 'warnings': 'Show information about warnings during processing.'} _DEFAULT_OUTPUT_FORMAT = 'text' _SUPPORTED_OUTPUT_FORMATS = ('json', 'markdown', 'text') _CONTAINER_TYPE_ANALYSIS_REPORT = reports.AnalysisReport.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_DATA_STREAM = events.EventDataStream.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_SOURCE = event_sources.EventSource.CONTAINER_TYPE _CONTAINER_TYPE_EXTRACTION_WARNING = warnings.ExtractionWarning.CONTAINER_TYPE _CONTAINER_TYPE_PREPROCESSING_WARNING = ( warnings.PreprocessingWarning.CONTAINER_TYPE) _CONTAINER_TYPE_RECOVERY_WARNING = warnings.RecoveryWarning.CONTAINER_TYPE _CONTAINER_TYPE_TIMELINING_WARNING = warnings.TimeliningWarning.CONTAINER_TYPE
[docs] def __init__(self, input_reader=None, output_writer=None): """Initializes the CLI tool object. Args: input_reader (Optional[InputReader]): input reader, where None indicates that the stdin input reader should be used. output_writer (Optional[OutputWriter]): output writer, where None indicates that the stdout output writer should be used. """ super(PinfoTool, self).__init__( input_reader=input_reader, output_writer=output_writer) self._compare_storage_file_path = None self._output_filename = None self._output_format = 'text' self._hash_type = self._DEFAULT_HASH_TYPE self._process_memory_limit = None self._report_type = self._DEFAULT_REPORT_TYPE self._sections = None self._storage_file_path = None self._verbose = False self.compare_storage_information = False self.generate_report = False self.list_reports = False self.list_sections = False
def _CalculateStorageCounters(self, storage_reader): """Calculates the counters of the entire storage. Args: storage_reader (StorageReader): storage reader. Returns: dict[str, collections.Counter]: storage counters. """ # TODO: determine analysis report counter from actual stored analysis # reports or remove. analysis_reports_counter = collections.Counter() analysis_reports_counter_error = False event_labels_counter = {} if storage_reader.HasAttributeContainers('event_label_count'): event_labels_counter = { event_label_count.label: event_label_count.number_of_events for event_label_count in storage_reader.GetAttributeContainers( 'event_label_count')} event_labels_counter = collections.Counter(event_labels_counter) event_labels_counter_error = False parsers_counter = {} if storage_reader.HasAttributeContainers('parser_count'): parsers_counter = { parser_count.name: parser_count.number_of_events for parser_count in storage_reader.GetAttributeContainers( 'parser_count')} parsers_counter = collections.Counter(parsers_counter) parsers_counter_error = False storage_counters = {} extraction_warnings_by_path_spec = collections.Counter() extraction_warnings_by_parser_chain = collections.Counter() for warning in storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_EXTRACTION_WARNING): path_spec_string = self._GetPathSpecificationString(warning.path_spec) extraction_warnings_by_path_spec[path_spec_string] += 1 extraction_warnings_by_parser_chain[warning.parser_chain] += 1 storage_counters['extraction_warnings_by_path_spec'] = ( extraction_warnings_by_path_spec) storage_counters['extraction_warnings_by_parser_chain'] = ( extraction_warnings_by_parser_chain) recovery_warnings_by_path_spec = collections.Counter() recovery_warnings_by_parser_chain = collections.Counter() for warning in storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_RECOVERY_WARNING): path_spec_string = self._GetPathSpecificationString(warning.path_spec) recovery_warnings_by_path_spec[path_spec_string] += 1 recovery_warnings_by_parser_chain[warning.parser_chain] += 1 storage_counters['recovery_warnings_by_path_spec'] = ( recovery_warnings_by_path_spec) storage_counters['recovery_warnings_by_parser_chain'] = ( recovery_warnings_by_parser_chain) timelining_warnings_by_path_spec = collections.Counter() timelining_warnings_by_parser_chain = collections.Counter() if storage_reader.HasAttributeContainers( self._CONTAINER_TYPE_TIMELINING_WARNING): for warning in storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_TIMELINING_WARNING): path_spec_string = self._GetPathSpecificationString(warning.path_spec) timelining_warnings_by_path_spec[path_spec_string] += 1 timelining_warnings_by_parser_chain[warning.parser_chain] += 1 storage_counters['timelining_warnings_by_path_spec'] = ( timelining_warnings_by_path_spec) storage_counters['timelining_warnings_by_parser_chain'] = ( timelining_warnings_by_parser_chain) if not analysis_reports_counter_error: storage_counters['analysis_reports'] = analysis_reports_counter if not event_labels_counter_error: storage_counters['event_labels'] = event_labels_counter if not parsers_counter_error: storage_counters['parsers'] = parsers_counter return storage_counters def _CheckStorageFile(self, storage_file_path, warn_about_existing=False): """Checks if the storage file path is valid. Args: storage_file_path (str): path of the storage file. warn_about_existing (bool): True if the user should be warned about the storage file already existing. Raises: BadConfigOption: if the storage file path is invalid. """ if os.path.exists(storage_file_path): if not os.path.isfile(storage_file_path): raise errors.BadConfigOption(( f'Storage file: {storage_file_path:s} already exists and is not ' f'a file.')) if warn_about_existing: logger.warning('Appending to an already existing storage file.') dirname = os.path.dirname(storage_file_path) if not dirname: dirname = '.' # TODO: add a more thorough check to see if the storage file really is # a plaso storage file. if not os.access(dirname, os.W_OK): raise errors.BadConfigOption( f'Unable to write to storage file: {storage_file_path:s}') def _CompareCounter(self, counter, compare_counter): """Compares two counters. Args: counter (collections.Counter): counter. compare_counter (collections.Counter): counter to compare against. Returns: dict[str, tuple[int, int]]: mismatching results per key. """ keys = set(counter.keys()) keys.union(compare_counter.keys()) differences = {} for key in keys: value = counter.get(key, 0) compare_value = compare_counter.get(key, 0) if value != compare_value: differences[key] = (value, compare_value) return differences def _CompareStores(self, storage_reader, compare_storage_reader): """Compares the contents of two stores. Args: storage_reader (StorageReader): storage reader. compare_storage_reader (StorageReader): storage to compare against. Returns: bool: True if the content of the stores is identical. """ stores_are_identical = True storage_counters = self._CalculateStorageCounters(storage_reader) compare_storage_counters = self._CalculateStorageCounters( compare_storage_reader) # Compare number of events. parsers_counter = storage_counters.get('parsers', collections.Counter()) compare_parsers_counter = compare_storage_counters.get( 'parsers', collections.Counter()) differences = self._CompareCounter(parsers_counter, compare_parsers_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=['Parser (plugin) name', 'Number of events'], title='Events generated per parser') # Compare extraction warnings by parser chain. warnings_counter = storage_counters.get( 'extraction_warnings_by_parser_chain', collections.Counter()) compare_warnings_counter = compare_storage_counters.get( 'extraction_warnings_by_parser_chain', collections.Counter()) differences = self._CompareCounter( warnings_counter, compare_warnings_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=['Parser (plugin) name', 'Number of warnings'], title='Extraction warnings generated per parser') # Compare extraction warnings by path specification warnings_counter = storage_counters.get( 'extraction_warnings_by_path_spec', collections.Counter()) compare_warnings_counter = compare_storage_counters.get( 'extraction_warnings_by_path_spec', collections.Counter()) differences = self._CompareCounter( warnings_counter, compare_warnings_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=['Number of warnings', 'Pathspec'], reverse=True, title='Pathspecs with most extraction warnings') # Compare recovery warnings by parser chain. warnings_counter = storage_counters.get( 'recovery_warnings_by_parser_chain', collections.Counter()) compare_warnings_counter = compare_storage_counters.get( 'recovery_warnings_by_parser_chain', collections.Counter()) differences = self._CompareCounter( warnings_counter, compare_warnings_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=['Parser (plugin) name', 'Number of warnings'], title='Recovery warnings generated per parser') # Compare recovery warnings by path specification warnings_counter = storage_counters.get( 'recovery_warnings_by_path_spec', collections.Counter()) compare_warnings_counter = compare_storage_counters.get( 'recovery_warnings_by_path_spec', collections.Counter()) differences = self._CompareCounter( warnings_counter, compare_warnings_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=['Number of warnings', 'Pathspec'], reverse=True, title='Pathspecs with most recovery warnings') # Compare timelining warnings by parser chain. warnings_counter = storage_counters.get( 'timelining_warnings_by_parser_chain', collections.Counter()) compare_warnings_counter = compare_storage_counters.get( 'timelining_warnings_by_parser_chain', collections.Counter()) differences = self._CompareCounter( warnings_counter, compare_warnings_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=['Parser (plugin) name', 'Number of warnings'], title='Timelining warnings generated per parser') # Compare timelining warnings by path specification. warnings_counter = storage_counters.get( 'timelining_warnings_by_path_spec', collections.Counter()) compare_warnings_counter = compare_storage_counters.get( 'timelining_warnings_by_path_spec', collections.Counter()) differences = self._CompareCounter( warnings_counter, compare_warnings_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=['Number of warnings', 'Pathspec'], reverse=True, title='Pathspecs with most timelining warnings') # Compare event labels. labels_counter = storage_counters.get('event_labels', collections.Counter()) compare_labels_counter = compare_storage_counters.get( 'event_labels', collections.Counter()) differences = self._CompareCounter(labels_counter, compare_labels_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=['Label', 'Number of event tags'], title='Event tags generated per label') # Compare analysis reports. reports_counter = storage_counters.get( 'analysis_reports', collections.Counter()) compare_reports_counter = compare_storage_counters.get( 'analysis_reports', collections.Counter()) differences = self._CompareCounter(reports_counter, compare_reports_counter) if differences: stores_are_identical = False self._PrintCounterDifferences( differences, column_names=['Plugin name', 'Number of reports'], title='Reports generated per plugin') return stores_are_identical def _GenerateAnalysisResultsReport( self, storage_reader, json_base_type, column_titles, container_type, attribute_names, attribute_mappings): """Generates an analysis results report. Args: storage_reader (StorageReader): storage reader. json_base_type (str): JSON base type. column_titles (list[str]): column titles of the Markdown and tab separated tables. container_type (str): attribute container type. attribute_names (list[str]): names of the attributes to report. attribute_mappings (dict[str, dict[st, str]]): mappings of attribute values to human readable strings. """ self._GenerateReportHeader(json_base_type, column_titles) entry_format_string = self._GenerateReportEntryFormatString(attribute_names) if storage_reader.HasAttributeContainers(container_type): generator = storage_reader.GetAttributeContainers(container_type) for artifact_index, analysis_result in enumerate(generator): if self._output_format == 'json': if artifact_index > 0: self._output_writer.Write(',\n') attribute_values = analysis_result.CopyToDict() for key in attribute_names: value = attribute_values.get(key, None) if value is None: value = '' elif isinstance(value, int): value = attribute_mappings.get(key, {}).get(value, value) elif self._output_format == 'json': value = value.replace('\\', '\\\\') attribute_values[key] = value self._output_writer.Write(entry_format_string.format( **attribute_values)) self._GenerateReportFooter() def _GenerateFileHashesReport(self, storage_reader): """Generates a file hashes report. Args: storage_reader (StorageReader): storage reader. """ hash_type = self._hash_type.upper() self._GenerateReportHeader( 'file_hashes', [f'{hash_type:s} hash', 'Display name']) hash_attribute_name = f'{self._hash_type:s}_hash' entry_format_string = self._GenerateReportEntryFormatString([ hash_attribute_name, 'display_name']) generator = storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_EVENT_DATA_STREAM) for event_data_stream_index, event_data_stream in enumerate(generator): if self._output_format == 'json': if event_data_stream_index > 0: self._output_writer.Write(',\n') hash_value = getattr( event_data_stream, hash_attribute_name, None) or 'N/A' display_name = 'N/A' if event_data_stream.path_spec: display_name = path_helper.PathHelper.GetDisplayNameForPathSpec( event_data_stream.path_spec) attribute_values = { 'display_name': display_name, hash_attribute_name: hash_value} self._output_writer.Write(entry_format_string.format( **attribute_values)) self._GenerateReportFooter() def _GenerateReportEntryFormatString(self, attribute_names): """Generates a report entry format string. Args: attribute_names (list[str]): names of the attributes to report. Returns: str: entry format string. """ if self._output_format == 'json': attributes_string = ', '.join([ f'"{name:s}": "{{{name:s}!s}}"' for name in attribute_names]) return f' {{{{{attributes_string:s}}}}}' if self._output_format == 'markdown': attributes_string = ' | '.join([ f'{{{name:s}!s}}' for name in attribute_names]) return f'{attributes_string:s}\n' if self._output_format == 'text': attributes_string = '\t'.join([ f'{{{name:s}!s}}' for name in attribute_names]) return f'{attributes_string:s}\n' return '' def _GenerateReportFooter(self): """Generates a report footer.""" if self._output_format == 'json': self._output_writer.Write('\n]}\n') def _GenerateReportHeader(self, json_base_type, column_titles): """Generates a report header. Args: json_base_type (str): JSON base type. column_titles (list[str]): column titles of the Markdown and tab separated tables. """ if self._output_format == 'json': self._output_writer.Write(f'{{"{json_base_type:s}": [\n') elif self._output_format == 'markdown': titles_string = ' | '.join(column_titles) self._output_writer.Write(f'{titles_string:s}\n') separators_string = ' | '.join(['---'] * len(column_titles)) self._output_writer.Write(f'{separators_string:s}\n') elif self._output_format == 'text': titles_string = '\t'.join(column_titles) self._output_writer.Write(f'{titles_string:s}\n') def _GenerateWinEvtProvidersReport(self, storage_reader): """Generates a Windows Event Log providers report. Args: storage_reader (StorageReader): storage reader. """ column_titles = [ 'Identifier', 'Log source(s)', 'Log type(s)', 'Event message file(s)', 'Parameter message file(s)'] self._GenerateReportHeader('winevt_providers', column_titles) attribute_names = [ 'identifier', 'log_sources', 'log_types', 'event_message_files', 'parameter_message_files'] entry_format_string = self._GenerateReportEntryFormatString(attribute_names) if storage_reader.HasAttributeContainers('windows_eventlog_provider'): generator = storage_reader.GetAttributeContainers( 'windows_eventlog_provider') for artifact_index, artifact in enumerate(generator): if self._output_format == 'json': if artifact_index > 0: self._output_writer.Write(',\n') attribute_values = { 'identifier': artifact.identifier or '', 'event_message_files': artifact.event_message_files or [], 'log_sources': artifact.log_sources or [], 'log_types': artifact.log_types or [], 'parameter_message_files': artifact.parameter_message_files or []} self._output_writer.Write(entry_format_string.format( **attribute_values)) self._GenerateReportFooter() def _GetStorageReader(self, path): """Retrieves a storage reader. Args: path (str): path of the storage file. Returns: StorageReader: storage reader or None if the storage file format is not supported. Raises: BadConfigOption: if the storage file format is not supported. """ storage_reader = ( storage_factory.StorageFactory.CreateStorageReaderForFile(path)) if not storage_reader: raise errors.BadConfigOption( f'Format of storage file: {path:s} not supported.') return storage_reader def _PrintAnalysisReportCounter( self, analysis_reports_counter, session_identifier=None): """Prints the analysis reports counter. Args: analysis_reports_counter (collections.Counter): number of analysis reports per analysis plugin. session_identifier (Optional[str]): session identifier, formatted as a UUID. """ if self._output_format == 'json': json_string = json.dumps(analysis_reports_counter) self._output_writer.Write(f'"analysis_reports": {json_string:s}') elif (self._output_format in ('markdown', 'text') and analysis_reports_counter): title = 'Reports generated per plugin' if session_identifier: title = ': '.join([title, session_identifier]) table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Plugin name', 'Number of reports'], title=title) for key, value in sorted(analysis_reports_counter.items()): if key != 'total': table_view.AddRow([key, value]) try: total = analysis_reports_counter['total'] except KeyError: total = 'N/A' table_view.AddRow(['Total', total]) table_view.Write(self._output_writer) def _PrintAnalysisReportsDetails(self, storage_reader): """Prints the details of the analysis reports. Args: storage_reader (StorageReader): storage reader. """ has_analysis_reports = storage_reader.HasAttributeContainers( self._CONTAINER_TYPE_ANALYSIS_REPORT) if self._output_format == 'text' and not has_analysis_reports: self._output_writer.Write('\nNo analysis reports stored.\n') else: generator = storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_ANALYSIS_REPORT) for index, analysis_report in enumerate(generator): date_time_string = None if analysis_report.time_compiled is not None: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=analysis_report.time_compiled) date_time_string = date_time.CopyToDateTimeStringISO8601() table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=f'Analysis report: {index:d}') table_view.AddRow(['Name plugin', analysis_report.plugin_name or 'N/A']) table_view.AddRow(['Date and time', date_time_string or 'N/A']) table_view.AddRow([ 'Event filter', analysis_report.event_filter or 'N/A']) if not analysis_report.analysis_counter: table_view.AddRow(['Text', analysis_report.text or '']) else: table_view.AddRow(['Results', '']) for key, value in sorted(analysis_report.analysis_counter.items()): table_view.AddRow([key, value]) table_view.Write(self._output_writer) def _PrintAnalysisReportSection( self, storage_reader, analysis_reports_counter): """Prints the analysis reports section. Args: storage_reader (StorageReader): storage reader. analysis_reports_counter (collections.Counter): number of analysis reports per analysis plugin. """ self._PrintAnalysisReportCounter(analysis_reports_counter) if self._output_format in ('markdown', 'text'): self._PrintAnalysisReportsDetails(storage_reader) def _PrintCounterDifferences( self, differences, column_names=None, reverse=False, title=None): """Prints the counter differences. Args: differences (dict[str, tuple[int, int]]): mismatching results per key. column_names (Optional[list[str]]): column names. reverse (Optional[bool]): True if the key and values of differences should be printed in reverse order. title (Optional[str]): title. """ # TODO: add support for 3 column table? table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=column_names, title=title) for key, value in sorted(differences.items()): value_string = f'{value[0]:d} ({value[1]:d})' if reverse: table_view.AddRow([value_string, key]) else: table_view.AddRow([key, value_string]) table_view.Write(self._output_writer) self._output_writer.Write('\n') def _PrintEventLabelsCounter( self, event_labels_counter, session_identifier=None): """Prints the event labels counter. Args: event_labels_counter (collections.Counter): number of event tags per label. session_identifier (Optional[str]): session identifier, formatted as a UUID. """ if self._output_format == 'json': json_string = json.dumps(event_labels_counter) self._output_writer.Write(f'"event_labels": {json_string:s}') elif self._output_format in ('markdown', 'text'): if self._output_format == 'text' and not event_labels_counter: if not session_identifier: self._output_writer.Write('\nNo events labels stored.\n') else: title = 'Event tags generated per label' if session_identifier: title = ': '.join([title, session_identifier]) title_level = 4 else: title_level = 2 if not event_labels_counter: if not session_identifier: heading_marker = '#' * title_level self._output_writer.Write( f'{heading_marker:s} {title:s}\n\nN/A\n\n') else: table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Label', 'Number of event tags'], title=title, title_level=title_level) for key, value in sorted(event_labels_counter.items()): if key != 'total': table_view.AddRow([key, value]) try: total = event_labels_counter['total'] except KeyError: total = 'N/A' table_view.AddRow(['Total', total]) table_view.Write(self._output_writer) def _PrintParsersCounter(self, parsers_counter, session_identifier=None): """Prints the parsers counter. Args: parsers_counter (collections.Counter): number of events per parser or parser plugin. session_identifier (Optional[str]): session identifier, formatted as a UUID. """ if self._output_format == 'json': if session_identifier: self._output_writer.Write(', ') json_string = json.dumps(parsers_counter) self._output_writer.Write(f'"parsers": {json_string:s}') elif self._output_format in ('markdown', 'text'): if self._output_format == 'text' and not parsers_counter: if not session_identifier: self._output_writer.Write('\nNo events stored.\n') else: title = 'Events generated per parser' if session_identifier: title = ': '.join([title, session_identifier]) title_level = 4 else: title_level = 2 if not parsers_counter: if not session_identifier: heading_marker = '#' * title_level self._output_writer.Write( f'{heading_marker:s} {title:s}\n\nN/A\n\n') else: table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Parser (plugin) name', 'Number of events'], title=title, title_level=title_level) for key, value in sorted(parsers_counter.items()): if key != 'total': table_view.AddRow([key, value]) table_view.AddRow(['Total', parsers_counter['total']]) table_view.Write(self._output_writer) def _PrintPreprocessingWarningsDetails(self, storage_reader): """Prints the details of the preprocessing warnings. Args: storage_reader (StorageReader): storage reader. """ generator = storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_PREPROCESSING_WARNING) for index, warning in enumerate(generator): table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=f'Preprocessing warning: {index:d}') table_view.AddRow(['Message', warning.message]) table_view.AddRow(['Plugin name', warning.plugin_name]) if warning.path_spec: # TODO: add helper method to format path spec as row. path_spec_string = self._GetPathSpecificationString(warning.path_spec) for path_index, line in enumerate(path_spec_string.split('\n')): if not line: continue if path_index == 0: table_view.AddRow(['Path specification', line]) else: table_view.AddRow(['', line]) table_view.Write(self._output_writer) def _PrintWarningsDetails(self, storage_reader, container_type, warning_type): """Prints the details of warnings. Args: storage_reader (StorageReader): storage reader. container_type (str): attribute container type. warning_type (str): warning type. """ generator = storage_reader.GetAttributeContainers(container_type) for index, warning in enumerate(generator): table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=f'{warning_type:s} warning: {index:d}') table_view.AddRow(['Message', warning.message]) table_view.AddRow(['Parser chain', warning.parser_chain]) path_spec_string = self._GetPathSpecificationString(warning.path_spec) for path_index, line in enumerate(path_spec_string.split('\n')): if not line: continue if path_index == 0: table_view.AddRow(['Path specification', line]) else: table_view.AddRow(['', line]) table_view.Write(self._output_writer) def _PrintSessionDetailsAsJSON(self, session): """Prints the details of a session as JSON. Args: session (Session): session. """ json_string = ( json_serializer.JSONAttributeContainerSerializer.WriteSerialized( session)) self._output_writer.Write(f'"session": {json_string:s}') def _PrintSessionDetailsAsTable(self, session, session_identifier): """Prints the details of a session as a table. Args: session (Session): session. session_identifier (str): session identifier, formatted as a UUID. """ start_time = 'N/A' if session.start_time is not None: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=session.start_time) start_time = date_time.CopyToDateTimeStringISO8601() completion_time = 'N/A' if session.completion_time is not None: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=session.completion_time) completion_time = date_time.CopyToDateTimeStringISO8601() enabled_parser_names = 'N/A' if session.enabled_parser_names: enabled_parser_names = ', '.join(sorted(session.enabled_parser_names)) command_line_arguments = session.command_line_arguments or 'N/A' parser_filter_expression = session.parser_filter_expression or 'N/A' preferred_encoding = session.preferred_encoding or 'N/A' preferred_time_zone = session.preferred_time_zone or 'N/A' if session.artifact_filters: artifact_filters_string = ', '.join(session.artifact_filters) else: artifact_filters_string = 'N/A' filter_file = session.filter_file or 'N/A' table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=f'Session: {session_identifier:s}') table_view.AddRow(['Start time', start_time]) table_view.AddRow(['Completion time', completion_time]) table_view.AddRow(['Product name', session.product_name]) table_view.AddRow(['Product version', session.product_version]) table_view.AddRow(['Command line arguments', command_line_arguments]) table_view.AddRow(['Parser filter expression', parser_filter_expression]) table_view.AddRow(['Enabled parser and plugins', enabled_parser_names]) table_view.AddRow(['Preferred encoding', preferred_encoding]) table_view.AddRow(['Preferred time zone', preferred_time_zone]) table_view.AddRow(['Debug mode', session.debug_mode]) table_view.AddRow(['Artifact filters', artifact_filters_string]) table_view.AddRow(['Filter file', filter_file]) table_view.Write(self._output_writer) def _PrintSessionsDetails(self, storage_reader): """Prints the details of the sessions. Args: storage_reader (BaseStore): storage. """ if self._output_format == 'json': self._output_writer.Write('"sessions": {') for session_index, session in enumerate(storage_reader.GetSessions()): session_identifier = str(uuid.UUID(hex=session.identifier)) if self._output_format == 'json': if session_index != 0: self._output_writer.Write(', ') self._PrintSessionDetailsAsJSON(session) elif self._output_format in ('markdown', 'text'): self._PrintSessionDetailsAsTable(session, session_identifier) if self._verbose: system_configuration = storage_reader.GetAttributeContainerByIndex( 'system_configuration', session_index) if system_configuration: self._PrintSystemConfigurations( storage_reader, [system_configuration], session_identifier=session_identifier) if self._output_format == 'json': self._output_writer.Write('}') def _PrintSessionsOverview(self, storage_reader): """Prints a sessions overview. Args: storage_reader (StorageReader): storage reader. """ table_view = views.ViewsFactory.GetTableView( self._views_format_type, title='Sessions', title_level=2) for session in storage_reader.GetSessions(): date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=session.start_time) start_time = date_time.CopyToDateTimeStringISO8601() session_identifier = str(uuid.UUID(hex=session.identifier)) table_view.AddRow([session_identifier, start_time]) table_view.Write(self._output_writer) def _PrintSessionsSection(self, storage_reader): """Prints the session section. Args: storage_reader (StorageReader): storage reader. """ if self._output_format in ('markdown', 'text'): self._PrintSessionsOverview(storage_reader) if (self._output_format == 'json' or self._verbose or 'sessions' in self._sections): self._PrintSessionsDetails(storage_reader) def _PrintSystemConfiguration( self, storage_reader, system_configuration, session_identifier=None): """Prints the details of a system configuration. Args: storage_reader (StorageReader): storage reader. system_configuration (SystemConfiguration): system configuration. session_identifier (Optional[str]): session identifier, formatted as a UUID. """ if not system_configuration: return title = 'System configuration' if session_identifier: title = ': '.join([title, session_identifier]) table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=title) hostname = 'N/A' if system_configuration.hostname: hostname = system_configuration.hostname.name operating_system = system_configuration.operating_system or 'N/A' operating_system_product = ( system_configuration.operating_system_product or 'N/A') operating_system_version = ( system_configuration.operating_system_version or 'N/A') language = system_configuration.language or 'N/A' code_page = system_configuration.code_page or 'N/A' keyboard_layout = system_configuration.keyboard_layout or 'N/A' time_zone = system_configuration.time_zone or 'N/A' table_view.AddRow(['Hostname', hostname]) table_view.AddRow(['Operating system', operating_system]) table_view.AddRow(['Operating system product', operating_system_product]) table_view.AddRow(['Operating system version', operating_system_version]) table_view.AddRow(['Language', language]) table_view.AddRow(['Code page', code_page]) table_view.AddRow(['Keyboard layout', keyboard_layout]) table_view.AddRow(['Time zone', time_zone]) table_view.Write(self._output_writer) title = 'Available time zones' if session_identifier: title = ': '.join([title, session_identifier]) table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Name', 'Offset from UTC'], title=title) # TODO: filter time zones on specific system configuration. for time_zone in storage_reader.GetAttributeContainers('time_zone'): hours_from_utc, minutes_from_utc = divmod(time_zone.offset, 60) if hours_from_utc < 0: sign = '+' hours_from_utc *= -1 else: sign = '-' time_zone_offset = f'{sign:s}{hours_from_utc:02d}:{minutes_from_utc:02d}' table_view.AddRow([time_zone.name, time_zone_offset]) table_view.Write(self._output_writer) title = 'User accounts' if session_identifier: title = ': '.join([title, session_identifier]) table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Username', 'User directory'], title=title) # TODO: filter user accounts on specific system configuration. for user_account in storage_reader.GetAttributeContainers('user_account'): table_view.AddRow([user_account.username, user_account.user_directory]) table_view.Write(self._output_writer) def _PrintSystemConfigurations( self, storage_reader, system_configurations, session_identifier=None): """Prints the details of system configurations. Args: storage_reader (StorageReader): storage reader. system_configurations (list[SystemConfiguration]): system configurations. session_identifier (Optional[str]): session identifier, formatted as a UUID. """ if self._output_format == 'json': self._output_writer.Write(', "system_configurations": {') for configuration_index, configuration in enumerate(system_configurations): if self._output_format == 'json': if configuration_index != 0: self._output_writer.Write(', ') json_string = ( json_serializer.JSONAttributeContainerSerializer.WriteSerialized( configuration.system_configuration)) self._output_writer.Write(f'"system_configuration": {json_string:s}') elif self._output_format in ('markdown', 'text'): self._PrintSystemConfiguration( storage_reader, configuration, session_identifier=session_identifier) if self._output_format == 'json': self._output_writer.Write('}') def _PrintStorageInformation(self, storage_reader): """Prints information about the store. Args: storage_reader (StorageReader): storage reader. """ if self._output_format == 'json': self._output_writer.Write('{') elif self._output_format in ('markdown', 'text'): self._PrintStorageOverviewAsTable(storage_reader) section_written = False if self._sections == 'all' or 'sessions' in self._sections: self._PrintSessionsSection(storage_reader) section_written = True if self._sections == 'all' or 'sources' in self._sections: if self._output_format == 'json' and section_written: self._output_writer.Write(', ') self._PrintSourcesOverview(storage_reader) section_written = True if self._output_format == 'json' and section_written: self._output_writer.Write(', ') storage_counters = self._CalculateStorageCounters(storage_reader) if self._output_format == 'json': self._output_writer.Write('"storage_counters": {') section_written = False if self._sections == 'all' or 'events' in self._sections: parsers = storage_counters.get('parsers', collections.Counter()) self._PrintParsersCounter(parsers) section_written = True event_labels = storage_counters.get( 'event_labels', collections.Counter()) if self._output_format == 'json' and section_written: self._output_writer.Write(', ') self._PrintEventLabelsCounter(event_labels) if self._sections == 'all' or 'warnings' in self._sections: if self._output_format == 'json' and section_written: self._output_writer.Write(', ') self._PrintWarningsSection(storage_reader, storage_counters) section_written = True if self._sections == 'all' or 'reports' in self._sections: if self._output_format == 'json' and section_written: self._output_writer.Write(', ') analysis_reports = storage_counters.get( 'analysis_reports', collections.Counter()) self._PrintAnalysisReportSection(storage_reader, analysis_reports) if self._output_format == 'json': self._output_writer.Write('}') if self._output_format == 'json': self._output_writer.Write('}') elif self._output_format in ('markdown', 'text'): self._output_writer.Write('\n') def _PrintSourcesOverview(self, storage_reader): """Prints a sources overview. Args: storage_reader (StorageReader): storage reader. """ if self._output_format == 'json': self._output_writer.Write('"event_sources": {') elif self._output_format in ('markdown', 'text'): table_view = views.ViewsFactory.GetTableView( self._views_format_type, title='Event sources', title_level=2) generator = storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_EVENT_SOURCE) number_of_event_sources = 0 for source_index, source in enumerate(generator): if self._output_format == 'json': if source_index != 0: self._output_writer.Write(', ') json_string = ( json_serializer.JSONAttributeContainerSerializer.WriteSerialized( source)) self._output_writer.Write(f'"source": {json_string:s}') elif self._output_format in ('markdown', 'text'): if self._verbose or 'sources' in self._sections: path_spec_string = self._GetPathSpecificationString(source.path_spec) for path_index, line in enumerate(path_spec_string.split('\n')): if not line: continue if path_index == 0: table_view.AddRow([f'{source_index:d}', line]) else: table_view.AddRow(['', line]) number_of_event_sources += 1 if self._output_format == 'json': self._output_writer.Write('}') elif self._output_format in ('markdown', 'text'): if not (self._verbose or 'sources' in self._sections): table_view.AddRow(['Total', f'{number_of_event_sources:d}']) table_view.Write(self._output_writer) def _PrintStorageOverviewAsTable(self, storage_reader): """Prints a storage overview as a table. Args: storage_reader (StorageReader): storage reader. """ format_version = storage_reader.GetFormatVersion() serialization_format = storage_reader.GetSerializationFormat() table_view = views.ViewsFactory.GetTableView( self._views_format_type, title='Plaso Storage Information', title_level=1) table_view.AddRow(['Filename', os.path.basename(self._storage_file_path)]) table_view.AddRow(['Format version', format_version]) table_view.AddRow(['Serialization format', serialization_format]) table_view.Write(self._output_writer) def _PrintWarningCountersJSON( self, warnings_by_path_spec, warnings_by_parser_chain): """Prints JSON containing a summary of the number of warnings. Args: warnings_by_path_spec (collections.Counter): number of warnings per path specification. warnings_by_parser_chain (collections.Counter): number of warnings per parser chain. """ json_string = json.dumps(warnings_by_parser_chain) self._output_writer.Write(f'"warnings_by_parser": {json_string:s}') json_string = json.dumps(warnings_by_path_spec) self._output_writer.Write(f', "warnings_by_path_spec": {json_string:s}') def _PrintWarningCountersTable( self, description, warnings_by_path_spec, warnings_by_parser_chain): """Prints a table containing a summary of the number of warnings. Args: description (str): description of the type of warning. warnings_by_path_spec (collections.Counter): number of warnings per path specification. warnings_by_parser_chain (collections.Counter): number of warnings per parser chain. """ if warnings_by_parser_chain: title = description.title() table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Parser (plugin) name', 'Number of warnings'], title=f'{title:s} warnings generated per parser') for parser_chain, count in warnings_by_parser_chain.items(): parser_chain = parser_chain or '<No parser>' table_view.AddRow([parser_chain, f'{count:d}']) table_view.Write(self._output_writer) if warnings_by_path_spec: table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Number of warnings', 'Pathspec'], title=f'Path specifications with most {description:s} warnings') for path_spec, count in warnings_by_path_spec.most_common(10): for path_index, line in enumerate(path_spec.split('\n')): if not line: continue if path_index == 0: table_view.AddRow([f'{count:d}', line]) else: table_view.AddRow(['', line]) table_view.Write(self._output_writer) def _PrintWarningsSection(self, storage_reader, storage_counters): """Prints the warnings section. Args: storage_reader (StorageReader): storage reader. storage_counters (dict[str, collections.Counter]): storage counters. """ has_extraction_warnings = storage_reader.HasAttributeContainers( self._CONTAINER_TYPE_EXTRACTION_WARNING) has_preprocessing_warnings = storage_reader.HasAttributeContainers( self._CONTAINER_TYPE_PREPROCESSING_WARNING) has_recovery_warnings = storage_reader.HasAttributeContainers( self._CONTAINER_TYPE_RECOVERY_WARNING) if (self._output_format == 'text' and not has_extraction_warnings and not has_preprocessing_warnings and not has_recovery_warnings): self._output_writer.Write('\nNo warnings stored.\n') else: warnings_by_path_spec = storage_counters.get( 'extraction_warnings_by_path_spec', collections.Counter()) warnings_by_parser_chain = storage_counters.get( 'extraction_warnings_by_parser_chain', collections.Counter()) if self._output_format == 'json': self._PrintWarningCountersJSON( warnings_by_path_spec, warnings_by_parser_chain) elif self._output_format in ('markdown', 'text'): self._PrintWarningCountersTable( 'extraction', warnings_by_path_spec, warnings_by_parser_chain) if self._verbose or 'warnings' in self._sections: self._PrintWarningsDetails( storage_reader, self._CONTAINER_TYPE_EXTRACTION_WARNING, 'Extraction') warnings_by_path_spec = storage_counters.get( 'preprocessing_warnings_by_path_spec', collections.Counter()) warnings_by_parser_chain = storage_counters.get( 'preprocessing_warnings_by_parser_chain', collections.Counter()) # TODO: print preprocessing warnings as part of JSON output format. if self._output_format in ('markdown', 'text'): self._PrintWarningCountersTable( 'preprocessing', warnings_by_path_spec, warnings_by_parser_chain) if self._verbose or 'warnings' in self._sections: self._PrintPreprocessingWarningsDetails(storage_reader) warnings_by_path_spec = storage_counters.get( 'recovery_warnings_by_path_spec', collections.Counter()) warnings_by_parser_chain = storage_counters.get( 'recovery_warnings_by_parser_chain', collections.Counter()) # TODO: print recovery warnings as part of JSON output format. if self._output_format in ('markdown', 'text'): self._PrintWarningCountersTable( 'recovery', warnings_by_path_spec, warnings_by_parser_chain) if self._verbose or 'warnings' in self._sections: self._PrintWarningsDetails( storage_reader, self._CONTAINER_TYPE_RECOVERY_WARNING, 'Recovery') warnings_by_path_spec = storage_counters.get( 'timelining_warnings_by_path_spec', collections.Counter()) warnings_by_parser_chain = storage_counters.get( 'timelining_warnings_by_parser_chain', collections.Counter()) # TODO: print timelining warnings as part of JSON output format. if self._output_format in ('markdown', 'text'): self._PrintWarningCountersTable( 'timelining', warnings_by_path_spec, warnings_by_parser_chain) if self._verbose or 'warnings' in self._sections: self._PrintWarningsDetails( storage_reader, self._CONTAINER_TYPE_TIMELINING_WARNING, 'Timelining')
[docs] def CompareStores(self): """Compares the contents of two stores. Returns: bool: True if the content of the stores is identical. Raises: BadConfigOption: if the storage file format is not supported. """ storage_reader = self._GetStorageReader(self._storage_file_path) compare_storage_reader = self._GetStorageReader( self._compare_storage_file_path) try: result = self._CompareStores(storage_reader, compare_storage_reader) finally: compare_storage_reader.Close() storage_reader.Close() if result: self._output_writer.Write('Storage files are identical.\n') else: self._output_writer.Write('Storage files are different.\n') return result
[docs] def GenerateReport(self): """Generates a report. Raises: BadConfigOption: if the storage file format is not supported. """ storage_reader = self._GetStorageReader(self._storage_file_path) try: if self._report_type == 'browser_search': column_titles = ['Search engine', 'Search term', 'Number of queries'] attribute_names = ['search_engine', 'search_term', 'number_of_queries'] attribute_mappings = {} self._GenerateAnalysisResultsReport( storage_reader, 'browser_searches', column_titles, 'browser_search_analysis_result', attribute_names, attribute_mappings) elif self._report_type == 'chrome_extension': column_titles = ['Username', 'Extension identifier', 'Extension'] attribute_names = ['username', 'extension_identifier', 'extension'] attribute_mappings = {} self._GenerateAnalysisResultsReport( storage_reader, 'chrome_extensions', column_titles, 'chrome_extension_analysis_result', attribute_names, attribute_mappings) elif self._report_type == 'environment_variables': column_titles = ['Name', 'Value'] attribute_names = ['name', 'value'] attribute_mappings = {} self._GenerateAnalysisResultsReport( storage_reader, 'environment_variables', column_titles, 'environment_variable', attribute_names, attribute_mappings) elif self._report_type == 'file_hashes': self._GenerateFileHashesReport(storage_reader) elif self._report_type == 'windows_services': column_titles = ['Name', 'Service type', 'Start type', 'Image path'] attribute_names = ['name', 'service_type', 'start_type', 'image_path'] # TODO: consider using message formatting helpers? attribute_mappings = { 'service_type': { 0x01: 'Kernel device driver (0x01)', 0x02: 'File system driver (0x02)', 0x04: 'Adapter (0x04)', 0x10: 'Stand-alone service (0x10)', 0x20: 'Shared service (0x20)'}, 'start_type': { 0: 'Boot (0)', 1: 'System (1)', 2: 'Automatic (2)', 3: 'On demand (3)', 4: 'Disabled (4)'} } self._GenerateAnalysisResultsReport( storage_reader, 'windows_services', column_titles, 'windows_service_configuration', attribute_names, attribute_mappings) elif self._report_type == 'winevt_providers': self._GenerateWinEvtProvidersReport(storage_reader) finally: storage_reader.Close()
[docs] def ListReports(self): """Lists information about the available report types.""" table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Name', 'Description'], title='Reports') for name, description in sorted(self._REPORTS.items()): table_view.AddRow([name, description]) table_view.Write(self._output_writer)
[docs] def ListSections(self): """Lists information about the available sections.""" table_view = views.ViewsFactory.GetTableView( self._views_format_type, column_names=['Name', 'Description'], title='Sections') for name, description in sorted(self._SECTIONS.items()): table_view.AddRow([name, description]) table_view.Write(self._output_writer)
[docs] def ParseArguments(self, arguments): """Parses the command line arguments. Args: arguments (list[str]): command line arguments. Returns: bool: True if the arguments were successfully parsed. """ loggers.ConfigureLogging() argument_parser = argparse.ArgumentParser( description=self.DESCRIPTION, add_help=False, formatter_class=argparse.RawDescriptionHelpFormatter) self.AddBasicOptions(argument_parser) self.AddStorageOptions(argument_parser) self.AddLogFileOptions(argument_parser) if self._CanEnforceProcessMemoryLimit(): helpers_manager.ArgumentHelperManager.AddCommandLineArguments( argument_parser, names=['process_resources']) argument_parser.add_argument( '--compare', dest='compare_storage_file', type=str, action='store', default='', metavar='STORAGE_FILE', help=( 'The path of the storage file to compare against.')) output_formats = ', '.join(sorted(self._SUPPORTED_OUTPUT_FORMATS)) argument_parser.add_argument( '--output_format', '--output-format', dest='output_format', type=str, choices=self._SUPPORTED_OUTPUT_FORMATS, action='store', default=self._DEFAULT_OUTPUT_FORMAT, metavar='FORMAT', help=( f'Format of the output, the default is: ' f'{self._DEFAULT_OUTPUT_FORMAT:s}. Supported options: ' f'{output_formats:s}.')) hash_choices = ', '.join(self._HASH_CHOICES) argument_parser.add_argument( '--hash', dest='hash', choices=self._HASH_CHOICES, action='store', metavar='TYPE', default=self._DEFAULT_HASH_TYPE, help=( f'Type of hash to output in file_hashes report. Supported options: ' f'{hash_choices:s}')) report_choices = ', '.join(self._REPORT_CHOICES) argument_parser.add_argument( '--report', dest='report', choices=self._REPORT_CHOICES, action='store', metavar='TYPE', default=self._DEFAULT_REPORT_TYPE, help=( f'Report on specific information. Supported options: ' f'{report_choices:s}')) argument_parser.add_argument( '--sections', dest='sections', type=str, action='store', default='all', metavar='SECTIONS_LIST', help=( 'List of sections to output. This is a comma separated list where ' 'each entry is the name of a section. Use "--sections list" to ' 'list the available sections and "--sections all" to show all ' 'available sections.')) argument_parser.add_argument( '-v', '--verbose', dest='verbose', action='store_true', default=False, help='Print verbose output.') argument_parser.add_argument( '-w', '--write', metavar='OUTPUTFILE', dest='write', help='Output filename.') try: options = argument_parser.parse_args(arguments) except UnicodeEncodeError: # If we get here we are attempting to print help in a non-Unicode # terminal. self._output_writer.Write('\n') self._output_writer.Write(argument_parser.format_help()) return False try: self.ParseOptions(options) except errors.BadConfigOption as exception: self._output_writer.Write(f'ERROR: {exception!s}\n') self._output_writer.Write('\n') self._output_writer.Write(argument_parser.format_usage()) return False self._WaitUserWarning() loggers.ConfigureLogging( debug_output=self._debug_mode, filename=self._log_file, quiet_mode=self._quiet_mode) return True
[docs] def ParseOptions(self, options): """Parses the options. Args: options (argparse.Namespace): command line arguments. Raises: BadConfigOption: if the options are invalid. """ self._ParseInformationalOptions(options) self._ParseLogFileOptions(options) self._verbose = getattr(options, 'verbose', False) self._report_type = getattr( options, 'report', self._DEFAULT_REPORT_TYPE) self._sections = getattr(options, 'sections', '') self.list_reports = self._report_type == 'list' self.list_sections = self._sections == 'list' self.show_troubleshooting = getattr(options, 'show_troubleshooting', False) if self.list_reports or self.list_sections or self.show_troubleshooting: return if self._report_type != 'none': if self._report_type not in self._REPORTS: raise errors.BadConfigOption( f'Unsupported report type: {self._report_type:s}') self.generate_report = True if self._sections != 'all': self._sections = self._sections.split(',') self._hash_type = getattr(options, 'hash', self._DEFAULT_HASH_TYPE) self._output_filename = getattr(options, 'write', None) helpers_manager.ArgumentHelperManager.ParseOptions( options, self, names=['process_resources']) # TODO: move check into _CheckStorageFile. self._storage_file_path = self.ParseStringOption(options, 'storage_file') if not self._storage_file_path: raise errors.BadConfigOption('Missing storage file option.') if not os.path.isfile(self._storage_file_path): raise errors.BadConfigOption( f'No such storage file: {self._storage_file_path:s}') compare_storage_file_path = self.ParseStringOption( options, 'compare_storage_file') if compare_storage_file_path: if not os.path.isfile(compare_storage_file_path): raise errors.BadConfigOption( f'No such storage file: {compare_storage_file_path:s}') self._compare_storage_file_path = compare_storage_file_path self.compare_storage_information = True self._output_format = self.ParseStringOption(options, 'output_format') if self._output_filename: if os.path.exists(self._output_filename): raise errors.BadConfigOption( f'Output file: {self._output_filename:s} already exists.') output_file_object = open(self._output_filename, 'wb') # pylint: disable=consider-using-with self._output_writer = tools.FileObjectOutputWriter(output_file_object) self._EnforceProcessMemoryLimit(self._process_memory_limit)
[docs] def PrintStorageInformation(self): """Prints the storage information. Raises: BadConfigOption: if the storage file format is not supported. """ if self._output_format == 'markdown': self._views_format_type = views.ViewsFactory.FORMAT_TYPE_MARKDOWN elif self._output_format == 'text': self._views_format_type = views.ViewsFactory.FORMAT_TYPE_CLI storage_reader = self._GetStorageReader(self._storage_file_path) try: self._PrintStorageInformation(storage_reader) finally: storage_reader.Close()