"""Shared functionality for JSON based output modules."""
import abc
import base64
from acstore.containers import interface as containers_interface
from dfdatetime import interface as dfdatetime_interface
from plaso.output import formatting_helper
from plaso.output import text_file
from plaso.serializer import json_serializer
[docs]
class SharedJSONOutputModule(text_file.TextFileOutputModule):
"""Shared functionality for JSON based output modules."""
_JSON_SERIALIZER = json_serializer.JSONAttributeContainerSerializer
_GENERATED_FIELD_VALUES = ["display_name", "filename", "inode"]
[docs]
def __init__(self):
"""Initializes an output module."""
super().__init__()
self._field_formatting_helper = JSONFieldFormattingHelper()
[docs]
def GetFieldValues(
self, output_mediator, event, event_data, event_data_stream, event_tag
):
"""Retrieves the output field values.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
event_tag (EventTag): event tag.
Returns:
dict[str, str]: output field values per name.
"""
field_values = {"__container_type__": "event", "__type__": "AttributeContainer"}
if event_data:
for attribute_name, attribute_value in event_data.GetAttributes():
# Ignore attribute container identifier and date and time values.
if isinstance(
attribute_value,
(
containers_interface.AttributeContainerIdentifier,
dfdatetime_interface.DateTimeValues,
),
):
continue
if (
isinstance(attribute_value, list)
and attribute_value
and isinstance(
attribute_value[0], dfdatetime_interface.DateTimeValues
)
):
continue
# Ignore protected internal only attributes.
if attribute_name[0] == "_" and attribute_name != "_parser_chain":
continue
field_value = self._field_formatting_helper.GetFormattedField(
output_mediator,
attribute_name,
event,
event_data,
event_data_stream,
event_tag,
)
# Output _parser_chain as parser for backwards compatibility.
if attribute_name == "_parser_chain":
attribute_name = "parser"
field_values[attribute_name] = field_value
if event_data_stream:
for attribute_name, attribute_value in event_data_stream.GetAttributes():
# Output path_spec as pathspec for backwards compatibility.
if attribute_name == "path_spec":
attribute_name = "pathspec"
attribute_value = self._JSON_SERIALIZER.WriteSerializedDict(
attribute_value
)
field_values[attribute_name] = attribute_value
if event:
for attribute_name, attribute_value in event.GetAttributes():
# Ignore attribute container identifier values.
if isinstance(
attribute_value, containers_interface.AttributeContainerIdentifier
):
continue
if attribute_name == "date_time":
attribute_value = self._JSON_SERIALIZER.WriteSerializedDict(
attribute_value
)
field_values[attribute_name] = attribute_value
for field_name in self._GENERATED_FIELD_VALUES:
if field_name not in field_values:
field_value = field_values.get(field_name)
if field_value is None:
field_value = self._field_formatting_helper.GetFormattedField(
output_mediator,
field_name,
event,
event_data,
event_data_stream,
event_tag,
)
field_values[field_name] = field_value
field_values["message"] = self._field_formatting_helper.GetFormattedField(
output_mediator, "message", event, event_data, event_data_stream, event_tag
)
if event_tag:
event_tag_values = {
"__container_type__": "event_tag",
"__type__": "AttributeContainer",
}
for attribute_name, attribute_value in event_tag.GetAttributes():
# Ignore attribute container identifier values.
if isinstance(
attribute_value, containers_interface.AttributeContainerIdentifier
):
continue
event_tag_values[attribute_name] = attribute_value
field_values["tag"] = event_tag_values
return field_values
[docs]
@abc.abstractmethod
def WriteFieldValues(self, output_mediator, field_values):
"""Writes field values to the output.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
field_values (dict[str, str]): output field values per name.
"""