Source code for plaso.parsers.text_plugins.atlassian_jira

"""Text parser plugin for Atlassian Jira log files.

This is for the atlassian-jira.log file, one of multiple log files
produced by a Jira DC/Server installation.
"""

import pyparsing

from dfdatetime import time_elements as dfdatetime_time_elements

from plaso.containers import events
from plaso.lib import errors
from plaso.parsers import text_parser
from plaso.parsers.text_plugins import interface


[docs] class AtlassianJiraEventData(events.EventData): """Jira event data. Attributes: body (str): the freeform body of the log line. level (str): the logging level of the event. logger_class (str): the Jira class responsible for logging. logger_method (str): name of the method within the class. thread (str): the Jira thread from which the log event originated. written_time (dfdatetime.DateTimeValues): entry written date and time. """ DATA_TYPE = 'atlassian:jira:line'
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.body = None self.level = None self.logger_class = None self.logger_method = None self.thread = None self.written_time = None
[docs] class AtlassianJiraTextPlugin(interface.TextPlugin): """Text parser plugin for Atlassian Jira log files.""" NAME = 'atlassian_jira' DATA_FORMAT = 'Atlassian Jira log file' ENCODING = 'utf-8' _TWO_DIGITS = pyparsing.Word(pyparsing.nums, exact=2).set_parse_action( lambda tokens: int(tokens[0], 10)) _THREE_DIGITS = pyparsing.Word(pyparsing.nums, exact=3).set_parse_action( lambda tokens: int(tokens[0], 10)) _FOUR_DIGITS = pyparsing.Word(pyparsing.nums, exact=4).set_parse_action( lambda tokens: int(tokens[0], 10)) # Jira log levels _JIRA_LEVELS = [ 'DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL'] # Date and time format: 2022-07-12 01:08:59,489 # Format: YYYY-MM-DD HH:MM:SS,mmm (comma-separated milliseconds) _DATE_TIME = ( _FOUR_DIGITS + pyparsing.Suppress('-') + _TWO_DIGITS + pyparsing.Suppress('-') + _TWO_DIGITS + _TWO_DIGITS + pyparsing.Suppress(':') + _TWO_DIGITS + pyparsing.Suppress(':') + _TWO_DIGITS + pyparsing.Suppress(',') + _THREE_DIGITS).set_results_name( 'date_time') # Log level (DEBUG, INFO, WARN, ERROR, FATAL) _LOG_LEVEL = pyparsing.oneOf(_JIRA_LEVELS).set_results_name('level') # Thread name enclosed in brackets: [http-nio-8080-exec-1] # or [Caesium-1-4] or [scheduler_Worker-1] # Allows alphanumerics, hyphens, underscores, dots, colons, and spaces _JIRA_THREAD = ( pyparsing.Suppress('[') + pyparsing.Word( pyparsing.alphanums + '-_.:' + ' ').set_results_name('thread') + pyparsing.Suppress(']')) # Logger class name enclosed in brackets # [com.atlassian.jira.startup.JiraStartupLogger] _JIRA_LOGGER = ( pyparsing.Suppress('[') + pyparsing.SkipTo(']').set_results_name('logger_class') + pyparsing.Suppress(']')) # Logger method name: start, doFilter, <init>, lambda$execute$0 _JIRA_LOGGER_METHOD = ( pyparsing.Word( pyparsing.alphanums + '_$<>').set_results_name('logger_method')) # Log message body - rest of line _JIRA_LOG_MESSAGE = pyparsing.SkipTo( pyparsing.LineEnd()).set_results_name('body') # Complete log line structure # Format: <timestamp> <level> [<thread>] [<logger_class>] <method> <message> _JIRA_LOG_LINE = ( _DATE_TIME + _LOG_LEVEL + _JIRA_THREAD + _JIRA_LOGGER + _JIRA_LOGGER_METHOD + _JIRA_LOG_MESSAGE) _LINE_STRUCTURES = [('log_entry', _JIRA_LOG_LINE)] VERIFICATION_GRAMMAR = _JIRA_LOG_LINE def _ParseRecord(self, parser_mediator, key, structure): """Parses a pyparsing structure. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. key (str): name of the parsed structure. structure (pyparsing.ParseResults): tokens from a parsed log line. Raises: ParseError: if the structure cannot be parsed. """ if key != 'log_entry': raise errors.ParseError( f'Unable to parse record, unknown structure: {key:s}') time_elements_structure = self._GetValueFromStructure( structure, 'date_time') event_data = AtlassianJiraEventData() event_data.body = self._GetValueFromStructure( structure, 'body', default_value='').strip() or None event_data.level = self._GetValueFromStructure(structure, 'level') event_data.logger_class = self._GetValueFromStructure( structure, 'logger_class') event_data.logger_method = self._GetValueFromStructure( structure, 'logger_method') event_data.thread = self._GetValueFromStructure(structure, 'thread') event_data.written_time = self._ParseTimeElements(time_elements_structure) parser_mediator.ProduceEventData(event_data) def _ParseTimeElements(self, time_elements_structure): """Parses date and time elements of a log line. Args: time_elements_structure (pyparsing.ParseResults): date and time elements of a log line. Returns: dfdatetime.TimeElements: date and time value. Raises: ParseError: if a valid date and time value cannot be derived from the time elements. """ try: date_time = dfdatetime_time_elements.TimeElementsInMilliseconds( time_elements_tuple=time_elements_structure) date_time.is_local_time = True return date_time except (IndexError, TypeError, ValueError) as exception: raise errors.ParseError( f'Unable to parse time elements with error: {exception!s}')
[docs] def CheckRequiredFormat(self, parser_mediator, text_reader): """Check if the log record has the minimal structure required. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. text_reader (EncodedTextReader): text reader. Returns: bool: True if this is the correct plugin, False otherwise. """ try: structure = self._VerifyString(text_reader.lines) except errors.ParseError: return False time_elements_structure = self._GetValueFromStructure( structure, 'date_time') try: self._ParseTimeElements(time_elements_structure) except errors.ParseError: return False return True
text_parser.TextLogParser.RegisterPlugin(AtlassianJiraTextPlugin)