Source code for plaso.output.shared_opensearch
"""Shared functionality for OpenSearch output modules."""
import logging
import os
from acstore.containers import interface as containers_interface
from dfdatetime import interface as dfdatetime_interface
from dfdatetime import posix_time as dfdatetime_posix_time
from dfvfs.serializer.json_serializer import JsonPathSpecSerializer
try:
import opensearchpy
except ImportError:
opensearchpy = None
from plaso.lib import errors
from plaso.output import formatting_helper
from plaso.output import interface
from plaso.output import logger
# Configure the OpenSearch logger.
if opensearchpy:
opensearch_logger = logging.getLogger("opensearchpy.trace")
opensearch_logger.setLevel(logging.WARNING)
[docs]
class SharedOpenSearchFieldFormattingHelper(formatting_helper.FieldFormattingHelper):
"""Shared OpenSearch output module field formatting helper."""
# Maps the name of a fields to a a callback function that formats
# the field value.
_FIELD_FORMAT_CALLBACKS = {
"datetime": "_FormatDateTime",
"display_name": "_FormatDisplayName",
"inode": "_FormatInode",
"message": "_FormatMessage",
"source_long": "_FormatSource",
"source_short": "_FormatSourceShort",
"tag": "_FormatTag",
"timestamp": "_FormatTimestamp",
"timestamp_desc": "_FormatTimestampDescription",
"yara_match": "_FormatYaraMatch",
}
# The field format callback methods require specific arguments hence
# the check for unused arguments is disabled here.
# pylint: disable=unused-argument
def _FormatDateTime(self, output_mediator, event, event_data, event_data_stream):
"""Formats a date and time field in ISO 8601 format.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: date and time field.
"""
date_time = dfdatetime_posix_time.PosixTimeInMicroseconds(
timestamp=event.timestamp
)
return date_time.CopyToDateTimeStringISO8601()
def _FormatTag(self, output_mediator, event_tag):
"""Formats an event tag field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event_tag (EventTag): event tag or None if not set.
Returns:
list[str]: event tag labels.
"""
return getattr(event_tag, "labels", None) or []
def _FormatInode(self, output_mediator, event, event_data, event_data_stream):
"""Formats an inode field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: inode field.
"""
inode = getattr(event_data, "inode", None)
if isinstance(inode, int):
inode = f"{inode:d}"
return inode
def _FormatTimestamp(self, output_mediator, event, event_data, event_data_stream):
"""Formats a timestamp field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
int: timestamp field.
"""
return event.timestamp
def _FormatTimestampDescription(
self, output_mediator, event, event_data, event_data_stream
):
"""Formats a timestamp description field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: timestamp description field.
"""
return event.timestamp_desc
def _FormatYaraMatch(self, output_mediator, event, event_data, event_data_stream):
"""Formats a Yara match field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
list[str]: Yara match field.
"""
return getattr(event_data_stream, "yara_match", None) or []
# pylint: enable=unused-argument
[docs]
def GetFormattedField(
self,
output_mediator,
field_name,
event,
event_data,
event_data_stream,
event_tag,
):
"""Formats the specified field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
field_name (str): name of the field.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
event_tag (EventTag): event tag.
Returns:
object: value of the field or None if not set.
"""
callback_name = self._FIELD_FORMAT_CALLBACKS.get(field_name)
if callback_name == "_FormatTag":
return self._FormatTag(output_mediator, event_tag)
callback_function = None
if callback_name:
callback_function = getattr(self, callback_name, None)
if callback_function:
output_value = callback_function(
output_mediator, event, event_data, event_data_stream
)
elif hasattr(event_data_stream, field_name):
output_value = getattr(event_data_stream, field_name, None)
else:
output_value = getattr(event_data, field_name, None)
return output_value
[docs]
class SharedOpenSearchOutputModule(interface.OutputModule):
"""Shared functionality for an OpenSearch output module."""
# pylint: disable=abstract-method
NAME = "opensearch_shared"
SUPPORTS_ADDITIONAL_FIELDS = True
SUPPORTS_CUSTOM_FIELDS = True
_DEFAULT_FLUSH_INTERVAL = 1000
# Number of seconds to wait before a request to OpenSearch is timed out.
_DEFAULT_REQUEST_TIMEOUT = 300
_DEFAULT_FIELD_NAMES = [
"datetime",
"display_name",
"message",
"source_long",
"source_short",
"tag",
"timestamp",
"timestamp_desc",
]
[docs]
def __init__(self):
"""Initializes an output module."""
super().__init__()
self._client = None
self._custom_fields = {}
self._event_documents = []
self._field_names = self._DEFAULT_FIELD_NAMES
self._field_formatting_helper = SharedOpenSearchFieldFormattingHelper()
self._flush_interval = self._DEFAULT_FLUSH_INTERVAL
self._host = None
self._index_name = None
self._mappings = None
self._number_of_buffered_events = 0
self._password = None
self._port = None
self._username = None
self._use_ssl = None
self._ca_certs = None
self._url_prefix = None
def _Connect(self):
"""Connects to an OpenSearch server.
Raises:
RuntimeError: if the OpenSearch version is not supported or the server
cannot be reached.
"""
opensearch_host = {"host": self._host, "port": self._port}
if self._url_prefix:
opensearch_host["url_prefix"] = self._url_prefix
opensearch_http_auth = None
if self._username is not None:
opensearch_http_auth = (self._username, self._password)
self._client = opensearchpy.OpenSearch(
[opensearch_host],
http_auth=opensearch_http_auth,
use_ssl=self._use_ssl,
ca_certs=self._ca_certs,
)
logger.debug(
f"Connected to OpenSearch server: {self._host:s} port: {self._port:d} "
f"URL prefix: {self._url_prefix!s}."
)
def _CreateIndexIfNotExists(self, index_name, mappings):
"""Creates an OpenSearch index if it does not exist.
Args:
index_name (str): mame of the index.
mappings (dict[str, object]): mappings of the index.
Raises:
RuntimeError: if the OpenSearch index cannot be created.
"""
try:
# Try opensearch-py >= 2.5.0 calling convention.
index_exists = self._client.indices.exists(index=index_name)
except TypeError:
# pylint: disable=missing-kwoa,too-many-function-args
index_exists = self._client.indices.exists(index_name)
try:
if not index_exists:
self._client.indices.create(
body={"mappings": mappings}, index=index_name
)
except opensearchpy.exceptions.ConnectionError as exception:
raise RuntimeError(
f"Unable to create OpenSearch index with error: {exception!s}"
)
def _FlushEvents(self):
"""Inserts the buffered event documents into OpenSearch."""
try:
# pylint: disable=unexpected-keyword-arg
bulk_arguments = {
"body": self._event_documents,
"index": self._index_name,
"request_timeout": self._DEFAULT_REQUEST_TIMEOUT,
}
self._client.bulk(**bulk_arguments)
except (ValueError, opensearchpy.exceptions.OpenSearchException) as exception:
# Ignore problematic events
logger.warning(f"Unable to bulk insert with error: {exception!s}")
logger.debug(
"Inserted {self._number_of_buffered_events:d} events into OpenSearch"
)
self._event_documents = []
self._number_of_buffered_events = 0
def _SanitizeField(self, data_type, attribute_name, field):
"""Sanitizes a field for output.
Args:
data_type (str): event data type.
attribute_name (str): name of the event attribute.
field (object): value of the field to sanitize.
Returns:
object: sanitized value of the field.
"""
# Some parsers have written bytes values to storage.
if isinstance(field, bytes):
field = field.decode("utf-8", "replace")
logger.warning(
f"Found bytes value for attribute: {attribute_name:s} of data type: "
f'{data_type!s}. Value was converted to UTF-8: "{field:s}"'
)
return field
[docs]
def Close(self):
"""Closes connection to OpenSearch.
Inserts any remaining buffered event documents.
"""
self._FlushEvents()
self._client = None
[docs]
def GetFieldValues(
self, output_mediator, event, event_data, event_data_stream, event_tag
):
"""Retrieves the output field values.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
event_tag (EventTag): event tag.
Returns:
dict[str, str]: output field values per name.
"""
event_values = {}
if event_data:
for attribute_name, attribute_value in event_data.GetAttributes():
# Ignore attribute container identifier and date and time values.
if isinstance(
attribute_value,
(
containers_interface.AttributeContainerIdentifier,
dfdatetime_interface.DateTimeValues,
),
):
continue
if (
isinstance(attribute_value, list)
and attribute_value
and isinstance(
attribute_value[0], dfdatetime_interface.DateTimeValues
)
):
continue
# Ignore protected internal only attributes.
if attribute_name[0] == "_" and attribute_name != "_parser_chain":
continue
# Output _parser_chain as parser for backwards compatibility.
if attribute_name == "_parser_chain":
attribute_name = "parser"
event_values[attribute_name] = attribute_value
if event_data_stream:
for attribute_name, attribute_value in event_data_stream.GetAttributes():
event_values[attribute_name] = attribute_value
for attribute_name in self._field_names:
if attribute_name not in event_values:
event_values[attribute_name] = None
field_values = {}
for attribute_name, attribute_value in event_values.items():
if attribute_name == "path_spec":
try:
field_value = JsonPathSpecSerializer.WriteSerialized(
attribute_value
)
except TypeError:
continue
else:
field_value = self._field_formatting_helper.GetFormattedField(
output_mediator,
attribute_name,
event,
event_data,
event_data_stream,
event_tag,
)
if field_value is None and attribute_name in self._custom_fields:
field_value = self._custom_fields.get(attribute_name)
if field_value is None:
field_value = "-"
field_values[attribute_name] = self._SanitizeField(
event_data.data_type, attribute_name, field_value
)
return field_values
[docs]
def SetAdditionalFields(self, field_names):
"""Sets the names of additional fields to output.
Args:
field_names (list[str]): names of additional fields to output.
"""
self._field_names.extend(field_names)
[docs]
def SetCustomFields(self, field_names_and_values):
"""Sets the names and values of custom fields to output.
Args:
field_names_and_values (list[tuple[str, str]]): names and values of
custom fields to output.
"""
self._custom_fields = dict(field_names_and_values)
self._field_names.extend(self._custom_fields.keys())
[docs]
def SetFlushInterval(self, flush_interval):
"""Sets the flush interval.
Args:
flush_interval (int): number of events to buffer before doing a bulk
insert.
"""
self._flush_interval = flush_interval
logger.debug(f"OpenSearch flush interval: {flush_interval:d}")
[docs]
def SetIndexName(self, index_name):
"""Sets the index name.
Args:
index_name (str): name of the index.
"""
self._index_name = index_name
logger.debug(f"OpenSearch index name: {index_name:s}")
[docs]
def SetMappings(self, mappings):
"""Sets the mappings.
Args:
mappings (dict[str, object]): mappings of the index.
"""
self._mappings = mappings
[docs]
def SetPassword(self, password):
"""Sets the password.
Args:
password (str): password to authenticate with.
"""
self._password = password
logger.debug("OpenSearch password: ********")
[docs]
def SetServerInformation(self, server, port):
"""Sets the server information.
Args:
server (str): IP address or hostname of the server.
port (int): Port number of the server.
"""
self._host = server
self._port = port
logger.debug(f"OpenSearch server: {server!s} port: {port:d}")
[docs]
def SetUsername(self, username):
"""Sets the username.
Args:
username (str): username to authenticate with.
"""
self._username = username
logger.debug(f"OpenSearch username: {username!s}")
[docs]
def SetUseSSL(self, use_ssl):
"""Sets the use of ssl.
Args:
use_ssl (bool): enforces use of ssl.
"""
self._use_ssl = use_ssl
logger.debug(f"OpenSearch use SSL/TLS: {use_ssl!s}")
[docs]
def SetCACertificatesPath(self, ca_certificates_path):
"""Sets the path to the CA certificates.
Args:
ca_certificates_path (str): path to file containing a list of root
certificates to trust.
Raises:
BadConfigOption: if the CA certificates file does not exist.
"""
if not ca_certificates_path:
return
if not os.path.exists(ca_certificates_path):
raise errors.BadConfigOption(
f"No such certificate file: {ca_certificates_path:s}"
)
self._ca_certs = ca_certificates_path
logger.debug(f"OpenSearch certificate file: {ca_certificates_path:s}")
[docs]
def SetURLPrefix(self, url_prefix):
"""Sets the URL prefix.
Args:
url_prefix (str): URL prefix.
"""
self._url_prefix = url_prefix
logger.debug("OpenSearch URL prefix: {0!s}")