Source code for plaso.parsers.ivanti_vc0

"""Parser for Ivanti Connect Secure (.vc0) log files."""

import ipaddress
import re
import os

from dfdatetime import posix_time as dfdatetime_posix_time

from plaso.containers import events
from plaso.lib import errors
from plaso.parsers import interface
from plaso.parsers import manager


[docs] class IvantiVC0EventData(events.EventData): """Ivanti Connect Secure (.vc0) log record. Attributes: authentication_realm (str): authentication realm. hostname (str): appliance hostname. ip_address (str): IP address found in the record values. line_number (str): line number. log_type (str): log family, "admin", "access", "diagnosticlog", "events", "policytrace" or "sensorslog". message_code (str): Ivanti message code. message_body (str): message body. recorded_time (dfdatetime.DateTimeValues): record timestamp. record_identifier (str): original record identifier in the format: "{timestamp:08x}.{line_number:08x}". username (str): username found in the record values. """ DATA_TYPE = "ivanti:connect_secure:vc0:record"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.authentication_realm = None self.hostname = None self.ip_address = None self.line_number = None self.log_type = None self.message_code = None self.message_body = None self.recorded_time = None self.record_identifier = None self.username = None
[docs] class VC0FileEntryFilter(interface.BaseFileEntryFilter): """File entry filter for Ivanti Connect Secure (.vc0) log files.""" _FILENAME_RE = re.compile( ( r"^.*[.](access|admin|diagnosticlog|events|policytrace|sensorslog)" r"[.]vc0(?:[.]old)?$" ), flags=re.IGNORECASE, )
[docs] def Match(self, file_entry): """Determines if a file entry is an Ivanti .vc0 log file. Args: file_entry (dfvfs.FileEntry): file entry. Returns: bool: True if the file entry matches. """ if not file_entry: return False return bool(self._FILENAME_RE.match(file_entry.name))
[docs] class IvantiVC0Parser(interface.FileObjectParser): """Parser for Ivanti Connect Secure .vc0 log files.""" NAME = "ivanti_vc0" DATA_FORMAT = "Ivanti Connect Secure (.vc0) log file" FILTERS = frozenset([VC0FileEntryFilter()]) _CHUNK_SIZE = 1024 * 1024 _HEADER_SIZE = 8192 _HEADER_SIGNATURE = b"\x05\x00\x00\x00\x01\x00\x00\x00" _MAXIMUM_BODY_VALUES = 8 _LOG_FILENAME_RE = re.compile(r"^.*[.](?P<log_type>.+?)[.]vc0(?:[.]old)?$") _MESSAGE_CODE_RE = re.compile(r"^[A-Z]{3}\d{5}$") _RECORD_SEPARATOR_RE = re.compile(b"\\x05") def _CheckHeader(self, file_object, file_size): """Checks if the file-like object contains a .vc0 header. Args: file_object (dfvfs.FileIO): a file-like object. file_size (int): file size. Returns: bool: True if a file-like object contains .vc0 header. """ if file_size < self._HEADER_SIZE: return False file_object.seek(0, os.SEEK_SET) header_data = file_object.read(self._HEADER_SIZE) return header_data[0:8] == self._HEADER_SIGNATURE and not any(header_data[8:]) def _CreateMessageBody( self, record_values, authentication_realm, ip_address, username ): """Builds the message body. Args: record_values (list[str]): values after the context field. authentication_realm (str): authentication realm. ip_address (str): IP address found in the record values. username (str): username found in the record values. Returns: str: message body or None if not available. """ body_values = [ value.strip() for value in record_values if value and value.strip() ] leading_metadata_values = { value for value in (ip_address, authentication_realm, username) if value } while body_values and body_values[0] in leading_metadata_values: body_values.pop(0) if not body_values: return None number_of_extra_values = len(body_values) - self._MAXIMUM_BODY_VALUES if number_of_extra_values > 0: body_values = body_values[: self._MAXIMUM_BODY_VALUES] body_values.append(f"... ({number_of_extra_values:d} more fields)") return " | ".join(body_values) def _GetLogType(self, filename): """Determines the log type from the filename. Args: filename (str): file name. Returns: str: log type or None if not available. """ if filename: match = self._LOG_FILENAME_RE.match(filename) if match: return match.group("log_type").lower() return None def _ExtractIPAddress(self, value): """Extracts an IP address from a value. Args: value (str): field value. Returns: str: IP address or None if not available. """ try: ip_address = ipaddress.ip_address(value.strip()) except ValueError: return None return str(ip_address) def _ExtractMessageCode(self, fields): """Extracts an Ivanti message code from record fields. Args: fields (list[str]): record fields. Returns: str: message code or None if not available. """ if len(fields) > 2 and self._MESSAGE_CODE_RE.match(fields[2].strip()): return fields[2].strip() for field in fields: field = field.strip() if self._MESSAGE_CODE_RE.match(field): return field return None def _ReadRecords(self, file_object, file_size): """Reads records. Args: file_object (dfvfs.FileIO): a file-like object. file_size (int): file size. Yields: tuple: containing: bytes: record data. int: offset of the record data relative to the start of the file. """ file_data = b"" record_offset = self._HEADER_SIZE file_object.seek(record_offset, os.SEEK_SET) while record_offset < file_size: data = file_object.read(self._CHUNK_SIZE) if not data: break file_data = b"".join([file_data, data]) records = self._RECORD_SEPARATOR_RE.split(file_data) for record_data in records[:-1]: if record_data: yield record_data, record_offset record_offset += len(record_data) + 1 file_data = records[-1] if file_data: yield file_data, record_offset def _ParseRecord(self, parser_mediator, log_type, record_data, record_offset): """Parses a record. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. log_type (str): log type. record_data (bytes): record data. record_offset (int): offset of the record data relative to the start of the file. """ try: record = record_data.decode("utf-8") except UnicodeDecodeError: parser_mediator.ProduceExtractionWarning( f"unable to decode record at offset: 0x{record_offset:08x} as UTF-8. " f"Unsupported code points are escaped." ) record = record_data.decode("utf-8", errors="backslashreplace") fields = record.split("\t") number_of_fields = len(fields) if number_of_fields < 3: parser_mediator.ProduceExtractionWarning( f"Invalid record at offset: 0x{record_offset:08x} - unsupported number " f"of fields: {number_of_fields:d}" ) return record_identifier = fields[0] if "." not in record_identifier: parser_mediator.ProduceExtractionWarning( f"Invalid record at offset: 0x{record_offset:08x} - record identifier: " f"{record_identifier:s} missing '.'" ) return timestamp_string, _, line_number_string = record_identifier.partition(".") if not timestamp_string: parser_mediator.ProduceExtractionWarning( f"Invalid record at offset: 0x{record_offset:08x} - record identifier: " f"{record_identifier:s} missing timestamp" ) return if not line_number_string: parser_mediator.ProduceExtractionWarning( f"Invalid record at offset: 0x{record_offset:08x} - record identifier: " f"{record_identifier:s} missing line number" ) return try: timestamp = int(timestamp_string, 16) except ValueError: parser_mediator.ProduceExtractionWarning( f"Invalid record at offset: 0x{record_offset:08x} - record identifier: " f"{record_identifier:s} unsupported timestamp" ) return try: line_number = int(line_number_string, 16) except ValueError: parser_mediator.ProduceExtractionWarning( f"Invalid record at offset: 0x{record_offset:08x} - record identifier: " f"{record_identifier:s} unsupported line number" ) return message_code = self._ExtractMessageCode(fields) if not message_code: parser_mediator.ProduceExtractionWarning( f"Invalid record at offset: 0x{record_offset:08x} - unable to extract " f"message code" ) event_data = IvantiVC0EventData() event_data.hostname = fields[1] or None event_data.line_number = line_number event_data.log_type = log_type event_data.message_code = message_code event_data.recorded_time = dfdatetime_posix_time.PosixTime(timestamp=timestamp) event_data.record_identifier = record_identifier if number_of_fields > 4: event_data.authentication_realm = fields[4] or None if number_of_fields > 5: event_data.ip_address = self._ExtractIPAddress(fields[5]) if number_of_fields > 6: event_data.username = fields[6] or None if number_of_fields > 5: event_data.message_body = self._CreateMessageBody( fields[5:], event_data.authentication_realm, event_data.ip_address, event_data.username, ) parser_mediator.ProduceEventData(event_data)
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses an Ivanti Connect Secure (.vc0) log file-like object. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_object (dfvfs.FileIO): a file-like object. Raises: WrongParser: when the file cannot be parsed. """ file_size = file_object.get_size() if not self._CheckHeader(file_object, file_size): raise errors.WrongParser("Not an Ivanti Connect Secure (.vc0) log file.") if file_size > self._HEADER_SIZE: filename = parser_mediator.GetFilename() log_type = self._GetLogType(filename) for record_data, record_offset in self._ReadRecords(file_object, file_size): if parser_mediator.abort: break self._ParseRecord(parser_mediator, log_type, record_data, record_offset)
manager.ParsersManager.RegisterParser(IvantiVC0Parser)