Source code for plaso.preprocessors.linux

"""Linux preprocessor plugins."""

import csv
import datetime

from dateutil import tz

from dfvfs.helpers import text_file as dfvfs_text_file

from plaso.containers import artifacts
from plaso.lib import errors
from plaso.lib import line_reader_file
from plaso.preprocessors import interface
from plaso.preprocessors import manager


[docs] class LinuxHostnamePlugin(interface.FileArtifactPreprocessorPlugin): """The Linux hostname plugin.""" ARTIFACT_DEFINITION_NAME = "LinuxHostnameFile" def _ParseFileData(self, mediator, file_object): """Parses file content (data) for a hostname preprocessing attribute. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_object (dfvfs.FileIO): file-like object that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ text_file_object = dfvfs_text_file.TextFile(file_object, encoding="utf-8") hostname = text_file_object.readline() hostname = hostname.strip() if hostname: hostname_artifact = artifacts.HostnameArtifact(name=hostname) mediator.AddHostname(hostname_artifact)
[docs] class LinuxDistributionPlugin(interface.FileArtifactPreprocessorPlugin): """The Linux distribution plugin.""" ARTIFACT_DEFINITION_NAME = "LinuxDistributionRelease" def _ParseFileData(self, mediator, file_object): """Parses file content (data) for system product preprocessing attribute. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_object (dfvfs.FileIO): file-like object that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ text_file_object = dfvfs_text_file.TextFile(file_object, encoding="utf-8") system_product = text_file_object.readline() system_product = system_product.strip() if system_product: mediator.SetValue("operating_system_product", system_product)
[docs] class LinuxIssueFilePlugin(interface.FileArtifactPreprocessorPlugin): """The Linux issue file plugin.""" ARTIFACT_DEFINITION_NAME = "LinuxIssueFile" def _ParseFileData(self, mediator, file_object): """Parses file content (data) for system product preprocessing attribute. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_object (dfvfs.FileIO): file-like object that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ text_file_object = dfvfs_text_file.TextFile(file_object, encoding="utf-8") system_product = text_file_object.readline() # Only parse known default /etc/issue file contents. if system_product.startswith("Debian GNU/Linux "): system_product, _, _ = system_product.partition("\\") system_product = system_product.rstrip() else: system_product = None if system_product: mediator.SetValue("operating_system_product", system_product)
[docs] class LinuxStandardBaseReleasePlugin(interface.FileArtifactPreprocessorPlugin): """The Linux standard base (LSB) release plugin.""" ARTIFACT_DEFINITION_NAME = "LinuxLSBRelease" def _ParseFileData(self, mediator, file_object): """Parses file content (data) for system product preprocessing attribute. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_object (dfvfs.FileIO): file-like object that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ text_file_object = dfvfs_text_file.TextFile(file_object, encoding="utf-8") product_values = {} for line in text_file_object.readlines(): line = line.strip() if line.startswith("#"): continue key, value = line.split("=") key = key.strip().upper() value = value.strip().strip('"') product_values[key] = value system_product = product_values.get("DISTRIB_DESCRIPTION") if system_product: mediator.SetValue("operating_system_product", system_product)
[docs] class LinuxSystemdOperatingSystemPlugin(interface.FileArtifactPreprocessorPlugin): """The Linux systemd operating system release plugin.""" ARTIFACT_DEFINITION_NAME = "LinuxSystemdOSRelease" def _ParseFileData(self, mediator, file_object): """Parses file content (data) for system product preprocessing attribute. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_object (dfvfs.FileIO): file-like object that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ text_file_object = dfvfs_text_file.TextFile(file_object, encoding="utf-8") product_values = {} for line in text_file_object.readlines(): line = line.strip() # Ignore lines that do not define a key value pair. if "=" not in line: continue key, value = line.split("=") key = key.upper() value = value.strip('"') product_values[key] = value system_product = product_values.get("PRETTY_NAME") if system_product: mediator.SetValue("operating_system_product", system_product)
[docs] class LinuxTimeZonePlugin(interface.FileEntryArtifactPreprocessorPlugin): """Linux time zone plugin.""" ARTIFACT_DEFINITION_NAME = "LinuxLocalTime" def _ParseFileEntry(self, mediator, file_entry): """Parses artifact file system data for a preprocessing attribute. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_entry (dfvfs.FileEntry): file entry that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ if file_entry.link: # Determine the timezone based on the file path. _, _, time_zone = file_entry.link.partition("zoneinfo/") else: # Determine the timezone based on the timezone information file. file_object = file_entry.GetFileObject() time_zone = None try: time_zone_file = tz.tzfile(file_object) date_time = datetime.datetime(2017, 1, 1) time_zone = time_zone_file.tzname(date_time) except ValueError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, "Unable to read time zone information file.", ) if time_zone: try: mediator.SetTimeZone(time_zone) except ValueError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, "Unable to set time zone in knowledge base.", )
[docs] class LinuxUserAccountsPlugin(interface.FileArtifactPreprocessorPlugin): """The Linux user accounts plugin.""" ARTIFACT_DEFINITION_NAME = "LinuxPasswdFile" def _ParseFileData(self, mediator, file_object): """Parses file content (data) for user account preprocessing attributes. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage. file_object (dfvfs.FileIO): file-like object that contains the artifact value data. Raises: errors.PreProcessFail: if the preprocessing fails. """ line_reader = line_reader_file.BinaryLineReader(file_object) try: reader = line_reader_file.BinaryDSVReader(line_reader, b":") except csv.Error as exception: raise errors.PreProcessFail( ( f"Unable to read: {self.ARTIFACT_DEFINITION_NAME:s} with error: " f"{exception!s}" ) ) for line_number, row in enumerate(reader): if len(row) < 7 or not row[0] or not row[2]: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, f"Unsupported number of values in line: {line_number:d}.", ) continue try: username = row[0].decode("utf-8") except UnicodeDecodeError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, "Unable to decode username." ) continue try: identifier = row[2].decode("utf-8") except UnicodeDecodeError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, "Unable to decode user identifier." ) continue group_identifier = None if row[3]: try: group_identifier = row[3].decode("utf-8") except UnicodeDecodeError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, "Unable to decode group identifier.", ) full_name = None if row[4]: try: full_name = row[4].decode("utf-8") except UnicodeDecodeError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, "Unable to decode full name." ) user_directory = None if row[5]: try: user_directory = row[5].decode("utf-8") except UnicodeDecodeError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, "Unable to decode user directory.", ) shell = None if row[6]: try: shell = row[6].decode("utf-8") except UnicodeDecodeError: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, "Unable to decode shell." ) user_account = artifacts.UserAccountArtifact( identifier=identifier, username=username ) user_account.group_identifier = group_identifier user_account.full_name = full_name user_account.user_directory = user_directory user_account.shell = shell try: mediator.AddUserAccount(user_account) except KeyError as exception: mediator.ProducePreprocessingWarning( self.ARTIFACT_DEFINITION_NAME, f"Unable to add user account with error: {exception!s}", )
manager.PreprocessPluginsManager.RegisterPlugins( [ LinuxHostnamePlugin, LinuxDistributionPlugin, LinuxIssueFilePlugin, LinuxStandardBaseReleasePlugin, LinuxSystemdOperatingSystemPlugin, LinuxTimeZonePlugin, LinuxUserAccountsPlugin, ] )