Source code for plaso.parsers.text_plugins.cri

"""Text file parser plugin for Container Runtime Interface (CRI) log format.

This is a text-based log format used in kubernetes/GKE.

Also see:
  https://github.com/kubernetes/design-proposals-archive/blob/main/node/kubelet-cri-logging.md
"""

import pyparsing

from dfdatetime import time_elements

from plaso.containers import events
from plaso.lib import errors
from plaso.parsers import text_parser
from plaso.parsers.text_plugins import interface


[docs] class CRIEventData(events.EventData): """CRI log event data. Attributes: body (str): the log message body. event_datetime (time_elements.TimeElementsInNanoseconds): the datetime of the log message. stream (str): the log stream. Currently only 'stdout' and 'stderr' are supported. tag (str): the log tag. Currently only 'P' (partial) and 'F' (full) are supported. """ DATA_TYPE = 'cri:container:log:entry'
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.body = None self.event_datetime = None self.stream = None self.tag = None
[docs] class CRITextPlugin(interface.TextPlugin): """Text file parser plugin for CRI log files.""" NAME = 'cri_log' DATA_FORMAT = 'Container Runtime Interface log file' ENCODING = 'utf-8' # Date and time values are formatted as: 2016-10-06T00:17:09.669794202Z _DATE_AND_TIME = ( pyparsing.Regex(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{1,9}Z') ).setResultsName('date_time') _STREAM = ( pyparsing.Literal('stderr') ^ pyparsing.Literal('stdout') ).setResultsName('stream') # P indicates a partial log, # F indicates a complete or the end of a multiline log. _TAG = pyparsing.oneOf(['P', 'F']).setResultsName('tag') _LOG = ( pyparsing.restOfLine() + pyparsing.Suppress(pyparsing.LineEnd()) ).setResultsName('body') _LOG_LINE = _DATE_AND_TIME + _STREAM + _TAG + _LOG _LINE_STRUCTURES = [('log_line', _LOG_LINE)] VERIFICATION_GRAMMAR = _LOG_LINE def _ParseRecord(self, parser_mediator, key, structure): """Parses a pyparsing structure. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. key (str): name of the parsed structure. structure (pyparsing.ParseResults): tokens from a parsed log line. Raises: ParseError: if the structure cannot be parsed. """ if key == 'log_line': date_time = time_elements.TimeElementsInNanoseconds() date_time.CopyFromStringISO8601(self._GetValueFromStructure( structure, 'date_time')) event_data = CRIEventData() event_data.event_datetime = date_time event_data.body = self._GetValueFromStructure( structure, 'body')[0] event_data.stream = self._GetValueFromStructure(structure, 'stream') event_data.tag = self._GetValueFromStructure(structure, 'tag') parser_mediator.ProduceEventData(event_data)
[docs] def CheckRequiredFormat(self, parser_mediator, text_reader): """Check if the log record has the minimal structure required by the parser. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. text_reader (EncodedTextReader): text reader. Returns: bool: True if this is the correct parser, False otherwise. """ try: self._VerifyString(text_reader.lines) except errors.ParseError: return False return True
text_parser.TextLogParser.RegisterPlugin(CRITextPlugin)