Source code for plaso.preprocessors.linux

# -*- coding: utf-8 -*-
"""Linux preprocessor plugins."""

import csv
import datetime

from dateutil import tz

from dfvfs.helpers import text_file as dfvfs_text_file

from plaso.containers import artifacts
from plaso.lib import errors
from plaso.lib import line_reader_file
from plaso.preprocessors import interface
from plaso.preprocessors import manager


[docs] class LinuxHostnamePlugin(interface.FileArtifactPreprocessorPlugin): """The Linux hostname plugin.""" ARTIFACT_DEFINITION_NAME = 'LinuxHostnameFile' def _ParseFileData(self, mediator, file_object): """Parses file content (data) for a hostname preprocessing attribute. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_object (dfvfs.FileIO): file-like object that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ text_file_object = dfvfs_text_file.TextFile(file_object, encoding='utf-8') hostname = text_file_object.readline() hostname = hostname.strip() if hostname: hostname_artifact = artifacts.HostnameArtifact(name=hostname) mediator.AddHostname(hostname_artifact)
[docs] class LinuxDistributionPlugin(interface.FileArtifactPreprocessorPlugin): """The Linux distribution plugin.""" ARTIFACT_DEFINITION_NAME = 'LinuxDistributionRelease' def _ParseFileData(self, mediator, file_object): """Parses file content (data) for system product preprocessing attribute. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_object (dfvfs.FileIO): file-like object that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ text_file_object = dfvfs_text_file.TextFile(file_object, encoding='utf-8') system_product = text_file_object.readline() system_product = system_product.strip() if system_product: mediator.SetValue('operating_system_product', system_product)
[docs] class LinuxIssueFilePlugin(interface.FileArtifactPreprocessorPlugin): """The Linux issue file plugin.""" ARTIFACT_DEFINITION_NAME = 'LinuxIssueFile' def _ParseFileData(self, mediator, file_object): """Parses file content (data) for system product preprocessing attribute. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_object (dfvfs.FileIO): file-like object that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ text_file_object = dfvfs_text_file.TextFile(file_object, encoding='utf-8') system_product = text_file_object.readline() # Only parse known default /etc/issue file contents. if system_product.startswith('Debian GNU/Linux '): system_product, _, _ = system_product.partition('\\') system_product = system_product.rstrip() else: system_product = None if system_product: mediator.SetValue('operating_system_product', system_product)
[docs] class LinuxStandardBaseReleasePlugin(interface.FileArtifactPreprocessorPlugin): """The Linux standard base (LSB) release plugin.""" ARTIFACT_DEFINITION_NAME = 'LinuxLSBRelease' def _ParseFileData(self, mediator, file_object): """Parses file content (data) for system product preprocessing attribute. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_object (dfvfs.FileIO): file-like object that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ text_file_object = dfvfs_text_file.TextFile(file_object, encoding='utf-8') product_values = {} for line in text_file_object.readlines(): line = line.strip() if line.startswith('#'): continue key, value = line.split('=') key = key.strip().upper() value = value.strip().strip('"') product_values[key] = value system_product = product_values.get('DISTRIB_DESCRIPTION', None) if system_product: mediator.SetValue('operating_system_product', system_product)
[docs] class LinuxSystemdOperatingSystemPlugin( interface.FileArtifactPreprocessorPlugin): """The Linux systemd operating system release plugin.""" ARTIFACT_DEFINITION_NAME = 'LinuxSystemdOSRelease' def _ParseFileData(self, mediator, file_object): """Parses file content (data) for system product preprocessing attribute. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_object (dfvfs.FileIO): file-like object that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ text_file_object = dfvfs_text_file.TextFile(file_object, encoding='utf-8') product_values = {} for line in text_file_object.readlines(): line = line.strip() # Ignore lines that do not define a key value pair. if '=' not in line: continue key, value = line.split('=') key = key.upper() value = value.strip('"') product_values[key] = value system_product = product_values.get('PRETTY_NAME', None) if system_product: mediator.SetValue('operating_system_product', system_product)
[docs] class LinuxTimeZonePlugin(interface.FileEntryArtifactPreprocessorPlugin): """Linux time zone plugin.""" ARTIFACT_DEFINITION_NAME = 'LinuxLocalTime' def _ParseFileEntry(self, mediator, file_entry): """Parses artifact file system data for a preprocessing attribute. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_entry (dfvfs.FileEntry): file entry that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ if file_entry.link: # Determine the timezone based on the file path. _, _, time_zone = file_entry.link.partition('zoneinfo/') else: # Determine the timezone based on the timezone information file. file_object = file_entry.GetFileObject() time_zone = None try: time_zone_file = tz.tzfile(file_object) date_time = datetime.datetime(2017, 1, 1) time_zone = time_zone_file.tzname(date_time) except ValueError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, 'Unable to read time zone information file.') if time_zone: try: mediator.SetTimeZone(time_zone) except ValueError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, 'Unable to set time zone in knowledge base.')
[docs] class LinuxUserAccountsPlugin(interface.FileArtifactPreprocessorPlugin): """The Linux user accounts plugin.""" ARTIFACT_DEFINITION_NAME = 'LinuxPasswdFile' def _ParseFileData(self, mediator, file_object): """Parses file content (data) for user account preprocessing attributes. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_object (dfvfs.FileIO): file-like object that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ line_reader = line_reader_file.BinaryLineReader(file_object) try: reader = line_reader_file.BinaryDSVReader(line_reader, b':') except csv.Error as exception: raise errors.PreProcessFail(( f'Unable to read: {self.ARTIFACT_DEFINITION_NAME:s} with error: ' f'{exception!s}')) for line_number, row in enumerate(reader): if len(row) < 7 or not row[0] or not row[2]: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, f'Unsupported number of values in line: {line_number:d}.') continue try: username = row[0].decode('utf-8') except UnicodeDecodeError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, 'Unable to decode username.') continue try: identifier = row[2].decode('utf-8') except UnicodeDecodeError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, 'Unable to decode user identifier.') continue group_identifier = None if row[3]: try: group_identifier = row[3].decode('utf-8') except UnicodeDecodeError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, 'Unable to decode group identifier.') full_name = None if row[4]: try: full_name = row[4].decode('utf-8') except UnicodeDecodeError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, 'Unable to decode full name.') user_directory = None if row[5]: try: user_directory = row[5].decode('utf-8') except UnicodeDecodeError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, 'Unable to decode user directory.') shell = None if row[6]: try: shell = row[6].decode('utf-8') except UnicodeDecodeError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, 'Unable to decode shell.') user_account = artifacts.UserAccountArtifact( identifier=identifier, username=username) user_account.group_identifier = group_identifier user_account.full_name = full_name user_account.user_directory = user_directory user_account.shell = shell try: mediator.AddUserAccount(user_account) except KeyError as exception: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, f'Unable to add user account with error: {exception!s}')
manager.PreprocessPluginsManager.RegisterPlugins([ LinuxHostnamePlugin, LinuxDistributionPlugin, LinuxIssueFilePlugin, LinuxStandardBaseReleasePlugin, LinuxSystemdOperatingSystemPlugin, LinuxTimeZonePlugin, LinuxUserAccountsPlugin])