Source code for plaso.engine.extractors

"""Extractor classes, used to extract information from sources."""

import copy

import pysigscan

from dfvfs.helpers import file_system_searcher
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.lib import errors as dfvfs_errors
from dfvfs.resolver import resolver as path_spec_resolver

from plaso.engine import logger
from plaso.lib import definitions
from plaso.lib import errors
from plaso.parsers import interface as parsers_interface
from plaso.parsers import manager as parsers_manager


[docs] class EventDataExtractor: """The event data extractor.""" _PARSE_RESULT_FAILURE = 1 _PARSE_RESULT_SUCCESS = 2 _PARSE_RESULT_UNSUPPORTED = 3
[docs] def __init__(self, force_parser=False, parser_filter_expression=None): """Initializes an event extractor. Args: force_parser (Optional[bool]): True if a specified parser should be forced to be used to extract events. parser_filter_expression (Optional[str]): parser filter expression, where None represents all parsers and plugins. A parser filter expression is a comma separated value string that denotes which parsers and plugins should be used. See filters/parser_filter.py for details of the expression syntax. """ super().__init__() self._filestat_parser = None self._force_parser = force_parser self._format_scanner = None self._formats_with_signatures = None self._mft_parser = None self._non_sigscan_parser_names = None self._parsers = None self._usnjrnl_parser = None self._InitializeParserObjects(parser_filter_expression=parser_filter_expression)
def _CheckParserCanProcessFileEntry(self, parser, file_entry): """Determines if a parser can process a file entry. Args: file_entry (dfvfs.FileEntry): file entry. parser (BaseParser): parser. Returns: bool: True if the file entry can be processed by the parser object. """ for filter_object in parser.FILTERS: if filter_object.Match(file_entry): return True return False def _GetSignatureMatchParserNames(self, file_object): """Determines if a file-like object matches one of the known signatures. Args: file_object (file): file-like object whose contents will be checked for known signatures. Returns: list[str]: parser names for which the contents of the file-like object matches their known signatures. """ parser_names = [] scan_state = pysigscan.scan_state() self._format_scanner.scan_file_object(scan_state, file_object) for scan_result in iter(scan_state.scan_results): format_specification = ( self._formats_with_signatures.GetSpecificationBySignature( scan_result.identifier ) ) if format_specification.identifier not in parser_names: parser_names.append(format_specification.identifier) return parser_names def _InitializeParserObjects(self, parser_filter_expression=None): """Initializes the parser objects. Args: parser_filter_expression (Optional[str]): parser filter expression, where None represents all parsers and plugins. A parser filter expression is a comma separated value string that denotes which parsers and plugins should be used. See filters/parser_filter.py for details of the expression syntax. """ self._formats_with_signatures, non_sigscan_parser_names = ( parsers_manager.ParsersManager.GetFormatsWithSignatures( parser_filter_expression=parser_filter_expression ) ) self._non_sigscan_parser_names = set() for parser_name in non_sigscan_parser_names: if parser_name not in ("filestat", "usnjrnl"): self._non_sigscan_parser_names.add(parser_name) self._format_scanner = parsers_manager.ParsersManager.CreateSignatureScanner( self._formats_with_signatures ) self._parsers = parsers_manager.ParsersManager.GetParserObjects( parser_filter_expression=parser_filter_expression ) active_parser_names = ", ".join(sorted(self._parsers.keys())) logger.debug(f"Active parsers: {active_parser_names:s}") self._filestat_parser = self._parsers.get("filestat") if "filestat" in self._parsers: del self._parsers["filestat"] self._mft_parser = self._parsers.get("mft") self._usnjrnl_parser = self._parsers.get("usnjrnl") if "usnjrnl" in self._parsers: del self._parsers["usnjrnl"] def _ParseDataStreamWithParser( self, parser_mediator, parser, file_entry, data_stream_name ): """Parses a data stream of a file entry with a specific parser. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. parser (BaseParser): parser. file_entry (dfvfs.FileEntry): file entry. data_stream_name (str): data stream name. Raises: RuntimeError: if the file-like object is missing. """ file_object = file_entry.GetFileObject(data_stream_name=data_stream_name) if not file_object: raise RuntimeError("Unable to retrieve file-like object from file entry.") self._ParseFileEntryWithParser( parser_mediator, parser, file_entry, file_object=file_object ) def _ParseFileEntryWithParser( self, parser_mediator, parser, file_entry, file_object=None ): """Parses a file entry with a specific parser. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. parser (BaseParser): parser. file_entry (dfvfs.FileEntry): file entry. file_object (Optional[file]): file-like object to parse. If not set the parser will use the parser mediator to open the file entry's default data stream as a file-like object. Returns: int: parse result which is _PARSE_RESULT_FAILURE if the file entry could not be parsed, _PARSE_RESULT_SUCCESS if the file entry successfully was parsed or _PARSE_RESULT_UNSUPPORTED when WrongParser was raised. Raises: TypeError: if parser object is not a supported parser type. """ if not isinstance( parser, (parsers_interface.FileEntryParser, parsers_interface.FileObjectParser), ): raise TypeError("Unsupported parser object type.") parser_mediator.ClearParserChain() try: if isinstance(parser, parsers_interface.FileEntryParser): parser.Parse(parser_mediator) elif isinstance(parser, parsers_interface.FileObjectParser): parser.Parse(parser_mediator, file_object) result = self._PARSE_RESULT_SUCCESS # We catch OSError so we can determine the parser that generated the error. except (OSError, dfvfs_errors.BackEndError) as exception: display_name = parser_mediator.GetDisplayName(file_entry=file_entry) logger.warning( f"{parser.NAME:s} unable to parse file: {display_name:s} with error: " f"{exception!s}" ) result = self._PARSE_RESULT_FAILURE except errors.WrongParser as exception: display_name = parser_mediator.GetDisplayName(file_entry=file_entry) logger.debug( f"{parser.NAME:s} unable to parse file: {display_name:s} with error: " f"{exception!s}" ) result = self._PARSE_RESULT_UNSUPPORTED parser_mediator.SampleMemoryUsage(parser.NAME) return result def _ParseFileEntryWithParsers( self, parser_mediator, parser_names, file_entry, file_object=None ): """Parses a file entry with a specific parsers. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. parser_names (set[str]): names of parsers. file_entry (dfvfs.FileEntry): file entry. file_object (Optional[file]): file-like object to parse. If not set the parser will use the parser mediator to open the file entry's default data stream as a file-like object. Returns: int: parse result which is _PARSE_RESULT_FAILURE if the file entry could not be parsed, _PARSE_RESULT_SUCCESS if the file entry successfully was parsed or _PARSE_RESULT_UNSUPPORTED when WrongParser was raised or no names of parser were provided. Raises: RuntimeError: if the parser object is missing. """ parse_results = self._PARSE_RESULT_UNSUPPORTED for parser_name in parser_names: parser = self._parsers.get(parser_name) if not parser: raise RuntimeError(f"Parser object missing for parser: {parser_name:s}") if parser.FILTERS: if not self._CheckParserCanProcessFileEntry(parser, file_entry): parse_results = self._PARSE_RESULT_SUCCESS continue display_name = parser_mediator.GetDisplayName(file_entry=file_entry) logger.debug( ( f"[ParseFileEntryWithParsers] parsing file: {display_name:s} with " f"parser: {parser_name:s}" ) ) parse_result = self._ParseFileEntryWithParser( parser_mediator, parser, file_entry, file_object=file_object ) if parse_result == self._PARSE_RESULT_FAILURE: return self._PARSE_RESULT_FAILURE if parse_result == self._PARSE_RESULT_SUCCESS: parse_results = self._PARSE_RESULT_SUCCESS return parse_results
[docs] def ParseDataStream(self, parser_mediator, file_entry, data_stream_name): """Parses a data stream of a file entry with the enabled parsers. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_entry (dfvfs.FileEntry): file entry. data_stream_name (str): data stream name. Raises: RuntimeError: if the file-like object or the parser object is missing. """ file_object = file_entry.GetFileObject(data_stream_name=data_stream_name) if not file_object: raise RuntimeError("Unable to retrieve file-like object from file entry.") parser_mediator.SampleFormatCheckStartTiming("format_scanner") try: parser_names = self._GetSignatureMatchParserNames(file_object) finally: parser_mediator.SampleFormatCheckStopTiming("format_scanner") parse_with_non_sigscan_parsers = True if parser_names: parse_result = self._ParseFileEntryWithParsers( parser_mediator, parser_names, file_entry, file_object=file_object ) if parse_result in (self._PARSE_RESULT_FAILURE, self._PARSE_RESULT_SUCCESS): parse_with_non_sigscan_parsers = False if parse_with_non_sigscan_parsers: self._ParseFileEntryWithParsers( parser_mediator, self._non_sigscan_parser_names, file_entry, file_object=file_object, ) if self._force_parser and self._usnjrnl_parser: # TODO: the usnjrnl needs to be adjusted to be used on an export of # $UsnJrnl:$J self._ParseFileEntryWithParser( parser_mediator, self._usnjrnl_parser, file_entry, file_object=file_object, )
[docs] def ParseFileEntryMetadata(self, parser_mediator, file_entry): """Parses the file entry metadata such as file system data. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_entry (dfvfs.FileEntry): file entry. """ if self._filestat_parser: self._ParseFileEntryWithParser( parser_mediator, self._filestat_parser, file_entry )
[docs] def ParseMetadataFile(self, parser_mediator, file_entry, data_stream_name): """Parses a metadata file. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_entry (dfvfs.FileEntry): file entry. data_stream_name (str): data stream name. """ parent_path_spec = getattr(file_entry.path_spec, "parent", None) filename_upper = file_entry.name.upper() if ( self._mft_parser and parent_path_spec and filename_upper in ("$MFT", "$MFTMIRR") and not data_stream_name ): self._ParseDataStreamWithParser( parser_mediator, self._mft_parser, file_entry, "" ) elif ( self._usnjrnl_parser and parent_path_spec and filename_upper == "$USNJRNL" and data_stream_name == "$J" ): # To be able to ignore the sparse data ranges the UsnJrnl parser # needs to read directly from the volume. volume_file_object = path_spec_resolver.Resolver.OpenFileObject( parent_path_spec, resolver_context=parser_mediator.resolver_context ) self._ParseFileEntryWithParser( parser_mediator, self._usnjrnl_parser, file_entry, file_object=volume_file_object, )
[docs] class PathSpecExtractor: """Path specification extractor. A path specification extractor extracts path specification from a source directory, file or storage media device or image. """ _MAXIMUM_DEPTH = 255 def _ExtractPathSpecsFromDirectory(self, file_entry, depth=0): """Extracts path specification from a directory. Args: file_entry (dfvfs.FileEntry): file entry that refers to the directory. depth (Optional[int]): current depth where 0 represents the file system root. Yields: dfvfs.PathSpec: path specification of a file entry found in the directory. Raises: MaximumRecursionDepth: when the maximum recursion depth is reached. """ if depth >= self._MAXIMUM_DEPTH: raise errors.MaximumRecursionDepth("Maximum recursion depth reached.") # Need to do a breadth-first search otherwise we'll hit the Python # maximum recursion depth. sub_directories = [] for sub_file_entry in file_entry.sub_file_entries: try: if not sub_file_entry.IsAllocated() or sub_file_entry.IsLink(): continue except dfvfs_errors.BackEndError as exception: path_spec_string = self._GetPathSpecificationString( sub_file_entry.path_spec ) logger.warning( ( f"Unable to process file: {path_spec_string:s} with error: " f"{exception!s}" ) ) continue # For TSK-based file entries only, ignore the virtual /$OrphanFiles # directory. if sub_file_entry.type_indicator == dfvfs_definitions.TYPE_INDICATOR_TSK: if file_entry.IsRoot() and sub_file_entry.name == "$OrphanFiles": continue if sub_file_entry.IsDirectory(): sub_directories.append(sub_file_entry) yield from self._ExtractPathSpecsFromFile(sub_file_entry) for sub_file_entry in sub_directories: try: yield from self._ExtractPathSpecsFromDirectory( sub_file_entry, depth=depth + 1 ) except ( OSError, dfvfs_errors.AccessError, dfvfs_errors.BackEndError, dfvfs_errors.PathSpecError, ) as exception: logger.warning(f"{exception!s}") def _ExtractPathSpecsFromFile(self, file_entry): """Extracts path specification from a file. Args: file_entry (dfvfs.FileEntry): file entry that refers to the file. Yields: dfvfs.PathSpec: path specification of a file entry found in the file. """ produced_main_path_spec = False for data_stream in file_entry.data_streams: # Make a copy so we don't make the changes on a path specification # directly. Otherwise already produced path specifications can be # altered in the process. path_spec = copy.deepcopy(file_entry.path_spec) if data_stream.name: setattr(path_spec, "data_stream", data_stream.name) yield path_spec if not data_stream.name: produced_main_path_spec = True if not produced_main_path_spec: yield file_entry.path_spec def _ExtractPathSpecsFromFileSystem( self, path_spec, find_specs=None, recurse_file_system=True, resolver_context=None, ): """Extracts path specification from a file system within a specific source. Args: path_spec (dfvfs.PathSpec): path specification of the root of the file system. find_specs (Optional[list[dfvfs.FindSpec]]): find specifications used in path specification extraction. recurse_file_system (Optional[bool]): True if extraction should recurse into a file system. resolver_context (Optional[dfvfs.Context]): resolver context. Yields: dfvfs.PathSpec: path specification of a file entry found in the file system. """ file_system = None try: file_system = path_spec_resolver.Resolver.OpenFileSystem( path_spec, resolver_context=resolver_context ) except ( dfvfs_errors.AccessError, dfvfs_errors.BackEndError, dfvfs_errors.PathSpecError, ) as exception: logger.error(f"Unable to open file system with error: {exception!s}") if file_system: try: if find_specs: searcher = file_system_searcher.FileSystemSearcher( file_system, path_spec ) yield from searcher.Find(find_specs=find_specs) elif recurse_file_system: file_entry = file_system.GetFileEntryByPathSpec(path_spec) if file_entry: yield from self._ExtractPathSpecsFromDirectory(file_entry) else: yield path_spec except ( dfvfs_errors.AccessError, dfvfs_errors.BackEndError, dfvfs_errors.PathSpecError, ) as exception: logger.warning(f"{exception!s}") def _GetPathSpecificationString(self, path_spec): """Retrieves a printable string representation of the path specification. Args: path_spec (dfvfs.PathSpec): path specification. Returns: str: printable string representation of the path specification. """ return "\n".join( [ line.translate(definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE) for line in path_spec.comparable.split("\n") ] )
[docs] def ExtractPathSpecs( self, path_spec, find_specs=None, recurse_file_system=True, resolver_context=None, ): """Extracts path specification from a specific source. Args: path_spec (dfvfs.PathSpec): path specification. find_specs (Optional[list[dfvfs.FindSpec]]): find specifications used in path specification extraction. recurse_file_system (Optional[bool]): True if extraction should recurse into a file system. resolver_context (Optional[dfvfs.Context]): resolver context. Yields: dfvfs.PathSpec: path specification of a file entry found in the source. """ file_entry = None try: file_entry = path_spec_resolver.Resolver.OpenFileEntry( path_spec, resolver_context=resolver_context ) except ( dfvfs_errors.AccessError, dfvfs_errors.BackEndError, dfvfs_errors.PathSpecError, ) as exception: logger.error(f"Unable to open file entry with error: {exception!s}") if not file_entry: path_spec_string = self._GetPathSpecificationString(path_spec) logger.warning(f"Unable to open: {path_spec_string:s}") elif ( not file_entry.IsDirectory() and not file_entry.IsFile() and not file_entry.IsDevice() ): path_spec_string = self._GetPathSpecificationString(path_spec) logger.warning( ( f"Source path specification not a device, file or directory.\n" f"{path_spec_string:s}" ) ) elif file_entry.IsFile(): yield path_spec else: yield from self._ExtractPathSpecsFromFileSystem( path_spec, find_specs=find_specs, recurse_file_system=recurse_file_system, resolver_context=resolver_context, )