Source code for plaso.engine.artifacts_trie

"""Trie structure for storing artifact paths."""

import fnmatch
import glob
import os

from plaso.engine import logger
from plaso.engine import path_helper


[docs] class TrieNode: """Represents a node in the Trie. Attributes: artifacts_names (list[str]): Names of artifacts associated with this node. children (dict[str, TrieNode]): Child nodes, keyed by path segment. is_root (bool): True if this is the root node. path_separator (str): Path separator used in the Trie, default is '/'. """
[docs] def __init__(self, path_separator="/", is_root=False): """Initializes a trie node object. Args: path_separator (Optional[str]): the path separator used in paths stored in the Trie, default is '/'. is_root (Optional[bool]): True if this node is the root node. """ super().__init__() self.artifacts_names = [] self.children = {} self.is_root = is_root self.path_separator = path_separator
[docs] class ArtifactsTrie: """Trie structure for storing artifact paths. Attributes: artifacts_paths (dict[str, list[str]]): Artifact paths for glob expansion, keyed by artifact name. root (TrieNode): Root node of the Trie. """
[docs] def __init__(self): """Initializes an artifact trie object.""" super().__init__() self.artifacts_paths = {} self.root = TrieNode(is_root=True)
[docs] def AddPath(self, artifact_name, path, path_separator): """Adds a path from an artifact definition to the Trie. Args: artifact_name (str): name of the artifact. path (str): path from the artifact definition. path_separator (str): path separator. """ logger.debug(f'Adding path: "{path:s}" to artifact: "{artifact_name:s}"') path_list = self.artifacts_paths.setdefault(artifact_name, []) path_list.append(path) current_node = self.root # Add a path separator node if this is a new separator. if path_separator not in current_node.children: current_node.children[path_separator] = TrieNode( path_separator=path_separator ) current_node = current_node.children[path_separator] # Handle the case when the input path is equal to the path separator. if path == path_separator: current_node.artifacts_names.append(artifact_name) return path_segments = path.strip(path_separator).split(path_separator) for path_segment in path_segments: # Store the path_separator for each node. if not hasattr(current_node, "path_separator"): current_node.path_separator = path_separator if path_segment not in current_node.children: current_node.children[path_segment] = TrieNode(path_separator) current_node = current_node.children[path_segment] current_node.artifacts_names.append(artifact_name)
[docs] def GetMatchingArtifacts(self, path, path_separator): """Retrieves the artifact names that match the given path. Args: path (str): path to match against the Trie. path_separator (str): path separator. Returns: list[str]: artifact names that match the path. """ # Start at the root's child that matches the path_separator. if path_separator not in self.root.children: return [] sub_root_node = self.root.children[path_separator] # Handle the case when the input path is equal to the path_separator. if path == path_separator: matching_artifacts = set() if sub_root_node.artifacts_names: matching_artifacts.update(sub_root_node.artifacts_names) return list(matching_artifacts) path_segments = path.strip(path_separator).split(path_separator) matching_artifacts = set() # Update self.artifacts_paths before starting the search. self.artifacts_paths = self._GetArtifactsPaths(sub_root_node) self._SearchTrie( sub_root_node, "", path_segments, path_separator, matching_artifacts ) return list(matching_artifacts)
def _SearchTrie( self, node, current_path, segments, path_separator, matching_artifacts ): """Searches the trie for paths matching the given path segments. Args: node (TrieNode): current trie node being traversed. current_path (str): path represented by the current node. segments (list[str]): remaining path segments to match. path_separator (str): path separator. matching_artifacts (set[str]): Set to store matching artifact names. """ if node.artifacts_names: for artifact_name in node.artifacts_names: for artifact_path in self.artifacts_paths.get(artifact_name, []): # Note that the sanitized path is used here to ensure consistency # with the exported path. # TODO: consider moving path sanitation out of engine. if self._ComparePathIfSanitized( current_path, path_separator, artifact_path, node.path_separator ): matching_artifacts.add(artifact_name) elif glob.has_magic(artifact_path): if self._MatchesGlobPattern( artifact_path, current_path, node.path_separator ): matching_artifacts.add(artifact_name) if not segments: return segment = segments[0] remaining_segments = segments[1:] # Handle glob characters in the current segment. for child_segment, child_node in node.children.items(): # Compare the sanitized version of the path segment stored in # the tree to the path segment from to the tool output as it # sanitizes path segments before writing data to disk. sanitized_child_segment = path_helper.PathHelper.SanitizePathSegments( [child_segment] ).pop() # If the child is an exact match, continue traversal. if segment in (child_segment, sanitized_child_segment): custom_path = self._CustomPathJoin( path_separator, current_path, child_segment ) self._SearchTrie( child_node, custom_path, remaining_segments, path_separator, matching_artifacts, ) # If the child is a glob, see if it matches. elif glob.has_magic(child_segment): if self._MatchesGlobPattern( child_segment, segment, child_node.path_separator ): custom_path = self._CustomPathJoin( path_separator, current_path, segment ) self._SearchTrie( child_node, custom_path, remaining_segments, path_separator, matching_artifacts, ) self._SearchTrie( node, custom_path, remaining_segments, path_separator, matching_artifacts, ) def _ComparePathIfSanitized( self, current_path, path_separator, artifact_path, artifact_path_seperator ): """Compares a current path with an artifact path, handling sanitization. This method checks if the current_path matches the artifact_path, considering that the artifact_path might have been sanitized. Args: current_path (str): The current path being checked. path_separator (str): Path separator for the current path. artifact_path (str): The artifact path to compare against. artifact_path_seperator (str): Path separator for the artifact path. Returns: bool: True if the current path matches the artifact path or its sanitized version, False otherwise. """ artifact_path_segments = self._GetNonEmptyPathSegments( artifact_path, artifact_path_seperator ) sanitized_path_segments = path_helper.PathHelper.SanitizePathSegments( artifact_path_segments ) return self._GetNonEmptyPathSegments(current_path, path_separator) in [ artifact_path_segments, sanitized_path_segments, ] def _GetNonEmptyPathSegments(self, path, separator): """Splits a path into segments and remove non-empty segments. Args: path (str): The path string to be split. separator (str): The path separator. Returns: list[str]: A list of non-empty path segments. """ return [s for s in path.split(separator) if s] def _GetArtifactsPaths(self, node): """Retrieves a mapping of artifact names to their paths. Args: node (TrieNode): current trie node being traversed. Returns: dict: dictionary mapping artifact names to their paths. """ artifacts_paths = {} self._collect_paths(node, "", artifacts_paths) return artifacts_paths def _collect_paths(self, node, current_path, artifacts): """Collects paths from the trie. Args: node (TrieNode): current node. current_path (str): path leading to this node. artifacts (dict): dictionary to store artifact paths. """ if node.artifacts_names: for artifact_name in node.artifacts_names: path_list = artifacts.setdefault(artifact_name, []) path_list.append(current_path) for segment, child_node in node.children.items(): # Ensure the path_separator attribute exists. if not hasattr(child_node, "path_separator"): child_node.path_separator = node.path_separator # Construct the next path segment. if current_path == child_node.path_separator: # Means it is the root folder, i.e. `/` next_path = current_path + segment else: next_path = current_path + child_node.path_separator + segment self._collect_paths(child_node, next_path, artifacts) def _CustomPathJoin(self, separator, current_path, new_segment): """Joins path components using a custom separator, replacing os.sep. Args: separator (str): The custom separator to use. current_path (str): The current path. new_segment (str): The new segment to add to it. Returns: str: The joined path with the custom separator. """ current_path = current_path.replace(separator, os.sep) joined_path = os.path.join(current_path, new_segment) return joined_path.replace(os.sep, separator) def _MatchesGlobPattern(self, glob_pattern, path, path_separator): """Checks if a path matches a given glob pattern. Args: glob_pattern (str): The glob pattern to match against. path (str): The path to check. path_separator (str): The path separator used in the glob pattern. Returns: True if the path matches the glob pattern, False otherwise. """ # Normalize paths using the appropriate separators glob_pattern = glob_pattern.strip(path_separator).split(path_separator) path = path.strip(path_separator).split(path_separator) glob_index = 0 path_index = 0 while glob_index < len(glob_pattern) and path_index < len(path): if glob_pattern[glob_index] == "**": # If ** is the last part, it matches everything remaining if glob_index == len(glob_pattern) - 1: return True # Move to the next part after ** glob_index += 1 # Keep advancing in the path until the next part matches while path_index < len(path) and not fnmatch.fnmatch( path[path_index], glob_pattern[glob_index] ): path_index += 1 elif not fnmatch.fnmatch(path[path_index], glob_pattern[glob_index]): # Mismatch return False else: glob_index += 1 path_index += 1 return glob_index == len(glob_pattern) and path_index == len(path)