Source code for plaso.parsers.text_plugins.santa

"""Text file parser plugin for Santa log files."""

import pyparsing

from dfdatetime import time_elements as dfdatetime_time_elements
from plaso.containers import events
from plaso.lib import errors
from plaso.parsers import text_parser
from plaso.parsers.text_plugins import interface


[docs] class SantaMountEventData(events.EventData): """Santa mount event data. Attributes: action (str): event type recorded by Santa. appearance_time (dfdatetime.DateTimeValues): date and time the disk appeared. bsd_name (str): disk BSD name. bus (str): device protocol. dmg_path (str): DMG file path. fs (str): disk volume kind. last_written_time (dfdatetime.DateTimeValues): entry last written date and time. model (str): disk model. mount (str): disk mount point. serial (str): disk serial. volume (str): disk volume name. """ DATA_TYPE = "santa:diskmount"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.action = None self.appearance_time = None self.bsd_name = None self.bus = None self.dmg_path = None self.fs = None self.last_written_time = None self.model = None self.mount = None self.serial = None self.volume = None
[docs] class SantaExecutionEventData(events.EventData): """Santa execution event data. Attributes: action (str): action recorded by Santa. certificate_common_name (str): certificate common name. certificate_hash (str): SHA256 hash for the certificate associated with the executed process. decision (str): if the process was allowed or blocked. gid (str): group identifier associated with the executed process. group (str): group name associated with the executed process. last_run_time (dfdatetime.DateTimeValues): executable (binary) last run date and time. long_reason (str): further explanation behind Santa decision to execute or block a process. mode (str): Santa execution mode, for example Monitor or Lockdown. pid (str): process identifier for the process. pid_version (str): the process identifier version extracted from the Mach audit token. The version can sed to identify process identifier rollovers. ppid (str): parent process identifier for the executed process. process_arguments (str): executed process with its arguments. process_hash (str): SHA256 hash for the executed process. process_path (str): process file path. reason (str): reason behind Santa decision to execute or block a process. uid (str): user identifier associated with the executed process. user (str): user name associated with the executed process. """ DATA_TYPE = "santa:execution"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.action = None self.certificate_common_name = None self.certificate_hash = None self.decision = None self.gid = None self.group = None self.last_run_time = None self.long_reason = None self.mode = None self.pid = None self.pid_version = None self.ppid = None self.process_arguments = None self.process_hash = None self.process_path = None self.quarantine_url = None self.reason = None self.uid = None self.user = None
[docs] class SantaFileSystemEventData(events.EventData): """Santa file system event data. Attributes: action (str): event type recorded by Santa. file_new_path (str): new file path and name for RENAME events. file_path (str): file path and name for WRITE/DELETE events. gid (str): group identifier associated with the executed process. group (str): group name associated with the executed process. last_written_time (dfdatetime.DateTimeValues): entry last written date and time. pid (str): process identifier for the process. pid_version (str): the process identifier version extracted from the Mach audit token. The version can be used to identify process identifier rollovers. ppid (str): parent process identifier for the executed process. process_path (str): process file path. process (str): process name. uid (str): user identifier associated with the executed process. user (str): user name associated with the executed process. """ DATA_TYPE = "santa:file_system_event"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.action = None self.file_new_path = None self.file_path = None self.gid = None self.group = None self.last_written_time = None self.pid = None self.pid_version = None self.ppid = None self.process = None self.process_path = None self.uid = None self.user = None
[docs] class SantaProcessExitEventData(events.EventData): """Santa process exit event data. Attributes: action (str): action recorded by Santa. exit_time (dfdatetime.DateTimeValues): process exit date and time. gid (str): group identifier associated with the executed process. pid (str): process identifier for the process. pid_version (str): the process identifier version extracted from the Mach audit token. The version can be used to identify process identifier rollovers. ppid (str): parent process identifier for the executed process. uid (str): user identifier associated with the executed process. """ DATA_TYPE = "santa:process_exit"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.action = None self.exit_time = None self.gid = None self.pid = None self.pid_version = None self.ppid = None self.uid = None
[docs] class SantaTextPlugin(interface.TextPlugin): """Text file parser plugin for Santa log files.""" NAME = "santa" DATA_FORMAT = "Santa log (santa.log) file" ENCODING = "utf-8" _TWO_DIGITS = pyparsing.Word(pyparsing.nums, exact=2).set_parse_action( lambda tokens: int(tokens[0], 10) ) _THREE_DIGITS = pyparsing.Word(pyparsing.nums, exact=3).set_parse_action( lambda tokens: int(tokens[0], 10) ) _FOUR_DIGITS = pyparsing.Word(pyparsing.nums, exact=4).set_parse_action( lambda tokens: int(tokens[0], 10) ) _SEPARATOR = pyparsing.Suppress("|") # Using CharsNotIn is more efficient than SkipTo('|'). Optional is needed # since CharsNotIn does not support a minimum of 0. _SKIP_TO_SEPARATOR = pyparsing.Optional(pyparsing.CharsNotIn("|\n")) # Date and time values are formatted as: 2018-08-19T03:09:13.120Z _DATE_AND_TIME = ( _FOUR_DIGITS + pyparsing.Suppress("-") + _TWO_DIGITS + pyparsing.Suppress("-") + _TWO_DIGITS + pyparsing.Suppress("T") + _TWO_DIGITS + pyparsing.Suppress(":") + _TWO_DIGITS + pyparsing.Suppress(":") + _TWO_DIGITS + pyparsing.Suppress(".") + _THREE_DIGITS + pyparsing.Suppress("Z") ).set_results_name("date_time") _DATE_TIME_BLOCK = ( pyparsing.Suppress("[") + _DATE_AND_TIME + pyparsing.Suppress("]") ) _END_OF_LINE = pyparsing.Suppress(pyparsing.LineEnd()) _SANTAD_PREAMBLE = pyparsing.Suppress("I santad:") _APPEARANCE = pyparsing.Suppress("|appearance=") + _DATE_AND_TIME.set_results_name( "appearance" ) _ARGS = pyparsing.Suppress("|args=") + _SKIP_TO_SEPARATOR.set_results_name("args") _BSD_NAME = pyparsing.Suppress("|bsdname=") + _SKIP_TO_SEPARATOR.set_results_name( "bsd_name" ) _BUS = pyparsing.Suppress("|bus=") + _SKIP_TO_SEPARATOR.set_results_name("bus") _CERT_CN = pyparsing.Suppress("|cert_cn=") + _SKIP_TO_SEPARATOR.set_results_name( "cert_cn" ) _CERT_SHA256 = pyparsing.Suppress( "|cert_sha256=" ) + _SKIP_TO_SEPARATOR.set_results_name("cert_sha256") _DECISION = pyparsing.Suppress("|decision=") + _SKIP_TO_SEPARATOR.set_results_name( "decision" ) _DMG_PATH = pyparsing.Suppress("|dmgpath=") + _SKIP_TO_SEPARATOR.set_results_name( "dmg_path" ) _EXPLAIN = pyparsing.Suppress("|explain=") + _SKIP_TO_SEPARATOR.set_results_name( "explain" ) _FILE_SYSTEM = pyparsing.Suppress("|fs=") + _SKIP_TO_SEPARATOR.set_results_name( "fs" ) _GID = pyparsing.Suppress("|gid=") + _SKIP_TO_SEPARATOR.set_results_name("gid") _GROUP = pyparsing.Suppress("|group=") + _SKIP_TO_SEPARATOR.set_results_name( "group" ) _MODE = pyparsing.Suppress("|mode=") + _SKIP_TO_SEPARATOR.set_results_name("mode") _MODEL = pyparsing.Suppress("|model=") + _SKIP_TO_SEPARATOR.set_results_name( "model" ) _MOUNT = pyparsing.Suppress("|mount=") + _SKIP_TO_SEPARATOR.set_results_name( "mount" ) _NEW_PATH = pyparsing.Suppress("|newpath=") + _SKIP_TO_SEPARATOR.set_results_name( "newpath" ) _QUARANTINE_URL = pyparsing.Suppress( "|quarantine_url=" ) + _SKIP_TO_SEPARATOR.set_results_name("quarantine_url") _PATH = pyparsing.Suppress("|path=") + _SKIP_TO_SEPARATOR.set_results_name("path") _PID = pyparsing.Suppress("|pid=") + _SKIP_TO_SEPARATOR.set_results_name("pid") _PID_VERSION = pyparsing.Suppress( "|pidversion=" ) + _SKIP_TO_SEPARATOR.set_results_name("pidversion") _PPID = pyparsing.Suppress("|ppid=") + _SKIP_TO_SEPARATOR.set_results_name("ppid") _PROCESS = pyparsing.Suppress("|process=") + _SKIP_TO_SEPARATOR.set_results_name( "process" ) _PROCESS_PATH = pyparsing.Suppress( "|processpath=" ) + _SKIP_TO_SEPARATOR.set_results_name("processpath") _REASON = pyparsing.Suppress("|reason=") + _SKIP_TO_SEPARATOR.set_results_name( "reason" ) _SERIAL = pyparsing.Suppress("|serial=") + _SKIP_TO_SEPARATOR.set_results_name( "serial" ) _SHA256 = pyparsing.Suppress("|sha256=") + _SKIP_TO_SEPARATOR.set_results_name( "sha256" ) _UID = pyparsing.Suppress("|uid=") + _SKIP_TO_SEPARATOR.set_results_name("uid") _USER = pyparsing.Suppress("|user=") + _SKIP_TO_SEPARATOR.set_results_name("user") _VOLUME = pyparsing.Suppress("|volume=") + _SKIP_TO_SEPARATOR.set_results_name( "volume" ) _DISK_MOUNT_LINE = ( _DATE_TIME_BLOCK + _SANTAD_PREAMBLE + pyparsing.Suppress("action=") + pyparsing.Literal("DISKAPPEAR").set_results_name("action") + _MOUNT + _VOLUME + _BSD_NAME + _FILE_SYSTEM + _MODEL + _SERIAL + _BUS + _DMG_PATH + _APPEARANCE + _END_OF_LINE ) _DISK_UNMOUNT_LINE = ( _DATE_TIME_BLOCK + _SANTAD_PREAMBLE + pyparsing.Suppress("action=") + pyparsing.Literal("DISKDISAPPEAR").set_results_name("action") + _MOUNT + _VOLUME + _BSD_NAME + _END_OF_LINE ) _EXECUTION_LINE = ( _DATE_TIME_BLOCK + _SANTAD_PREAMBLE + pyparsing.Suppress("action=") + pyparsing.Literal("EXEC").set_results_name("action") + _DECISION + _REASON + pyparsing.Optional(_EXPLAIN) + _SHA256 + pyparsing.Optional(_CERT_SHA256) + pyparsing.Optional(_CERT_CN) + pyparsing.Optional(_QUARANTINE_URL) + _PID + pyparsing.Optional(_PID_VERSION) + _PPID + _UID + _USER + _GID + _GROUP + _MODE + _PATH + pyparsing.Optional(_ARGS) + _END_OF_LINE ) _FILE_OPERATION_LINE = ( _DATE_TIME_BLOCK + _SANTAD_PREAMBLE + pyparsing.Suppress("action=") + ( pyparsing.Literal("DELETE") ^ pyparsing.Literal("LINK") ^ pyparsing.Literal("RENAME") ^ pyparsing.Literal("WRITE") ).set_results_name("action") + _PATH + pyparsing.Optional(_NEW_PATH) + _PID + pyparsing.Optional(_PID_VERSION) + _PPID + _PROCESS + _PROCESS_PATH + _UID + _USER + _GID + _GROUP + _END_OF_LINE ) _PROCESS_EXIT_LINE = ( _DATE_TIME_BLOCK + _SANTAD_PREAMBLE + pyparsing.Suppress("action=") + pyparsing.Literal("EXIT").set_results_name("action") + _PID + _PID_VERSION + _PPID + _UID + _GID + _END_OF_LINE ) _QUOTA_EXCEEDED_LINE = ( _DATE_TIME_BLOCK + pyparsing.Literal( ( "*** LOG MESSAGE QUOTA EXCEEDED - SOME MESSAGES FROM THIS PROCESS " "HAVE BEEN DISCARDED ***" ) ) + _END_OF_LINE ) _LINE_STRUCTURES = [ ("execution_line", _EXECUTION_LINE), ("file_system_event_line", _FILE_OPERATION_LINE), ("mount_line", _DISK_MOUNT_LINE), ("process_exit_line", _PROCESS_EXIT_LINE), ("quota_exceeded_line", _QUOTA_EXCEEDED_LINE), ("unmount_line", _DISK_UNMOUNT_LINE), ] VERIFICATION_GRAMMAR = ( _DISK_MOUNT_LINE ^ _DISK_UNMOUNT_LINE ^ _EXECUTION_LINE ^ _FILE_OPERATION_LINE ^ _PROCESS_EXIT_LINE ) def _ParseRecord(self, parser_mediator, key, structure): """Parses a pyparsing structure. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. key (str): name of the parsed structure. structure (pyparsing.ParseResults): tokens from a parsed log line. Raises: ParseError: if the structure cannot be parsed. """ if key == "quota_exceeded_line": # skip this line return time_elements_structure = self._GetValueFromStructure(structure, "date_time") date_time = self._ParseTimeElements(time_elements_structure) if key == "execution_line": event_data = SantaExecutionEventData() event_data.action = self._GetValueFromStructure(structure, "action") event_data.certificate_common_name = self._GetValueFromStructure( structure, "cert_cn" ) event_data.certificate_hash = self._GetValueFromStructure( structure, "cert_sha256" ) event_data.decision = self._GetValueFromStructure(structure, "decision") event_data.gid = self._GetValueFromStructure(structure, "gid") event_data.group = self._GetValueFromStructure(structure, "group") event_data.last_run_time = date_time event_data.long_reason = self._GetValueFromStructure(structure, "explain") event_data.mode = self._GetValueFromStructure(structure, "mode") event_data.quarantine_url = self._GetValueFromStructure( structure, "quarantine_url" ) event_data.pid = self._GetValueFromStructure(structure, "pid") event_data.pid_version = self._GetValueFromStructure( structure, "pidversion" ) event_data.ppid = self._GetValueFromStructure(structure, "ppid") event_data.process_arguments = self._GetValueFromStructure( structure, "args" ) event_data.process_hash = self._GetValueFromStructure(structure, "sha256") event_data.process_path = self._GetValueFromStructure(structure, "path") event_data.reason = self._GetValueFromStructure(structure, "reason") event_data.uid = self._GetValueFromStructure(structure, "uid") event_data.user = self._GetValueFromStructure(structure, "user") if key == "process_exit_line": event_data = SantaProcessExitEventData() event_data.action = self._GetValueFromStructure(structure, "action") event_data.exit_time = date_time event_data.pid = self._GetValueFromStructure(structure, "pid") event_data.pid_version = self._GetValueFromStructure( structure, "pidversion" ) event_data.ppid = self._GetValueFromStructure(structure, "ppid") event_data.uid = self._GetValueFromStructure(structure, "uid") event_data.gid = self._GetValueFromStructure(structure, "gid") elif key == "file_system_event_line": event_data = SantaFileSystemEventData() event_data.action = self._GetValueFromStructure(structure, "action") event_data.file_new_path = self._GetValueFromStructure(structure, "newpath") event_data.file_path = self._GetValueFromStructure(structure, "path") event_data.gid = self._GetValueFromStructure(structure, "gid") event_data.group = self._GetValueFromStructure(structure, "group") event_data.last_written_time = date_time event_data.pid = self._GetValueFromStructure(structure, "pid") event_data.pid_version = self._GetValueFromStructure( structure, "pidversion" ) event_data.ppid = self._GetValueFromStructure(structure, "ppid") event_data.process = self._GetValueFromStructure(structure, "process") event_data.process_path = self._GetValueFromStructure( structure, "processpath" ) event_data.uid = self._GetValueFromStructure(structure, "uid") event_data.user = self._GetValueFromStructure(structure, "user") elif key == "unmount_line": event_data = SantaMountEventData() event_data.action = self._GetValueFromStructure(structure, "action") event_data.bsd_name = self._GetValueFromStructure(structure, "bsd_name") event_data.last_written_time = date_time event_data.mount = self._GetValueFromStructure(structure, "mount") or None event_data.volume = self._GetValueFromStructure(structure, "volume") elif key == "mount_line": event_data = SantaMountEventData() event_data.action = self._GetValueFromStructure(structure, "action") event_data.bsd_name = self._GetValueFromStructure(structure, "bsd_name") event_data.bus = self._GetValueFromStructure(structure, "bus") event_data.dmg_path = self._GetValueFromStructure(structure, "dmg_path") event_data.fs = self._GetValueFromStructure(structure, "fs") event_data.last_written_time = date_time event_data.model = self._GetValueFromStructure(structure, "model") event_data.mount = self._GetValueFromStructure(structure, "mount") or None event_data.serial = self._GetValueFromStructure(structure, "serial") or None event_data.volume = self._GetValueFromStructure(structure, "volume") appearance = self._GetValueFromStructure(structure, "appearance") if appearance: # pylint: disable=attribute-defined-outside-init event_data.appearance_time = self._ParseTimeElements(appearance) parser_mediator.ProduceEventData(event_data) def _ParseTimeElements(self, time_elements_structure): """Parses date and time elements of a log line. Args: time_elements_structure (pyparsing.ParseResults): date and time elements of a log line. Returns: dfdatetime.TimeElements: date and time value. Raises: ParseError: if a valid date and time value cannot be derived from the time elements. """ try: year, month, day_of_month, hours, minutes, seconds, milliseconds = ( time_elements_structure ) # Ensure time_elements_tuple is not a pyparsing.ParseResults otherwise # copy.deepcopy() of the dfDateTime object will fail on Python 3.8 with: # "TypeError: 'str' object is not callable" due to pyparsing.ParseResults # overriding __getattr__ with a function that returns an empty string # when named token does not exist. time_elements_tuple = ( year, month, day_of_month, hours, minutes, seconds, milliseconds, ) return dfdatetime_time_elements.TimeElementsInMilliseconds( time_elements_tuple=time_elements_tuple ) except (IndexError, TypeError, ValueError) as exception: raise errors.ParseError( f"Unable to parse time elements with error: {exception!s}" )
[docs] def CheckRequiredFormat(self, parser_mediator, text_reader): """Check if the log record has the minimal structure required by the plugin. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. text_reader (EncodedTextReader): text reader. Returns: bool: True if this is the correct plugin, False otherwise. """ try: structure = self._VerifyString(text_reader.lines) except errors.ParseError: return False time_elements_structure = self._GetValueFromStructure(structure, "date_time") try: self._ParseTimeElements(time_elements_structure) except errors.ParseError: return False return True
text_parser.TextLogParser.RegisterPlugin(SantaTextPlugin)