Source code for plaso.output.shared_opensearch

# -*- coding: utf-8 -*-
"""Shared functionality for OpenSearch output modules."""

import logging
import os

from acstore.containers import interface as containers_interface

from dfdatetime import interface as dfdatetime_interface
from dfdatetime import posix_time as dfdatetime_posix_time

from dfvfs.serializer.json_serializer import JsonPathSpecSerializer

try:
  import opensearchpy
except ImportError:
  opensearchpy = None

from plaso.lib import errors
from plaso.output import formatting_helper
from plaso.output import interface
from plaso.output import logger


# Configure the OpenSearch logger.
if opensearchpy:
  opensearch_logger = logging.getLogger('opensearchpy.trace')
  opensearch_logger.setLevel(logging.WARNING)


[docs] class SharedOpenSearchFieldFormattingHelper( formatting_helper.FieldFormattingHelper): """Shared OpenSearch output module field formatting helper.""" # Maps the name of a fields to a a callback function that formats # the field value. _FIELD_FORMAT_CALLBACKS = { 'datetime': '_FormatDateTime', 'display_name': '_FormatDisplayName', 'inode': '_FormatInode', 'message': '_FormatMessage', 'source_long': '_FormatSource', 'source_short': '_FormatSourceShort', 'tag': '_FormatTag', 'timestamp': '_FormatTimestamp', 'timestamp_desc': '_FormatTimestampDescription', 'yara_match': '_FormatYaraMatch'} # The field format callback methods require specific arguments hence # the check for unused arguments is disabled here. # pylint: disable=unused-argument def _FormatDateTime( self, output_mediator, event, event_data, event_data_stream): """Formats a date and time field in ISO 8601 format. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: date and time field. """ date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=event.timestamp) return date_time.CopyToDateTimeStringISO8601() def _FormatTag(self, output_mediator, event_tag): """Formats an event tag field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event_tag (EventTag): event tag or None if not set. Returns: list[str]: event tag labels. """ return getattr(event_tag, 'labels', None) or [] def _FormatInode(self, output_mediator, event, event_data, event_data_stream): """Formats an inode field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: inode field. """ inode = getattr(event_data, 'inode', None) if isinstance(inode, int): inode = f'{inode:d}' return inode def _FormatTimestamp( self, output_mediator, event, event_data, event_data_stream): """Formats a timestamp field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: int: timestamp field. """ return event.timestamp def _FormatTimestampDescription( self, output_mediator, event, event_data, event_data_stream): """Formats a timestamp description field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: timestamp description field. """ return event.timestamp_desc def _FormatYaraMatch( self, output_mediator, event, event_data, event_data_stream): """Formats a Yara match field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: list[str]: Yara match field. """ return getattr(event_data_stream, 'yara_match', None) or [] # pylint: enable=unused-argument
[docs] def GetFormattedField( self, output_mediator, field_name, event, event_data, event_data_stream, event_tag): """Formats the specified field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. field_name (str): name of the field. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. event_tag (EventTag): event tag. Returns: object: value of the field or None if not set. """ callback_name = self._FIELD_FORMAT_CALLBACKS.get(field_name, None) if callback_name == '_FormatTag': return self._FormatTag(output_mediator, event_tag) callback_function = None if callback_name: callback_function = getattr(self, callback_name, None) if callback_function: output_value = callback_function( output_mediator, event, event_data, event_data_stream) elif hasattr(event_data_stream, field_name): output_value = getattr(event_data_stream, field_name, None) else: output_value = getattr(event_data, field_name, None) return output_value
[docs] class SharedOpenSearchOutputModule(interface.OutputModule): """Shared functionality for an OpenSearch output module.""" # pylint: disable=abstract-method NAME = 'opensearch_shared' SUPPORTS_ADDITIONAL_FIELDS = True SUPPORTS_CUSTOM_FIELDS = True _DEFAULT_FLUSH_INTERVAL = 1000 # Number of seconds to wait before a request to OpenSearch is timed out. _DEFAULT_REQUEST_TIMEOUT = 300 _DEFAULT_FIELD_NAMES = [ 'datetime', 'display_name', 'message', 'source_long', 'source_short', 'tag', 'timestamp', 'timestamp_desc']
[docs] def __init__(self): """Initializes an output module.""" super(SharedOpenSearchOutputModule, self).__init__() self._client = None self._custom_fields = {} self._event_documents = [] self._field_names = self._DEFAULT_FIELD_NAMES self._field_formatting_helper = SharedOpenSearchFieldFormattingHelper() self._flush_interval = self._DEFAULT_FLUSH_INTERVAL self._host = None self._index_name = None self._mappings = None self._number_of_buffered_events = 0 self._password = None self._port = None self._username = None self._use_ssl = None self._ca_certs = None self._url_prefix = None
def _Connect(self): """Connects to an OpenSearch server. Raises: RuntimeError: if the OpenSearch version is not supported or the server cannot be reached. """ opensearch_host = {'host': self._host, 'port': self._port} if self._url_prefix: opensearch_host['url_prefix'] = self._url_prefix opensearch_http_auth = None if self._username is not None: opensearch_http_auth = (self._username, self._password) self._client = opensearchpy.OpenSearch( [opensearch_host], http_auth=opensearch_http_auth, use_ssl=self._use_ssl, ca_certs=self._ca_certs) logger.debug(( f'Connected to OpenSearch server: {self._host:s} port: {self._port:d} ' f'URL prefix: {self._url_prefix!s}.')) def _CreateIndexIfNotExists(self, index_name, mappings): """Creates an OpenSearch index if it does not exist. Args: index_name (str): mame of the index. mappings (dict[str, object]): mappings of the index. Raises: RuntimeError: if the OpenSearch index cannot be created. """ try: if not self._client.indices.exists(index_name): self._client.indices.create( body={'mappings': mappings}, index=index_name) except opensearchpy.exceptions.ConnectionError as exception: raise RuntimeError( f'Unable to create OpenSearch index with error: {exception!s}') def _FlushEvents(self): """Inserts the buffered event documents into OpenSearch.""" try: # pylint: disable=unexpected-keyword-arg bulk_arguments = { 'body': self._event_documents, 'index': self._index_name, 'request_timeout': self._DEFAULT_REQUEST_TIMEOUT} self._client.bulk(**bulk_arguments) except (ValueError, opensearchpy.exceptions.OpenSearchException) as exception: # Ignore problematic events logger.warning(f'Unable to bulk insert with error: {exception!s}') logger.debug( 'Inserted {self._number_of_buffered_events:d} events into OpenSearch') self._event_documents = [] self._number_of_buffered_events = 0 def _SanitizeField(self, data_type, attribute_name, field): """Sanitizes a field for output. Args: data_type (str): event data type. attribute_name (str): name of the event attribute. field (object): value of the field to sanitize. Returns: object: sanitized value of the field. """ # Some parsers have written bytes values to storage. if isinstance(field, bytes): field = field.decode('utf-8', 'replace') logger.warning(( f'Found bytes value for attribute: {attribute_name:s} of data type: ' f'{data_type!s}. Value was converted to UTF-8: "{field:s}"')) return field
[docs] def Close(self): """Closes connection to OpenSearch. Inserts any remaining buffered event documents. """ self._FlushEvents() self._client = None
[docs] def GetFieldValues( self, output_mediator, event, event_data, event_data_stream, event_tag): """Retrieves the output field values. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. event_tag (EventTag): event tag. Returns: dict[str, str]: output field values per name. """ event_values = {} if event_data: for attribute_name, attribute_value in event_data.GetAttributes(): # Ignore attribute container identifier and date and time values. if isinstance(attribute_value, ( containers_interface.AttributeContainerIdentifier, dfdatetime_interface.DateTimeValues)): continue if (isinstance(attribute_value, list) and attribute_value and isinstance(attribute_value[0], dfdatetime_interface.DateTimeValues)): continue # Ignore protected internal only attributes. if attribute_name[0] == '_' and attribute_name != '_parser_chain': continue # Output _parser_chain as parser for backwards compatibility. if attribute_name == '_parser_chain': attribute_name = 'parser' event_values[attribute_name] = attribute_value if event_data_stream: for attribute_name, attribute_value in event_data_stream.GetAttributes(): event_values[attribute_name] = attribute_value for attribute_name in self._field_names: if attribute_name not in event_values: event_values[attribute_name] = None field_values = {} for attribute_name, attribute_value in event_values.items(): if attribute_name == 'path_spec': try: field_value = JsonPathSpecSerializer.WriteSerialized(attribute_value) except TypeError: continue else: field_value = self._field_formatting_helper.GetFormattedField( output_mediator, attribute_name, event, event_data, event_data_stream, event_tag) if field_value is None and attribute_name in self._custom_fields: field_value = self._custom_fields.get(attribute_name, None) if field_value is None: field_value = '-' field_values[attribute_name] = self._SanitizeField( event_data.data_type, attribute_name, field_value) return field_values
[docs] def SetAdditionalFields(self, field_names): """Sets the names of additional fields to output. Args: field_names (list[str]): names of additional fields to output. """ self._field_names.extend(field_names)
[docs] def SetCustomFields(self, field_names_and_values): """Sets the names and values of custom fields to output. Args: field_names_and_values (list[tuple[str, str]]): names and values of custom fields to output. """ self._custom_fields = dict(field_names_and_values) self._field_names.extend(self._custom_fields.keys())
[docs] def SetFlushInterval(self, flush_interval): """Sets the flush interval. Args: flush_interval (int): number of events to buffer before doing a bulk insert. """ self._flush_interval = flush_interval logger.debug(f'OpenSearch flush interval: {flush_interval:d}')
[docs] def SetIndexName(self, index_name): """Sets the index name. Args: index_name (str): name of the index. """ self._index_name = index_name logger.debug(f'OpenSearch index name: {index_name:s}')
[docs] def SetMappings(self, mappings): """Sets the mappings. Args: mappings (dict[str, object]): mappings of the index. """ self._mappings = mappings
[docs] def SetPassword(self, password): """Sets the password. Args: password (str): password to authenticate with. """ self._password = password logger.debug('OpenSearch password: ********')
[docs] def SetServerInformation(self, server, port): """Sets the server information. Args: server (str): IP address or hostname of the server. port (int): Port number of the server. """ self._host = server self._port = port logger.debug(f'OpenSearch server: {server!s} port: {port:d}')
[docs] def SetUsername(self, username): """Sets the username. Args: username (str): username to authenticate with. """ self._username = username logger.debug(f'OpenSearch username: {username!s}')
[docs] def SetUseSSL(self, use_ssl): """Sets the use of ssl. Args: use_ssl (bool): enforces use of ssl. """ self._use_ssl = use_ssl logger.debug(f'OpenSearch use SSL/TLS: {use_ssl!s}')
[docs] def SetCACertificatesPath(self, ca_certificates_path): """Sets the path to the CA certificates. Args: ca_certificates_path (str): path to file containing a list of root certificates to trust. Raises: BadConfigOption: if the CA certificates file does not exist. """ if not ca_certificates_path: return if not os.path.exists(ca_certificates_path): raise errors.BadConfigOption( f'No such certificate file: {ca_certificates_path:s}') self._ca_certs = ca_certificates_path logger.debug(f'OpenSearch certificate file: {ca_certificates_path:s}')
[docs] def SetURLPrefix(self, url_prefix): """Sets the URL prefix. Args: url_prefix (str): URL prefix. """ self._url_prefix = url_prefix logger.debug('OpenSearch URL prefix: {0!s}')