"""Linux preprocessor plugins."""
import csv
import datetime
from dateutil import tz
from dfvfs.helpers import text_file as dfvfs_text_file
from plaso.containers import artifacts
from plaso.lib import errors
from plaso.lib import line_reader_file
from plaso.preprocessors import interface
from plaso.preprocessors import manager
[docs]
class LinuxHostnamePlugin(interface.FileArtifactPreprocessorPlugin):
"""The Linux hostname plugin."""
ARTIFACT_DEFINITION_NAME = "LinuxHostnameFile"
def _ParseFileData(self, mediator, file_object):
"""Parses file content (data) for a hostname preprocessing attribute.
Args:
mediator (PreprocessMediator): mediates interactions between preprocess
plugins and other components, such as storage.
file_object (dfvfs.FileIO): file-like object that contains the artifact
value data.
Raises:
errors.PreProcessFail: if the preprocessing fails.
"""
text_file_object = dfvfs_text_file.TextFile(file_object, encoding="utf-8")
hostname = text_file_object.readline()
hostname = hostname.strip()
if hostname:
hostname_artifact = artifacts.HostnameArtifact(name=hostname)
mediator.AddHostname(hostname_artifact)
[docs]
class LinuxDistributionPlugin(interface.FileArtifactPreprocessorPlugin):
"""The Linux distribution plugin."""
ARTIFACT_DEFINITION_NAME = "LinuxDistributionRelease"
def _ParseFileData(self, mediator, file_object):
"""Parses file content (data) for system product preprocessing attribute.
Args:
mediator (PreprocessMediator): mediates interactions between preprocess
plugins and other components, such as storage.
file_object (dfvfs.FileIO): file-like object that contains the artifact
value data.
Raises:
errors.PreProcessFail: if the preprocessing fails.
"""
text_file_object = dfvfs_text_file.TextFile(file_object, encoding="utf-8")
system_product = text_file_object.readline()
system_product = system_product.strip()
if system_product:
mediator.SetValue("operating_system_product", system_product)
[docs]
class LinuxIssueFilePlugin(interface.FileArtifactPreprocessorPlugin):
"""The Linux issue file plugin."""
ARTIFACT_DEFINITION_NAME = "LinuxIssueFile"
def _ParseFileData(self, mediator, file_object):
"""Parses file content (data) for system product preprocessing attribute.
Args:
mediator (PreprocessMediator): mediates interactions between preprocess
plugins and other components, such as storage.
file_object (dfvfs.FileIO): file-like object that contains the artifact
value data.
Raises:
errors.PreProcessFail: if the preprocessing fails.
"""
text_file_object = dfvfs_text_file.TextFile(file_object, encoding="utf-8")
system_product = text_file_object.readline()
# Only parse known default /etc/issue file contents.
if system_product.startswith("Debian GNU/Linux "):
system_product, _, _ = system_product.partition("\\")
system_product = system_product.rstrip()
else:
system_product = None
if system_product:
mediator.SetValue("operating_system_product", system_product)
[docs]
class LinuxStandardBaseReleasePlugin(interface.FileArtifactPreprocessorPlugin):
"""The Linux standard base (LSB) release plugin."""
ARTIFACT_DEFINITION_NAME = "LinuxLSBRelease"
def _ParseFileData(self, mediator, file_object):
"""Parses file content (data) for system product preprocessing attribute.
Args:
mediator (PreprocessMediator): mediates interactions between preprocess
plugins and other components, such as storage.
file_object (dfvfs.FileIO): file-like object that contains the artifact
value data.
Raises:
errors.PreProcessFail: if the preprocessing fails.
"""
text_file_object = dfvfs_text_file.TextFile(file_object, encoding="utf-8")
product_values = {}
for line in text_file_object.readlines():
line = line.strip()
if line.startswith("#"):
continue
key, value = line.split("=")
key = key.strip().upper()
value = value.strip().strip('"')
product_values[key] = value
system_product = product_values.get("DISTRIB_DESCRIPTION")
if system_product:
mediator.SetValue("operating_system_product", system_product)
[docs]
class LinuxSystemdOperatingSystemPlugin(interface.FileArtifactPreprocessorPlugin):
"""The Linux systemd operating system release plugin."""
ARTIFACT_DEFINITION_NAME = "LinuxSystemdOSRelease"
def _ParseFileData(self, mediator, file_object):
"""Parses file content (data) for system product preprocessing attribute.
Args:
mediator (PreprocessMediator): mediates interactions between preprocess
plugins and other components, such as storage.
file_object (dfvfs.FileIO): file-like object that contains the artifact
value data.
Raises:
errors.PreProcessFail: if the preprocessing fails.
"""
text_file_object = dfvfs_text_file.TextFile(file_object, encoding="utf-8")
product_values = {}
for line in text_file_object.readlines():
line = line.strip()
# Ignore lines that do not define a key value pair.
if "=" not in line:
continue
key, value = line.split("=")
key = key.upper()
value = value.strip('"')
product_values[key] = value
system_product = product_values.get("PRETTY_NAME")
if system_product:
mediator.SetValue("operating_system_product", system_product)
[docs]
class LinuxTimeZonePlugin(interface.FileEntryArtifactPreprocessorPlugin):
"""Linux time zone plugin."""
ARTIFACT_DEFINITION_NAME = "LinuxLocalTime"
def _ParseFileEntry(self, mediator, file_entry):
"""Parses artifact file system data for a preprocessing attribute.
Args:
mediator (PreprocessMediator): mediates interactions between preprocess
plugins and other components, such as storage.
file_entry (dfvfs.FileEntry): file entry that contains the artifact
value data.
Raises:
errors.PreProcessFail: if the preprocessing fails.
"""
if file_entry.link:
# Determine the timezone based on the file path.
_, _, time_zone = file_entry.link.partition("zoneinfo/")
else:
# Determine the timezone based on the timezone information file.
file_object = file_entry.GetFileObject()
time_zone = None
try:
time_zone_file = tz.tzfile(file_object)
date_time = datetime.datetime(2017, 1, 1)
time_zone = time_zone_file.tzname(date_time)
except ValueError:
mediator.ProducePreprocessingWarning(
self.ARTIFACT_DEFINITION_NAME,
"Unable to read time zone information file.",
)
if time_zone:
try:
mediator.SetTimeZone(time_zone)
except ValueError:
mediator.ProducePreprocessingWarning(
self.ARTIFACT_DEFINITION_NAME,
"Unable to set time zone in knowledge base.",
)
[docs]
class LinuxUserAccountsPlugin(interface.FileArtifactPreprocessorPlugin):
"""The Linux user accounts plugin."""
ARTIFACT_DEFINITION_NAME = "LinuxPasswdFile"
def _ParseFileData(self, mediator, file_object):
"""Parses file content (data) for user account preprocessing attributes.
Args:
mediator (PreprocessMediator): mediates interactions between preprocess
plugins and other components, such as storage.
file_object (dfvfs.FileIO): file-like object that contains the artifact
value data.
Raises:
errors.PreProcessFail: if the preprocessing fails.
"""
line_reader = line_reader_file.BinaryLineReader(file_object)
try:
reader = line_reader_file.BinaryDSVReader(line_reader, b":")
except csv.Error as exception:
raise errors.PreProcessFail(
(
f"Unable to read: {self.ARTIFACT_DEFINITION_NAME:s} with error: "
f"{exception!s}"
)
)
for line_number, row in enumerate(reader):
if len(row) < 7 or not row[0] or not row[2]:
mediator.ProducePreprocessingWarning(
self.ARTIFACT_DEFINITION_NAME,
f"Unsupported number of values in line: {line_number:d}.",
)
continue
try:
username = row[0].decode("utf-8")
except UnicodeDecodeError:
mediator.ProducePreprocessingWarning(
self.ARTIFACT_DEFINITION_NAME, "Unable to decode username."
)
continue
try:
identifier = row[2].decode("utf-8")
except UnicodeDecodeError:
mediator.ProducePreprocessingWarning(
self.ARTIFACT_DEFINITION_NAME, "Unable to decode user identifier."
)
continue
group_identifier = None
if row[3]:
try:
group_identifier = row[3].decode("utf-8")
except UnicodeDecodeError:
mediator.ProducePreprocessingWarning(
self.ARTIFACT_DEFINITION_NAME,
"Unable to decode group identifier.",
)
full_name = None
if row[4]:
try:
full_name = row[4].decode("utf-8")
except UnicodeDecodeError:
mediator.ProducePreprocessingWarning(
self.ARTIFACT_DEFINITION_NAME, "Unable to decode full name."
)
user_directory = None
if row[5]:
try:
user_directory = row[5].decode("utf-8")
except UnicodeDecodeError:
mediator.ProducePreprocessingWarning(
self.ARTIFACT_DEFINITION_NAME,
"Unable to decode user directory.",
)
shell = None
if row[6]:
try:
shell = row[6].decode("utf-8")
except UnicodeDecodeError:
mediator.ProducePreprocessingWarning(
self.ARTIFACT_DEFINITION_NAME, "Unable to decode shell."
)
user_account = artifacts.UserAccountArtifact(
identifier=identifier, username=username
)
user_account.group_identifier = group_identifier
user_account.full_name = full_name
user_account.user_directory = user_directory
user_account.shell = shell
try:
mediator.AddUserAccount(user_account)
except KeyError as exception:
mediator.ProducePreprocessingWarning(
self.ARTIFACT_DEFINITION_NAME,
f"Unable to add user account with error: {exception!s}",
)
manager.PreprocessPluginsManager.RegisterPlugins(
[
LinuxHostnamePlugin,
LinuxDistributionPlugin,
LinuxIssueFilePlugin,
LinuxStandardBaseReleasePlugin,
LinuxSystemdOperatingSystemPlugin,
LinuxTimeZonePlugin,
LinuxUserAccountsPlugin,
]
)