Source code for plaso.parsers.text_plugins.santa

# -*- coding: utf-8 -*-
"""Text file parser plugin for Santa log files."""

import pyparsing

from dfdatetime import time_elements as dfdatetime_time_elements
from plaso.containers import events
from plaso.lib import errors
from plaso.parsers import text_parser
from plaso.parsers.text_plugins import interface


[docs] class SantaMountEventData(events.EventData): """Santa mount event data. Attributes: action (str): event type recorded by Santa. appearance_time (dfdatetime.DateTimeValues): date and time the disk appeared. bsd_name (str): disk BSD name. bus (str): device protocol. dmg_path (str): DMG file path. fs (str): disk volume kind. last_written_time (dfdatetime.DateTimeValues): entry last written date and time. model (str): disk model. mount (str): disk mount point. serial (str): disk serial. volume (str): disk volume name. """ DATA_TYPE = 'santa:diskmount'
[docs] def __init__(self): """Initializes event data.""" super(SantaMountEventData, self).__init__(data_type=self.DATA_TYPE) self.action = None self.appearance_time = None self.bsd_name = None self.bus = None self.dmg_path = None self.fs = None self.last_written_time = None self.model = None self.mount = None self.serial = None self.volume = None
[docs] class SantaExecutionEventData(events.EventData): """Santa execution event data. Attributes: action (str): action recorded by Santa. certificate_common_name (str): certificate common name. certificate_hash (str): SHA256 hash for the certificate associated with the executed process. decision (str): if the process was allowed or blocked. gid (str): group identifier associated with the executed process. group (str): group name associated with the executed process. last_run_time (dfdatetime.DateTimeValues): executable (binary) last run date and time. long_reason (str): further explanation behind Santa decision to execute or block a process. mode (str): Santa execution mode, for example Monitor or Lockdown. pid (str): process identifier for the process. pid_version (str): the process identifier version extracted from the Mach audit token. The version can sed to identify process identifier rollovers. ppid (str): parent process identifier for the executed process. process_arguments (str): executed process with its arguments. process_hash (str): SHA256 hash for the executed process. process_path (str): process file path. reason (str): reason behind Santa decision to execute or block a process. uid (str): user identifier associated with the executed process. user (str): user name associated with the executed process. """ DATA_TYPE = 'santa:execution'
[docs] def __init__(self): """Initializes event data.""" super(SantaExecutionEventData, self).__init__(data_type=self.DATA_TYPE) self.action = None self.certificate_common_name = None self.certificate_hash = None self.decision = None self.gid = None self.group = None self.last_run_time = None self.long_reason = None self.mode = None self.pid = None self.pid_version = None self.ppid = None self.process_arguments = None self.process_hash = None self.process_path = None self.quarantine_url = None self.reason = None self.uid = None self.user = None
[docs] class SantaFileSystemEventData(events.EventData): """Santa file system event data. Attributes: action (str): event type recorded by Santa. file_new_path (str): new file path and name for RENAME events. file_path (str): file path and name for WRITE/DELETE events. gid (str): group identifier associated with the executed process. group (str): group name associated with the executed process. last_written_time (dfdatetime.DateTimeValues): entry last written date and time. pid (str): process identifier for the process. pid_version (str): the process identifier version extracted from the Mach audit token. The version can be used to identify process identifier rollovers. ppid (str): parent process identifier for the executed process. process_path (str): process file path. process (str): process name. uid (str): user identifier associated with the executed process. user (str): user name associated with the executed process. """ DATA_TYPE = 'santa:file_system_event'
[docs] def __init__(self): """Initializes event data.""" super(SantaFileSystemEventData, self).__init__(data_type=self.DATA_TYPE) self.action = None self.file_new_path = None self.file_path = None self.gid = None self.group = None self.last_written_time = None self.pid = None self.pid_version = None self.ppid = None self.process = None self.process_path = None self.uid = None self.user = None
[docs] class SantaProcessExitEventData(events.EventData): """Santa process exit event data. Attributes: action (str): action recorded by Santa. exit_time (dfdatetime.DateTimeValues): process exit date and time. gid (str): group identifier associated with the executed process. pid (str): process identifier for the process. pid_version (str): the process identifier version extracted from the Mach audit token. The version can be used to identify process identifier rollovers. ppid (str): parent process identifier for the executed process. uid (str): user identifier associated with the executed process. """ DATA_TYPE = 'santa:process_exit'
[docs] def __init__(self): """Initializes event data.""" super(SantaProcessExitEventData, self).__init__(data_type=self.DATA_TYPE) self.action = None self.exit_time = None self.gid = None self.pid = None self.pid_version = None self.ppid = None self.uid = None
[docs] class SantaTextPlugin(interface.TextPlugin): """Text file parser plugin for Santa log files.""" NAME = 'santa' DATA_FORMAT = 'Santa log (santa.log) file' ENCODING = 'utf-8' _TWO_DIGITS = pyparsing.Word(pyparsing.nums, exact=2).set_parse_action( lambda tokens: int(tokens[0], 10)) _THREE_DIGITS = pyparsing.Word(pyparsing.nums, exact=3).set_parse_action( lambda tokens: int(tokens[0], 10)) _FOUR_DIGITS = pyparsing.Word(pyparsing.nums, exact=4).set_parse_action( lambda tokens: int(tokens[0], 10)) _SEPARATOR = pyparsing.Suppress('|') # Using CharsNotIn is more efficient than SkipTo('|'). Optional is needed # since CharsNotIn does not support a minimum of 0. _SKIP_TO_SEPARATOR = pyparsing.Optional(pyparsing.CharsNotIn('|\n')) # Date and time values are formatted as: 2018-08-19T03:09:13.120Z _DATE_AND_TIME = ( _FOUR_DIGITS + pyparsing.Suppress('-') + _TWO_DIGITS + pyparsing.Suppress('-') + _TWO_DIGITS + pyparsing.Suppress('T') + _TWO_DIGITS + pyparsing.Suppress(':') + _TWO_DIGITS + pyparsing.Suppress(':') + _TWO_DIGITS + pyparsing.Suppress('.') + _THREE_DIGITS + pyparsing.Suppress('Z')).set_results_name('date_time') _DATE_TIME_BLOCK = ( pyparsing.Suppress('[') + _DATE_AND_TIME + pyparsing.Suppress(']')) _END_OF_LINE = pyparsing.Suppress(pyparsing.LineEnd()) _SANTAD_PREAMBLE = pyparsing.Suppress('I santad:') _APPEARANCE = (pyparsing.Suppress('|appearance=') + _DATE_AND_TIME.set_results_name('appearance')) _ARGS = (pyparsing.Suppress('|args=') + _SKIP_TO_SEPARATOR.set_results_name('args')) _BSD_NAME = (pyparsing.Suppress('|bsdname=') + _SKIP_TO_SEPARATOR.set_results_name('bsd_name')) _BUS = pyparsing.Suppress('|bus=') + _SKIP_TO_SEPARATOR.set_results_name( 'bus') _CERT_CN = (pyparsing.Suppress('|cert_cn=') + _SKIP_TO_SEPARATOR.set_results_name('cert_cn')) _CERT_SHA256 = (pyparsing.Suppress('|cert_sha256=') + _SKIP_TO_SEPARATOR.set_results_name('cert_sha256')) _DECISION = (pyparsing.Suppress('|decision=') + _SKIP_TO_SEPARATOR.set_results_name('decision')) _DMG_PATH = (pyparsing.Suppress('|dmgpath=') + _SKIP_TO_SEPARATOR.set_results_name('dmg_path')) _EXPLAIN = (pyparsing.Suppress('|explain=') + _SKIP_TO_SEPARATOR.set_results_name('explain')) _FILE_SYSTEM = (pyparsing.Suppress('|fs=') + _SKIP_TO_SEPARATOR.set_results_name('fs')) _GID = (pyparsing.Suppress('|gid=') + _SKIP_TO_SEPARATOR.set_results_name('gid')) _GROUP = (pyparsing.Suppress('|group=') + _SKIP_TO_SEPARATOR.set_results_name('group')) _MODE = (pyparsing.Suppress('|mode=') + _SKIP_TO_SEPARATOR.set_results_name('mode')) _MODEL = (pyparsing.Suppress('|model=') + _SKIP_TO_SEPARATOR.set_results_name('model')) _MOUNT = (pyparsing.Suppress('|mount=') + _SKIP_TO_SEPARATOR.set_results_name('mount')) _NEW_PATH = (pyparsing.Suppress('|newpath=') + _SKIP_TO_SEPARATOR.set_results_name('newpath')) _QUARANTINE_URL = (pyparsing.Suppress('|quarantine_url=') + _SKIP_TO_SEPARATOR.set_results_name('quarantine_url')) _PATH = (pyparsing.Suppress('|path=') + _SKIP_TO_SEPARATOR.set_results_name('path')) _PID = (pyparsing.Suppress('|pid=') + _SKIP_TO_SEPARATOR.set_results_name('pid')) _PID_VERSION = (pyparsing.Suppress('|pidversion=') + _SKIP_TO_SEPARATOR.set_results_name('pidversion')) _PPID = (pyparsing.Suppress('|ppid=') + _SKIP_TO_SEPARATOR.set_results_name('ppid')) _PROCESS = (pyparsing.Suppress('|process=') + _SKIP_TO_SEPARATOR.set_results_name('process')) _PROCESS_PATH = (pyparsing.Suppress('|processpath=') + _SKIP_TO_SEPARATOR.set_results_name('processpath')) _REASON = (pyparsing.Suppress('|reason=') + _SKIP_TO_SEPARATOR.set_results_name('reason')) _SERIAL = (pyparsing.Suppress('|serial=') + _SKIP_TO_SEPARATOR.set_results_name('serial')) _SHA256 = (pyparsing.Suppress('|sha256=') + _SKIP_TO_SEPARATOR.set_results_name('sha256')) _UID = (pyparsing.Suppress('|uid=') + _SKIP_TO_SEPARATOR.set_results_name('uid')) _USER = (pyparsing.Suppress('|user=') + _SKIP_TO_SEPARATOR.set_results_name('user')) _VOLUME = (pyparsing.Suppress('|volume=') + _SKIP_TO_SEPARATOR.set_results_name('volume')) _DISK_MOUNT_LINE = ( _DATE_TIME_BLOCK + _SANTAD_PREAMBLE + pyparsing.Suppress('action=') + pyparsing.Literal('DISKAPPEAR').set_results_name('action') + _MOUNT + _VOLUME + _BSD_NAME + _FILE_SYSTEM + _MODEL + _SERIAL + _BUS + _DMG_PATH + _APPEARANCE + _END_OF_LINE) _DISK_UNMOUNT_LINE = ( _DATE_TIME_BLOCK + _SANTAD_PREAMBLE + pyparsing.Suppress('action=') + pyparsing.Literal('DISKDISAPPEAR').set_results_name('action') + _MOUNT + _VOLUME + _BSD_NAME + _END_OF_LINE) _EXECUTION_LINE = ( _DATE_TIME_BLOCK + _SANTAD_PREAMBLE + pyparsing.Suppress('action=') + pyparsing.Literal('EXEC').set_results_name('action') + _DECISION + _REASON + pyparsing.Optional(_EXPLAIN) + _SHA256 + pyparsing.Optional(_CERT_SHA256) + pyparsing.Optional(_CERT_CN) + pyparsing.Optional(_QUARANTINE_URL) + _PID + pyparsing.Optional(_PID_VERSION) + _PPID + _UID + _USER + _GID + _GROUP + _MODE + _PATH + pyparsing.Optional(_ARGS) + _END_OF_LINE) _FILE_OPERATION_LINE = ( _DATE_TIME_BLOCK + _SANTAD_PREAMBLE + pyparsing.Suppress('action=') + ( pyparsing.Literal('DELETE') ^ pyparsing.Literal('LINK') ^ pyparsing.Literal('RENAME') ^ pyparsing.Literal('WRITE')).set_results_name('action') + _PATH + pyparsing.Optional(_NEW_PATH) + _PID + pyparsing.Optional(_PID_VERSION) + _PPID + _PROCESS + _PROCESS_PATH + _UID + _USER + _GID + _GROUP + _END_OF_LINE) _PROCESS_EXIT_LINE = ( _DATE_TIME_BLOCK + _SANTAD_PREAMBLE + pyparsing.Suppress('action=') + pyparsing.Literal('EXIT').set_results_name('action') + _PID + _PID_VERSION + _PPID + _UID + _GID + _END_OF_LINE) _QUOTA_EXCEEDED_LINE = ( _DATE_TIME_BLOCK + pyparsing.Literal(( '*** LOG MESSAGE QUOTA EXCEEDED - SOME MESSAGES FROM THIS PROCESS ' 'HAVE BEEN DISCARDED ***')) + _END_OF_LINE) _LINE_STRUCTURES = [ ('execution_line', _EXECUTION_LINE), ('file_system_event_line', _FILE_OPERATION_LINE), ('mount_line', _DISK_MOUNT_LINE), ('process_exit_line', _PROCESS_EXIT_LINE), ('quota_exceeded_line', _QUOTA_EXCEEDED_LINE), ('unmount_line', _DISK_UNMOUNT_LINE)] VERIFICATION_GRAMMAR = ( _DISK_MOUNT_LINE ^ _DISK_UNMOUNT_LINE ^ _EXECUTION_LINE ^ _FILE_OPERATION_LINE ^ _PROCESS_EXIT_LINE) def _ParseRecord(self, parser_mediator, key, structure): """Parses a pyparsing structure. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. key (str): name of the parsed structure. structure (pyparsing.ParseResults): tokens from a parsed log line. Raises: ParseError: if the structure cannot be parsed. """ if key == 'quota_exceeded_line': # skip this line return time_elements_structure = self._GetValueFromStructure( structure, 'date_time') date_time = self._ParseTimeElements(time_elements_structure) if key == 'execution_line': event_data = SantaExecutionEventData() event_data.action = self._GetValueFromStructure(structure, 'action') event_data.certificate_common_name = self._GetValueFromStructure( structure, 'cert_cn') event_data.certificate_hash = self._GetValueFromStructure( structure, 'cert_sha256') event_data.decision = self._GetValueFromStructure(structure, 'decision') event_data.gid = self._GetValueFromStructure(structure, 'gid') event_data.group = self._GetValueFromStructure(structure, 'group') event_data.last_run_time = date_time event_data.long_reason = self._GetValueFromStructure(structure, 'explain') event_data.mode = self._GetValueFromStructure(structure, 'mode') event_data.quarantine_url = self._GetValueFromStructure( structure, 'quarantine_url') event_data.pid = self._GetValueFromStructure(structure, 'pid') event_data.pid_version = self._GetValueFromStructure( structure, 'pidversion') event_data.ppid = self._GetValueFromStructure(structure, 'ppid') event_data.process_arguments = self._GetValueFromStructure( structure, 'args') event_data.process_hash = self._GetValueFromStructure(structure, 'sha256') event_data.process_path = self._GetValueFromStructure(structure, 'path') event_data.reason = self._GetValueFromStructure(structure, 'reason') event_data.uid = self._GetValueFromStructure(structure, 'uid') event_data.user = self._GetValueFromStructure(structure, 'user') if key == 'process_exit_line': event_data = SantaProcessExitEventData() event_data.action = self._GetValueFromStructure(structure, 'action') event_data.exit_time = date_time event_data.pid = self._GetValueFromStructure(structure, 'pid') event_data.pid_version = self._GetValueFromStructure( structure, 'pidversion') event_data.ppid = self._GetValueFromStructure(structure, 'ppid') event_data.uid = self._GetValueFromStructure(structure, 'uid') event_data.gid = self._GetValueFromStructure(structure, 'gid') elif key == 'file_system_event_line': event_data = SantaFileSystemEventData() event_data.action = self._GetValueFromStructure(structure, 'action') event_data.file_new_path = self._GetValueFromStructure( structure, 'newpath') event_data.file_path = self._GetValueFromStructure(structure, 'path') event_data.gid = self._GetValueFromStructure(structure, 'gid') event_data.group = self._GetValueFromStructure(structure, 'group') event_data.last_written_time = date_time event_data.pid = self._GetValueFromStructure(structure, 'pid') event_data.pid_version = self._GetValueFromStructure( structure, 'pidversion') event_data.ppid = self._GetValueFromStructure(structure, 'ppid') event_data.process = self._GetValueFromStructure(structure, 'process') event_data.process_path = self._GetValueFromStructure( structure, 'processpath') event_data.uid = self._GetValueFromStructure(structure, 'uid') event_data.user = self._GetValueFromStructure(structure, 'user') elif key == 'unmount_line': event_data = SantaMountEventData() event_data.action = self._GetValueFromStructure(structure, 'action') event_data.bsd_name = self._GetValueFromStructure(structure, 'bsd_name') event_data.last_written_time = date_time event_data.mount = self._GetValueFromStructure(structure, 'mount') or None event_data.volume = self._GetValueFromStructure(structure, 'volume') elif key == 'mount_line': event_data = SantaMountEventData() event_data.action = self._GetValueFromStructure(structure, 'action') event_data.bsd_name = self._GetValueFromStructure(structure, 'bsd_name') event_data.bus = self._GetValueFromStructure(structure, 'bus') event_data.dmg_path = self._GetValueFromStructure(structure, 'dmg_path') event_data.fs = self._GetValueFromStructure(structure, 'fs') event_data.last_written_time = date_time event_data.model = self._GetValueFromStructure(structure, 'model') event_data.mount = self._GetValueFromStructure(structure, 'mount') or None event_data.serial = self._GetValueFromStructure( structure, 'serial') or None event_data.volume = self._GetValueFromStructure(structure, 'volume') appearance = self._GetValueFromStructure(structure, 'appearance') if appearance: # pylint: disable=attribute-defined-outside-init event_data.appearance_time = self._ParseTimeElements(appearance) parser_mediator.ProduceEventData(event_data) def _ParseTimeElements(self, time_elements_structure): """Parses date and time elements of a log line. Args: time_elements_structure (pyparsing.ParseResults): date and time elements of a log line. Returns: dfdatetime.TimeElements: date and time value. Raises: ParseError: if a valid date and time value cannot be derived from the time elements. """ try: (year, month, day_of_month, hours, minutes, seconds, milliseconds) = ( time_elements_structure) # Ensure time_elements_tuple is not a pyparsing.ParseResults otherwise # copy.deepcopy() of the dfDateTime object will fail on Python 3.8 with: # "TypeError: 'str' object is not callable" due to pyparsing.ParseResults # overriding __getattr__ with a function that returns an empty string # when named token does not exist. time_elements_tuple = ( year, month, day_of_month, hours, minutes, seconds, milliseconds) return dfdatetime_time_elements.TimeElementsInMilliseconds( time_elements_tuple=time_elements_tuple) except (TypeError, ValueError) as exception: raise errors.ParseError( 'Unable to parse time elements with error: {0!s}'.format(exception))
[docs] def CheckRequiredFormat(self, parser_mediator, text_reader): """Check if the log record has the minimal structure required by the plugin. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. text_reader (EncodedTextReader): text reader. Returns: bool: True if this is the correct plugin, False otherwise. """ try: structure = self._VerifyString(text_reader.lines) except errors.ParseError: return False time_elements_structure = self._GetValueFromStructure( structure, 'date_time') try: self._ParseTimeElements(time_elements_structure) except errors.ParseError: return False return True
text_parser.TextLogParser.RegisterPlugin(SantaTextPlugin)