Source code for plaso.parsers.wincc

# -*- coding: utf-8 -*-
"""Text parser plugin for WinCC log files."""

from dfvfs.helpers import text_file

from dfdatetime import time_elements as dfdatetime_time_elements

from plaso.containers import events
from plaso.lib import errors
from plaso.parsers import interface
from plaso.parsers import manager

[docs] class SIMATICS7EventData(events.EventData): """SIMATIC S7 event data. Attributes: body (str): the message content of the event. creation_time (dfdatetime.DateTimeValues): date and time the log entry was created. """ DATA_TYPE = 'wincc:simatic_s7:entry'
[docs] def __init__(self): """Initializes event data.""" super(SIMATICS7EventData, self).__init__(data_type=self.DATA_TYPE) self.body = None self.creation_time = None
[docs] class WinCCSysLogEventData(events.EventData): """WinCC Sys Log event data. Attributes: body (str): the content of the log's message. creation_time (dfdatetime.DateTimeValues): date and time the log entry was created. event_number (int): a number specifying the type of event. log_hostname (str): the hostname of the machine logging the event. log_identifier (int): identifier for this log file. source_device (str): which device generated the event. """ DATA_TYPE = 'wincc:sys_log:entry'
[docs] def __init__(self): """Initializes event data.""" super(WinCCSysLogEventData, self).__init__(data_type=self.DATA_TYPE) self.body = None self.creation_time = None self.event_number = None self.log_hostname = None self.log_identifier = None self.source_device = None
[docs] class SIMATICLogParser(interface.FileObjectParser): """Text parser plugin for SIMATIC S7 Log files.""" NAME = 'simatic_s7' DATA_FORMAT = 'SIMATIC S7 Log file' DELIMITER = ',' ENCODING = 'ascii' END_OF_LINE = '\r\n' _EXPECTED_FIRST_LINE_STRING = 'Log starting ...' + END_OF_LINE _EXPECTED_SECOND_LINE_STRING = '| LogFileName' _EXPECTED_THIRD_LINE_STRING = '| LogFileCount' def _ParseValues(self, parser_mediator, line_number, values): """Parses SIMATIC S7 log file values. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. line_number (int): number of the line the values were extracted from. values (list[str]): values extracted from the line. Raises: WrongParser: when the values cannot be parsed. """ number_of_values = len(values) if number_of_values < 2: error_string = 'Expected at least two values on line {0:d}'.format( line_number) raise errors.WrongParser(error_string) # The first lines seem to always follow the following format: # # 2019-05-27 10:05:43,405 INFO Log starting ... # 2019-05-27 10:05:43,405 INFO | LogFileName : C:\..... # 2019-05-27 10:05:43,419 INFO | LogFileCount : 3 if line_number == 0: if not values[1].endswith(self._EXPECTED_FIRST_LINE_STRING): error_string = 'Expected first line to end with "{0:s}"."{1!s}"'.format( self._EXPECTED_FIRST_LINE_STRING, values[1]) raise errors.WrongParser(error_string) if line_number == 1: if values[1].find(self._EXPECTED_SECOND_LINE_STRING) < 0: error_string = 'Expected second line to contain "{0:s}"'.format( self._EXPECTED_SECOND_LINE_STRING) raise errors.WrongParser(error_string) if line_number == 2: if values[1].find(self._EXPECTED_THIRD_LINE_STRING) < 0: error_string = 'Expected third line to contain {0:s}'.format( self._EXPECTED_THIRD_LINE_STRING) raise errors.WrongParser(error_string) event_data = SIMATICS7EventData() try: date_time = dfdatetime_time_elements.TimeElements() date_time.CopyFromDateTimeString(values[0]) event_data.creation_time = date_time except (TypeError, ValueError) as exception: error_string = ( 'Unable to parse date time value with error: ' '{0!s} on line {1:d} {2!s}').format(exception, line_number, values) parser_mediator.ProduceExtractionWarning(error_string) event_data.body = ','.join(values[1:]).strip() parser_mediator.ProduceEventData(event_data)
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses a SIMATIC Log file-like object. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_object (dfvfs.FileIO): file-like object. Raises: WrongParser: when the file cannot be parsed. """ line_reader = text_file.TextFile( file_object, encoding=self.ENCODING, end_of_line=self.END_OF_LINE) line_number = 0 try: line = line_reader.readline() except UnicodeDecodeError as exception: raise errors.WrongParser( 'unable to read line: {0:d} with error: {1!s}'.format( line_number, exception)) while line: values = line.split(self.DELIMITER) self._ParseValues(parser_mediator, line_number, values) try: line_number += 1 line = line_reader.readline() except UnicodeDecodeError as exception: parser_mediator.ProduceExtractionWarning( 'unable to read line: {0:d} with error: {1!s}'.format( line_number, exception)) break
[docs] class WinCCSysLogParser(interface.FileObjectParser): """Text parser plugin for WinCC Sys Log files.""" NAME = 'wincc_sys' DATA_FORMAT = 'WinCC Sys Log file' DELIMITER = ',' ENCODING = 'utf-16-le' _DISALLOWED_HOSTNAME_CHARS = ['\\', '/', ':', '*', '?', '"', '<', '>', '|'] _END_OF_LOG_FILE_STRING = '======>' def _ParseValues(self, parser_mediator, line_number, values, first_line): """Parses WinCC log file values. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. line_number (int): number of the line the values were extracted from. values (list[str]): values extracted from the line. first_line (bool): True if this is first line from which values were extracted. Raises: WrongParser: when the values cannot be parsed. """ number_of_values = len(values) if len(values) < 10: error_string = 'invalid number of values : {0:d} in line: {1:d}'.format( number_of_values, line_number) # On other lines, we might encounter times where the split() operation # produces more values. if first_line: raise errors.WrongParser(error_string) parser_mediator.ProduceExtractionWarning(error_string) event_data = WinCCSysLogEventData() try: log_identifier = int(values[0], 10) event_data.log_identifier = log_identifier except ValueError as exception: error_string = ( 'Type of first value ({0!s}) should be an int in line: {1:d}').format( values[0], line_number) self._ParseValuesFail( parser_mediator, first_line, exception, error_string) try: date_string = values[1] time_string = values[2] day_of_month, month, year = [ int(element, 10) for element in date_string.split('.')] hours, minutes, seconds, milliseconds = [ int(element, 10) for element in time_string.split(':')] time_elements_tuple = ( year, month, day_of_month, hours, minutes, seconds, milliseconds) date_time = dfdatetime_time_elements.TimeElementsInMilliseconds( time_elements_tuple=time_elements_tuple, ) # Unsure about whether the format is dependant on the system's Locale date_time.is_local_time = True event_data.creation_time = date_time except (TypeError, ValueError) as exception: error_string = ( 'Unable to parse time elements with error: ' '{0!s} on line {1:d} {2!s}').format( exception, line_number, values[1:2]) self._ParseValuesFail( parser_mediator, first_line, exception, error_string) try: event_data.event_number = int(values[3], 10) except ValueError as exception: error_string = ( 'Type of event_number value ({0!s}) should be a decimal in line:' '{1:d}').format(values[3], line_number) self._ParseValuesFail( parser_mediator, first_line, exception, error_string) # Column 4 and 5 contain unknown values, we are not parsing them. # We are checking this only once, on the first line, as we expect all # following lines to contain the same hostname, as the logs are # collected from the system generating it. # Using this documentation to validate a Windows host name. # hostname = values[6] if first_line: if not hostname: error_string = ( 'Hostname ({0!s}) needs to be at least 1 character on line:' '{1:d}').format(hostname, line_number) self._ParseValuesFail( parser_mediator, first_line, exception, error_string) if len(hostname) > 16: error_string = ( 'Hostname ({0!s}) can\'t be longer than 15 characters on line:' '{1:d}').format(hostname, line_number) self._ParseValuesFail( parser_mediator, first_line, exception, error_string) for character in self._DISALLOWED_HOSTNAME_CHARS: if character in hostname: error_string = ( 'Hostname ({0!s}) can\'t contain the character {1:s} on line' '{2:d}').format(hostname, character, line_number) self._ParseValuesFail( parser_mediator, first_line, exception, error_string) event_data.log_hostname = hostname event_data.source_device = values[7] # The second to last field is the one that might contain unquoted separator. # The last field can contain a string such as MSG_STATE_COME, MSG_STATE_GO, # but also sometimes contains a message string. text_message = ' '.join(values[8:]) event_data.body = text_message parser_mediator.ProduceEventData(event_data) def _ParseValuesFail( self, parser_mediator, first_line, parent_exception, error_string): if first_line: raise errors.WrongParser(error_string) from parent_exception parser_mediator.ProduceExtractionWarning(error_string)
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses a WinCC Sys Log file-like object. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_object (dfvfs.FileIO): file-like object. Raises: WrongParser: when the file cannot be parsed. """ # Note that we cannot use the DSVParser here since this WinCC file format is # not strict and clean file format. line_reader = text_file.TextFile( file_object, encoding=self.ENCODING, end_of_line='\r\n') line_number = 0 first_line = True # While the file is in UTF-16, the end_of_line character is '\r\n'. # We manually strip this out. try: line = line_reader.readline().strip() except UnicodeDecodeError as exception: raise errors.WrongParser( 'unable to read line: {0:d} with error: {1!s}'.format( line_number, exception)) while line: if line.startswith(self._END_OF_LOG_FILE_STRING): # It seems that sometimes WinCC logs will end with a line starting like # this, with the name of the next log file. # But this is not always the case though. line = line_reader.readline().strip() continue values = line.split(self.DELIMITER) self._ParseValues(parser_mediator, line_number, values, first_line) try: line_number += 1 first_line = False line = line_reader.readline().strip() except UnicodeDecodeError as exception: parser_mediator.ProduceExtractionWarning( 'unable to read line: {0:d} with error: {1!s}'.format( line_number, exception)) break
manager.ParsersManager.RegisterParsers([WinCCSysLogParser, SIMATICLogParser])