# -*- coding: utf-8 -*-
"""JSON-L parser plugin for Docker layer configuration files."""
from dfdatetime import semantic_time as dfdatetime_semantic_time
from dfdatetime import time_elements as dfdatetime_time_elements
from plaso.containers import events
from plaso.containers import time_events
from plaso.lib import definitions
from plaso.parsers import jsonl_parser
from plaso.parsers.jsonl_plugins import interface
[docs]class DockerLayerConfigurationEventData(events.EventData):
"""Docker layer configuration event data.
Attributes:
command: the command used which made Docker create a new layer.
layer_identifier: the identifier of the current Docker layer (SHA-1).
"""
DATA_TYPE = 'docker:layer:configuration'
def __init__(self):
"""Initializes event data."""
super(DockerLayerConfigurationEventData, self).__init__(
data_type=self.DATA_TYPE)
self.command = None
self.layer_identifier = None
[docs]class DockerLayerConfigurationJSONLPlugin(interface.JSONLPlugin):
"""JSON-L parser plugin for Docker layer configuration files.
This parser handles per Docker layer configuration files stored in:
DOCKER_DIR/graph/<layer_identifier>/json
"""
NAME = 'docker_layer_config'
DATA_FORMAT = 'Docker layer configuration file'
def _GetLayerIdentifierFromPath(self, parser_mediator):
"""Extracts a layer (or graph) identifier from a path.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
Returns:
str: layer identifier.
"""
file_entry = parser_mediator.GetFileEntry()
file_system = file_entry.GetFileSystem()
path_segments = file_system.SplitPath(file_entry.path_spec.location)
# TODO: validate format of layer identifier.
return path_segments[-2]
def _ParseRecord(self, parser_mediator, json_dict):
"""Parses a Docker layer configuration record.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
json_dict (dict): JSON dictionary of the configuration record.
"""
created = self._GetJSONValue(json_dict, 'created')
if created:
container_configuration = self._GetJSONValue(
json_dict, 'container_config', default_value={})
commands = self._GetJSONValue(
container_configuration, 'Cmd', default_value=[])
event_data = DockerLayerConfigurationEventData()
event_data.command = ' '.join([
command.replace('\t', '').strip() for command in commands])
event_data.layer_identifier = self._GetLayerIdentifierFromPath(
parser_mediator)
try:
date_time = dfdatetime_time_elements.TimeElementsInMicroseconds()
date_time.CopyFromStringISO8601(created)
except ValueError as exception:
parser_mediator.ProduceExtractionWarning((
'Unable to parse created time string: {0:s} with error: '
'{1!s}').format(created, exception))
date_time = dfdatetime_semantic_time.InvalidTime()
event = time_events.DateTimeValuesEvent(
date_time, definitions.TIME_DESCRIPTION_ADDED)
parser_mediator.ProduceEventWithEventData(event, event_data)
jsonl_parser.JSONLParser.RegisterPlugin(DockerLayerConfigurationJSONLPlugin)