Source code for plaso.engine.extractors
"""Extractor classes, used to extract information from sources."""
import copy
import pysigscan
from dfvfs.helpers import file_system_searcher
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.lib import errors as dfvfs_errors
from dfvfs.resolver import resolver as path_spec_resolver
from plaso.engine import logger
from plaso.lib import definitions
from plaso.lib import errors
from plaso.parsers import interface as parsers_interface
from plaso.parsers import manager as parsers_manager
[docs]
class EventDataExtractor:
"""The event data extractor."""
_PARSE_RESULT_FAILURE = 1
_PARSE_RESULT_SUCCESS = 2
_PARSE_RESULT_UNSUPPORTED = 3
[docs]
def __init__(self, force_parser=False, parser_filter_expression=None):
"""Initializes an event extractor.
Args:
force_parser (Optional[bool]): True if a specified parser should be forced
to be used to extract events.
parser_filter_expression (Optional[str]): parser filter expression,
where None represents all parsers and plugins.
A parser filter expression is a comma separated value string that
denotes which parsers and plugins should be used. See
filters/parser_filter.py for details of the expression syntax.
"""
super().__init__()
self._filestat_parser = None
self._force_parser = force_parser
self._format_scanner = None
self._formats_with_signatures = None
self._mft_parser = None
self._non_sigscan_parser_names = None
self._parsers = None
self._usnjrnl_parser = None
self._InitializeParserObjects(parser_filter_expression=parser_filter_expression)
def _CheckParserCanProcessFileEntry(self, parser, file_entry):
"""Determines if a parser can process a file entry.
Args:
file_entry (dfvfs.FileEntry): file entry.
parser (BaseParser): parser.
Returns:
bool: True if the file entry can be processed by the parser object.
"""
for filter_object in parser.FILTERS:
if filter_object.Match(file_entry):
return True
return False
def _GetSignatureMatchParserNames(self, file_object):
"""Determines if a file-like object matches one of the known signatures.
Args:
file_object (file): file-like object whose contents will be checked
for known signatures.
Returns:
list[str]: parser names for which the contents of the file-like object
matches their known signatures.
"""
parser_names = []
scan_state = pysigscan.scan_state()
self._format_scanner.scan_file_object(scan_state, file_object)
for scan_result in iter(scan_state.scan_results):
format_specification = (
self._formats_with_signatures.GetSpecificationBySignature(
scan_result.identifier
)
)
if format_specification.identifier not in parser_names:
parser_names.append(format_specification.identifier)
return parser_names
def _InitializeParserObjects(self, parser_filter_expression=None):
"""Initializes the parser objects.
Args:
parser_filter_expression (Optional[str]): parser filter expression,
where None represents all parsers and plugins.
A parser filter expression is a comma separated value string that
denotes which parsers and plugins should be used. See
filters/parser_filter.py for details of the expression syntax.
"""
self._formats_with_signatures, non_sigscan_parser_names = (
parsers_manager.ParsersManager.GetFormatsWithSignatures(
parser_filter_expression=parser_filter_expression
)
)
self._non_sigscan_parser_names = set()
for parser_name in non_sigscan_parser_names:
if parser_name not in ("filestat", "usnjrnl"):
self._non_sigscan_parser_names.add(parser_name)
self._format_scanner = parsers_manager.ParsersManager.CreateSignatureScanner(
self._formats_with_signatures
)
self._parsers = parsers_manager.ParsersManager.GetParserObjects(
parser_filter_expression=parser_filter_expression
)
active_parser_names = ", ".join(sorted(self._parsers.keys()))
logger.debug(f"Active parsers: {active_parser_names:s}")
self._filestat_parser = self._parsers.get("filestat")
if "filestat" in self._parsers:
del self._parsers["filestat"]
self._mft_parser = self._parsers.get("mft")
self._usnjrnl_parser = self._parsers.get("usnjrnl")
if "usnjrnl" in self._parsers:
del self._parsers["usnjrnl"]
def _ParseDataStreamWithParser(
self, parser_mediator, parser, file_entry, data_stream_name
):
"""Parses a data stream of a file entry with a specific parser.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
parser (BaseParser): parser.
file_entry (dfvfs.FileEntry): file entry.
data_stream_name (str): data stream name.
Raises:
RuntimeError: if the file-like object is missing.
"""
file_object = file_entry.GetFileObject(data_stream_name=data_stream_name)
if not file_object:
raise RuntimeError("Unable to retrieve file-like object from file entry.")
self._ParseFileEntryWithParser(
parser_mediator, parser, file_entry, file_object=file_object
)
def _ParseFileEntryWithParser(
self, parser_mediator, parser, file_entry, file_object=None
):
"""Parses a file entry with a specific parser.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
parser (BaseParser): parser.
file_entry (dfvfs.FileEntry): file entry.
file_object (Optional[file]): file-like object to parse.
If not set the parser will use the parser mediator to open
the file entry's default data stream as a file-like object.
Returns:
int: parse result which is _PARSE_RESULT_FAILURE if the file entry
could not be parsed, _PARSE_RESULT_SUCCESS if the file entry
successfully was parsed or _PARSE_RESULT_UNSUPPORTED when
WrongParser was raised.
Raises:
TypeError: if parser object is not a supported parser type.
"""
if not isinstance(
parser,
(parsers_interface.FileEntryParser, parsers_interface.FileObjectParser),
):
raise TypeError("Unsupported parser object type.")
parser_mediator.ClearParserChain()
try:
if isinstance(parser, parsers_interface.FileEntryParser):
parser.Parse(parser_mediator)
elif isinstance(parser, parsers_interface.FileObjectParser):
parser.Parse(parser_mediator, file_object)
result = self._PARSE_RESULT_SUCCESS
# We catch OSError so we can determine the parser that generated the error.
except (OSError, dfvfs_errors.BackEndError) as exception:
display_name = parser_mediator.GetDisplayName(file_entry=file_entry)
logger.warning(
f"{parser.NAME:s} unable to parse file: {display_name:s} with error: "
f"{exception!s}"
)
result = self._PARSE_RESULT_FAILURE
except errors.WrongParser as exception:
display_name = parser_mediator.GetDisplayName(file_entry=file_entry)
logger.debug(
f"{parser.NAME:s} unable to parse file: {display_name:s} with error: "
f"{exception!s}"
)
result = self._PARSE_RESULT_UNSUPPORTED
parser_mediator.SampleMemoryUsage(parser.NAME)
return result
def _ParseFileEntryWithParsers(
self, parser_mediator, parser_names, file_entry, file_object=None
):
"""Parses a file entry with a specific parsers.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
parser_names (set[str]): names of parsers.
file_entry (dfvfs.FileEntry): file entry.
file_object (Optional[file]): file-like object to parse.
If not set the parser will use the parser mediator to open
the file entry's default data stream as a file-like object.
Returns:
int: parse result which is _PARSE_RESULT_FAILURE if the file entry
could not be parsed, _PARSE_RESULT_SUCCESS if the file entry
successfully was parsed or _PARSE_RESULT_UNSUPPORTED when
WrongParser was raised or no names of parser were provided.
Raises:
RuntimeError: if the parser object is missing.
"""
parse_results = self._PARSE_RESULT_UNSUPPORTED
for parser_name in parser_names:
parser = self._parsers.get(parser_name)
if not parser:
raise RuntimeError(f"Parser object missing for parser: {parser_name:s}")
if parser.FILTERS:
if not self._CheckParserCanProcessFileEntry(parser, file_entry):
parse_results = self._PARSE_RESULT_SUCCESS
continue
display_name = parser_mediator.GetDisplayName(file_entry=file_entry)
logger.debug(
(
f"[ParseFileEntryWithParsers] parsing file: {display_name:s} with "
f"parser: {parser_name:s}"
)
)
parse_result = self._ParseFileEntryWithParser(
parser_mediator, parser, file_entry, file_object=file_object
)
if parse_result == self._PARSE_RESULT_FAILURE:
return self._PARSE_RESULT_FAILURE
if parse_result == self._PARSE_RESULT_SUCCESS:
parse_results = self._PARSE_RESULT_SUCCESS
return parse_results
[docs]
def ParseDataStream(self, parser_mediator, file_entry, data_stream_name):
"""Parses a data stream of a file entry with the enabled parsers.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
file_entry (dfvfs.FileEntry): file entry.
data_stream_name (str): data stream name.
Raises:
RuntimeError: if the file-like object or the parser object is missing.
"""
file_object = file_entry.GetFileObject(data_stream_name=data_stream_name)
if not file_object:
raise RuntimeError("Unable to retrieve file-like object from file entry.")
parser_mediator.SampleFormatCheckStartTiming("format_scanner")
try:
parser_names = self._GetSignatureMatchParserNames(file_object)
finally:
parser_mediator.SampleFormatCheckStopTiming("format_scanner")
parse_with_non_sigscan_parsers = True
if parser_names:
parse_result = self._ParseFileEntryWithParsers(
parser_mediator, parser_names, file_entry, file_object=file_object
)
if parse_result in (self._PARSE_RESULT_FAILURE, self._PARSE_RESULT_SUCCESS):
parse_with_non_sigscan_parsers = False
if parse_with_non_sigscan_parsers:
self._ParseFileEntryWithParsers(
parser_mediator,
self._non_sigscan_parser_names,
file_entry,
file_object=file_object,
)
if self._force_parser and self._usnjrnl_parser:
# TODO: the usnjrnl needs to be adjusted to be used on an export of
# $UsnJrnl:$J
self._ParseFileEntryWithParser(
parser_mediator,
self._usnjrnl_parser,
file_entry,
file_object=file_object,
)
[docs]
def ParseFileEntryMetadata(self, parser_mediator, file_entry):
"""Parses the file entry metadata such as file system data.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
file_entry (dfvfs.FileEntry): file entry.
"""
if self._filestat_parser:
self._ParseFileEntryWithParser(
parser_mediator, self._filestat_parser, file_entry
)
[docs]
def ParseMetadataFile(self, parser_mediator, file_entry, data_stream_name):
"""Parses a metadata file.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
file_entry (dfvfs.FileEntry): file entry.
data_stream_name (str): data stream name.
"""
parent_path_spec = getattr(file_entry.path_spec, "parent", None)
filename_upper = file_entry.name.upper()
if (
self._mft_parser
and parent_path_spec
and filename_upper in ("$MFT", "$MFTMIRR")
and not data_stream_name
):
self._ParseDataStreamWithParser(
parser_mediator, self._mft_parser, file_entry, ""
)
elif (
self._usnjrnl_parser
and parent_path_spec
and filename_upper == "$USNJRNL"
and data_stream_name == "$J"
):
# To be able to ignore the sparse data ranges the UsnJrnl parser
# needs to read directly from the volume.
volume_file_object = path_spec_resolver.Resolver.OpenFileObject(
parent_path_spec, resolver_context=parser_mediator.resolver_context
)
self._ParseFileEntryWithParser(
parser_mediator,
self._usnjrnl_parser,
file_entry,
file_object=volume_file_object,
)
[docs]
class PathSpecExtractor:
"""Path specification extractor.
A path specification extractor extracts path specification from a source directory,
file or storage media device or image.
"""
_MAXIMUM_DEPTH = 255
def _ExtractPathSpecsFromDirectory(self, file_entry, depth=0):
"""Extracts path specification from a directory.
Args:
file_entry (dfvfs.FileEntry): file entry that refers to the directory.
depth (Optional[int]): current depth where 0 represents the file system
root.
Yields:
dfvfs.PathSpec: path specification of a file entry found in the directory.
Raises:
MaximumRecursionDepth: when the maximum recursion depth is reached.
"""
if depth >= self._MAXIMUM_DEPTH:
raise errors.MaximumRecursionDepth("Maximum recursion depth reached.")
# Need to do a breadth-first search otherwise we'll hit the Python
# maximum recursion depth.
sub_directories = []
for sub_file_entry in file_entry.sub_file_entries:
try:
if not sub_file_entry.IsAllocated() or sub_file_entry.IsLink():
continue
except dfvfs_errors.BackEndError as exception:
path_spec_string = self._GetPathSpecificationString(
sub_file_entry.path_spec
)
logger.warning(
(
f"Unable to process file: {path_spec_string:s} with error: "
f"{exception!s}"
)
)
continue
# For TSK-based file entries only, ignore the virtual /$OrphanFiles
# directory.
if sub_file_entry.type_indicator == dfvfs_definitions.TYPE_INDICATOR_TSK:
if file_entry.IsRoot() and sub_file_entry.name == "$OrphanFiles":
continue
if sub_file_entry.IsDirectory():
sub_directories.append(sub_file_entry)
yield from self._ExtractPathSpecsFromFile(sub_file_entry)
for sub_file_entry in sub_directories:
try:
yield from self._ExtractPathSpecsFromDirectory(
sub_file_entry, depth=depth + 1
)
except (
OSError,
dfvfs_errors.AccessError,
dfvfs_errors.BackEndError,
dfvfs_errors.PathSpecError,
) as exception:
logger.warning(f"{exception!s}")
def _ExtractPathSpecsFromFile(self, file_entry):
"""Extracts path specification from a file.
Args:
file_entry (dfvfs.FileEntry): file entry that refers to the file.
Yields:
dfvfs.PathSpec: path specification of a file entry found in the file.
"""
produced_main_path_spec = False
for data_stream in file_entry.data_streams:
# Make a copy so we don't make the changes on a path specification
# directly. Otherwise already produced path specifications can be
# altered in the process.
path_spec = copy.deepcopy(file_entry.path_spec)
if data_stream.name:
setattr(path_spec, "data_stream", data_stream.name)
yield path_spec
if not data_stream.name:
produced_main_path_spec = True
if not produced_main_path_spec:
yield file_entry.path_spec
def _ExtractPathSpecsFromFileSystem(
self,
path_spec,
find_specs=None,
recurse_file_system=True,
resolver_context=None,
):
"""Extracts path specification from a file system within a specific source.
Args:
path_spec (dfvfs.PathSpec): path specification of the root of
the file system.
find_specs (Optional[list[dfvfs.FindSpec]]): find specifications
used in path specification extraction.
recurse_file_system (Optional[bool]): True if extraction should
recurse into a file system.
resolver_context (Optional[dfvfs.Context]): resolver context.
Yields:
dfvfs.PathSpec: path specification of a file entry found in
the file system.
"""
file_system = None
try:
file_system = path_spec_resolver.Resolver.OpenFileSystem(
path_spec, resolver_context=resolver_context
)
except (
dfvfs_errors.AccessError,
dfvfs_errors.BackEndError,
dfvfs_errors.PathSpecError,
) as exception:
logger.error(f"Unable to open file system with error: {exception!s}")
if file_system:
try:
if find_specs:
searcher = file_system_searcher.FileSystemSearcher(
file_system, path_spec
)
yield from searcher.Find(find_specs=find_specs)
elif recurse_file_system:
file_entry = file_system.GetFileEntryByPathSpec(path_spec)
if file_entry:
yield from self._ExtractPathSpecsFromDirectory(file_entry)
else:
yield path_spec
except (
dfvfs_errors.AccessError,
dfvfs_errors.BackEndError,
dfvfs_errors.PathSpecError,
) as exception:
logger.warning(f"{exception!s}")
def _GetPathSpecificationString(self, path_spec):
"""Retrieves a printable string representation of the path specification.
Args:
path_spec (dfvfs.PathSpec): path specification.
Returns:
str: printable string representation of the path specification.
"""
return "\n".join(
[
line.translate(definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE)
for line in path_spec.comparable.split("\n")
]
)
[docs]
def ExtractPathSpecs(
self,
path_spec,
find_specs=None,
recurse_file_system=True,
resolver_context=None,
):
"""Extracts path specification from a specific source.
Args:
path_spec (dfvfs.PathSpec): path specification.
find_specs (Optional[list[dfvfs.FindSpec]]): find specifications
used in path specification extraction.
recurse_file_system (Optional[bool]): True if extraction should
recurse into a file system.
resolver_context (Optional[dfvfs.Context]): resolver context.
Yields:
dfvfs.PathSpec: path specification of a file entry found in the source.
"""
file_entry = None
try:
file_entry = path_spec_resolver.Resolver.OpenFileEntry(
path_spec, resolver_context=resolver_context
)
except (
dfvfs_errors.AccessError,
dfvfs_errors.BackEndError,
dfvfs_errors.PathSpecError,
) as exception:
logger.error(f"Unable to open file entry with error: {exception!s}")
if not file_entry:
path_spec_string = self._GetPathSpecificationString(path_spec)
logger.warning(f"Unable to open: {path_spec_string:s}")
elif (
not file_entry.IsDirectory()
and not file_entry.IsFile()
and not file_entry.IsDevice()
):
path_spec_string = self._GetPathSpecificationString(path_spec)
logger.warning(
(
f"Source path specification not a device, file or directory.\n"
f"{path_spec_string:s}"
)
)
elif file_entry.IsFile():
yield path_spec
else:
yield from self._ExtractPathSpecsFromFileSystem(
path_spec,
find_specs=find_specs,
recurse_file_system=recurse_file_system,
resolver_context=resolver_context,
)