Source code for plaso.output.text_file

# -*- coding: utf-8 -*-
"""Shared functionality for text file based output modules."""

import abc
import heapq
import os

from plaso.output import interface


[docs] class SortedStringHeap(object): """Heap to sort output strings.""" _MAXIMUM_NUMBER_OF_STRINGS = 100000
[docs] def __init__(self): """Initializes a heap.""" super(SortedStringHeap, self).__init__() self._heap = []
[docs] def IsFull(self): """Determines if the heap is full. Returns: bool: True if the heap is full. """ return len(self._heap) >= self._MAXIMUM_NUMBER_OF_STRINGS
[docs] def PopString(self): """Pops a string from the heap. Returns: str: string. """ try: _, string = heapq.heappop(self._heap) except IndexError: return None return string
[docs] def PopStrings(self): """Pops strings from the heap. Yields: str: string. """ string = self.PopString() while string: yield string string = self.PopString()
[docs] def PushString(self, sort_key, string): """Pushes a string onto the heap. Args: sort_key (str): key for the sort order. string (str): string. """ heapq.heappush(self._heap, (sort_key, string))
[docs] class TextFileOutputModule(interface.OutputModule): """Shared functionality of an output module that writes to a text file.""" WRITES_OUTPUT_FILE = True _ENCODING = 'utf-8'
[docs] def __init__(self): """Initializes an output module that writes to a text file.""" super(TextFileOutputModule, self).__init__() self._file_object = None
[docs] def Close(self): """Closes the output file.""" if self._file_object: self._file_object.close() self._file_object = None
[docs] @abc.abstractmethod def GetFieldValues( self, output_mediator, event, event_data, event_data_stream, event_tag): """Retrieves the output field values. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. event_tag (EventTag): event tag. Returns: dict[str, str]: output field values per name. """
[docs] def Open(self, path=None, **kwargs): # pylint: disable=arguments-differ """Opens the output file. Args: path (Optional[str]): path of the output file. Raises: IOError: if the specified output file already exists. OSError: if the specified output file already exists. ValueError: if path is not set. """ if not path: raise ValueError('Missing path.') if os.path.isfile(path): raise IOError(( 'Unable to use an already existing file for output ' '[{0:s}]').format(path)) self._file_object = open(path, 'wt', encoding=self._ENCODING) # pylint: disable=consider-using-with
[docs] @abc.abstractmethod def WriteFieldValues(self, output_mediator, field_values): """Writes field values to the output. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. field_values (dict[str, str]): output field values per name. """
[docs] def WriteLine(self, text): """Writes a line of text to the output file. Args: text (str): text to output. """ self._file_object.write('{0:s}\n'.format(text))
[docs] def WriteText(self, text): """Writes text to the output file. Args: text (str): text to output. """ self._file_object.write(text)
[docs] class SortedTextFileOutputModule(TextFileOutputModule): """Shared functionality of an output module that writes to a text file.""" _SORT_KEY_FIELD_NAMES = ['time']
[docs] def __init__(self, event_formatting_helper): """Initializes an output module that writes to a text file. Args: event_formatting_helper (EventFormattingHelper): event formatting helper. """ super(SortedTextFileOutputModule, self).__init__() self._event_formatting_helper = event_formatting_helper self._last_primary_sort_key = None self._sorted_strings_heap = SortedStringHeap()
def _FlushSortedStringsHeap(self): """Flushed the sorted strings heap.""" for output_text in self._sorted_strings_heap.PopStrings(): self.WriteText(output_text) self._last_primary_sort_key = None @abc.abstractmethod def _GetString(self, output_mediator, field_values): """Retrieves an output string. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. field_values (dict[str, str]): output field values per name. Returns: str: output string. """
[docs] def GetFieldValues( self, output_mediator, event, event_data, event_data_stream, event_tag): """Retrieves the output field values. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. event_tag (EventTag): event tag. Returns: dict[str, str]: output field values per name. """ return self._event_formatting_helper.GetFieldValues( output_mediator, event, event_data, event_data_stream, event_tag)
[docs] def WriteFieldValues(self, output_mediator, field_values): """Writes field values to the output. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. field_values (dict[str, str]): output field values per name. """ primary_sort_key = field_values.get(self._SORT_KEY_FIELD_NAMES[0], None) if self._last_primary_sort_key is None: self._last_primary_sort_key = primary_sort_key if (primary_sort_key != self._last_primary_sort_key or self._sorted_strings_heap.IsFull()): self._FlushSortedStringsHeap() output_text = self._GetString(output_mediator, field_values) if output_text: sort_key = ' '.join([field_values.get(field_name, None) or '' for field_name in self._SORT_KEY_FIELD_NAMES]) self._sorted_strings_heap.PushString(sort_key, output_text)
[docs] def WriteFooter(self): """Writes the footer to the output. Can be used for post-processing or output after the last event is written, such as writing a file footer. """ self._FlushSortedStringsHeap()