Source code for plaso.output.rawpy

"""Output module for the native (or "raw") Python format."""

from acstore.containers import interface as containers_interface

from dfdatetime import interface as dfdatetime_interface
from dfdatetime import posix_time as dfdatetime_posix_time

from plaso.output import dynamic
from plaso.output import logger
from plaso.output import manager
from plaso.output import text_file


[docs] class NativePythonOutputModule(text_file.TextFileOutputModule): """Output module for native (or "raw") Python output format.""" NAME = "rawpy" DESCRIPTION = 'native (or "raw") Python output.' _GENERATED_FIELD_VALUES = ["display_name", "filename", "inode"] # Note that native Python output defines certain fields as part of the format. _RESERVED_FIELDS = frozenset( [ "body", "data_type", "date_time", "display_name", "filename", "hostname", "http_headers", "inode", "mapped_files", "metadata", "offset", "parser", "path_spec", "query", "source_long", "source_short", "tag", "timestamp", "timestamp_desc", "timezone", "username", ] )
[docs] def __init__(self): """Initializes an output module.""" super().__init__() self._field_formatting_helper = dynamic.DynamicFieldFormattingHelper()
def _GetString(self, field_values): """Retrieves an output string. Args: field_values (dict[str, str]): output field values per name. Returns: str: output string. """ reserved_attributes = [] additional_attributes = [] for field_name, field_value in sorted(field_values.items()): if field_name in ( "_event_identifier", "_event_tag_labels", "_timestamp", "path_spec", ): continue field_string = f" {{{field_name!s}}} {field_value!s}" if field_name in self._RESERVED_FIELDS: reserved_attributes.append(field_string) else: additional_attributes.append(field_string) timestamp_value = field_values["_timestamp"] lines_of_text = ["+-" * 40, "[Timestamp]:", f" {timestamp_value:s}"] path_specification = field_values.get("path_spec") if path_specification: lines_of_text.extend(["", "[Pathspec]:"]) lines_of_text.extend( [f" {line:s}" for line in path_specification.comparable.split("\n")] ) # Remove additional empty line. lines_of_text.pop() lines_of_text.extend(["", "[Reserved attributes]:"]) lines_of_text.extend(reserved_attributes) lines_of_text.extend(["", "[Additional attributes]:"]) lines_of_text.extend(additional_attributes) event_tag_labels = field_values.get("_event_tag_labels") if event_tag_labels: labels_string = ", ".join(f"'{label:s}'" for label in event_tag_labels) lines_of_text.extend(["", "[Tag]:", f" {{labels}} [{labels_string:s}]"]) lines_of_text.append("") return "\n".join(lines_of_text)
[docs] def GetFieldValues( self, output_mediator, event, event_data, event_data_stream, event_tag ): """Retrieves the output field values. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. event_tag (EventTag): event tag. Returns: dict[str, str]: output field values per name. """ event_identifier = event.GetIdentifier() event_identifier_string = event_identifier.CopyToString() date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=event.timestamp ) date_time_string = date_time.CopyToDateTimeStringISO8601() field_values = { "_event_identifier": event_identifier_string, "_timestamp": date_time_string, } event_attributes = list(event_data.GetAttributes()) if event_data_stream: event_attributes.extend(event_data_stream.GetAttributes()) for attribute_name, attribute_value in sorted(event_attributes): # Ignore attribute container identifier and date and time values. if isinstance( attribute_value, ( containers_interface.AttributeContainerIdentifier, dfdatetime_interface.DateTimeValues, ), ): continue if ( isinstance(attribute_value, list) and attribute_value and isinstance(attribute_value[0], dfdatetime_interface.DateTimeValues) ): continue # Ignore protected internal only attributes. if attribute_name[0] == "_" and attribute_name != "_parser_chain": continue # Some parsers have written bytes values to storage. if isinstance(attribute_value, bytes): attribute_value = attribute_value.decode("utf-8", "replace") logger.warning( f'Found bytes value for attribute "{attribute_name:s}" for data ' f"type: {event_data.data_type!s}. Value was converted to UTF-8: " f'"{attribute_value:s}"' ) # Output _parser_chain as parser for backwards compatibility. if attribute_name == "_parser_chain": attribute_name = "parser" field_values[attribute_name] = attribute_value for field_name in self._GENERATED_FIELD_VALUES: if field_name not in field_values: field_value = field_values.get(field_name) if field_value is None: field_value = self._field_formatting_helper.GetFormattedField( output_mediator, field_name, event, event_data, event_data_stream, event_tag, ) field_values[field_name] = field_value if event_tag: field_values["_event_tag_labels"] = event_tag.labels return field_values
[docs] def WriteFieldValues(self, output_mediator, field_values): """Writes field values to the output. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. field_values (dict[str, str]): output field values per name. """ output_text = self._GetString(field_values) self.WriteLine(output_text)
manager.OutputManager.RegisterOutput(NativePythonOutputModule)