# -*- coding: utf-8 -*-
"""The Apple Unified Logging (AUL) file parser."""
import abc
import base64
import collections
import os
import re
import struct
import uuid
import lz4.block
from dfdatetime import posix_time as dfdatetime_posix_time
from dfvfs.path import factory as path_spec_factory
from dtfabric.runtime import data_maps as dtfabric_data_maps
from plaso.containers import events
from plaso.helpers.macos import darwin
from plaso.lib import dtfabric_helper
from plaso.lib import errors
from plaso.lib import specification
from plaso.parsers import interface
from plaso.parsers import manager
[docs]
class UnifiedLoggingEventData(events.EventData):
"""Apple Unified Logging (AUL) event data.
Attributes:
activity_identifier (int): activity identifier.
boot_identifier (str): boot identifier.
category (str): event category.
event_message (str): event message.
event_type (str): event type.
message_type (str): message type.
process_identifier (int): process identifier (PID).
process_image_identifier (str): process image identifier.
process_image_identifier (str): process image identifier, contains an UUID.
recorded_time (dfdatetime.DateTimeValues): date and time the log entry was
recorded.
sender_image_identifier (str): (sender) image identifier, contains an UUID.
sender_image_path (str): path of the (sender) image.
signpost_identifier (int): signpost identifier.
signpost_name (str): signpost name.
subsystem (str): subsystem that produced the logging event.
thread_identifier (int): thread identifier.
ttl (int): log time to live (TTL).
"""
DATA_TYPE = 'macos:unified_logging:event'
[docs]
def __init__(self):
"""Initialise event data."""
super(UnifiedLoggingEventData, self).__init__(data_type=self.DATA_TYPE)
self.activity_identifier = None
self.boot_identifier = None
self.category = None
self.event_message = None
self.event_type = None
self.message_type = None
self.process_identifier = None
self.process_image_identifier = None
self.process_image_path = None
self.recorded_time = None
self.sender_image_identifier = None
self.sender_image_path = None
self.signpost_identifier = None
self.signpost_name = None
self.subsystem = None
self.thread_identifier = None
self.ttl = None
[docs]
class DSCRange(object):
"""Shared-Cache Strings (dsc) range.
Attributes:
data_offset (int): offset of the string data.
image_identifier (uuid.UUID): the image identifier.
image_path (str): the image path.
range_offset (int): the offset of the range.
range_sizes (int): the size of the range.
text_offset (int): the offset of the text.
text_size (int): the size of the text.
uuid_index (int): index of the dsc UUID.
"""
[docs]
def __init__(self):
"""Initializes a Shared-Cache Strings (dsc) range."""
super(DSCRange, self).__init__()
self.data_offset = None
self.image_identifier = None
self.image_path = None
self.range_offset = None
self.range_size = None
self.text_offset = None
self.text_size = None
self.uuid_index = None
[docs]
class DSCUUID(object):
"""Shared-Cache Strings (dsc) UUID.
Attributes:
image_identifier (uuid.UUID): the image identifier.
image_path (str): the image path.
text_offset (int): the offset of the text.
text_size (int): the size of the text.
"""
[docs]
def __init__(self):
"""Initializes a Shared-Cache Strings (dsc) UUID."""
super(DSCUUID, self).__init__()
self.image_identifier = None
self.image_path = None
self.text_offset = None
self.text_size = None
[docs]
class ImageValues(object):
"""Image values.
Attributes:
identifier (uuid.UUID): the identifier.
path (str): the path.
string (str): the string.
text_offset (int): the offset of the text.
"""
[docs]
def __init__(
self, identifier=None, path=None, string=None, text_offset=None):
"""Initializes image values.
Args:
identifier (Optional[uuid.UUID]): the identifier.
path (Optional[str]): the path.
string (Optional[str]): the string.
text_offset (Optional[int]): the offset of the text.
"""
super(ImageValues, self).__init__()
self._string_formatter = None
self.identifier = identifier
self.path = path
self.string = string
self.text_offset = text_offset
[docs]
class BacktraceFrame(object):
"""Backtrace frame.
Attributes:
image_identifier (str): image identifier, contains an UUID.
image_offset (int): image offset.
"""
[docs]
def __init__(self):
"""Initializes a backtrace frame."""
super(BacktraceFrame, self).__init__()
self.image_identifier = None
self.image_offset = None
[docs]
class LogEntry(object):
"""Log entry.
Attributes:
activity_identifier (int): activity identifier.
backtrace_frames (list[BacktraceFrame]): backtrace frames.
boot_identifier (uuid.UUID): boot identifier.
category (str): (sub system) category.
creator_activity_identifier (int): creator activity identifier.
event_message (str): event message.
event_type (str): event type.
format_string (str): format string.
loss_count (int): number of message lost.
loss_end_mach_timestamp (int): Mach timestamp of the end of the message
loss.
loss_end_timestamp (int): timestamp of the end of the message loss, in
number of nanoseconds since January 1, 1970 00:00:00.000000000
loss_start_mach_timestamp (int): Mach timestamp of the start of the message
loss.
loss_start_timestamp (int): timestamp of the start of the message loss, in
number of nanoseconds since January 1, 1970 00:00:00.000000000
mach_timestamp (int): Mach timestamp.
message_type (str): message type.
parent_activity_identifier (int): parent activity identifier.
process_identifier (int): process identifier (PID).
process_image_identifier (uuid.UUID): process image identifier.
process_image_path (str): path of the process image.
sender_image_identifier (uuid.UUID): (sender) image identifier.
sender_image_path (str): path of the (sender) image.
sender_program_counter (int): (sender) program counter.
signpost_identifier (int): signpost identifier.
signpost_name (str): signpost name.
signpost_scope (str): signpost scope.
signpost_type (str): signpost type.
sub_system (str): sub system.
thread_identifier (int): thread identifier.
timestamp (int): number of nanoseconds since January 1, 1970
00:00:00.000000000.
time_zone_name (str): name of the time zone.
trace_identifier (int): trace identifier.
ttl (int): Time to live (TTL) value.
"""
[docs]
def __init__(self):
"""Initializes a log entry."""
super(LogEntry, self).__init__()
self.activity_identifier = None
self.backtrace_frames = None
self.boot_identifier = None
self.category = None
self.creator_activity_identifier = None
self.event_message = None
self.event_type = None
self.format_string = None
self.loss_count = None
self.loss_end_mach_timestamp = None
self.loss_end_timestamp = None
self.loss_start_mach_timestamp = None
self.loss_start_timestamp = None
self.mach_timestamp = None
self.message_type = None
self.parent_activity_identifier = None
self.process_identifier = None
self.process_image_identifier = None
self.process_image_path = None
self.sender_image_identifier = None
self.sender_image_path = None
self.sender_program_counter = None
self.signpost_identifier = None
self.signpost_name = None
self.signpost_scope = None
self.signpost_type = None
self.sub_system = None
self.thread_identifier = None
self.timestamp = None
self.time_zone_name = None
self.trace_identifier = None
self.ttl = None
[docs]
class SignpostDescriptionAttributeFormatStringDecoder(BaseFormatStringDecoder):
"""Signpost description attribute value format string decoder."""
def _GetStringValue(self, value, format_string_operator=None):
"""Retrieves the values as a string.
Args:
value (bytes): Signpost value.
format_string_operator (Optional[FormatStringOperator]): format string
operator.
Returns:
str: formatted Signpost value.
"""
specifier = getattr(format_string_operator, 'specifier', 's')
value_size = len(value)
if not value:
string_value = ''
elif specifier in ('a', 'A', 'e', 'E', 'f', 'F', 'g', 'G'):
if value_size not in (4, 8):
return '<decode: unsupported value>'
if value_size == 4:
float_value = struct.unpack('<f', value)
else:
float_value = struct.unpack('<d', value)
string_value = '{0:.16g}'.format(float_value[0])
elif specifier in ('c', 'C', 'o', 'O', 'p', 'u', 'U', 'x', 'X'):
if value_size not in (1, 2, 4, 8):
return '<decode: unsupported value>'
integer_value = int.from_bytes(value, 'little', signed=False)
if format_string_operator:
format_string = format_string_operator.GetPythonFormatString()
else:
format_string = '{0:d}'
string_value = format_string.format(integer_value)
elif specifier in ('d', 'D', 'i', 'm'):
if value_size not in (1, 2, 4, 8):
return '<decode: unsupported value>'
integer_value = int.from_bytes(value, 'little', signed=True)
if format_string_operator:
format_string = format_string_operator.GetPythonFormatString()
else:
format_string = '{0:d}'
string_value = format_string.format(integer_value)
elif specifier in ('@', 's', 'S'):
try:
string_value = value.decode('utf-8').rstrip('\x00')
except UnicodeDecodeError:
return '<decode: unsupported value>'
if format_string_operator:
format_string = format_string_operator.GetPythonFormatString()
else:
format_string = '{0:s}'
string_value = format_string.format(string_value)
else:
return '<decode: unsupported value>'
return string_value
[docs]
def FormatValue(self, value, format_string_operator=None):
"""Formats a Signpost description attribute value.
Args:
value (bytes): Signpost description attribute value.
format_string_operator (Optional[FormatStringOperator]): format string
operator.
Returns:
str: formatted Signpost description attribute value.
"""
string_value = self._GetStringValue(
value, format_string_operator=format_string_operator)
return (f'__##__signpost.description#____#attribute'
f'#_##_#{string_value:s}##__##')
[docs]
class SignpostDescriptionTimeFormatStringDecoder(BaseFormatStringDecoder):
"""Signpost description time value format string decoder."""
[docs]
def __init__(self, time='begin'):
"""Initializes a Signpost description time value format string decoder.
Args:
time (Optional[str]): Signpost description time.
"""
super(SignpostDescriptionTimeFormatStringDecoder, self).__init__()
self._time = time
[docs]
def FormatValue(self, value, format_string_operator=None):
"""Formats a Signpost description time value.
Args:
value (bytes): Signpost description time value.
format_string_operator (Optional[FormatStringOperator]): format string
operator.
Returns:
str: formatted Signpost description time value.
"""
if len(value) != 8:
return '<decode: unsupported value>'
integer_value = int.from_bytes(value, 'little', signed=False)
return (f'__##__signpost.description#____#{self._time:s}_time'
f'#_##_#{integer_value:d}##__##')
[docs]
class SignpostTelemetryNumberFormatStringDecoder(BaseFormatStringDecoder):
"""Signpost telemetry number value format string decoder."""
[docs]
def __init__(self, number=1):
"""Initializes a Signpost telemetry number value format string decoder.
Args:
number (Optional[int]): Signpost telemetry number.
"""
super(SignpostTelemetryNumberFormatStringDecoder, self).__init__()
self._number = number
def _GetStringValue(self, value, format_string_operator=None):
"""Retrieves the values as a string.
Args:
value (bytes): Signpost value.
format_string_operator (Optional[FormatStringOperator]): format string
operator.
Returns:
str: formatted Signpost value.
"""
specifier = getattr(format_string_operator, 'specifier', 's')
value_size = len(value)
if not value:
string_value = ''
elif specifier in ('a', 'A', 'e', 'E', 'f', 'F', 'g', 'G'):
if value_size not in (4, 8):
return '<decode: unsupported value>'
if value_size == 4:
float_value = struct.unpack('<f', value)
else:
float_value = struct.unpack('<d', value)
string_value = '{0:.16g}'.format(float_value[0])
elif specifier in ('c', 'C', 'o', 'O', 'p', 'u', 'U', 'x', 'X'):
if value_size not in (1, 2, 4, 8):
return '<decode: unsupported value>'
integer_value = int.from_bytes(value, 'little', signed=False)
if format_string_operator:
format_string = format_string_operator.GetPythonFormatString()
else:
format_string = '{0:d}'
string_value = format_string.format(integer_value)
elif specifier in ('d', 'D', 'i', 'm'):
if value_size not in (1, 2, 4, 8):
return '<decode: unsupported value>'
integer_value = int.from_bytes(value, 'little', signed=True)
if format_string_operator:
format_string = format_string_operator.GetPythonFormatString()
else:
format_string = '{0:d}'
string_value = format_string.format(integer_value)
elif specifier in ('@', 's', 'S'):
try:
string_value = value.decode('utf-8').rstrip('\x00')
except UnicodeDecodeError:
return '<decode: unsupported value>'
if format_string_operator:
format_string = format_string_operator.GetPythonFormatString()
else:
format_string = '{0:s}'
string_value = format_string.format(string_value)
else:
return '<decode: unsupported value>'
return string_value
[docs]
def FormatValue(self, value, format_string_operator=None):
"""Formats a Signpost telemetry number value.
Args:
value (bytes): Signpost telemetry number value.
format_string_operator (Optional[FormatStringOperator]): format string
operator.
Returns:
str: formatted Signpost telemetry number value.
"""
string_value = self._GetStringValue(
value, format_string_operator=format_string_operator)
return (f'__##__signpost.telemetry#____#number{self._number}'
f'#_##_#{string_value:s}##__##')
[docs]
class SignpostTelemetryStringFormatStringDecoder(BaseFormatStringDecoder):
"""Signpost telemetry string value format string decoder."""
[docs]
def __init__(self, number=1):
"""Initializes a Signpost telemetry string value format string decoder.
Args:
number (Optional[int]): Signpost telemetry number.
"""
super(SignpostTelemetryStringFormatStringDecoder, self).__init__()
self._number = number
[docs]
def FormatValue(self, value, format_string_operator=None):
"""Formats a Signpost telemetry string value.
Args:
value (bytes): Signpost telemetry string value.
format_string_operator (Optional[FormatStringOperator]): format string
operator.
Returns:
str: formatted Signpost telemetry string value.
"""
try:
string_value = value.decode('utf-8').rstrip('\x00')
except UnicodeDecodeError:
return '<decode: unsupported value>'
return (f'__##__signpost.telemetry#____#string{self._number}'
f'#_##_#{string_value:s}##__##')
[docs]
class BaseUnifiedLoggingFile(dtfabric_helper.DtFabricHelper):
"""Shared functionality for Apple Unified Logging (AUL) files."""
[docs]
def __init__(self):
"""Initializes a Apple Unified Logging (AUL) file."""
super(BaseUnifiedLoggingFile, self).__init__()
self._file_entry = None
self._file_object = None
[docs]
def Close(self):
"""Closes an Apple Unified Logging (AUL) file.
Raises:
IOError: if the file is not opened.
OSError: if the file is not opened.
"""
if not self._file_object:
raise IOError('File not opened')
self._file_object = None
self._file_entry = None
[docs]
def Open(self, file_entry):
"""Opens an Apple Unified Logging (AUL) file.
Args:
file_entry (dfvfs.FileEntry): a file entry.
Raises:
IOError: if the file is already opened.
OSError: if the file is already opened.
"""
if self._file_object:
raise IOError('File already opened')
self._file_entry = file_entry
file_object = file_entry.GetFileObject()
self.ReadFileObject(file_object)
self._file_object = file_object
[docs]
@abc.abstractmethod
def ReadFileObject(self, file_object):
"""Reads an Apple Unified Logging (AUL) file-like object.
Args:
file_object (file): file-like object.
"""
[docs]
class DSCFile(BaseUnifiedLoggingFile):
"""Shared-Cache Strings (dsc) file.
Attributes:
ranges (list[DSCRange]): the ranges.
uuids (list[DSCUUID]): the UUIDs.
"""
_DEFINITION_FILE = os.path.join(os.path.dirname(__file__), 'aul_dsc.yaml')
_SUPPORTED_FORMAT_VERSIONS = ((1, 0), (2, 0))
[docs]
def __init__(self):
"""Initializes a shared-cache strings (dsc) file."""
super(DSCFile, self).__init__()
self.ranges = []
self.uuids = []
def _ReadFileHeader(self, file_object):
"""Reads a file header.
Args:
file_object (file): file-like object.
Returns:
dsc_file_header: a file header.
Raises:
ParseError: if the file header cannot be read.
"""
data_type_map = self._GetDataTypeMap('dsc_file_header')
file_header, _ = self._ReadStructureFromFileObject(
file_object, 0, data_type_map)
if file_header.signature != b'hcsd':
raise errors.ParseError('Unsupported signature.')
format_version = (
file_header.major_format_version, file_header.minor_format_version)
if format_version not in self._SUPPORTED_FORMAT_VERSIONS:
format_version_string = '.'.join([
f'{file_header.major_format_version:d}',
f'{file_header.minor_format_version:d}'])
raise errors.ParseError(
f'Unsupported format version: {format_version_string:s}')
return file_header
def _ReadImagePath(self, file_object, file_offset):
"""Reads an image path.
Args:
file_object (file): file-like object.
file_offset (int): offset of the image path data relative to the start
of the file.
Returns:
str: image path.
Raises:
ParseError: if the image path cannot be read.
"""
data_type_map = self._GetDataTypeMap('cstring')
image_path, _ = self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map)
return image_path
def _ReadRangeDescriptors(
self, file_object, file_offset, version, number_of_ranges):
"""Reads the range descriptors.
Args:
file_object (file): file-like object.
file_offset (int): offset of the start of range descriptors data relative
to the start of the file.
version (int): major version of the file.
number_of_ranges (int): the number of range descriptions to retrieve.
Yields:
DSCRange: a range.
Raises:
ParseError: if the file cannot be read.
"""
if version not in (1, 2):
raise errors.ParseError(f'Unsupported format version: {version:d}.')
if version == 1:
data_type_map_name = 'dsc_range_descriptor_v1'
else:
data_type_map_name = 'dsc_range_descriptor_v2'
data_type_map = self._GetDataTypeMap(data_type_map_name)
for _ in range(number_of_ranges):
range_descriptor, record_size = self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map)
file_offset += record_size
dsc_range = DSCRange()
dsc_range.data_offset = range_descriptor.data_offset
dsc_range.range_offset = range_descriptor.range_offset
dsc_range.range_size = range_descriptor.range_size
dsc_range.uuid_index = range_descriptor.uuid_descriptor_index
yield dsc_range
def _ReadString(self, file_object, file_offset):
"""Reads a string.
Args:
file_object (file): file-like object.
file_offset (int): offset of the string data relative to the start
of the file.
Returns:
str: string.
Raises:
ParseError: if the string cannot be read.
"""
data_type_map = self._GetDataTypeMap('cstring')
format_string, _ = self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map)
return format_string
def _ReadUUIDDescriptors(
self, file_object, file_offset, version, number_of_uuids):
"""Reads the UUID descriptors.
Args:
file_object (file): file-like object.
file_offset (int): offset of the start of UUID descriptors data relative
to the start of the file.
version (int): major version of the file
number_of_uuids (int): the number of UUID descriptions to retrieve.
Yields:
DSCUUId: an UUID.
Raises:
ParseError: if the file cannot be read.
"""
if version not in (1, 2):
raise errors.ParseError(f'Unsupported format version: {version:d}.')
if version == 1:
data_type_map_name = 'dsc_uuid_descriptor_v1'
else:
data_type_map_name = 'dsc_uuid_descriptor_v2'
data_type_map = self._GetDataTypeMap(data_type_map_name)
for _ in range(number_of_uuids):
uuid_descriptor, record_size = self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map)
file_offset += record_size
dsc_uuid = DSCUUID()
dsc_uuid.image_identifier = uuid_descriptor.image_identifier
dsc_uuid.text_offset = uuid_descriptor.text_offset
dsc_uuid.text_size = uuid_descriptor.text_size
dsc_uuid.image_path = self._ReadImagePath(
file_object, uuid_descriptor.path_offset)
yield dsc_uuid
[docs]
def GetImageValues(self, string_reference, is_dynamic):
"""Retrieves image values.
Args:
string_reference (int): reference of the string.
is_dynamic (bool): dynamic flag.
Returns:
ImageValues: image value or None if not available.
Raises:
ParseError: if the image values cannot be read.
"""
for dsc_range in self.ranges:
if is_dynamic:
range_offset = dsc_range.text_offset
range_size = dsc_range.text_size
else:
range_offset = dsc_range.range_offset
range_size = dsc_range.range_size
if string_reference < range_offset:
continue
relative_offset = string_reference - range_offset
if relative_offset <= range_size:
if is_dynamic:
string = '%s'
else:
file_offset = dsc_range.data_offset + relative_offset
string = self._ReadString(self._file_object, file_offset)
return ImageValues(
identifier=dsc_range.image_identifier, path=dsc_range.image_path,
string=string, text_offset=dsc_range.text_offset)
# TODO: if string_reference is invalid use:
# "<Invalid shared cache format string offset>"
return None
[docs]
def ReadFileObject(self, file_object):
"""Reads a shared-cache strings (dsc) file-like object.
Args:
file_object (file): file-like object.
Raises:
ParseError: if the file cannot be read.
"""
file_header = self._ReadFileHeader(file_object)
file_offset = file_object.tell()
self.ranges = list(self._ReadRangeDescriptors(
file_object, file_offset, file_header.major_format_version,
file_header.number_of_ranges))
file_offset = file_object.tell()
self.uuids = list(self._ReadUUIDDescriptors(
file_object, file_offset, file_header.major_format_version,
file_header.number_of_uuids))
for dsc_range in self.ranges:
dsc_uuid = self.uuids[dsc_range.uuid_index]
dsc_range.image_identifier = dsc_uuid.image_identifier
dsc_range.image_path = dsc_uuid.image_path
dsc_range.text_offset = dsc_uuid.text_offset
dsc_range.text_size = dsc_uuid.text_size
[docs]
class TimesyncDatabaseFile(BaseUnifiedLoggingFile):
"""Timesync database file."""
_DEFINITION_FILE = os.path.join(
os.path.dirname(__file__), 'aul_timesync.yaml')
_BOOT_RECORD_SIGNATURE = b'\xb0\xbb'
_SYNC_RECORD_SIGNATURE = b'Ts'
[docs]
def __init__(self):
"""Initializes a timesync database file."""
super(TimesyncDatabaseFile, self).__init__()
self._boot_record_data_type_map = self._GetDataTypeMap(
'timesync_boot_record')
self._sync_record_data_type_map = self._GetDataTypeMap(
'timesync_sync_record')
def _ReadRecord(self, file_object, file_offset):
"""Reads a boot or sync record.
Args:
file_object (file): file-like object.
file_offset (int): offset of the start of the record relative to the start
of the file.
Returns:
tuple[object, int]: boot or sync record and number of bytes read.
Raises:
ParseError: if the file cannot be read.
"""
signature = file_object.read(2)
if signature == self._BOOT_RECORD_SIGNATURE:
data_type_map = self._boot_record_data_type_map
elif signature == self._SYNC_RECORD_SIGNATURE:
data_type_map = self._sync_record_data_type_map
else:
signature = repr(signature)
raise errors.ParseError(f'Unsupported signature: {signature:s}.')
return self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map)
[docs]
def ReadFileObject(self, file_object):
"""Reads a timesync file-like object.
Args:
file_object (file): file-like object.
Raises:
ParseError: if the file cannot be read.
"""
return
[docs]
def ReadRecords(self):
"""Reads a timesync records.
Yields:
object: boot or sync record.
Raises:
ParseError: if the file cannot be read.
"""
file_offset = 0
while file_offset < self._file_entry.size:
record, _ = self._ReadRecord(self._file_object, file_offset)
yield record
file_offset += record.record_size
[docs]
class TraceV3File(BaseUnifiedLoggingFile):
"""Apple Unified Logging and Activity Tracing (tracev3) file."""
_DEFINITION_FILE = os.path.join(os.path.dirname(__file__), 'aul_tracev3.yaml')
_RECORD_TYPE_UNUSED = 0x00
_RECORD_TYPE_ACTIVITY = 0x02
_RECORD_TYPE_TRACE = 0x03
_RECORD_TYPE_LOG = 0x04
_RECORD_TYPE_SIGNPOST = 0x06
_RECORD_TYPE_LOSS = 0x07
_ACTIVITY_EVENT_TYPE_DESCRIPTIONS = {
0x01: 'activityCreateEvent',
0x03: 'userActionEvent'}
_EVENT_TYPE_DESCRIPTIONS = {
_RECORD_TYPE_LOG: 'logEvent',
_RECORD_TYPE_LOSS: 'lossEvent',
_RECORD_TYPE_SIGNPOST: 'signpostEvent',
_RECORD_TYPE_TRACE: 'traceEvent'}
_CHUNK_TAG_HEADER = 0x00001000
_CHUNK_TAG_FIREHOSE = 0x00006001
_CHUNK_TAG_OVERSIZE = 0x00006002
_CHUNK_TAG_STATEDUMP = 0x00006003
_CHUNK_TAG_SIMPLEDUMP = 0x00006004
_CHUNK_TAG_CATALOG = 0x0000600b
_CHUNK_TAG_CHUNK_SET = 0x0000600d
_DATA_ITEM_BINARY_DATA_VALUE_TYPES = (0x30, 0x32, 0xf2)
_DATA_ITEM_NUMERIC_VALUE_TYPES = (0x00, 0x02)
_DATA_ITEM_PRECISION_VALUE_TYPES = (0x10, 0x12)
_DATA_ITEM_PRIVATE_STRING_VALUE_TYPES = (0x21, 0x41)
_DATA_ITEM_PRIVATE_VALUE_TYPES = (
0x01, 0x21, 0x25, 0x31, 0x35, 0x41, 0x45)
_DATA_ITEM_STRING_VALUE_TYPES = (0x20, 0x22, 0x40, 0x42)
_FLAG_HAS_ACTIVITY_IDENTIFIER = 0x0001
_FLAG_HAS_LARGE_OFFSET = 0x0020
_FLAG_HAS_PRIVATE_STRINGS_RANGE = 0x0100
_SUPPORTED_STRINGS_FILE_TYPES = (0x0002, 0x0004, 0x0008, 0x000a, 0x000c)
_UUIDTEXT_STRINGS_FILE_TYPES = (0x0002, 0x0008, 0x000a)
_LOG_TYPE_DESCRIPTIONS = {
0x00: 'Default',
0x01: 'Info',
0x02: 'Debug',
0x03: 'Useraction',
0x10: 'Error',
0x11: 'Fault',
0x40: 'Thread Signpost Event',
0x41: 'Thread Signpost Start',
0x42: 'Thread Signpost End',
0x80: 'Process Signpost Event',
0x81: 'Process Signpost Start',
0x82: 'Process Signpost End',
0xc0: 'System Signpost Event',
0xc1: 'System Signpost Start',
0xc2: 'System Signpost End'}
_SIGNPOST_SCOPE_DESCRIPTIONS = {
0x04: 'thread',
0x08: 'process',
0x0c: 'system'}
_SIGNPOST_TYPE_DESCRIPTIONS = {
0x00: 'event',
0x01: 'begin',
0x02: 'end'}
_FORMAT_STRING_DECODERS = {
'bool': BooleanFormatStringDecoder(
false_value='false', true_value='true'),
'BOOL': BooleanFormatStringDecoder(
false_value='NO', true_value='YES'),
'darwin.errno': ExtendedErrorCodeFormatStringDecoder(),
'darwin.mode': FileModeFormatStringDecoder(),
'errno': ExtendedErrorCodeFormatStringDecoder(),
'in_addr': IPv4FormatStringDecoder(),
'in6_addr': IPv6FormatStringDecoder(),
'internal:f': FloatingPointFormatStringDecoder(),
'internal:i': SignedIntegerFormatStringDecoder(),
'internal:m': ErrorCodeFormatStringDecoder(),
'internal:s': StringFormatStringDecoder(),
'internal:u': UnsignedIntegerFormatStringDecoder(),
'location:_CLClientManagerStateTrackerState': (
LocationClientManagerStateFormatStringDecoder()),
'location:_CLLocationManagerStateTrackerState': (
LocationLocationManagerStateFormatStringDecoder()),
'location:CLClientAuthorizationStatus': (
LocationClientAuthorizationStatusFormatStringDecoder()),
'location:escape_only': LocationEscapeOnlyFormatStringDecoder(),
'location:SqliteResult': LocationSQLiteResultFormatStringDecoder(),
'mdns:acceptable': BooleanFormatStringDecoder(
false_value='unacceptable', true_value='acceptable'),
'mdns:addrmv': BooleanFormatStringDecoder(
false_value='rmv', true_value='add'),
'mdns:dns.counts': MDNSDNSCountersFormatStringDecoder(),
'mdns:dns.idflags': MDNSDNSIdentifierAndFlagsFormatStringDecoder(),
'mdns:dnshdr': MDNSDNSHeaderFormatStringDecoder(),
'mdns:protocol': MDNSProtocolFormatStringDecoder(),
'mdns:nreason': MDNSReasonFormatStringDecoder(),
'mdns:rrtype': MDNSResourceRecordTypeFormatStringDecoder(),
'mdns:yesno': BooleanFormatStringDecoder(
false_value='no', true_value='yes'),
'network:in_addr': IPv4FormatStringDecoder(),
'network:in6_addr': IPv6FormatStringDecoder(),
'network:sockaddr': SocketAddressFormatStringDecoder(),
'mask.hash': MaskHashFormatStringDecoder(),
'odtypes:mbr_details': (
OpenDirectoryMembershipDetailsFormatStringDecoder()),
'odtypes:mbridtype': OpenDirectoryMembershipTypeFormatStringDecoder(),
'odtypes:nt_sid_t': WindowsNTSecurityIdentifierFormatStringDecoder(),
'odtypes:ODError': OpenDirectoryErrorFormatStringDecoder(),
'signpost.description:attribute': (
SignpostDescriptionAttributeFormatStringDecoder()),
'signpost.description:begin_time': (
SignpostDescriptionTimeFormatStringDecoder(time='begin')),
'signpost.description:end_time': (
SignpostDescriptionTimeFormatStringDecoder(time='end')),
'signpost.telemetry:number1': (
SignpostTelemetryNumberFormatStringDecoder(number=1)),
'signpost.telemetry:number2': (
SignpostTelemetryNumberFormatStringDecoder(number=2)),
'signpost.telemetry:number3': (
SignpostTelemetryNumberFormatStringDecoder(number=3)),
'signpost.telemetry:string1': (
SignpostTelemetryStringFormatStringDecoder(number=1)),
'signpost.telemetry:string2': (
SignpostTelemetryStringFormatStringDecoder(number=2)),
'sockaddr': SocketAddressFormatStringDecoder(),
'time_t': DateTimeInSecondsFormatStringDecoder(),
'uuid_t': UUIDFormatStringDecoder()}
_FORMAT_STRING_DECODER_NAMES = frozenset(_FORMAT_STRING_DECODERS.keys())
_MAXIMUM_CACHED_FILES = 64
_MAXIMUM_CACHED_IMAGE_VALUES = 8192
_NANOSECONDS_PER_SECOND = 1000000000
ACTIVITY_IDENTIFIER_BITMASK = (1 << 63) - 1
[docs]
def __init__(self, file_system=None):
"""Initializes a tracev3 file.
Args:
file_system (Optional[dfvfs.FileSystem]): file system.
"""
super(TraceV3File, self).__init__()
self._boot_identifier = None
self._cached_dsc_files = collections.OrderedDict()
self._cached_image_values = collections.OrderedDict()
self._cached_uuidtext_files = collections.OrderedDict()
self._catalog = None
self._catalog_process_information_entries = {}
self._catalog_strings_map = {}
self._file_system = file_system
self._header_timebase = 1.0
self._header_timestamp = 0
self._sorted_timesync_sync_records = []
self._timesync_boot_record = None
self._timesync_path = None
self._timesync_sync_records = []
self._timesync_timebase = 1.0
self._uuidtext_path = None
def _BuildCatalogProcessInformationEntries(self, catalog):
"""Builds the catalog process information lookup table.
Args:
catalog (tracev3_catalog): catalog.
Raises:
ParseError: if a process information entry already exists.
"""
self._catalog_strings_map = self._GetCatalogSubSystemStringMap(catalog)
self._catalog_process_information_entries = {}
for process_information_entry in catalog.process_information_entries:
if process_information_entry.main_uuid_index >= 0:
process_information_entry.main_uuid = catalog.uuids[
process_information_entry.main_uuid_index]
if process_information_entry.dsc_uuid_index >= 0:
process_information_entry.dsc_uuid = catalog.uuids[
process_information_entry.dsc_uuid_index]
proc_id = (f'{process_information_entry.proc_id_upper:d}@'
f'{process_information_entry.proc_id_lower:d}')
if proc_id in self._catalog_process_information_entries:
raise errors.ParseError(f'proc_id: {proc_id:s} already set')
self._catalog_process_information_entries[proc_id] = (
process_information_entry)
def _CalculateFormatStringReference(
self, tracepoint_data_object, string_reference):
"""Calculates the format string reference.
Args:
tracepoint_data_object (object): firehose tracepoint data object.
string_reference (int): string reference.
Returns:
tuple[int, bool]: string reference and dynamic flag.
"""
is_dynamic = bool(string_reference & 0x80000000 != 0)
if is_dynamic:
string_reference &= 0x7fffffff
large_offset_data = getattr(
tracepoint_data_object, 'large_offset_data', None)
large_shared_cache_data = getattr(
tracepoint_data_object, 'large_shared_cache_data', None)
if large_shared_cache_data:
string_reference |= large_shared_cache_data << 31
elif large_offset_data:
string_reference |= large_offset_data << 31
return string_reference, is_dynamic
def _CalculateNameStringReference(self, tracepoint_data_object):
"""Calculates the name string reference.
Args:
tracepoint_data_object (object): firehose tracepoint data object.
Returns:
tuple[int, bool]: string reference and dynamic flag.
"""
string_reference = getattr(
tracepoint_data_object, 'name_string_reference_lower', None) or 0
is_dynamic = bool(string_reference & 0x80000000 != 0)
if is_dynamic:
string_reference &= 0x7fffffff
large_offset_data = getattr(
tracepoint_data_object, 'name_string_reference_upper', None)
if large_offset_data:
string_reference |= large_offset_data << 31
return string_reference, is_dynamic
def _CalculateProgramCounter(self, tracepoint_data_object, image_text_offset):
"""Calculates the program counter.
Args:
tracepoint_data_object (object): firehose tracepoint data object.
image_text_offset (int): image text offset.
Returns:
int: program counter.
"""
load_address = getattr(
tracepoint_data_object, 'load_address_upper', None) or 0
load_address <<= 32
load_address |= tracepoint_data_object.load_address_lower
large_offset_data = getattr(
tracepoint_data_object, 'large_offset_data', None)
large_shared_cache_data = getattr(
tracepoint_data_object, 'large_shared_cache_data', None)
if large_shared_cache_data:
calculated_large_offset_data = large_shared_cache_data >> 1
if (large_offset_data and
large_offset_data != calculated_large_offset_data):
load_address |= large_offset_data << 32
else:
load_address |= large_shared_cache_data << 31
elif large_offset_data:
load_address |= large_offset_data << 31
return load_address - image_text_offset
def _DecodeValue(
self, string_formatter, value_index, value_data, precision=None):
"""Decodes value data using the string formatter.
Args:
string_formatter (StringFormatter): string formatter.
value_index (int): index of the value in the string formatter.
value_data (bytes): value data.
precision (Optional[int]): value format string precision.
Returns:
str: decoded value.
"""
if not string_formatter:
return '<decode: missing string formatter>'
decoder_names = string_formatter.GetDecoderNamesByIndex(value_index)
if not decoder_names:
return '<decode: missing decoder>'
decoder_object = self._FORMAT_STRING_DECODERS.get(decoder_names[0], None)
if not decoder_object:
return f'<decode: unsupported decoder: {decoder_names[0]:s}>'
# TODO: add support for precision
_ = precision
format_string_operator = string_formatter.GetFormatStringOperator(
value_index)
return decoder_object.FormatValue(
value_data, format_string_operator=format_string_operator)
def _GetCatalogSubSystemStringMap(self, catalog):
"""Retrieves a map of the catalog sub system strings and offsets.
Args:
catalog (tracev3_catalog): catalog.
Returns:
dict[int, str]: catalog sub system string per offset.
"""
strings_map = {}
map_offset = 0
for string in catalog.sub_system_strings:
strings_map[map_offset] = string
map_offset += len(string) + 1
return strings_map
def _GetDataItemsAndValuesData(
self, proc_id, tracepoint_data_object, values_data, private_data,
oversize_chunks):
"""Retrieves the data items and values data.
Args:
proc_id (str): firehose tracepoint proc_id value.
tracepoint_data_object (object): firehose tracepoint data object.
values_data (bytes): (public) values data.
private_data (bytes): private data.
oversize_chunks (dict[str, oversize_chunk]): Oversize chunks per data
reference.
Returns:
tuple[list[tracev3_data_item], bytes, bytes]: data items and values
data and private data.
"""
data_reference = getattr(tracepoint_data_object, 'data_reference', None)
if not data_reference:
data_items = getattr(tracepoint_data_object, 'data_items', None)
return data_items, values_data, private_data
lookup_key = f'{proc_id:s}:{data_reference:04x}'
oversize_chunk = oversize_chunks.get(lookup_key, None)
if oversize_chunk:
return (oversize_chunk.data_items, oversize_chunk.values_data,
oversize_chunk.private_data)
# Seen in certain tracev3 files that oversize chunks can be missing.
# TODO: issue warning.
return None, values_data, private_data
def _GetDSCFile(self, uuid_string):
"""Retrieves a specific shared-cache strings (DSC) file.
Args:
uuid_string (str): string representation of the UUID.
Returns:
DSCFile: a shared-cache strings (DSC) file or None if not available.
"""
dsc_file = self._cached_dsc_files.get(uuid_string, None)
if not dsc_file:
dsc_file = self._OpenDSCFile(uuid_string)
if len(self._cached_dsc_files) >= self._MAXIMUM_CACHED_FILES:
_, cached_dsc_file = self._cached_dsc_files.popitem(last=True)
if cached_dsc_file:
cached_dsc_file.Close()
self._cached_dsc_files[uuid_string] = dsc_file
self._cached_dsc_files.move_to_end(uuid_string, last=False)
return dsc_file
def _GetImageValues(
self, process_information_entry, firehose_tracepoint,
tracepoint_data_object, string_reference, is_dynamic):
"""Retrieves image values.
Args:
process_information_entry (tracev3_catalog_process_information_entry):
process information entry.
firehose_tracepoint (tracev3_firehose_tracepoint): firehose tracepoint.
tracepoint_data_object (object): firehose tracepoint data object.
string_reference (int): string reference.
is_dynamic (bool): dynamic flag.
Returns:
ImageValues: image values.
Raises:
ParseError: if the image values cannot be retrieved.
"""
strings_file_type = firehose_tracepoint.flags & 0x000e
if strings_file_type not in self._SUPPORTED_STRINGS_FILE_TYPES:
raise errors.ParseError(
f'Unsupported strings file type: 0x{strings_file_type:04x}')
strings_file_identifier = None
image_text_offset = 0
if strings_file_type == 0x0002:
strings_file_identifier = process_information_entry.main_uuid
if strings_file_type in (0x0004, 0x000c):
strings_file_identifier = process_information_entry.dsc_uuid
if strings_file_type == 0x0008:
load_address_upper = tracepoint_data_object.load_address_upper or 0
load_address_lower = tracepoint_data_object.load_address_lower
for uuid_entry in process_information_entry.uuid_entries:
if (load_address_upper != uuid_entry.load_address_upper or
load_address_lower < uuid_entry.load_address_lower):
continue
if load_address_lower <= (
uuid_entry.load_address_lower + uuid_entry.size):
strings_file_identifier = self._catalog.uuids[uuid_entry.uuid_index]
image_text_offset = uuid_entry.load_address_lower | (
uuid_entry.load_address_upper << 32)
break
if not strings_file_identifier:
# ~~> no uuid found for absolute pc
return None
elif strings_file_type == 0x000a:
strings_file_identifier = tracepoint_data_object.uuidtext_file_identifier
if not strings_file_identifier:
raise errors.ParseError('Missing strings file identifier.')
uuid_string = strings_file_identifier.hex.upper()
lookup_key = f'{uuid_string:s}:0x{string_reference:x}'
image_values = self._cached_image_values.get(lookup_key, None)
if not image_values:
large_offset_data = getattr(
tracepoint_data_object, 'large_offset_data', None) or 0
if strings_file_type in self._UUIDTEXT_STRINGS_FILE_TYPES:
image_values = ImageValues(
identifier=strings_file_identifier, text_offset=image_text_offset)
uuidtext_file = self._GetUUIDTextFile(uuid_string)
if uuidtext_file:
image_values.path = uuidtext_file.GetImagePath()
if is_dynamic:
image_values.string = '%s'
else:
image_values.string = uuidtext_file.GetString(string_reference)
if image_values.string is None:
# ~~> Invalid bounds INTEGER for UUID
image_values.text_offset = large_offset_data << 31
else:
dsc_file = self._GetDSCFile(uuid_string)
if dsc_file:
image_values = dsc_file.GetImageValues(string_reference, is_dynamic)
if not image_values:
image_values = ImageValues(
identifier=strings_file_identifier, text_offset=image_text_offset)
large_shared_cache_data = getattr(
tracepoint_data_object, 'large_shared_cache_data', None)
if large_offset_data and large_shared_cache_data:
calculated_large_offset_data = large_shared_cache_data >> 1
if large_offset_data != calculated_large_offset_data:
# "<Invalid shared cache code pointer offset>"
image_values.identifier = strings_file_identifier
image_values.text_offset = 0
image_values.path = ''
if len(self._cached_image_values) >= self._MAXIMUM_CACHED_IMAGE_VALUES:
self._cached_image_values.popitem(last=True)
self._cached_image_values[lookup_key] = image_values
self._cached_image_values.move_to_end(lookup_key, last=False)
return image_values
def _GetProcessImageValues(self, process_information_entry):
"""Retrieves the process image value.
Args:
process_information_entry (tracev3_catalog_process_information_entry):
process information entry.
Returns:
tuple[uuid.UUID, str]: process image identifier and path or (None, None)
if not available.
"""
image_identifier = None
image_path = None
if process_information_entry and process_information_entry.main_uuid:
uuid_string = process_information_entry.main_uuid.hex.upper()
uuidtext_file = self._GetUUIDTextFile(uuid_string)
if uuidtext_file:
image_identifier = process_information_entry.main_uuid
image_path = uuidtext_file.GetImagePath()
return image_identifier, image_path
def _GetSubSystemStrings(
self, process_information_entry, sub_system_identifier):
"""Retrieves the sub system strings.
Args:
process_information_entry (tracev3_catalog_process_information_entry):
process information entry.
sub_system_identifier (int): sub system identifier.
Returns:
tuple[str, str]: category and sub system or (None, None) if not available.
"""
category = None
sub_system = None
# TODO: build a look up table of entries per identifier.
if process_information_entry and sub_system_identifier is not None:
for sub_system_entry in process_information_entry.sub_system_entries:
if sub_system_entry.identifier == sub_system_identifier:
category = self._catalog_strings_map.get(
sub_system_entry.category_offset, None)
sub_system = self._catalog_strings_map.get(
sub_system_entry.sub_system_offset, None)
return category, sub_system
def _GetTimestamp(self, continuous_time):
"""Determine the timestamp from a continuous time.
Args:
continuous_time (int): continuous time.
Returns:
int: timestamp containing the number of nanoseconds since January 1, 1970
00:00:00.000000000.
"""
timesync_record = self._GetTimesyncRecord(continuous_time)
if timesync_record:
continuous_time -= timesync_record.kernel_time
timestamp = timesync_record.timestamp + int(
continuous_time * self._timesync_timebase)
elif self._timesync_boot_record:
timestamp = self._timesync_boot_record.timestamp + int(
continuous_time * self._timesync_timebase)
else:
timestamp = self._header_timestamp + int(
continuous_time * self._header_timebase)
return timestamp
def _GetTraceIdentifier(self, firehose_tracepoint):
"""Determines a trace identifier.
Args:
firehose_tracepoint (tracev3_firehose_tracepoint): firehose tracepoint.
Returns:
int: trace identifier.
"""
trace_identifier = firehose_tracepoint.format_string_reference << 32
trace_identifier |= firehose_tracepoint.flags << 16
trace_identifier |= firehose_tracepoint.log_type << 8
trace_identifier |= firehose_tracepoint.record_type
return trace_identifier
def _GetTimesyncRecord(self, continuous_time):
"""Retrieves a timesync record corresponding to the continuous time.
Args:
continuous_time (int): continuous time.
Returns:
timesync_sync_record: timesync sync record or None if not available.
"""
for record in self._sorted_timesync_sync_records:
if continuous_time >= record.kernel_time:
return record
return None
def _GetUUIDTextFile(self, uuid_string):
"""Retrieves a specific uuidtext file.
Args:
uuid_string (str): string representation of the UUID.
Returns:
UUIDTextFile: an uuidtext file or None if not available.
"""
uuidtext_file = self._cached_uuidtext_files.get(uuid_string, None)
if not uuidtext_file:
uuidtext_file = self._OpenUUIDTextFile(uuid_string)
if len(self._cached_uuidtext_files) >= self._MAXIMUM_CACHED_FILES:
_, cached_uuidtext_file = self._cached_uuidtext_files.popitem(last=True)
if cached_uuidtext_file:
cached_uuidtext_file.Close()
self._cached_uuidtext_files[uuid_string] = uuidtext_file
self._cached_uuidtext_files.move_to_end(uuid_string, last=False)
return uuidtext_file
def _OpenDSCFile(self, uuid_string):
"""Opens a specific shared-cache strings (DSC) file.
Args:
uuid_string (str): string representation of the UUID.
Returns:
DSCFile: a shared-cache strings (DSC) file or None if not available.
"""
if not self._uuidtext_path:
return None
dsc_file_path = self._file_system.JoinPath([
self._uuidtext_path, 'dsc', uuid_string])
path_spec = path_spec_factory.Factory.NewPathSpec(
self._file_entry.type_indicator, location=dsc_file_path,
parent=self._file_entry.path_spec.parent)
file_entry = self._file_system.GetFileEntryByPathSpec(path_spec)
if not file_entry:
return None
dsc_file = DSCFile()
dsc_file.Open(file_entry)
return dsc_file
def _OpenTimesyncDatabaseFile(self, file_entry):
"""Opens a specific timesync database file.
Args:
file_entry (dfvfs.FileEntry): file entry of the timesync database file.
Returns:
TimesyncDatabaseFile: a timesync database file or None if not available.
"""
if not self._timesync_path:
return None
timesync_file = TimesyncDatabaseFile()
timesync_file.Open(file_entry)
return timesync_file
def _OpenUUIDTextFile(self, uuid_string):
"""Opens a specific uuidtext file.
Args:
uuid_string (str): string representation of the UUID.
Returns:
UUIDTextFile: an uuidtext file or None if not available.
"""
if not self._uuidtext_path:
return None
uuidtext_file_path = self._file_system.JoinPath([
self._uuidtext_path, uuid_string[0:2], uuid_string[2:]])
path_spec = path_spec_factory.Factory.NewPathSpec(
self._file_entry.type_indicator, location=uuidtext_file_path,
parent=self._file_entry.path_spec.parent)
file_entry = self._file_system.GetFileEntryByPathSpec(path_spec)
if not file_entry:
return None
uuidtext_file = UUIDTextFile()
uuidtext_file.Open(file_entry)
return uuidtext_file
def _ReadCatalog(self, file_object, file_offset):
"""Reads a catalog.
Args:
file_object (file): file-like object.
file_offset (int): offset of the catalog data relative to the start
of the file.
Returns:
tracev3_catalog: catalog.
Raises:
ParseError: if the chunk header cannot be read.
"""
data_type_map = self._GetDataTypeMap('tracev3_catalog')
catalog, bytes_read = self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map)
file_offset += bytes_read
data_type_map = self._GetDataTypeMap(
'tracev3_catalog_process_information_entry')
catalog.process_information_entries = []
for _ in range(catalog.number_of_process_information_entries):
process_information_entry, bytes_read = self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map)
file_offset += bytes_read
catalog.process_information_entries.append(process_information_entry)
return catalog
def _ReadChunkHeader(self, file_object, file_offset):
"""Reads a chunk header.
Args:
file_object (file): file-like object.
file_offset (int): offset of the chunk header relative to the start
of the file.
Returns:
tracev3_chunk_header: a chunk header.
Raises:
ParseError: if the chunk header cannot be read.
"""
data_type_map = self._GetDataTypeMap('tracev3_chunk_header')
chunk_header, _ = self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map)
return chunk_header
def _ReadChunkSet(
self, file_object, file_offset, chunk_header, oversize_chunks):
"""Reads a chunk set.
Args:
file_object (file): file-like object.
file_offset (int): offset of the chunk set data relative to the start
of the file.
chunk_header (tracev3_chunk_header): the chunk header of the chunk set.
oversize_chunks (dict[str, oversize_chunk]): Oversize chunks per data
reference.
Yields:
LogEntry: a log entry.
Raises:
ParseError: if the chunk header cannot be read.
"""
chunk_data = file_object.read(chunk_header.chunk_data_size)
data_type_map = self._GetDataTypeMap('tracev3_lz4_block_header')
lz4_block_header, _ = self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map)
if lz4_block_header.signature == b'bv41':
end_of_data_offset = 12 + lz4_block_header.compressed_data_size
uncompressed_data = lz4.block.decompress(
chunk_data[12:end_of_data_offset],
uncompressed_size=lz4_block_header.uncompressed_data_size)
elif lz4_block_header.signature == b'bv4-':
end_of_data_offset = 8 + lz4_block_header.uncompressed_data_size
uncompressed_data = chunk_data[8:end_of_data_offset]
else:
raise errors.ParseError('Unsupported start of LZ4 block marker')
end_of_lz4_block_marker = chunk_data[
end_of_data_offset:end_of_data_offset + 4]
if end_of_lz4_block_marker != b'bv4$':
raise errors.ParseError('Unsupported end of LZ4 block marker')
data_type_map = self._GetDataTypeMap('tracev3_chunk_header')
data_offset = 0
while data_offset < lz4_block_header.uncompressed_data_size:
chunkset_chunk_header = self._ReadStructureFromByteStream(
uncompressed_data[data_offset:], data_offset, data_type_map)
data_offset += 16
data_end_offset = data_offset + chunkset_chunk_header.chunk_data_size
chunkset_chunk_data = uncompressed_data[data_offset:data_end_offset]
if chunkset_chunk_header.chunk_tag == self._CHUNK_TAG_FIREHOSE:
yield from self._ReadFirehoseChunkData(
chunkset_chunk_data, data_offset, oversize_chunks)
elif chunkset_chunk_header.chunk_tag == self._CHUNK_TAG_OVERSIZE:
oversize_chunk = self._ReadOversizeChunkData(
chunkset_chunk_data, data_offset)
lookup_key = (f'{oversize_chunk.proc_id_upper:d}@'
f'{oversize_chunk.proc_id_lower:d}:'
f'{oversize_chunk.data_reference:04x}')
oversize_chunks[lookup_key] = oversize_chunk
elif chunkset_chunk_header.chunk_tag == self._CHUNK_TAG_STATEDUMP:
yield from self._ReadStateDumpChunkData(
chunkset_chunk_data, data_offset)
elif chunkset_chunk_header.chunk_tag == self._CHUNK_TAG_SIMPLEDUMP:
yield from self._ReadSimpleDumpChunkData(
chunkset_chunk_data, data_offset)
data_offset = data_end_offset
_, alignment = divmod(data_offset, 8)
if alignment > 0:
alignment = 8 - alignment
data_offset += alignment
def _ReadBacktraceData(self, flags, backtrace_data, data_offset):
"""Reads firehose tracepoint backtrace data.
Args:
flags (int): firehose tracepoint flags.
backtrace_data (bytes): firehose tracepoint backtrace data.
data_offset (int): offset of the firehose tracepoint backtrace data
relative to the start of the chunk set.
Returns:
list[BacktraceFrame]: backtrace frames.
Raises:
ParseError: if the backtrace data cannot be read.
"""
if flags & 0x1000 == 0:
return []
data_type_map = self._GetDataTypeMap(
'tracev3_firehose_tracepoint_backtrace_data')
backtrace_data = self._ReadStructureFromByteStream(
backtrace_data, data_offset, data_type_map)
backtrace_frames = []
for frame_index in range(backtrace_data.number_of_frames):
identifier_index = backtrace_data.indexes[frame_index]
backtrace_frame = BacktraceFrame()
backtrace_frame.image_offset = backtrace_data.offsets[frame_index]
if identifier_index == 0xff:
backtrace_frame.image_identifier = uuid.UUID(
'00000000-0000-0000-0000-000000000000')
else:
backtrace_frame.image_identifier = backtrace_data.identifiers[
identifier_index]
backtrace_frames.append(backtrace_frame)
return backtrace_frames
def _ReadDataItems(
self, data_items, values_data, private_data, private_data_range_offset,
string_formatter):
"""Reads data items.
Args:
data_items (list[tracev3_data_item]): data items.
values_data (bytes): (public) values data.
private_data (bytes): firehose private data.
private_data_range_offset (int): offset of the private data range
relative to the start of the private data.
string_formatter (StringFormatter): string formatter.
Returns:
list[str]: values formatted as strings.
Raises:
ParseError: if the data items cannot be read.
"""
values = []
value_index = 0
precision = None
for data_item in data_items:
value_data = None
if data_item.value_type in self._DATA_ITEM_NUMERIC_VALUE_TYPES:
value_data = data_item.data
elif data_item.value_type in self._DATA_ITEM_PRECISION_VALUE_TYPES:
value_data = data_item.data
elif data_item.value_type in self._DATA_ITEM_STRING_VALUE_TYPES:
value_data_offset = data_item.value_data_offset
value_data = values_data[
value_data_offset:value_data_offset + data_item.value_data_size]
elif data_item.value_type in self._DATA_ITEM_PRIVATE_STRING_VALUE_TYPES:
value_data_offset = (
private_data_range_offset + data_item.value_data_offset)
value_data = private_data[
value_data_offset:value_data_offset + data_item.value_data_size]
elif data_item.value_type in self._DATA_ITEM_BINARY_DATA_VALUE_TYPES:
value_data_offset = data_item.value_data_offset
value_data = values_data[
value_data_offset:value_data_offset + data_item.value_data_size]
if data_item.value_type not in (
0x00, 0x01, 0x02, 0x10, 0x12, 0x20, 0x21, 0x22, 0x25, 0x30, 0x31,
0x32, 0x35, 0x40, 0x41, 0x42, 0x45, 0x72, 0xf2):
raise errors.ParseError((
f'Unsupported data item value type: '
f'0x{data_item.value_type:02x}.'))
if data_item.value_type in self._DATA_ITEM_PRECISION_VALUE_TYPES:
precision = int.from_bytes(value_data, 'little', signed=False)
continue
value = None
if data_item.value_type in self._DATA_ITEM_PRIVATE_VALUE_TYPES:
if not value_data:
value = '<private>'
if not value:
value = self._DecodeValue(
string_formatter, value_index, value_data, precision=precision)
precision = None
values.append(value)
value_index += 1
return values
def _ReadFirehoseChunkData(self, chunk_data, data_offset, oversize_chunks):
"""Reads firehose chunk data.
Args:
chunk_data (bytes): firehose chunk data.
data_offset (int): offset of the firehose chunk relative to the start
of the chunk set.
oversize_chunks (dict[str, oversize_chunk]): Oversize chunks per data
reference.
Yields:
LogEntry: a log entry.
Raises:
ParseError: if the firehose chunk cannot be read.
"""
data_type_map = self._GetDataTypeMap('tracev3_firehose_header')
firehose_header = self._ReadStructureFromByteStream(
chunk_data, data_offset, data_type_map)
proc_id = (f'{firehose_header.proc_id_upper:d}@'
f'{firehose_header.proc_id_lower:d}')
process_information_entry = (
self._catalog_process_information_entries.get(proc_id, None))
if not process_information_entry:
raise errors.ParseError((
f'Unable to retrieve process information entry: {proc_id:s} from '
f'catalog'))
chunk_data_offset = 32
while chunk_data_offset < firehose_header.public_data_size:
firehose_tracepoint, bytes_read = self._ReadFirehoseTracepointData(
chunk_data[chunk_data_offset:], data_offset + chunk_data_offset)
record_type = firehose_tracepoint.record_type
if record_type not in (
self._RECORD_TYPE_UNUSED, self._RECORD_TYPE_ACTIVITY,
self._RECORD_TYPE_TRACE, self._RECORD_TYPE_LOG,
self._RECORD_TYPE_SIGNPOST, self._RECORD_TYPE_LOSS):
raise errors.ParseError(
f'Unsupported record type: 0x{record_type:02x}.')
if record_type == self._RECORD_TYPE_UNUSED:
chunk_data_offset += bytes_read
continue
chunk_data_offset += 24
tracepoint_data_offset = data_offset + chunk_data_offset
tracepoint_data_object = None
bytes_read = 0
if record_type == self._RECORD_TYPE_ACTIVITY:
if firehose_tracepoint.log_type not in (0x01, 0x03):
raise errors.ParseError(
f'Unsupported log type: 0x{firehose_tracepoint.log_type:02x}.')
tracepoint_data_object, bytes_read = (
self._ReadFirehoseTracepointActivityData(
firehose_tracepoint.log_type, firehose_tracepoint.flags,
firehose_tracepoint.data, tracepoint_data_offset))
elif record_type == self._RECORD_TYPE_TRACE:
if firehose_tracepoint.log_type not in (0x00, ):
raise errors.ParseError(
f'Unsupported log type: 0x{firehose_tracepoint.log_type:02x}.')
tracepoint_data_object, bytes_read = (
self._ReadFirehoseTracepointTraceData(
firehose_tracepoint.flags, firehose_tracepoint.data,
tracepoint_data_offset))
elif record_type == self._RECORD_TYPE_LOG:
if firehose_tracepoint.log_type not in (0x00, 0x01, 0x02, 0x10, 0x11):
raise errors.ParseError(
f'Unsupported log type: 0x{firehose_tracepoint.log_type:02x}.')
tracepoint_data_object, bytes_read = (
self._ReadFirehoseTracepointLogData(
firehose_tracepoint.flags, firehose_tracepoint.data,
tracepoint_data_offset))
elif record_type == self._RECORD_TYPE_SIGNPOST:
if firehose_tracepoint.log_type not in (
0x40, 0x41, 0x42, 0x80, 0x81, 0x82, 0xc0, 0xc1, 0xc2):
raise errors.ParseError(
f'Unsupported log type: 0x{firehose_tracepoint.log_type:02x}.')
tracepoint_data_object, bytes_read = (
self._ReadFirehoseTracepointSignpostData(
firehose_tracepoint.flags, firehose_tracepoint.data,
tracepoint_data_offset))
elif record_type == self._RECORD_TYPE_LOSS:
if firehose_tracepoint.log_type not in (0x00, ):
raise errors.ParseError(
f'Unsupported log type: 0x{firehose_tracepoint.log_type:02x}.')
tracepoint_data_object, bytes_read = (
self._ReadFirehoseTracepointLossData(
firehose_tracepoint.flags, firehose_tracepoint.data,
tracepoint_data_offset))
continuous_time = firehose_tracepoint.continuous_time_lower | (
firehose_tracepoint.continuous_time_upper << 32)
continuous_time += firehose_header.base_continuous_time
process_image_identifier, process_image_path = (
self._GetProcessImageValues(process_information_entry))
log_entry = LogEntry()
log_entry.boot_identifier = self._boot_identifier
log_entry.mach_timestamp = continuous_time
log_entry.process_image_identifier = process_image_identifier
log_entry.thread_identifier = firehose_tracepoint.thread_identifier
log_entry.timestamp = self._GetTimestamp(continuous_time)
log_entry.trace_identifier = self._GetTraceIdentifier(firehose_tracepoint)
if record_type == self._RECORD_TYPE_ACTIVITY:
log_entry.event_type = self._ACTIVITY_EVENT_TYPE_DESCRIPTIONS.get(
firehose_tracepoint.log_type, None)
else:
log_entry.event_type = self._EVENT_TYPE_DESCRIPTIONS.get(
firehose_tracepoint.record_type, None)
if record_type == self._RECORD_TYPE_LOSS:
loss_count = tracepoint_data_object.number_of_messages or 0
loss_start_time = tracepoint_data_object.start_time or 0
loss_end_time = tracepoint_data_object.end_time or 0
log_entry.event_message = (
f'lost >={loss_count:d} unreliable messages from '
f'{loss_start_time:d}-{loss_end_time:d} (Mach continuous exact '
f'start-approx. end)')
log_entry.loss_count = loss_count
log_entry.loss_end_mach_timestamp = loss_end_time
log_entry.loss_end_timestamp = self._GetTimestamp(loss_end_time)
log_entry.loss_start_mach_timestamp = loss_start_time
log_entry.loss_start_timestamp = self._GetTimestamp(loss_start_time)
# TODO: add support for lossCountSaturated
else:
values_data_offset = bytes_read
values_data = firehose_tracepoint.data[values_data_offset:]
string_reference, is_dynamic = self._CalculateFormatStringReference(
tracepoint_data_object, firehose_tracepoint.format_string_reference)
image_values = self._GetImageValues(
process_information_entry, firehose_tracepoint,
tracepoint_data_object, string_reference, is_dynamic)
if image_values:
string_formatter = image_values.GetStringFormatter()
else:
string_formatter = None
backtrace_frames = []
values = []
if record_type == self._RECORD_TYPE_TRACE:
values = self._ReadFirehoseTracepointTraceValuesData(
tracepoint_data_object, values_data, string_formatter)
else:
private_data_virtual_offset = (
firehose_header.private_data_virtual_offset & 0x0fff)
if not private_data_virtual_offset:
private_data = b''
else:
private_data_size = 4096 - private_data_virtual_offset
private_data = chunk_data[-private_data_size:]
data_items, values_data, private_data = (
self._GetDataItemsAndValuesData(
proc_id, tracepoint_data_object, values_data, private_data,
oversize_chunks))
backtrace_frames = self._ReadBacktraceData(
firehose_tracepoint.flags, values_data,
tracepoint_data_offset + values_data_offset)
if data_items:
private_data_range = getattr(
tracepoint_data_object, 'private_data_range', None)
if private_data_range is None:
private_data_offset = 0
else:
# TODO: error if private_data_virtual_offset >
# private_data_range.offset
private_data_offset = (
private_data_range.offset - private_data_virtual_offset)
# TODO: calculate item data offset for debugging purposes.
values = self._ReadDataItems(
data_items, values_data, private_data, private_data_offset,
string_formatter)
sub_system_identifier = getattr(
tracepoint_data_object, 'sub_system_identifier', None)
category, sub_system = self._GetSubSystemStrings(
process_information_entry, sub_system_identifier)
text_offset = getattr(image_values, 'text_offset', None) or 0
program_counter = self._CalculateProgramCounter(
tracepoint_data_object, text_offset)
if not backtrace_frames and image_values:
backtrace_frame = BacktraceFrame()
backtrace_frame.image_identifier = image_values.identifier
backtrace_frame.image_offset = program_counter or 0
backtrace_frames.append(backtrace_frame)
log_entry.backtrace_frames = backtrace_frames
log_entry.category = category
log_entry.format_string = getattr(image_values, 'string', None)
log_entry.process_identifier = getattr(
process_information_entry, 'process_identifier', None) or 0
log_entry.process_image_path = process_image_path
log_entry.sender_image_identifier = getattr(
image_values, 'identifier', None)
log_entry.sender_image_path = getattr(image_values, 'path', None)
log_entry.sender_program_counter = program_counter or 0
log_entry.sub_system = sub_system
log_entry.time_zone_name = None
log_entry.ttl = getattr(tracepoint_data_object, 'ttl', None) or 0
if firehose_tracepoint.record_type != self._RECORD_TYPE_SIGNPOST:
log_entry.message_type = self._LOG_TYPE_DESCRIPTIONS.get(
firehose_tracepoint.log_type, None)
if firehose_tracepoint.record_type == self._RECORD_TYPE_ACTIVITY:
new_activity_identifier = getattr(
tracepoint_data_object, 'new_activity_identifier', None) or 0
log_entry.activity_identifier = (
new_activity_identifier & self.ACTIVITY_IDENTIFIER_BITMASK)
if firehose_tracepoint.log_type == 0x01:
current_activity_identifier = getattr(
tracepoint_data_object, 'current_activity_identifier',
None) or 0
# Note that the creator activity identifier is not masked in
# the output.
log_entry.creator_activity_identifier = current_activity_identifier
other_activity_identifier = getattr(
tracepoint_data_object, 'other_activity_identifier', None) or 0
log_entry.parent_activity_identifier = (
other_activity_identifier & self.ACTIVITY_IDENTIFIER_BITMASK)
else:
current_activity_identifier = getattr(
tracepoint_data_object, 'current_activity_identifier', None) or 0
log_entry.activity_identifier = (
current_activity_identifier & self.ACTIVITY_IDENTIFIER_BITMASK)
if firehose_tracepoint.record_type == self._RECORD_TYPE_SIGNPOST:
name_string_reference, is_dynamic = (
self._CalculateNameStringReference(tracepoint_data_object))
if not name_string_reference:
name_string = None
else:
name_image_values = self._GetImageValues(
process_information_entry, firehose_tracepoint,
tracepoint_data_object, name_string_reference, is_dynamic)
name_string = name_image_values.string
log_entry.signpost_identifier = getattr(
tracepoint_data_object, 'signpost_identifier', None)
log_entry.signpost_name = name_string
log_entry.signpost_scope = self._SIGNPOST_SCOPE_DESCRIPTIONS.get(
firehose_tracepoint.log_type >> 4, None)
log_entry.signpost_type = self._SIGNPOST_TYPE_DESCRIPTIONS.get(
firehose_tracepoint.log_type & 0x0f, None)
if string_formatter:
log_entry.event_message = string_formatter.FormatString(values)
else:
log_entry.event_message = (
'<compose failure [missing precomposed log]>')
yield log_entry
chunk_data_offset += firehose_tracepoint.data_size
_, alignment = divmod(chunk_data_offset, 8)
if alignment > 0:
alignment = 8 - alignment
chunk_data_offset += alignment
def _ReadFirehoseTracepointData(self, tracepoint_data, data_offset):
"""Reads firehose tracepoint data.
Args:
tracepoint_data (bytes): firehose tracepoint data.
data_offset (int): offset of the firehose tracepoint relative to
the start of the chunk set.
Returns:
tuple[tracev3_firehose_tracepoint, int]: firehose tracepoint and number
of bytes read.
Raises:
ParseError: if the firehose tracepoint cannot be read.
"""
data_type_map = self._GetDataTypeMap('tracev3_firehose_tracepoint')
context = dtfabric_data_maps.DataTypeMapContext()
firehose_tracepoint = self._ReadStructureFromByteStream(
tracepoint_data, data_offset, data_type_map, context=context)
return firehose_tracepoint, context.byte_size
def _ReadFirehoseTracepointActivityData(
self, log_type, flags, tracepoint_data, data_offset):
"""Reads firehose tracepoint activity data.
Args:
log_type (int): firehose tracepoint log type.
flags (int): firehose tracepoint flags.
tracepoint_data (bytes): firehose tracepoint data.
data_offset (int): offset of the firehose tracepoint data relative to
the start of the chunk set.
Returns:
tuple[tracev3_firehose_tracepoint_activity, int]: activity and the number
of bytes read.
Raises:
ParseError: if the activity data cannot be read.
"""
supported_flags = 0x0001 | 0x000e | 0x0010 | 0x0020 | 0x0100 | 0x0200
if flags & ~supported_flags != 0:
raise errors.ParseError(f'Unsupported flags: 0x{flags:04x}.')
data_type_map = self._GetDataTypeMap('tracev3_firehose_tracepoint_activity')
context = dtfabric_data_maps.DataTypeMapContext(values={
'tracev3_firehose_tracepoint_flags': flags,
'tracev3_firehose_tracepoint_log_type': log_type,
'tracev3_firehose_tracepoint_strings_file_type': flags & 0x000e})
activity = self._ReadStructureFromByteStream(
tracepoint_data, data_offset, data_type_map, context=context)
return activity, context.byte_size
def _ReadFirehoseTracepointLogData(self, flags, tracepoint_data, data_offset):
"""Reads firehose tracepoint log data.
Args:
flags (int): firehose tracepoint flags.
tracepoint_data (bytes): firehose tracepoint data.
data_offset (int): offset of the firehose tracepoint data relative to
the start of the chunk set.
Returns:
tuple[tracev3_firehose_tracepoint_log, int]: log and the number of bytes
read.
Raises:
ParseError: if the log data cannot be read.
"""
supported_flags = (
0x0001 | 0x000e | 0x0020 | 0x0100 | 0x0200 | 0x0400 | 0x0800 | 0x1000)
if flags & ~supported_flags != 0:
raise errors.ParseError(f'Unsupported flags: 0x{flags:04x}.')
data_type_map = self._GetDataTypeMap('tracev3_firehose_tracepoint_log')
context = dtfabric_data_maps.DataTypeMapContext(values={
'tracev3_firehose_tracepoint_flags': flags,
'tracev3_firehose_tracepoint_strings_file_type': flags & 0x000e})
log = self._ReadStructureFromByteStream(
tracepoint_data, data_offset, data_type_map, context=context)
return log, context.byte_size
def _ReadFirehoseTracepointLossData(
self, flags, tracepoint_data, data_offset):
"""Reads firehose tracepoint loss data.
Args:
flags (int): firehose tracepoint flags.
tracepoint_data (bytes): firehose tracepoint data.
data_offset (int): offset of the firehose tracepoint data relative to
the start of the chunk set.
Returns:
tuple[tracev3_firehose_tracepoint_loss, int]: loss and the number of bytes
read.
Raises:
ParseError: if the loss data cannot be read.
"""
supported_flags = 0x0000
if flags & ~supported_flags != 0:
raise errors.ParseError(f'Unsupported flags: 0x{flags:04x}.')
data_type_map = self._GetDataTypeMap('tracev3_firehose_tracepoint_loss')
context = dtfabric_data_maps.DataTypeMapContext(values={
'tracev3_firehose_tracepoint_flags': flags,
'tracev3_firehose_tracepoint_strings_file_type': flags & 0x000e})
loss = self._ReadStructureFromByteStream(
tracepoint_data, data_offset, data_type_map, context=context)
return loss, context.byte_size
def _ReadFirehoseTracepointSignpostData(
self, flags, tracepoint_data, data_offset):
"""Reads firehose tracepoint signpost data.
Args:
flags (int): firehose tracepoint flags.
tracepoint_data (bytes): firehose tracepoint data.
data_offset (int): offset of the firehose tracepoint data relative to
the start of the chunk set.
Returns:
tuple[tracev3_firehose_tracepoint_signpost, int]: signpost and the number
of bytes read.
Raises:
ParseError: if the signpost data cannot be read.
"""
supported_flags = (
0x0001 | 0x000e | 0x0020 | 0x0100 | 0x0200 | 0x0400 | 0x0800 | 0x8000)
if flags & ~supported_flags != 0:
raise errors.ParseError(f'Unsupported flags: 0x{flags:04x}.')
data_type_map = self._GetDataTypeMap('tracev3_firehose_tracepoint_signpost')
context = dtfabric_data_maps.DataTypeMapContext(values={
'tracev3_firehose_tracepoint_flags': flags,
'tracev3_firehose_tracepoint_strings_file_type': flags & 0x000e})
signpost = self._ReadStructureFromByteStream(
tracepoint_data, data_offset, data_type_map, context=context)
return signpost, context.byte_size
def _ReadFirehoseTracepointTraceData(
self, flags, tracepoint_data, data_offset):
"""Reads firehose tracepoint trace data.
Args:
flags (int): firehose tracepoint flags.
tracepoint_data (bytes): firehose tracepoint data.
data_offset (int): offset of the firehose tracepoint data relative to
the start of the chunk set.
Returns:
tuple[tracev3_firehose_tracepoint_trace, int]: trace and the number of
bytes read.
Raises:
ParseError: if the trace data cannot be read.
"""
supported_flags = 0x0002
if flags & ~supported_flags != 0:
raise errors.ParseError(f'Unsupported flags: 0x{flags:04x}.')
data_type_map = self._GetDataTypeMap('tracev3_firehose_tracepoint_trace')
context = dtfabric_data_maps.DataTypeMapContext(values={
'tracev3_firehose_tracepoint_flags': flags,
'tracev3_firehose_tracepoint_strings_file_type': flags & 0x000e})
trace = self._ReadStructureFromByteStream(
tracepoint_data, data_offset, data_type_map, context=context)
trace.number_of_values = tracepoint_data[-1]
return trace, context.byte_size
def _ReadFirehoseTracepointTraceValuesData(
self, trace, values_data, string_formatter):
"""Reads firehose tracepoint trace values data.
Args:
trace (tracev3_firehose_tracepoint_trace): trace.
values_data (bytes): (public) values data.
string_formatter (StringFormatter): string formatter.
Returns:
list[str]: values formatted as strings.
Raises:
ParseError: if the values cannot be read.
"""
if not trace.number_of_values:
return []
values = []
value_data_offset = 0
value_size_offset = -(1 + trace.number_of_values)
for value_index in range(trace.number_of_values):
value_data_size = values_data[value_size_offset]
if value_data_size not in (4, 8):
raise errors.ParseError(f'Unsupported value size: {value_data_size:d}')
value_data = values_data[
value_data_offset:value_data_offset + value_data_size]
value = self._DecodeValue(string_formatter, value_index, value_data)
values.append(value)
value_data_offset += value_data_size
value_size_offset += 1
return values
def _ReadHeaderChunk(self, file_object, file_offset):
"""Reads a header chunk.
Args:
file_object (file): file-like object.
file_offset (int): offset of the chunk relative to the start of the file.
Returns:
header_chunk: a header chunk.
Raises:
ParseError: if the header chunk cannot be read.
"""
data_type_map = self._GetDataTypeMap('tracev3_header_chunk')
header_chunk, _ = self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map)
if header_chunk.flags & 0x0001 == 0:
raise errors.ParseError(
f'Unsupported header chunk flags: 0x{header_chunk.flags:04x}.')
self._boot_identifier = header_chunk.generation.boot_identifier
return header_chunk
def _ReadOversizeChunkData(self, chunk_data, data_offset):
"""Reads Oversize chunk data.
Args:
chunk_data (bytes): Oversize chunk data.
data_offset (int): offset of the Oversize chunk relative to the start
of the chunk set.
Returns:
oversize_chunk: an Oversize chunk.
Raises:
ParseError: if the chunk cannot be read.
"""
data_type_map = self._GetDataTypeMap('tracev3_oversize_chunk')
context = dtfabric_data_maps.DataTypeMapContext()
oversize_chunk = self._ReadStructureFromByteStream(
chunk_data, data_offset, data_type_map, context=context)
data_offset = context.byte_size
data_size = data_offset + oversize_chunk.data_size
oversize_chunk.values_data = chunk_data[data_offset:data_size]
data_offset += oversize_chunk.data_size
data_size = data_offset + oversize_chunk.private_data_size
oversize_chunk.private_data = chunk_data[data_offset:data_size]
return oversize_chunk
def _ReadSimpleDumpChunkData(self, chunk_data, data_offset):
"""Reads SimpleDump chunk data.
Args:
chunk_data (bytes): SimpleDump chunk data.
data_offset (int): offset of the SimpleDump chunk relative to the start
of the chunk set.
Yields:
LogEntry: a log entry.
Raises:
ParseError: if the chunk cannot be read.
"""
data_type_map = self._GetDataTypeMap('tracev3_simpledump_chunk')
context = dtfabric_data_maps.DataTypeMapContext()
simpledump_chunk = self._ReadStructureFromByteStream(
chunk_data, data_offset, data_type_map, context=context)
proc_id = (f'{simpledump_chunk.proc_id_upper:d}@'
f'{simpledump_chunk.proc_id_lower:d}')
process_information_entry = (
self._catalog_process_information_entries.get(proc_id, None))
if not process_information_entry:
raise errors.ParseError((
f'Unable to retrieve process information entry: {proc_id:s} from '
f'catalog'))
backtrace_frame = BacktraceFrame()
backtrace_frame.image_identifier = simpledump_chunk.sender_image_identifier
backtrace_frame.image_offset = simpledump_chunk.load_address
process_image_identifier, process_image_path = (
self._GetProcessImageValues(process_information_entry))
log_entry = LogEntry()
log_entry.backtrace_frames = [backtrace_frame]
log_entry.boot_identifier = self._boot_identifier
log_entry.event_message = simpledump_chunk.message_string
log_entry.event_type = 'logEvent'
log_entry.mach_timestamp = simpledump_chunk.continuous_time
log_entry.message_type = self._LOG_TYPE_DESCRIPTIONS.get(
simpledump_chunk.log_type, None)
log_entry.process_identifier = getattr(
process_information_entry, 'process_identifier', None) or 0
log_entry.process_image_identifier = process_image_identifier
log_entry.process_image_path = process_image_path
log_entry.sub_system = simpledump_chunk.sub_system_string
log_entry.sender_image_identifier = simpledump_chunk.sender_image_identifier
# TODO: implement
log_entry.sender_image_path = None
log_entry.sender_program_counter = simpledump_chunk.load_address
log_entry.thread_identifier = simpledump_chunk.thread_identifier
log_entry.timestamp = self._GetTimestamp(simpledump_chunk.continuous_time)
log_entry.trace_identifier = 0
yield log_entry
def _ReadStateDumpChunkData(self, chunk_data, data_offset):
"""Reads StateDump chunk data.
Args:
chunk_data (bytes): StateDump chunk data.
data_offset (int): offset of the StateDump chunk relative to the start
of the chunk set.
Yields:
LogEntry: a log entry.
Raises:
ParseError: if the chunk cannot be read.
"""
data_type_map = self._GetDataTypeMap('tracev3_statedump_chunk')
context = dtfabric_data_maps.DataTypeMapContext()
statedump_chunk = self._ReadStructureFromByteStream(
chunk_data, data_offset, data_type_map, context=context)
proc_id = (f'{statedump_chunk.proc_id_upper:d}@'
f'{statedump_chunk.proc_id_lower:d}')
process_information_entry = (
self._catalog_process_information_entries.get(proc_id, None))
if not process_information_entry:
raise errors.ParseError((
f'Unable to retrieve process information entry: {proc_id:s} from '
f'catalog'))
event_message = ''
if statedump_chunk.state_data_type == 0x03:
library_data = statedump_chunk.library.split(b'\x00', maxsplit=1)[0]
library = library_data.decode('utf8')
decoder_type_data = statedump_chunk.decoder_type.split(
b'\x00', maxsplit=1)[0]
decoder_type = decoder_type_data.decode('utf8')
decoder_name = f'{library:s}:{decoder_type:s}'
decoder_class = self._FORMAT_STRING_DECODERS.get(decoder_name, None)
if not decoder_class:
value = f'<decode: unsupported decoder: {decoder_name:s}>'
else:
value = decoder_class.FormatValue(statedump_chunk.state_data)
event_message = f'{statedump_chunk.title:s}\n{value:s}'
activity_identifier = statedump_chunk.activity_identifier or 0
backtrace_frame = BacktraceFrame()
backtrace_frame.image_identifier = statedump_chunk.sender_image_identifier
backtrace_frame.image_offset = 0
process_image_identifier, process_image_path = (
self._GetProcessImageValues(process_information_entry))
log_entry = LogEntry()
log_entry.activity_identifier = (
activity_identifier & self.ACTIVITY_IDENTIFIER_BITMASK)
log_entry.backtrace_frames = [backtrace_frame]
log_entry.boot_identifier = self._boot_identifier
log_entry.event_message = event_message
log_entry.event_type = 'stateEvent'
log_entry.mach_timestamp = statedump_chunk.continuous_time
log_entry.process_identifier = getattr(
process_information_entry, 'process_identifier', None) or 0
log_entry.process_image_identifier = process_image_identifier
log_entry.process_image_path = process_image_path
log_entry.sender_image_identifier = statedump_chunk.sender_image_identifier
log_entry.timestamp = self._GetTimestamp(statedump_chunk.continuous_time)
log_entry.trace_identifier = 0
yield log_entry
def _ReadTimesyncRecords(self, boot_identifier):
"""Reads the timesync records corresponding to the boot identifier.
Args:
boot_identifier (str): boot identifier (UUID).
"""
self._timesync_boot_record = None
self._timesync_sync_records = []
if not self._timesync_path:
return
path_spec = path_spec_factory.Factory.NewPathSpec(
self._file_entry.type_indicator, location=self._timesync_path,
parent=self._file_entry.path_spec.parent)
file_entry = self._file_system.GetFileEntryByPathSpec(path_spec)
if not file_entry:
return
for sub_file_entry in file_entry.sub_file_entries:
if self._timesync_boot_record:
break
lower_name = sub_file_entry.name.lower()
if not lower_name.endswith('.timesync'):
continue
timesync_file = self._OpenTimesyncDatabaseFile(sub_file_entry)
for record in timesync_file.ReadRecords():
record_boot_identifier = getattr(record, 'boot_identifier', None)
if self._timesync_boot_record:
if record_boot_identifier:
break
self._timesync_sync_records.append(record)
elif record_boot_identifier == boot_identifier:
self._timesync_boot_record = record
if self._timesync_boot_record:
self._timesync_timebase = (
self._timesync_boot_record.timebase_numerator /
self._timesync_boot_record.timebase_denominator)
# Sort the timesync records starting with the largest kernel time.
self._sorted_timesync_sync_records = sorted(
self._timesync_sync_records, key=lambda record: record.kernel_time,
reverse=True)
[docs]
def Close(self):
"""Closes a tracev3 file.
Raises:
IOError: if the file is not opened.
OSError: if the file is not opened.
"""
for dsc_file in self._cached_dsc_files.values():
if dsc_file:
dsc_file.Close()
for uuidtext_file in self._cached_uuidtext_files.values():
if uuidtext_file:
uuidtext_file.Close()
super(TraceV3File, self).Close()
[docs]
def ReadFileObject(self, file_object):
"""Reads a tracev3 file-like object.
Args:
file_object (file): file-like object.
Raises:
ParseError: if the file cannot be read.
"""
# The uuidtext files can be stored in multiple locations relative from
# the tracev3 file.
# * in ../ for *.logarchive/logdata.LiveData.tracev3
# * in ../../ for .logarchive/*/*.tracev3
# * in ../../uuidtext/ for /private/var/db/diagnostics/*/*.tracev3
path_segments = self._file_system.SplitPath(
self._file_entry.path_spec.location)
# Remove the filename from the path.
path_segments.pop(-1)
lower_last_path_segment = path_segments[-1].lower()
if not lower_last_path_segment.endswith('.logarchive'):
path_segments.pop(-1)
if path_segments:
lower_last_path_segment = path_segments[-1].lower()
if not lower_last_path_segment.endswith('.logarchive'):
path_segments.pop(-1)
if path_segments:
lower_last_path_segment = path_segments[-1].lower()
if lower_last_path_segment.endswith('.logarchive'):
path_segments.append('dsc')
dsc_path = self._file_system.JoinPath(path_segments)
path_spec = path_spec_factory.Factory.NewPathSpec(
self._file_entry.type_indicator, location=dsc_path,
parent=self._file_entry.path_spec.parent)
if self._file_system.FileEntryExistsByPathSpec(path_spec):
path_segments.pop(-1)
self._uuidtext_path = self._file_system.JoinPath(path_segments)
else:
path_segments.append('uuidtext')
uuidtext_path = self._file_system.JoinPath(path_segments)
path_spec = path_spec_factory.Factory.NewPathSpec(
self._file_entry.type_indicator, location=uuidtext_path,
parent=self._file_entry.path_spec.parent)
if self._file_system.FileEntryExistsByPathSpec(path_spec):
self._uuidtext_path = uuidtext_path
path_segments.pop(-1)
# The timesync files can be stored in multiple locations relative from
# the tracev3 file.
# * in ../timesync/ for *.logarchive/logdata.LiveData.tracev3
# * in ../../timesync/ for .logarchive/*/*.tracev3
# * in ../timesync/ for /private/var/db/diagnostics/*/*.tracev3
path_segments.append('timesync')
timesync_path = self._file_system.JoinPath(path_segments)
path_spec = path_spec_factory.Factory.NewPathSpec(
self._file_entry.type_indicator, location=timesync_path,
parent=self._file_entry.path_spec.parent)
if not self._file_system.FileEntryExistsByPathSpec(path_spec):
path_segments.insert(-1, 'diagnostics')
timesync_path = self._file_system.JoinPath(path_segments)
path_spec = path_spec_factory.Factory.NewPathSpec(
self._file_entry.type_indicator, location=timesync_path,
parent=self._file_entry.path_spec.parent)
if self._file_system.FileEntryExistsByPathSpec(path_spec):
self._timesync_path = timesync_path
file_offset = 0
chunk_header = self._ReadChunkHeader(file_object, file_offset)
file_offset += 16
header_chunk = self._ReadHeaderChunk(file_object, file_offset)
file_offset += chunk_header.chunk_data_size
_, alignment = divmod(file_offset, 8)
if alignment > 0:
alignment = 8 - alignment
file_offset += alignment
file_object.seek(file_offset, os.SEEK_SET)
self._header_timestamp = (
header_chunk.timestamp * self._NANOSECONDS_PER_SECOND)
self._header_timebase = (
header_chunk.timebase_numerator / header_chunk.timebase_denominator)
self._ReadTimesyncRecords(self._boot_identifier)
[docs]
def ReadLogEntries(self):
"""Reads log traces.
Yields:
LogEntry: a log entry.
Raises:
ParseError: if the file cannot be read.
"""
if self._timesync_boot_record:
boot_identifier_string = str(self._boot_identifier).upper()
log_entry = LogEntry()
log_entry.boot_identifier = self._boot_identifier
log_entry.event_message = f'=== system boot: {boot_identifier_string:s}'
log_entry.event_type = 'timesyncEvent'
log_entry.mach_timestamp = 0
log_entry.thread_identifier = 0
log_entry.timestamp = self._timesync_boot_record.timestamp
log_entry.trace_identifier = 0
yield log_entry
# TODO: generate timesyncEvent LogEntry
# "=== log class: persist begins"
# "=== log class: in-memory begins"
# are these determined based on the base continuous time of the first
# firehose chunk?
for record in self._timesync_sync_records:
boot_identifier_string = str(self._boot_identifier).upper()
log_entry = LogEntry()
log_entry.boot_identifier = self._boot_identifier
log_entry.event_message = '=== system wallclock time adjusted'
log_entry.event_type = 'timesyncEvent'
log_entry.mach_timestamp = record.kernel_time
log_entry.parent_activity_identifier = 0
log_entry.thread_identifier = 0
log_entry.timestamp = record.timestamp
log_entry.trace_identifier = 0
yield log_entry
file_offset = self._file_object.tell()
oversize_chunks = {}
while file_offset < self._file_entry.size:
chunk_header = self._ReadChunkHeader(self._file_object, file_offset)
file_offset += 16
if chunk_header.chunk_tag == self._CHUNK_TAG_CATALOG:
self._catalog = self._ReadCatalog(self._file_object, file_offset)
self._BuildCatalogProcessInformationEntries(self._catalog)
elif chunk_header.chunk_tag == self._CHUNK_TAG_CHUNK_SET:
yield from self._ReadChunkSet(
self._file_object, file_offset, chunk_header, oversize_chunks)
else:
raise errors.ParseError(
f'Unsupported chunk tag: 0x{chunk_header.chunk_tag:04x}.')
file_offset += chunk_header.chunk_data_size
_, alignment = divmod(file_offset, 8)
if alignment > 0:
alignment = 8 - alignment
file_offset += alignment
[docs]
class UUIDTextFile(BaseUnifiedLoggingFile):
"""Apple Unified Logging and Activity Tracing (uuidtext) file."""
_DEFINITION_FILE = os.path.join(
os.path.dirname(__file__), 'aul_uuidtext.yaml')
[docs]
def __init__(self):
"""Initializes an uuidtext file."""
super(UUIDTextFile, self).__init__()
self._entry_descriptors = []
self._file_footer = None
def _ReadFileFooter(self, file_object, file_offset):
"""Reads a file footer.
Args:
file_object (file): file-like object.
file_offset (int): offset of the file footer relative to the start
of the file.
Returns:
uuidtext_file_footer: a file footer.
Raises:
ParseError: if the file footer cannot be read.
"""
data_type_map = self._GetDataTypeMap('uuidtext_file_footer')
file_footer, _ = self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map)
return file_footer
def _ReadFileHeader(self, file_object):
"""Reads a file header.
Args:
file_object (file): file-like object.
Returns:
uuidtext_file_header: a file header.
Raises:
ParseError: if the file header cannot be read.
"""
data_type_map = self._GetDataTypeMap('uuidtext_file_header')
file_header, _ = self._ReadStructureFromFileObject(
file_object, 0, data_type_map)
format_version = (
file_header.major_format_version, file_header.minor_format_version)
if format_version != (2, 1):
format_version_string = '.'.join([
f'{file_header.major_format_version:d}',
f'{file_header.minor_format_version:d}'])
raise errors.ParseError(
f'Unsupported format version: {format_version_string:s}')
return file_header
def _ReadString(self, file_object, file_offset):
"""Reads a string.
Args:
file_object (file): file-like object.
file_offset (int): offset of the string data relative to the start
of the file.
Returns:
str: string.
Raises:
ParseError: if the string cannot be read.
"""
data_type_map = self._GetDataTypeMap('cstring')
format_string, _ = self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map)
return format_string
[docs]
def GetString(self, string_reference):
"""Retrieves a string.
Args:
string_reference (int): reference of the string.
Returns:
str: string or None if not available.
Raises:
ParseError: if the string cannot be read.
"""
for file_offset, entry_descriptor in self._entry_descriptors:
if string_reference < entry_descriptor.offset:
continue
relative_offset = string_reference - entry_descriptor.offset
if relative_offset <= entry_descriptor.data_size:
file_offset += relative_offset
return self._ReadString(self._file_object, file_offset)
return None
[docs]
def GetImagePath(self):
"""Retrieves the image path.
Returns:
str: image path or None if not available.
"""
return getattr(self._file_footer, 'image_path', None)
[docs]
def ReadFileObject(self, file_object):
"""Reads an uuidtext file-like object.
Args:
file_object (file): file-like object.
Raises:
ParseError: if the file cannot be read.
"""
file_header = self._ReadFileHeader(file_object)
self._entry_descriptors = []
file_offset = file_object.tell()
for entry_descriptor in file_header.entry_descriptors:
self._entry_descriptors.append((file_offset, entry_descriptor))
file_offset += entry_descriptor.data_size
self._file_footer = self._ReadFileFooter(file_object, file_offset)
[docs]
class UnifiedLoggingParser(interface.FileEntryParser):
"""Parses Apple Unified Logging (AUL) tracev3 files."""
NAME = 'unified_logging'
DATA_FORMAT = 'Apple Unified Logging (AUL) 64-bit tracev3 file'
[docs]
def ParseFileEntry(self, parser_mediator, file_entry):
"""Parses an Apple Unified Logging (AUL) tracev3 file entry:
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
file_entry (dfvfs.FileEntry): a file entry to parse.
Raises:
WrongParser: when the file cannot be parsed.
"""
file_system = file_entry.GetFileSystem()
# TODO: extract timesync events
tracev3_file = TraceV3File(file_system=file_system)
try:
tracev3_file.Open(file_entry)
except errors.ParseError as exception:
raise errors.WrongParser(
'Unable to open tracev3 file with error: {0!s}'.format(exception))
try:
for log_entry in tracev3_file.ReadLogEntries():
activity_identifier = log_entry.activity_identifier or 0
event_data = UnifiedLoggingEventData()
event_data.activity_identifier = (
activity_identifier & tracev3_file.ACTIVITY_IDENTIFIER_BITMASK)
event_data.boot_identifier = str(log_entry.boot_identifier).upper()
event_data.category = log_entry.category
event_data.event_message = log_entry.event_message
event_data.event_type = log_entry.event_type
event_data.message_type = log_entry.message_type
event_data.process_identifier = log_entry.process_identifier
event_data.process_image_identifier = str(
log_entry.process_image_identifier).upper()
event_data.process_image_path = log_entry.process_image_path
event_data.recorded_time = dfdatetime_posix_time.PosixTimeInNanoseconds(
timestamp=log_entry.timestamp)
event_data.signpost_identifier = log_entry.signpost_identifier
event_data.signpost_name = log_entry.signpost_name
event_data.sender_image_identifier = str(
log_entry.sender_image_identifier).upper()
event_data.sender_image_path = log_entry.sender_image_path
event_data.subsystem = log_entry.sub_system
event_data.thread_identifier = log_entry.thread_identifier
event_data.ttl = log_entry.ttl
parser_mediator.ProduceEventData(event_data)
finally:
tracev3_file.Close()
manager.ParsersManager.RegisterParser(UnifiedLoggingParser)