"""Windows EventLog resources database reader."""
import collections
import os
import sqlite3
from acstore import sqlite_store
from acstore.containers import interface as containers_interface
from plaso.engine import path_helper
from plaso.helpers.windows import languages
from plaso.helpers.windows import resource_files
from plaso.output import logger
[docs]
class Sqlite3DatabaseFile:
"""Class that defines a sqlite3 database file."""
_HAS_TABLE_QUERY = (
'SELECT name FROM sqlite_master '
'WHERE type = "table" AND name = "{0:s}"')
[docs]
def __init__(self):
"""Initializes the database file object."""
super().__init__()
self._connection = None
self._cursor = None
self.filename = None
self.read_only = None
[docs]
def Close(self):
"""Closes the database file.
Raises:
RuntimeError: if the database is not opened.
"""
if not self._connection:
raise RuntimeError('Cannot close database not opened.')
# We need to run commit or not all data is stored in the database.
self._connection.commit()
self._connection.close()
self._connection = None
self._cursor = None
self.filename = None
self.read_only = None
[docs]
def HasTable(self, table_name):
"""Determines if a specific table exists.
Args:
table_name (str): table name.
Returns:
bool: True if the table exists.
Raises:
RuntimeError: if the database is not opened.
"""
if not self._connection:
raise RuntimeError(
'Cannot determine if table exists database not opened.')
sql_query = self._HAS_TABLE_QUERY.format(table_name)
self._cursor.execute(sql_query)
if self._cursor.fetchone():
return True
return False
[docs]
def GetValues(self, table_names, column_names, condition):
"""Retrieves values from a table.
Args:
table_names (list[str]): table names.
column_names (list[str]): column names.
condition (str): query condition such as
"log_source == 'Application Error'".
Yields:
sqlite3.row: row.
Raises:
RuntimeError: if the database is not opened.
"""
if not self._connection:
raise RuntimeError('Cannot retrieve values database not opened.')
if condition:
condition = f' WHERE {condition:s}'
else:
condition = ''
table_names_string = ', '.join(table_names)
column_names_string = ', '.join(column_names)
sql_query = (
f'SELECT {column_names_string:s} FROM {table_names_string:s}'
f'{condition:s}')
self._cursor.execute(sql_query)
# TODO: have a look at https://docs.python.org/2/library/
# sqlite3.html#sqlite3.Row.
for row in self._cursor:
yield {
column_name: row[column_index]
for column_index, column_name in enumerate(column_names)}
[docs]
def Open(self, filename, read_only=False):
"""Opens the database file.
Args:
filename (str): filename of the database.
read_only (Optional[bool]): True if the database should be opened in
read-only mode. Since sqlite3 does not support a real read-only
mode we fake it by only permitting SELECT queries.
Returns:
bool: True if successful.
Raises:
RuntimeError: if the database is already opened.
"""
if self._connection:
raise RuntimeError('Cannot open database already opened.')
self.filename = filename
self.read_only = read_only
try:
self._connection = sqlite3.connect(filename)
except sqlite3.OperationalError:
return False
if not self._connection:
return False
self._cursor = self._connection.cursor()
if not self._cursor:
return False
return True
[docs]
class WinevtResourcesSqlite3DatabaseReader:
"""Windows EventLog resources SQLite database reader."""
[docs]
def __init__(self):
"""Initializes a Windows EventLog resources SQLite database reader."""
super().__init__()
self._database_file = Sqlite3DatabaseFile()
self._resouce_file_helper = resource_files.WindowsResourceFileHelper
self._string_format = 'wrc'
def _GetEventLogProviderKey(self, log_source):
"""Retrieves the EventLog provider key.
Args:
log_source (str): EventLog source.
Returns:
str: EventLog provider key or None if not available.
Raises:
RuntimeError: if more than one value is found in the database.
"""
table_names = ['event_log_providers']
column_names = ['event_log_provider_key']
condition = f'log_source == "{log_source:s}"'
values_list = list(self._database_file.GetValues(
table_names, column_names, condition))
number_of_values = len(values_list)
if number_of_values == 0:
return None
if number_of_values == 1:
values = values_list[0]
return values['event_log_provider_key']
raise RuntimeError('More than one value found in database.')
def _GetMessage(self, message_file_key, lcid, message_identifier):
"""Retrieves a specific message from a specific message table.
Args:
message_file_key (int): message file key.
lcid (int): language code identifier (LCID).
message_identifier (int): message identifier.
Returns:
str: message string or None if not available.
Raises:
RuntimeError: if more than one value is found in the database.
"""
table_name = f'message_table_{message_file_key:d}_0x{lcid:08x}'
has_table = self._database_file.HasTable(table_name)
if not has_table:
return None
column_names = ['message_string']
condition = f'message_identifier == "0x{message_identifier:08x}"'
values = list(self._database_file.GetValues(
[table_name], column_names, condition))
number_of_values = len(values)
if number_of_values == 0:
return None
if number_of_values == 1:
return values[0]['message_string']
raise RuntimeError('More than one value found in database.')
def _GetMessageFileKeys(self, event_log_provider_key):
"""Retrieves the message file keys.
Args:
event_log_provider_key (int): EventLog provider key.
Yields:
int: message file key.
"""
table_names = ['message_file_per_event_log_provider']
column_names = ['message_file_key']
condition = f'event_log_provider_key == {event_log_provider_key:d}'
generator = self._database_file.GetValues(
table_names, column_names, condition)
for values in generator:
yield values['message_file_key']
[docs]
def Close(self):
"""Closes the database reader object."""
self._database_file.Close()
[docs]
def GetMessage(self, log_source, lcid, message_identifier):
"""Retrieves a specific message for a specific EventLog source.
Args:
log_source (str): EventLog source, such as "Application Error".
lcid (int): language code identifier (LCID).
message_identifier (int): message identifier.
Returns:
str: message string or None if not available.
"""
event_log_provider_key = self._GetEventLogProviderKey(log_source)
if not event_log_provider_key:
return None
generator = self._GetMessageFileKeys(event_log_provider_key)
if not generator:
return None
message_string = None
for message_file_key in generator:
message_string = self._GetMessage(
message_file_key, lcid, message_identifier)
if message_string:
break
if self._string_format == 'wrc':
message_string = self._resouce_file_helper.FormatMessageStringInPEP3101(
message_string)
return message_string
[docs]
def Open(self, filename):
"""Opens the database reader object.
Args:
filename (str): filename of the database.
Returns:
bool: True if successful.
Raises:
RuntimeError: if the version or string format of the database
is not supported.
"""
if not self._database_file.Open(filename, read_only=True):
return False
version = self.GetMetadataAttribute('version')
if not version or version != '20150315':
raise RuntimeError(f'Unsupported version: {version:s}')
string_format = self.GetMetadataAttribute('string_format')
if not string_format:
string_format = 'wrc'
if string_format not in ('pep3101', 'wrc'):
raise RuntimeError(f'Unsupported string format: {string_format:s}')
self._string_format = string_format
return True
[docs]
class WinevtResourcesEventLogProvider(containers_interface.AttributeContainer):
"""Windows Event Log provider.
Attributes:
additional_identifier (str): additional identifier of the provider,
contains a GUID.
category_message_files (set[str]): paths of the category message files.
event_message_files (set[str]): paths of the event message files.
identifier (str): identifier of the provider, contains a GUID.
log_sources (list[str]): names of the corresponding Event Log sources.
log_types (list[str]): Windows Event Log types.
name (str): name of the provider.
parameter_message_files (set[str]): paths of the parameter message
files.
windows_version (str): Windows version.
"""
CONTAINER_TYPE = 'winevtrc_eventlog_provider'
SCHEMA = {
'additional_identifier': 'str',
'category_message_files': 'List[str]',
'event_message_files': 'List[str]',
'identifier': 'str',
'log_sources': 'List[str]',
'log_types': 'List[str]',
'name': 'str',
'parameter_message_files': 'List[str]',
'windows_version': 'str'}
[docs]
def __init__(self):
"""Initializes the Windows Event Log provider."""
super().__init__()
self.additional_identifier = None
self.category_message_files = set()
self.event_message_files = set()
self.identifier = None
self.log_sources = []
self.log_types = []
self.name = None
self.parameter_message_files = set()
self.windows_version = None
[docs]
class WinevtResourcesMessageFile(containers_interface.AttributeContainer):
"""Windows Event Log message file.
Attributes:
file_version (str): file version.
product_version (str): product version.
windows_path (str): path as defined by the Window Event Log provider.
windows_version (str): Windows version.
"""
CONTAINER_TYPE = 'winevtrc_message_file'
SCHEMA = {
'file_version': 'str',
'product_version': 'str',
'windows_path': 'str',
'windows_version': 'str'}
[docs]
def __init__(
self, file_version=None, product_version=None, windows_path=None,
windows_version=None):
"""Initializes a Windows Event Log message file.
Args:
file_version (Optional[str]): file version.
product_version (Optional[str]): product version.
windows_path (Optional[str]): path as defined by the Window Event Log
provider.
windows_version (Optional[str]): Windows version.
"""
super().__init__()
self.file_version = file_version
self.product_version = product_version
self.windows_path = windows_path
self.windows_version = windows_version
[docs]
class WinevtResourcesMessageString(containers_interface.AttributeContainer):
"""Windows Event Log message string.
Attributes:
identifier (int): message identifier.
text (str): message text.
"""
CONTAINER_TYPE = 'winevtrc_message_string'
SCHEMA = {
'_message_table_identifier': 'AttributeContainerIdentifier',
'language_identifier': 'str',
'message_identifier': 'int',
'text': 'str'}
_SERIALIZABLE_PROTECTED_ATTRIBUTES = [
'_message_table_identifier']
[docs]
def __init__(self, message_identifier=None, text=None):
"""Initializes a Windows Event Log message string.
Args:
message_identifier (Optional[int]): message identifier.
text (Optional[int]): message text.
"""
super().__init__()
self._message_table_identifier = None
self.message_identifier = message_identifier
self.text = text
[docs]
def GetMessageTableIdentifier(self):
"""Retrieves the identifier of the associated message table.
Returns:
AttributeContainerIdentifier: message table identifier or None when not
set.
"""
return self._message_table_identifier
[docs]
def SetMessageTableIdentifier(self, message_table_identifier):
"""Sets the identifier of the associated message table.
Args:
message_table_identifier (AttributeContainerIdentifier): message table
identifier.
"""
self._message_table_identifier = message_table_identifier
[docs]
class WinevtResourcesMessageStringMapping(
containers_interface.AttributeContainer):
"""Windows Event Log message string mapping.
Attributes:
event_identifier (int): event identifier.
event_version (int): event version.
message_identifier (int): message identifier.
provider_identifier (str): Event Log provider identifier.
"""
CONTAINER_TYPE = 'winevtrc_message_string_mapping'
SCHEMA = {
'_message_file_identifier': 'AttributeContainerIdentifier',
'event_identifier': 'int',
'event_version': 'int',
'message_identifier': 'int',
'provider_identifier': 'str'}
_SERIALIZABLE_PROTECTED_ATTRIBUTES = [
'_message_file_identifier']
[docs]
def __init__(
self, event_identifier=None, event_version=None, message_identifier=None,
provider_identifier=None):
"""Initializes a Windows Event Log message string mapping.
Args:
event_identifier (Optional[int]): event identifier.
event_version (Optional[int]): event version.
message_identifier (Optional[int]): message identifier.
provider_identifier (Optional[str]): Event Log provider identifier.
"""
super().__init__()
self._message_file_identifier = None
self.event_identifier = event_identifier
self.event_version = event_version
self.message_identifier = message_identifier
self.provider_identifier = provider_identifier
[docs]
def GetMessageFileIdentifier(self):
"""Retrieves the identifier of the associated message file.
Returns:
AttributeContainerIdentifier: message file identifier or None when
not set.
"""
return self._message_file_identifier
[docs]
def SetMessageFileIdentifier(self, message_file_identifier):
"""Sets the identifier of the associated message file.
Args:
message_file_identifier (AttributeContainerIdentifier): message file
identifier.
"""
self._message_file_identifier = message_file_identifier
[docs]
class WinevtResourcesMessageTable(containers_interface.AttributeContainer):
"""Windows Event Log message table.
Attributes:
language_identifier (int): language identifier (LCID).
"""
CONTAINER_TYPE = 'winevtrc_message_table'
SCHEMA = {
'_message_file_identifier': 'AttributeContainerIdentifier',
'language_identifier': 'int'}
_SERIALIZABLE_PROTECTED_ATTRIBUTES = [
'_message_file_identifier']
[docs]
def __init__(self, language_identifier=None):
"""Initializes a Windows Event Log message table descriptor.
Args:
language_identifier (Optional[int]): language identifier (LCID).
"""
super().__init__()
self._message_file_identifier = None
self.language_identifier = language_identifier
[docs]
def GetMessageFileIdentifier(self):
"""Retrieves the identifier of the associated message file.
Returns:
AttributeContainerIdentifier: message file identifier or None when not
set.
"""
return self._message_file_identifier
[docs]
def SetMessageFileIdentifier(self, message_file_identifier):
"""Sets the identifier of the associated message file.
Args:
message_file_identifier (AttributeContainerIdentifier): message file
identifier.
"""
self._message_file_identifier = message_file_identifier
[docs]
class WinevtResourcesAttributeContainerStore(
sqlite_store.SQLiteAttributeContainerStore):
"""Windows EventLog resources attribute container store.
Attributes:
format_version (int): storage format version.
serialization_format (str): serialization format.
string_format (str): string format.
"""
_FORMAT_VERSION = 20240929
_APPEND_COMPATIBLE_FORMAT_VERSION = 20240929
_UPGRADE_COMPATIBLE_FORMAT_VERSION = 20240929
_READ_COMPATIBLE_FORMAT_VERSION = 20240929
[docs]
def __init__(self, string_format='wrc'):
"""Initializes a message resource attribute container store.
Args:
string_format (Optional[str]): string format. The default is the Windows
Resource (wrc) format.
"""
super().__init__()
self.string_format = string_format
self._containers_manager.RegisterAttributeContainers([
WinevtResourcesEventLogProvider, WinevtResourcesMessageFile,
WinevtResourcesMessageString, WinevtResourcesMessageStringMapping,
WinevtResourcesMessageTable])
def _ReadAndCheckStorageMetadata(self, check_readable_only=False):
"""Reads storage metadata and checks that the values are valid.
Args:
check_readable_only (Optional[bool]): whether the store should only be
checked to see if it can be read. If False, the store will be checked
to see if it can be read and written to.
Raises:
OSError: when there is an error querying the attribute container store.
"""
metadata_values = self._ReadMetadata()
self._CheckStorageMetadata(
metadata_values, check_readable_only=check_readable_only)
string_format = metadata_values.get('string_format')
if string_format not in ('pep3101', 'wrc'):
raise OSError(f'Unsupported string format: {string_format:s}')
self.format_version = metadata_values['format_version']
self.serialization_format = metadata_values['serialization_format']
self.string_format = metadata_values['string_format']
[docs]
class WinevtResourcesHelper:
"""Windows EventLog resources helper."""
# LCID 0x0409 is en-US.
DEFAULT_LCID = 0x0409
_DEFAULT_PARAMETER_MESSAGE_FILES = (
'%SystemRoot%\\System32\\MsObjs.dll',
'%SystemRoot%\\System32\\kernel32.dll')
# The maximum number of cached message strings
_MAXIMUM_CACHED_MESSAGE_STRINGS = 64 * 1024
_WINEVT_RC_DATABASE = 'winevt-rc.db'
[docs]
def __init__(self, storage_reader, data_location, lcid):
"""Initializes Windows EventLog resources helper.
Args:
storage_reader (StorageReader): storage reader.
data_location (str): data location of the winevt-rc database.
lcid (int): Windows Language Code Identifier (LCID).
"""
language_tag = languages.WindowsLanguageHelper.GetLanguageTagForLCID(
lcid or self.DEFAULT_LCID)
super().__init__()
self._data_location = data_location
self._environment_variables = None
self._language_tag = language_tag.lower()
self._lcid = lcid or self.DEFAULT_LCID
self._message_string_cache = collections.OrderedDict()
self._resouce_file_helper = resource_files.WindowsResourceFileHelper
self._storage_reader = None
self._windows_eventlog_message_files = None
self._windows_eventlog_providers = None
self._winevt_database_reader = None
if storage_reader and storage_reader.HasAttributeContainers(
'windows_eventlog_provider'):
self._storage_reader = storage_reader
def _CacheMessageString(
self, provider_identifier, log_source, message_identifier,
event_version, message_string):
"""Caches a specific message string.
Args:
provider_identifier (str): EventLog provider identifier.
log_source (str): EventLog source, such as "Application Error".
message_identifier (int): message identifier.
event_version (int): event version or None if not set.
message_string (str): message string.
"""
if len(self._message_string_cache) >= self._MAXIMUM_CACHED_MESSAGE_STRINGS:
self._message_string_cache.popitem(last=True)
if provider_identifier:
lookup_key = f'{provider_identifier:s}:0x{message_identifier:08x}'
if event_version is not None:
lookup_key = f'{lookup_key:s}:{event_version:d}'
self._message_string_cache[lookup_key] = message_string
self._message_string_cache.move_to_end(lookup_key, last=False)
if log_source:
lookup_key = f'{log_source:s}:0x{message_identifier:08x}'
if event_version is not None:
lookup_key = f'{lookup_key:s}:{event_version:d}'
self._message_string_cache[lookup_key] = message_string
self._message_string_cache.move_to_end(lookup_key, last=False)
def _GetCachedMessageString(
self, provider_identifier, log_source, message_identifier, event_version):
"""Retrieves a specific cached message string.
Args:
provider_identifier (str): EventLog provider identifier.
log_source (str): EventLog source, such as "Application Error".
message_identifier (int): message identifier.
event_version (int): event version or None if not set.
Returns:
str: message string or None if not available.
"""
lookup_key = None
message_string = None
if provider_identifier:
lookup_key = f'{provider_identifier:s}:0x{message_identifier:08x}'
if event_version is not None:
lookup_key = f'{lookup_key:s}:{event_version:d}'
message_string = self._message_string_cache.get(lookup_key)
if not message_string and log_source:
lookup_key = f'{log_source:s}:0x{message_identifier:08x}'
if event_version is not None:
lookup_key = f'{lookup_key:s}:{event_version:d}'
message_string = self._message_string_cache.get(lookup_key)
if message_string:
self._message_string_cache.move_to_end(lookup_key, last=False)
return message_string
def _GetEventMessageFileIdentifiers(self, message_files):
"""Retrieves event message file identifiers.
Args:
message_files (list[str]): Windows EventLog message files.
Returns:
list[str]: message file identifiers.
"""
message_file_identifiers = []
for windows_path in message_files or []:
path, filename = path_helper.PathHelper.GetWindowsSystemPath(
windows_path, self._environment_variables)
lookup_path = '\\'.join([path, filename]).lower()
message_file_identifier = self._windows_eventlog_message_files.get(
lookup_path, None)
if message_file_identifier:
message_file_identifier = message_file_identifier.CopyToString()
message_file_identifiers.append(message_file_identifier)
mui_filename = f'{filename:s}.mui'
lookup_path = '\\'.join([path, self._language_tag, mui_filename]).lower()
message_file_identifier = self._windows_eventlog_message_files.get(
lookup_path, None)
if message_file_identifier:
message_file_identifier = message_file_identifier.CopyToString()
message_file_identifiers.append(message_file_identifier)
return message_file_identifiers
def _GetMappedMessageIdentifier(
self, storage_reader, provider_identifier, message_identifier,
event_version):
"""Retrieves a WEVT_TEMPLATE mapped message identifier if available.
Args:
storage_reader (StorageReader): storage reader.
provider_identifier (str): EventLog provider identifier.
message_identifier (int): message identifier.
event_version (int): event version or None if not set.
Returns:
int: message identifier.
"""
# Map the event identifier to a message identifier as defined by the
# WEVT_TEMPLATE event definition.
if provider_identifier and storage_reader.HasAttributeContainers(
'windows_wevt_template_event'):
# TODO: add message_file_identifiers to filter_expression
filter_expression = (
f'provider_identifier == "{provider_identifier:s}" and '
f'identifier == {message_identifier:d}')
if event_version is not None:
filter_expression = (
f'{filter_expression:s} and version == {event_version:d}')
for event_definition in storage_reader.GetAttributeContainers(
'windows_wevt_template_event', filter_expression=filter_expression):
logger.debug((
f'Message: 0x{message_identifier:08x} of provider: '
f'{provider_identifier:s} maps to: '
f'0x{event_definition.message_identifier:08x}'))
return event_definition.message_identifier
return message_identifier
def _GetMessageStrings(
self, storage_reader, message_file_identifiers, message_identifier):
"""Retrieves message strings.
Args:
storage_reader (StorageReader): storage reader.
message_file_identifiers (list[str]): message file identifiers.
message_identifier (int): message identifier.
Returns:
list[str]: message strings.
"""
message_strings = []
# TODO: add message_file_identifiers to filter_expression
filter_expression = (
f'language_identifier == {self._lcid:d} and '
f'message_identifier == {message_identifier:d}')
for message_string in storage_reader.GetAttributeContainers(
'windows_eventlog_message_string', filter_expression=filter_expression):
identifier = message_string.GetMessageFileIdentifier()
identifier = identifier.CopyToString()
if identifier in message_file_identifiers:
message_strings.append(message_string)
return message_strings
def _GetMessageStringsWithMessageTable(
self, storage_reader, message_file_identifiers, message_identifier):
"""Retrieves message strings.
Args:
storage_reader (StorageReader): storage reader.
message_file_identifiers (list[str]): message file identifiers.
message_identifier (int): message identifier.
Returns:
list[str]: message strings.
"""
message_strings = []
for message_file_identifier in message_file_identifiers:
filter_expression = (
f'_message_file_identifier == "{message_file_identifier:s}" and '
f'language_identifier == {self._lcid:d}')
for message_table in storage_reader.GetAttributeContainers(
'winevtrc_message_table', filter_expression=filter_expression):
if not message_table:
continue
identifier = message_table.GetIdentifier()
message_table_identifier = identifier.CopyToString()
filter_expression = (
f'_message_table_identifier == "{message_table_identifier:s}" and '
f'message_identifier == {message_identifier:d}')
identifier = message_table.GetMessageFileIdentifier()
message_file_identifier = identifier.CopyToString()
for message_string in storage_reader.GetAttributeContainers(
'winevtrc_message_string', filter_expression=filter_expression):
if message_file_identifier in message_file_identifiers:
message_strings.append(message_string)
return message_strings
def _GetWindowsEventLogProvider(self, provider_identifier, log_source):
"""Retrieves a Windows EventLog provider.
Args:
provider_identifier (str): EventLog provider identifier.
log_source (str): EventLog source, such as "Application Error".
Returns:
tuple[WindowsEventLogProviderArtifact, str]: Windows EventLog provider
or None if not available, and provider lookup key.
"""
provider = None
if provider_identifier:
lookup_key = provider_identifier.lower()
provider = self._windows_eventlog_providers.get(lookup_key)
if not provider:
lookup_key = log_source.lower()
provider = self._windows_eventlog_providers.get(lookup_key)
return provider, lookup_key
def _GetWinevtRcDatabaseReader(self):
"""Opens the Windows EventLog resource database reader.
Returns:
WinevtResourcesSqlite3DatabaseReader: Windows EventLog resource
database reader or None.
"""
if not self._winevt_database_reader and self._data_location:
logger.warning((
f'Falling back to {self._WINEVT_RC_DATABASE:s}. Please make sure '
f'the Windows EventLog message strings in the database correspond '
f'to those in the EventLog files.'))
database_path = os.path.join(
self._data_location, self._WINEVT_RC_DATABASE)
if not os.path.isfile(database_path):
return None
try:
self._winevt_database_reader = WinevtResourcesSqlite3DatabaseReader()
result = self._winevt_database_reader.Open(database_path)
except sqlite3.OperationalError:
result = False
if not result:
try:
self._winevt_database_reader = (
WinevtResourcesAttributeContainerStore())
self._winevt_database_reader.Open(path=database_path, read_only=True) # pylint: disable=no-value-for-parameter,unexpected-keyword-arg
result = True
except OSError:
result = False
if not result:
self._winevt_database_reader = None
return self._winevt_database_reader
def _GetWinevtRcDatabaseMessageString(
self, provider_identifier, log_source, message_identifier, event_version):
"""Retrieves a specific Windows EventLog resource database message string.
Args:
provider_identifier (str): EventLog provider identifier.
log_source (str): EventLog source, such as "Application Error".
message_identifier (int): message identifier.
event_version (int): event version or None if not set.
Returns:
str: message string or None if not available.
"""
database_reader = self._GetWinevtRcDatabaseReader()
if not database_reader:
return None
if isinstance(database_reader, WinevtResourcesSqlite3DatabaseReader):
return database_reader.GetMessage(
log_source, self._lcid, message_identifier)
if self._windows_eventlog_providers is None:
self._ReadWindowsEventLogProviders(
database_reader, container_type='winevtrc_eventlog_provider')
if self._windows_eventlog_message_files is None:
self._ReadWindowsEventLogMessageFiles(
database_reader, container_type='winevtrc_message_file',
path_attribute='windows_path')
provider, provider_lookup_key = self._GetWindowsEventLogProvider(
provider_identifier, log_source)
if not provider:
return None
original_message_identifier = message_identifier
# TODO: pass message_file_identifiers
message_identifier = self._GetMappedMessageIdentifier(
database_reader, provider_identifier, message_identifier, event_version)
message_file_identifiers = self._GetEventMessageFileIdentifiers(
provider.event_message_files)
if not message_file_identifiers:
logger.warning((
f'No event message file for identifier: 0x{message_identifier:08x} '
f'(original: 0x{original_message_identifier:08x}) '
f'of provider: {provider_lookup_key:s}'))
return None
message_strings = self._GetMessageStringsWithMessageTable(
database_reader, message_file_identifiers, message_identifier)
if not message_strings:
logger.warning((
f'No message string for identifier: 0x{message_identifier:08x} '
f'(original: 0x{original_message_identifier:08x}) '
f'of provider: {provider_lookup_key:s}'))
return None
message_string = message_strings[0].text
if database_reader.string_format == 'wrc':
message_string = self._resouce_file_helper.FormatMessageStringInPEP3101(
message_string)
return message_string
def _ReadEnvironmentVariables(self, storage_reader):
"""Reads the environment variables.
Args:
storage_reader (StorageReader): storage reader.
"""
# TODO: get environment variables related to the source.
self._environment_variables = list(storage_reader.GetAttributeContainers(
'environment_variable'))
def _ReadEventMessageString(
self, storage_reader, provider_identifier, log_source,
message_identifier, event_version):
"""Reads an event message string.
Args:
storage_reader (StorageReader): storage reader.
provider_identifier (str): EventLog provider identifier.
log_source (str): EventLog source, such as "Application Error".
message_identifier (int): message identifier.
event_version (int): event version or None if not set.
Returns:
str: message string or None if not available.
"""
if self._environment_variables is None:
self._ReadEnvironmentVariables(storage_reader)
if self._windows_eventlog_providers is None:
self._ReadWindowsEventLogProviders(storage_reader)
if self._windows_eventlog_message_files is None:
self._ReadWindowsEventLogMessageFiles(storage_reader)
provider, provider_lookup_key = self._GetWindowsEventLogProvider(
provider_identifier, log_source)
if not provider:
return None
if not storage_reader.HasAttributeContainers(
'windows_eventlog_message_string'):
return None
original_message_identifier = message_identifier
# TODO: pass message_file_identifiers
message_identifier = self._GetMappedMessageIdentifier(
storage_reader, provider_identifier, message_identifier, event_version)
message_file_identifiers = self._GetEventMessageFileIdentifiers(
provider.event_message_files)
if not message_file_identifiers:
logger.warning((
f'No event message file for identifier: 0x{message_identifier:08x} '
f'(original: 0x{original_message_identifier:08x}) '
f'of provider: {provider_lookup_key:s}'))
return None
message_strings = self._GetMessageStrings(
storage_reader, message_file_identifiers, message_identifier)
if not message_strings:
logger.warning((
f'No message string for identifier: 0x{message_identifier:08x} '
f'(original: 0x{original_message_identifier:08x}) '
f'of provider: {provider_lookup_key:s}'))
return None
return message_strings[0].string
def _ReadParameterMessageString(
self, storage_reader, provider_identifier, log_source,
message_identifier):
"""Reads a parameter message string.
Args:
storage_reader (StorageReader): storage reader.
provider_identifier (str): EventLog provider identifier.
log_source (str): EventLog source, such as "Application Error".
message_identifier (int): message identifier.
Returns:
str: parameter string or None if not available.
"""
if self._environment_variables is None:
self._ReadEnvironmentVariables(storage_reader)
if self._windows_eventlog_providers is None:
self._ReadWindowsEventLogProviders(storage_reader)
if self._windows_eventlog_message_files is None:
self._ReadWindowsEventLogMessageFiles(storage_reader)
provider, provider_lookup_key = self._GetWindowsEventLogProvider(
provider_identifier, log_source)
if not provider:
return None
if not storage_reader.HasAttributeContainers(
'windows_eventlog_message_string'):
return None
message_files = provider.parameter_message_files
if not message_files:
# If no parameter message files are defined fallback to the event
# message files and default parameter message files.
message_files = list(provider.event_message_files)
message_files.extend(self._DEFAULT_PARAMETER_MESSAGE_FILES)
message_file_identifiers = self._GetEventMessageFileIdentifiers(
message_files)
if not message_file_identifiers:
logger.warning((
f'No parameter message file for identifier: '
f'0x{message_identifier:08x} of provider: {provider_lookup_key:s}'))
return None
message_strings = self._GetMessageStrings(
storage_reader, message_file_identifiers, message_identifier)
if not message_strings:
logger.warning((
f'No parameter string for identifier: 0x{message_identifier:08x} '
f'of provider: {provider_lookup_key:s}'))
return None
return message_strings[0].string
def _ReadWindowsEventLogMessageFiles(
self, attribute_store, container_type='windows_eventlog_message_file',
path_attribute='path'):
"""Reads the Windows EventLog message files.
Args:
attribute_store (AttributeContainerStore): attribute container store.
container_type (Optional[str]): attribute container type.
path_attribute (Optional[str]): name of the attribute containing the path.
"""
# TODO: get windows eventlog message files related to the source.
self._windows_eventlog_message_files = {}
if attribute_store.HasAttributeContainers(container_type):
for message_file in attribute_store.GetAttributeContainers(
container_type):
message_file_path = getattr(message_file, path_attribute, None)
path, filename = path_helper.PathHelper.GetWindowsSystemPath(
message_file_path, self._environment_variables)
lookup_path = '\\'.join([path, filename]).lower()
message_file_identifier = message_file.GetIdentifier()
self._windows_eventlog_message_files[lookup_path] = (
message_file_identifier)
def _ReadWindowsEventLogProviders(
self, attribute_store, container_type='windows_eventlog_provider'):
"""Reads Windows EventLog provider attribute containers.
Args:
attribute_store (AttributeContainerStore): attribute container store.
container_type (Optional[str]): attribute container type.
"""
self._windows_eventlog_providers = {}
if attribute_store.HasAttributeContainers(container_type):
for provider in attribute_store.GetAttributeContainers(container_type):
if provider.identifier:
self._windows_eventlog_providers[provider.identifier] = provider
for log_source in provider.log_sources:
log_source = log_source.lower()
self._windows_eventlog_providers[log_source] = provider
[docs]
def GetMessageString(
self, provider_identifier, log_source, message_identifier, event_version):
"""Retrieves a specific Windows EventLog message string.
Args:
provider_identifier (str): EventLog provider identifier.
log_source (str): EventLog source, such as "Application Error".
message_identifier (int): message identifier.
event_version (int): event version or None if not set.
Returns:
str: message string or None if not available.
"""
message_string = self._GetCachedMessageString(
provider_identifier, log_source, message_identifier, event_version)
if not message_string:
if self._storage_reader:
message_string = self._ReadEventMessageString(
self._storage_reader, provider_identifier, log_source,
message_identifier, event_version)
else:
message_string = self._GetWinevtRcDatabaseMessageString(
provider_identifier, log_source, message_identifier, event_version)
if message_string:
self._CacheMessageString(
provider_identifier, log_source, message_identifier, event_version,
message_string)
return message_string
[docs]
def GetParameterString(
self, provider_identifier, log_source, message_identifier):
"""Retrieves a specific Windows EventLog parameter string.
Args:
provider_identifier (str): EventLog provider identifier.
log_source (str): EventLog source, such as "Application Error".
message_identifier (int): parameter identifier.
Returns:
str: parameter string or None if not available.
"""
message_string = self._GetCachedMessageString(
provider_identifier, log_source, message_identifier, None)
if not message_string:
if self._storage_reader:
message_string = self._ReadParameterMessageString(
self._storage_reader, provider_identifier, log_source,
message_identifier)
if message_string:
self._CacheMessageString(
provider_identifier, log_source, message_identifier,
None, message_string)
return message_string