Source code for plaso.parsers.utmp

# -*- coding: utf-8 -*-
"""Parser for Linux utmp files."""

import os

from dfdatetime import posix_time as dfdatetime_posix_time

from plaso.containers import events
from plaso.lib import definitions
from plaso.lib import dtfabric_helper
from plaso.lib import errors
from plaso.parsers import interface
from plaso.parsers import manager


[docs] class UtmpEventData(events.EventData): """Linux libc6 utmp event data. Attributes: exit_status (int): exit status. hostname (str): hostname or IP address. ip_address (str): IP address from the connection. offset (int): offset of the utmp record relative to the start of the file, from which the event data was extracted. pid (int): process identifier (PID). terminal_identifier (int): inittab identifier. terminal (str): type of terminal. type (int): type of login. username (str): user name. written_time (dfdatetime.DateTimeValues): entry written date and time. """ DATA_TYPE = 'linux:utmp:event'
[docs] def __init__(self): """Initializes event data.""" super(UtmpEventData, self).__init__(data_type=self.DATA_TYPE) self.exit_status = None self.hostname = None self.ip_address = None self.offset = None self.pid = None self.terminal_identifier = None self.terminal = None self.type = None self.username = None self.written_time = None
[docs] class UtmpParser(interface.FileObjectParser, dtfabric_helper.DtFabricHelper): """Parser for Linux libc6 utmp files.""" NAME = 'utmp' DATA_FORMAT = 'Linux libc6 utmp file' _DEFINITION_FILE = os.path.join( os.path.dirname(__file__), 'utmp.yaml') _EMPTY_IP_ADDRESS = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) _SUPPORTED_TYPES = frozenset(range(0, 10)) _DEAD_PROCESS_TYPE = 8 def _ReadEntry(self, parser_mediator, file_object, file_offset): """Reads an utmp entry. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfvfs. file_object (dfvfs.FileIO): a file-like object. file_offset (int): offset of the data relative to the start of the file-like object. Returns: tuple: containing: UtmpEventData: event data of the utmp entry read. list[str]: warning messages emitted by the parser. Raises: ParseError: if the entry cannot be parsed. """ entry_map = self._GetDataTypeMap('linux_libc6_utmp_entry') warning_strings = [] try: entry, _ = self._ReadStructureFromFileObject( file_object, file_offset, entry_map) except (ValueError, errors.ParseError) as exception: raise errors.ParseError(( 'Unable to parse utmp entry at offset: 0x{0:08x} with error: ' '{1!s}.').format(file_offset, exception)) if entry.type not in self._SUPPORTED_TYPES: raise errors.ParseError('Unsupported type: {0:d}'.format(entry.type)) code_page = parser_mediator.GetCodePage() try: username = entry.username.split(b'\x00')[0] username = username.decode(code_page).rstrip() except UnicodeDecodeError: warning_strings.append('unable to decode username string') username = None try: terminal = entry.terminal.split(b'\x00')[0] terminal = terminal.decode(code_page).rstrip() except UnicodeDecodeError: warning_strings.append('unable to decode terminal string') terminal = None if terminal == '~': terminal = 'system boot' try: hostname = entry.hostname.split(b'\x00')[0] hostname = hostname.decode(code_page).rstrip() except UnicodeDecodeError: warning_strings.append('unable to decode hostname string') hostname = None if not hostname or hostname == ':0': hostname = 'localhost' if entry.ip_address[4:] == self._EMPTY_IP_ADDRESS[4:]: ip_address = self._FormatPackedIPv4Address(entry.ip_address[:4]) else: ip_address = self._FormatPackedIPv6Address(entry.ip_address) timestamp = entry.microseconds + ( entry.timestamp * definitions.MICROSECONDS_PER_SECOND) # TODO: add termination status. event_data = UtmpEventData() event_data.hostname = hostname event_data.exit_status = entry.exit_status event_data.ip_address = ip_address event_data.offset = file_offset event_data.pid = entry.pid event_data.terminal = terminal or None event_data.terminal_identifier = entry.terminal_identifier event_data.type = entry.type event_data.username = username or None event_data.written_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=timestamp) return event_data, warning_strings
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses an utmp file-like object. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfvfs. file_object (dfvfs.FileIO): a file-like object. Raises: WrongParser: when the file cannot be parsed. """ file_offset = 0 try: event_data, warning_strings = self._ReadEntry( parser_mediator, file_object, file_offset) except errors.ParseError as exception: raise errors.WrongParser( 'Unable to parse first utmp entry with error: {0!s}'.format( exception)) if not event_data.written_time: raise errors.WrongParser( 'Unable to parse first utmp entry with error: missing written time') if not event_data.username and event_data.type != self._DEAD_PROCESS_TYPE: raise errors.WrongParser( 'Unable to parse first utmp entry with error: missing username') if warning_strings: all_warnings = ', '.join(warning_strings) raise errors.WrongParser( 'Unable to parse first utmp entry with error: {0:s}'.format( all_warnings)) parser_mediator.ProduceEventData(event_data) file_offset = file_object.tell() file_size = file_object.get_size() while file_offset < file_size: if parser_mediator.abort: break try: event_data, warning_strings = self._ReadEntry( parser_mediator, file_object, file_offset) except errors.ParseError: # Note that the utmp file can contain trailing data. break parser_mediator.ProduceEventData(event_data) for warning_string in warning_strings: parser_mediator.ProduceExtractionWarning(warning_string) file_offset = file_object.tell()
manager.ParsersManager.RegisterParser(UtmpParser)