Source code for plaso.cli.analysis_tool

# -*- coding: utf-8 -*-
"""Shared functionality for an analysis CLI tool."""

import time

from dfdatetime import posix_time as dfdatetime_posix_time

from plaso.analysis import manager as analysis_manager
from plaso.cli import tool_options
from plaso.cli import tools
from plaso.cli import views
from plaso.containers import reports
from plaso.multi_process import analysis_engine as multi_analysis_engine
from plaso.storage import factory as storage_factory


[docs] class AnalysisTool( tools.CLITool, tool_options.AnalysisPluginOptions, tool_options.ProfilingOptions, tool_options.StorageFileOptions): """Analysis CLI tool. Attributes: list_analysis_plugins (bool): True if information about the analysis plugins should be shown. """ _CONTAINER_TYPE_ANALYSIS_REPORT = reports.AnalysisReport.CONTAINER_TYPE
[docs] def __init__(self, input_reader=None, output_writer=None): """Initializes the CLI tool object. Args: input_reader (Optional[InputReader]): input reader, where None indicates that the stdin input reader should be used. output_writer (Optional[OutputWriter]): output writer, where None indicates that the stdout output writer should be used. """ super(AnalysisTool, self).__init__( input_reader=input_reader, output_writer=output_writer) self._analysis_manager = analysis_manager.AnalysisPluginManager self._analysis_plugins = None self._analysis_plugins_output_format = None self._command_line_arguments = None self._event_filter_expression = None self._event_filter = None self._number_of_stored_analysis_reports = 0 self._status_view_interval = 0.5 self._storage_file_path = None self._worker_memory_limit = None self._worker_timeout = None self.list_analysis_plugins = False
def _AnalyzeEvents(self, session, configuration, status_update_callback=None): """Analyzes events in a Plaso storage. Args: session (Session): session in which the events are analyzed. configuration (ProcessingConfiguration): processing configuration. status_update_callback (Optional[function]): callback function for status updates. Returns: ProcessingStatus: processing status. Raises: RuntimeError: if a non-recoverable situation is encountered. """ storage_writer = storage_factory.StorageFactory.CreateStorageWriterForFile( self._storage_file_path) if not storage_writer: raise RuntimeError('Unable to create storage writer.') # TODO: add single process analysis engine support. analysis_engine = multi_analysis_engine.AnalysisMultiProcessEngine( worker_memory_limit=self._worker_memory_limit, worker_timeout=self._worker_timeout) analysis_engine.SetStatusUpdateInterval(self._status_view_interval) storage_writer.Open(path=self._storage_file_path) processing_status = None session.command_line_arguments = self._command_line_arguments session.debug_mode = self._debug_mode try: storage_writer.AddAttributeContainer(session) try: processing_status = analysis_engine.AnalyzeEvents( session, storage_writer, self._data_location, self._analysis_plugins, configuration, event_filter=self._event_filter, event_filter_expression=self._event_filter_expression, status_update_callback=status_update_callback, storage_file_path=self._storage_file_path) finally: session.aborted = getattr(processing_status, 'aborted', True) session.completion_time = int(time.time() * 1000000) storage_writer.UpdateAttributeContainer(session) finally: storage_writer.Close() return processing_status def _PrintAnalysisReportsDetails(self, storage_reader): """Prints the details of the analysis reports. Args: storage_reader (StorageReader): storage reader. """ generator = storage_reader.GetAttributeContainers( self._CONTAINER_TYPE_ANALYSIS_REPORT) for index, analysis_report in enumerate(generator): if index + 1 <= self._number_of_stored_analysis_reports: continue date_time_string = None if analysis_report.time_compiled is not None: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=analysis_report.time_compiled) date_time_string = date_time.CopyToDateTimeStringISO8601() title = f'Analysis report: {analysis_report.plugin_name:s}' table_view = views.ViewsFactory.GetTableView( self._views_format_type, title=title) table_view.AddRow(['Date and time', date_time_string or 'N/A']) table_view.AddRow(['Event filter', analysis_report.event_filter or 'N/A']) if not analysis_report.analysis_counter: table_view.AddRow(['Text', analysis_report.text or '']) else: text = 'Results' for key, value in sorted(analysis_report.analysis_counter.items()): table_view.AddRow([text, f'{key:s}: {value:d}']) text = '' table_view.Write(self._output_writer)