Source code for plaso.output.formatting_helper
"""Output module field formatting helper."""
import abc
import datetime
import math
import pytz
from dfdatetime import posix_time as dfdatetime_posix_time
from dfvfs.lib import definitions as dfvfs_definitions
from plaso.containers import events
from plaso.formatters import default
from plaso.output import logger
[docs]
class EventFormattingHelper:
"""Output module event formatting helper."""
[docs]
@abc.abstractmethod
def GetFieldValues(
self, output_mediator, event, event_data, event_data_stream, event_tag
):
"""Retrieves the output field values.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
event_tag (EventTag): event tag.
Returns:
list[str]: output field values.
"""
[docs]
class FieldFormattingHelper:
"""Output module field formatting helper."""
_DEFAULT_MESSAGE_FORMATTER = default.DefaultEventFormatter()
# Maps the name of a field to callback function that formats the field value.
_FIELD_FORMAT_CALLBACKS = {}
[docs]
def __init__(self):
"""Initializes a field formatting helper."""
event_data_stream = events.EventDataStream()
super().__init__()
self._callback_functions = {}
self._event_data_stream_field_names = event_data_stream.GetAttributeNames()
self._event_tag_field_names = []
for field_name, callback_name in self._FIELD_FORMAT_CALLBACKS.items():
if callback_name == "_FormatTag":
self._event_tag_field_names.append(field_name)
else:
self._callback_functions[field_name] = getattr(
self, callback_name, None
)
# The field format callback methods require specific arguments hence
# the check for unused arguments is disabled here.
# pylint: disable=unused-argument
def _FormatDateTime(self, output_mediator, event, event_data, event_data_stream):
"""Formats a date and time field in ISO 8601 format.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: date and time field with time zone offset, semantic time.
"""
if output_mediator.dynamic_time and event.date_time:
iso8601_string = getattr(event.date_time, "string", None)
if iso8601_string:
return iso8601_string
date_time = event.date_time
if event.date_time.is_local_time:
# TODO: replace this by a more generic solution.
if event.date_time.precision == "1s":
timestamp, _ = divmod(event.timestamp, 1000000)
date_time = dfdatetime_posix_time.PosixTime(timestamp=timestamp)
else:
date_time = dfdatetime_posix_time.PosixTimeInMicroseconds(
timestamp=event.timestamp
)
iso8601_string = date_time.CopyToDateTimeStringISO8601()
if not iso8601_string:
return "Invalid"
if iso8601_string[-1] == "Z":
iso8601_string = f"{iso8601_string[:-1]:s}+00:00"
if output_mediator.time_zone != pytz.UTC or date_time.time_zone_offset:
# For output in a specific time zone overwrite the date, time in
# seconds and time zone offset in the UTC ISO8601 string.
year, month, day_of_month, hours, minutes, seconds = (
date_time.GetDateWithTimeOfDay()
)
try:
datetime_object = datetime.datetime(
year,
month,
day_of_month,
hours,
minutes,
seconds,
tzinfo=pytz.UTC,
)
datetime_object = datetime_object.astimezone(
output_mediator.time_zone
)
isoformat_string = datetime_object.isoformat()
iso8601_string = "".join(
[
isoformat_string[:19],
iso8601_string[19:-6],
isoformat_string[-6:],
]
)
except (OSError, OverflowError, TypeError, ValueError):
return "Invalid"
else:
if not event.date_time or event.date_time.is_local_time:
timestamp = event.timestamp
else:
timestamp, fraction_of_second = (
event.date_time.CopyToPosixTimestampWithFractionOfSecond()
)
timestamp = (timestamp or 0) * 1000000
if fraction_of_second:
while fraction_of_second < 1000000:
fraction_of_second *= 10
while fraction_of_second >= 1000000:
fraction_of_second /= 10
if timestamp < 0:
timestamp -= math.ceil(fraction_of_second)
else:
timestamp += math.ceil(fraction_of_second)
# For now check if event.timestamp is set, to mimic existing behavior of
# using 0000-00-00T00:00:00.000000+00:00 for 0 timestamp values
if not timestamp:
return "0000-00-00T00:00:00.000000+00:00"
try:
# Note that setting tzinfo to None can have unexpected results on
# certain platforms.
datetime_object = datetime.datetime(
1970, 1, 1, tzinfo=pytz.UTC
) + datetime.timedelta(microseconds=timestamp)
datetime_object = datetime_object.astimezone(output_mediator.time_zone)
iso8601_string = datetime_object.isoformat()
iso8601_string = (
f"{iso8601_string[:19]:s}.{datetime_object.microsecond:06d}"
f"{iso8601_string[-6:]:s}"
)
except (OSError, OverflowError, TypeError, ValueError) as exception:
iso8601_string = "0000-00-00T00:00:00.000000+00:00"
self._ReportEventError(
event,
event_data,
(
f"unable to copy timestamp: {event.timestamp!s} to a human "
f"readable date and time with error: {exception!s}. Defaulting "
f'to: "{iso8601_string:s}"'
),
)
return iso8601_string
def _FormatDisplayName(self, output_mediator, event, event_data, event_data_stream):
"""Formats the display name.
The display_name field can be set as an attribute to event_data otherwise
it is derived from the path specification.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: display name field.
"""
display_name = getattr(event_data, "display_name", None)
if not display_name:
path_spec = getattr(event_data_stream, "path_spec", None)
if path_spec:
display_name = output_mediator.GetDisplayNameForPathSpec(path_spec)
else:
display_name = "-"
return display_name
def _FormatFilename(self, output_mediator, event, event_data, event_data_stream):
"""Formats the filename.
The filename field can be set as an attribute to event_data otherwise
it is derived from the path specification.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: filename field.
"""
filename = getattr(event_data, "filename", None)
if not filename:
path_spec = getattr(event_data_stream, "path_spec", None)
if path_spec:
filename = output_mediator.GetRelativePathForPathSpec(path_spec)
else:
filename = "-"
return filename
def _FormatHostname(self, output_mediator, event, event_data, event_data_stream):
"""Formats a hostname field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: hostname field.
"""
return output_mediator.GetHostname(event_data)
def _FormatInode(self, output_mediator, event, event_data, event_data_stream):
"""Formats an inode field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: inode field.
"""
inode = getattr(event_data, "inode", None)
# Note that inode can contain 0.
if inode is None:
path_specification = getattr(event_data_stream, "path_spec", None)
if path_specification:
if path_specification.type_indicator in (
dfvfs_definitions.TYPE_INDICATOR_APFS,
dfvfs_definitions.TYPE_INDICATOR_HFS,
):
inode = getattr(path_specification, "identifier", None)
elif path_specification.type_indicator == (
dfvfs_definitions.TYPE_INDICATOR_NTFS
):
inode = getattr(path_specification, "mft_entry", None)
elif path_specification.type_indicator in (
dfvfs_definitions.TYPE_INDICATOR_EXT,
dfvfs_definitions.TYPE_INDICATOR_TSK,
):
# Note that inode can contain a TSK metadata address.
inode = getattr(path_specification, "inode", None)
if inode is None:
inode = "-"
elif isinstance(inode, int):
inode = f"{inode:d}"
return inode
def _FormatMACB(self, output_mediator, event, event_data, event_data_stream):
"""Formats a legacy MACB representation field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: MACB field.
"""
return output_mediator.GetMACBRepresentation(event, event_data)
def _FormatMessage(self, output_mediator, event, event_data, event_data_stream):
"""Formats a message field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: message field.
"""
message_formatter = output_mediator.GetMessageFormatter(event_data.data_type)
if not message_formatter:
logger.warning(
f"Using default message formatter for data type: "
f"{event_data.data_type:s}"
)
message_formatter = self._DEFAULT_MESSAGE_FORMATTER
event_values = event_data.CopyToDict()
message_formatter.FormatEventValues(output_mediator, event_values)
return message_formatter.GetMessage(event_values)
def _FormatMessageShort(
self, output_mediator, event, event_data, event_data_stream
):
"""Formats a short message field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: short message field.
"""
message_formatter = output_mediator.GetMessageFormatter(event_data.data_type)
if not message_formatter:
logger.warning(
f"Using default message formatter for data type: "
f"{event_data.data_type:s}"
)
message_formatter = self._DEFAULT_MESSAGE_FORMATTER
event_values = event_data.CopyToDict()
message_formatter.FormatEventValues(output_mediator, event_values)
return message_formatter.GetMessageShort(event_values)
def _FormatParser(self, output_mediator, event, event_data, event_data_stream):
"""Formats a parser field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: parser field.
"""
return getattr(event_data, "_parser_chain", None) or "-"
def _FormatSource(self, output_mediator, event, event_data, event_data_stream):
"""Formats a source field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: source field.
"""
data_type = getattr(event_data, "data_type", None) or "-"
_, source = output_mediator.GetSourceMapping(data_type)
return source or "N/A"
def _FormatSourceShort(self, output_mediator, event, event_data, event_data_stream):
"""Formats a short source field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: short source field.
"""
data_type = getattr(event_data, "data_type", None) or "-"
source_short, _ = output_mediator.GetSourceMapping(data_type)
return source_short or "N/A"
def _FormatTag(self, output_mediator, event_tag):
"""Formats an event tag field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event_tag (EventTag): event tag or None if not set.
Returns:
str: event tag labels or "-" if event tag is not set.
"""
if not event_tag:
return "-"
return " ".join(event_tag.labels)
def _FormatTime(self, output_mediator, event, event_data, event_data_stream):
"""Formats a time field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: time in seconds formatted as "HH:MM:SS" or "--:--:--" on error.
"""
# For now check if event.timestamp is set, to mimic existing behavior of
# using --:--:-- for 0 timestamp values.
if not event.timestamp:
return "--:--:--"
date_time = event.date_time
if not date_time or date_time.is_local_time:
date_time = dfdatetime_posix_time.PosixTimeInMicroseconds(
timestamp=event.timestamp
)
year, month, day_of_month, hours, minutes, seconds = (
date_time.GetDateWithTimeOfDay()
)
if output_mediator.time_zone != pytz.UTC:
try:
datetime_object = datetime.datetime(
year, month, day_of_month, hours, minutes, seconds, tzinfo=pytz.UTC
)
datetime_object = datetime_object.astimezone(output_mediator.time_zone)
hours, minutes, seconds = (
datetime_object.hour,
datetime_object.minute,
datetime_object.second,
)
except (OSError, OverflowError, TypeError, ValueError):
hours, minutes, seconds = (None, None, None)
if None in (hours, minutes, seconds):
message = (
f"unable to copy timestamp: {event.timestamp!s} to a human readable "
f'time. Defaulting to: "--:--:--"'
)
self._ReportEventError(
event,
event_data,
message,
)
return "--:--:--"
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def _FormatTimeZone(self, output_mediator, event, event_data, event_data_stream):
"""Formats a time zone field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: time zone field.
"""
if not event.timestamp:
return "-"
date_time = event.date_time
if not date_time or date_time.is_local_time:
date_time = dfdatetime_posix_time.PosixTimeInMicroseconds(
timestamp=event.timestamp
)
if output_mediator.time_zone == pytz.UTC:
return "UTC"
year, month, day_of_month, hours, minutes, seconds = (
date_time.GetDateWithTimeOfDay()
)
try:
# For tzname to work the datetime object must be naive (without
# a time zone).
datetime_object = datetime.datetime(
year, month, day_of_month, hours, minutes, seconds
)
return output_mediator.time_zone.tzname(datetime_object)
except (OverflowError, TypeError, ValueError):
message = (
f"unable to copy timestamp: {event.timestamp!s} to a human readable "
f'time zone. Defaulting to: "-"'
)
self._ReportEventError(
event,
event_data,
message,
)
return "-"
def _FormatUsername(self, output_mediator, event, event_data, event_data_stream):
"""Formats an username field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: username field.
"""
return output_mediator.GetUsername(event_data)
def _FormatValues(self, output_mediator, event, event_data, event_data_stream):
"""Formats a values.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: values field.
"""
values = getattr(event_data, "values", None)
if isinstance(values, list) and event_data.data_type in (
"windows:registry:key_value",
"windows:registry:service",
):
value_strings = []
for name, data_type, data in sorted(values):
name_string = name or "(default)"
data_string = data or "(empty)"
value_strings.append(
f"{name_string:s}: [{data_type:s}] {data_string:s}"
)
values = " ".join(value_strings)
return values
# pylint: enable=unused-argument
def _ReportEventError(self, event, event_data, error_message):
"""Reports an event related error.
Args:
event (EventObject): event.
event_data (EventData): event data.
error_message (str): error message.
"""
event_identifier = event.GetIdentifier()
event_identifier_string = event_identifier.CopyToString()
display_name = getattr(event_data, "display_name", None) or "N/A"
parser_chain = getattr(event_data, "_parser_chain", None) or "N/A"
logger.error(
f"Event: {event_identifier_string!s} description: "
f"{event.timestamp_desc:s} data type: {event_data.data_type:s} "
f"display name: {display_name:s} parser chain: {parser_chain:s} with "
f"error: {error_message:s}"
)
[docs]
def GetFormattedField(
self,
output_mediator,
field_name,
event,
event_data,
event_data_stream,
event_tag,
):
"""Formats the specified field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
field_name (str): name of the field.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
event_tag (EventTag): event tag.
Returns:
str: value of the field or None if not available.
"""
if field_name in self._event_tag_field_names:
return self._FormatTag(output_mediator, event_tag)
callback_function = self._callback_functions.get(field_name)
if callback_function:
output_value = callback_function(
output_mediator, event, event_data, event_data_stream
)
elif field_name in self._event_data_stream_field_names:
output_value = getattr(event_data_stream, field_name, None)
else:
output_value = getattr(event_data, field_name, None)
if output_value is not None and not isinstance(output_value, str):
output_value = f"{output_value!s}"
return output_value