Source code for plaso.parsers.winjob

"""Parser for Windows Scheduled Task job files."""

import os

from dfdatetime import definitions as dfdatetime_definitions
from dfdatetime import systemtime as dfdatetime_systemtime
from dfdatetime import time_elements as dfdatetime_time_elements

from plaso.containers import events
from plaso.lib import dtfabric_helper
from plaso.lib import errors
from plaso.parsers import interface
from plaso.parsers import manager


[docs] class WinJobEventData(events.EventData): """Windows Scheduled Task event data. Attributes: application (str): path to job executable. comment (str): description of the scheduled task. last_run_time (dfdatetime.DateTimeValues): executable (binary) last run date and time. parameters (str): application command line parameters. username (str): username that scheduled the task. working_directory (str): working directory of the scheduled task. """ DATA_TYPE = "windows:tasks:job"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.application = None self.comment = None self.last_run_time = None self.parameters = None self.username = None self.working_directory = None
[docs] class WinJobTriggerEventData(events.EventData): """Windows Scheduled Task trigger event data. Attributes: application (str): path to job executable. comment (str): description of the scheduled task. end_time (dfdatetime.DateTimeValues): date and time the end of the trigger. parameters (str): application command line parameters. start_time (dfdatetime.DateTimeValues): date and time the start of the trigger. trigger_type (int): trigger type. username (str): username that scheduled the task. working_directory (str): working directory of the scheduled task. """ DATA_TYPE = "windows:tasks:trigger"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.application = None self.comment = None self.end_time = None self.parameters = None self.start_time = None self.trigger_type = None self.username = None self.working_directory = None
[docs] class WinJobParser(interface.FileObjectParser, dtfabric_helper.DtFabricHelper): """Parse Windows Scheduled Task files for job events.""" NAME = "winjob" DATA_FORMAT = "Windows Scheduled Task job (or at-job) file" _DEFINITION_FILE = os.path.join(os.path.dirname(__file__), "winjob.yaml") _EMPTY_SYSTEM_TIME_TUPLE = (0, 0, 0, 0, 0, 0, 0, 0) _PRODUCT_VERSIONS = { 0x0400: "Windows NT 4.0", 0x0500: "Windows 2000", 0x0501: "Windows XP", 0x0600: "Windows Vista", 0x0601: "Windows 7", 0x0602: "Windows 8", 0x0603: "Windows 8.1", 0x0A00: "Windows 10", } def _ParseEventData(self, variable_length_section): """Parses the event data form a variable-length data section. Args: variable_length_section (job_variable_length_data_section): a Windows Scheduled Task job variable-length data section. Returns: WinJobEventData: event data of the job file. """ event_data = WinJobEventData() event_data.application = variable_length_section.application_name or None event_data.comment = variable_length_section.comment or None event_data.parameters = variable_length_section.parameters or None event_data.username = variable_length_section.author or None event_data.working_directory = variable_length_section.working_directory or None return event_data def _ParseLastRunTime(self, parser_mediator, fixed_length_section): """Parses the last run time from a fixed-length data section. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. fixed_length_section (job_fixed_length_data_section): a Windows Scheduled Task job fixed-length data section. Returns: dfdatetime.DateTimeValues: last run date and time or None if not available. """ systemtime_struct = fixed_length_section.last_run_time system_time_tuple = ( systemtime_struct.year, systemtime_struct.month, systemtime_struct.weekday, systemtime_struct.day_of_month, systemtime_struct.hours, systemtime_struct.minutes, systemtime_struct.seconds, systemtime_struct.milliseconds, ) date_time = None if system_time_tuple != self._EMPTY_SYSTEM_TIME_TUPLE: try: date_time = dfdatetime_systemtime.Systemtime( system_time_tuple=system_time_tuple ) except ValueError: parser_mediator.ProduceExtractionWarning( f"invalid last run time: {system_time_tuple!s}" ) return date_time def _ParseTriggerEventData(self, variable_length_section): """Parses the trigger event data form a variable-length data section. Args: variable_length_section (job_variable_length_data_section): a Windows Scheduled Task job variable-length data section. Returns: WinJobTriggerEventData: event data of the job file. """ event_data = WinJobTriggerEventData() event_data.application = variable_length_section.application_name or None event_data.comment = variable_length_section.comment or None event_data.parameters = variable_length_section.parameters or None event_data.username = variable_length_section.author or None event_data.working_directory = variable_length_section.working_directory or None return event_data def _ParseTriggerEndTime(self, parser_mediator, trigger): """Parses the end time from a trigger. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. trigger (job_trigger): a trigger. Returns: dfdatetime.DateTimeValues: last run date and time or None if not available. """ time_elements_tuple = ( trigger.end_date.year, trigger.end_date.month, trigger.end_date.day_of_month, 0, 0, 0, ) date_time = None if time_elements_tuple != (0, 0, 0, 0, 0, 0): try: date_time = dfdatetime_time_elements.TimeElements( precision=dfdatetime_definitions.PRECISION_1_DAY, time_elements_tuple=time_elements_tuple, ) date_time.is_local_time = True except ValueError: parser_mediator.ProduceExtractionWarning( f"invalid trigger end time: {time_elements_tuple!s}" ) return date_time def _ParseTriggerStartTime(self, parser_mediator, trigger): """Parses the start time from a trigger. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. trigger (job_trigger): a trigger. Returns: dfdatetime.DateTimeValues: last run date and time or None if not available. """ time_elements_tuple = ( trigger.start_date.year, trigger.start_date.month, trigger.start_date.day_of_month, trigger.start_time.hours, trigger.start_time.minutes, 0, ) date_time = None if time_elements_tuple != (0, 0, 0, 0, 0, 0): try: date_time = dfdatetime_time_elements.TimeElements( precision=dfdatetime_definitions.PRECISION_1_MINUTE, time_elements_tuple=time_elements_tuple, ) date_time.is_local_time = True except ValueError: parser_mediator.ProduceExtractionWarning( f"invalid trigger start time: {time_elements_tuple!s}" ) return date_time
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses a Windows job file-like object. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_object (dfvfs.FileIO): a file-like object. Raises: WrongParser: when the file cannot be parsed. """ fixed_section_data_map = self._GetDataTypeMap("job_fixed_length_data_section") try: fixed_length_section, file_offset = self._ReadStructureFromFileObject( file_object, 0, fixed_section_data_map ) except (ValueError, errors.ParseError) as exception: raise errors.WrongParser( f"Unable to parse fixed-length data section with error: " f"{exception!s}" ) if not fixed_length_section.product_version in self._PRODUCT_VERSIONS: raise errors.WrongParser( f"Unsupported product version in: " f"0x{fixed_length_section.product_version:04x}" ) if not fixed_length_section.format_version == 1: raise errors.WrongParser( f"Unsupported format version in: " f"{fixed_length_section.format_version!s}" ) variable_section_data_map = self._GetDataTypeMap( "job_variable_length_data_section" ) try: variable_length_section, data_size = self._ReadStructureFromFileObject( file_object, file_offset, variable_section_data_map ) except (ValueError, errors.ParseError) as exception: raise errors.WrongParser( f"Unable to parse variable-length data section with error: " f"{exception!s}" ) file_offset += data_size event_data = self._ParseEventData(variable_length_section) event_data.last_run_time = self._ParseLastRunTime( parser_mediator, fixed_length_section ) parser_mediator.ProduceEventData(event_data) trigger_data_map = self._GetDataTypeMap("job_trigger") for trigger_index in range(variable_length_section.number_of_triggers): try: trigger, data_size = self._ReadStructureFromFileObject( file_object, file_offset, trigger_data_map ) except (ValueError, errors.ParseError) as exception: parser_mediator.ProduceExtractionWarning( f"unable to parse trigger: {trigger_index:d} with " f"error: {exception!s}" ) break file_offset += data_size event_data = self._ParseTriggerEventData(variable_length_section) event_data.end_time = self._ParseTriggerEndTime(parser_mediator, trigger) event_data.start_time = self._ParseTriggerStartTime( parser_mediator, trigger ) event_data.trigger_type = trigger.trigger_type parser_mediator.ProduceEventData(event_data)
manager.ParsersManager.RegisterParser(WinJobParser)