Source code for plaso.filters.file_entry

# -*- coding: utf-8 -*-
"""File entry filters."""

import abc
import collections

import pysigscan

from dfdatetime import time_elements

from plaso.filters import logger


[docs] class FileEntryFilter(object): """File entry filter interface.""" # pylint: disable=redundant-returns-doc
[docs] @abc.abstractmethod def Matches(self, file_entry): """Compares the file entry against the filter. Args: file_entry (dfvfs.FileEntry): file entry to compare. Returns: bool: True if the file entry matches the filter, False if not or None if the filter does not apply. """
[docs] @abc.abstractmethod def Print(self, output_writer): """Prints a human readable version of the filter. Args: output_writer (CLIOutputWriter): output writer. """
[docs] class DateTimeFileEntryFilter(FileEntryFilter): """Date and time-based file entry filter.""" _DATE_TIME_RANGE_TUPLE = collections.namedtuple( 'date_time_range_tuple', 'time_value start_date_time end_date_time') # Maps the time value of the date time range to a file entry attribute name. _TIME_VALUE_MAPPINGS = { 'atime': 'access_time', 'bkup': 'backup_time', 'ctime': 'change_time', 'crtime': 'creation_time', 'dtime': 'deletion_time', 'mtime': 'modification_time'} _SUPPORTED_TIME_VALUES = frozenset(_TIME_VALUE_MAPPINGS.keys())
[docs] def __init__(self): """Initializes a date and time-based file entry filter.""" super(DateTimeFileEntryFilter, self).__init__() self._date_time_ranges = []
[docs] def AddDateTimeRange( self, time_value, start_time_string=None, end_time_string=None): """Adds a date time filter range. The time strings are formatted as: YYYY-MM-DD hh:mm:ss.######[+-]##:## Where # are numeric digits ranging from 0 to 9 and the seconds fraction can be either 3 or 6 digits. The time of day, seconds fraction and timezone offset are optional. The default timezone is UTC. Args: time_value (str): time value, such as, atime, ctime, crtime, dtime, bkup and mtime. start_time_string (str): start date and time value string. end_time_string (str): end date and time value string. Raises: ValueError: If the filter is badly formed. """ if not isinstance(time_value, str): raise ValueError('Filter type must be a string.') if start_time_string is None and end_time_string is None: raise ValueError( 'Filter must have either a start or an end date time value.') time_value_lower = time_value.lower() if time_value_lower not in self._SUPPORTED_TIME_VALUES: raise ValueError('Unsupported time value: {0:s}.'.format(time_value)) start_date_time = None if start_time_string: start_date_time = time_elements.TimeElementsInMicroseconds() start_date_time.CopyFromDateTimeString(start_time_string) end_date_time = None if end_time_string: end_date_time = time_elements.TimeElementsInMicroseconds() end_date_time.CopyFromDateTimeString(end_time_string) # Make sure that the end timestamp occurs after the beginning. # If not then we need to reverse the time range. if (None not in (start_date_time, end_date_time) and start_date_time > end_date_time): raise ValueError( 'Invalid date time value start must be earlier than end.') self._date_time_ranges.append(self._DATE_TIME_RANGE_TUPLE( time_value_lower, start_date_time, end_date_time))
[docs] def Matches(self, file_entry): """Compares the file entry against the filter. Args: file_entry (dfvfs.FileEntry): file entry to compare. Returns: bool: True if the file entry matches the filter, False if not or None if the filter does not apply. """ if not self._date_time_ranges: return None for date_time_range in self._date_time_ranges: time_attribute = self._TIME_VALUE_MAPPINGS.get( date_time_range.time_value, None) if not time_attribute: continue timestamp = getattr(file_entry, time_attribute, None) if timestamp is None: continue if (date_time_range.start_date_time is not None and timestamp < date_time_range.start_date_time): return False if (date_time_range.end_date_time is not None and timestamp > date_time_range.end_date_time): return False return True
[docs] def Print(self, output_writer): """Prints a human readable version of the filter. Args: output_writer (CLIOutputWriter): output writer. """ if self._date_time_ranges: for date_time_range in self._date_time_ranges: if date_time_range.start_date_time is None: end_time_string = date_time_range.end_date_time.CopyToDateTimeString() output_writer.Write('\t{0:s} after {1:s}\n'.format( date_time_range.time_value, end_time_string)) elif date_time_range.end_date_time is None: start_time_string = ( date_time_range.start_date_time.CopyToDateTimeString()) output_writer.Write('\t{0:s} before {1:s}\n'.format( date_time_range.time_value, start_time_string)) else: start_time_string = ( date_time_range.start_date_time.CopyToDateTimeString()) end_time_string = date_time_range.end_date_time.CopyToDateTimeString() output_writer.Write('\t{0:s} between {1:s} and {2:s}\n'.format( date_time_range.time_value, start_time_string, end_time_string))
[docs] class ExtensionsFileEntryFilter(FileEntryFilter): """Extensions-based file entry filter."""
[docs] def __init__(self, extensions): """Initializes an extensions-based file entry filter. An extension is defined as "pdf" as in "document.pdf". Args: extensions (list[str]): a list of extension strings. """ super(ExtensionsFileEntryFilter, self).__init__() self._extensions = extensions
[docs] def Matches(self, file_entry): """Compares the file entry against the filter. Args: file_entry (dfvfs.FileEntry): file entry to compare. Returns: bool: True if the file entry matches the filter, False if not or None if the filter does not apply. """ location = getattr(file_entry.path_spec, 'location', None) if not location: return None if '.' not in location: return False _, _, extension = location.rpartition('.') return extension.lower() in self._extensions
[docs] def Print(self, output_writer): """Prints a human readable version of the filter. Args: output_writer (CLIOutputWriter): output writer. """ if self._extensions: output_writer.Write('\textensions: {0:s}\n'.format( ', '.join(self._extensions)))
[docs] class NamesFileEntryFilter(FileEntryFilter): """Names-based file entry filter."""
[docs] def __init__(self, names): """Initializes a names-based file entry filter. Args: names (list[str]): names. """ super(NamesFileEntryFilter, self).__init__() self._names = names
[docs] def Matches(self, file_entry): """Compares the file entry against the filter. Args: file_entry (dfvfs.FileEntry): file entry to compare. Returns: bool: True if the file entry matches the filter. """ if not self._names or not file_entry.IsFile(): return False return file_entry.name.lower() in self._names
[docs] def Print(self, output_writer): """Prints a human readable version of the filter. Args: output_writer (CLIOutputWriter): output writer. """ if self._names: output_writer.Write('\tnames: {0:s}\n'.format( ', '.join(self._names)))
[docs] class SignaturesFileEntryFilter(FileEntryFilter): """Signature-based file entry filter."""
[docs] def __init__(self, specification_store, signature_identifiers): """Initializes a signature-based file entry filter. Args: specification_store (FormatSpecificationStore): a specification store. signature_identifiers (list[str]): signature identifiers. """ super(SignaturesFileEntryFilter, self).__init__() self._file_scanner = None self._signature_identifiers = [] self._file_scanner = self._GetScanner( specification_store, signature_identifiers)
def _GetScanner(self, specification_store, signature_identifiers): """Initializes the scanner form the specification store. Args: specification_store (FormatSpecificationStore): a specification store. signature_identifiers (list[str]): signature identifiers. Returns: pysigscan.scanner: signature scanner or None. """ if not specification_store: return None scanner_object = pysigscan.scanner() for format_specification in specification_store.specifications: if format_specification.identifier not in signature_identifiers: continue for signature in format_specification.signatures: pattern_offset = signature.offset if pattern_offset is None: signature_flags = pysigscan.signature_flags.NO_OFFSET elif pattern_offset < 0: pattern_offset *= -1 signature_flags = pysigscan.signature_flags.RELATIVE_FROM_END else: signature_flags = pysigscan.signature_flags.RELATIVE_FROM_START scanner_object.add_signature( signature.identifier, pattern_offset, signature.pattern, signature_flags) self._signature_identifiers.append(format_specification.identifier) return scanner_object
[docs] def Matches(self, file_entry): """Compares the file entry against the filter. Args: file_entry (dfvfs.FileEntry): file entry to compare. Returns: bool: True if the file entry matches the filter, False if not or None if the filter does not apply. """ if not self._file_scanner or not file_entry.IsFile(): return None file_object = file_entry.GetFileObject() if not file_object: return False try: scan_state = pysigscan.scan_state() self._file_scanner.scan_file_object(scan_state, file_object) except IOError as exception: # TODO: replace location by display name. location = getattr(file_entry.path_spec, 'location', '') logger.error(( '[skipping] unable to scan file: {0:s} for signatures ' 'with error: {1!s}').format(location, exception)) return False return scan_state.number_of_scan_results > 0
[docs] def Print(self, output_writer): """Prints a human readable version of the filter. Args: output_writer (CLIOutputWriter): output writer. """ if self._file_scanner: output_writer.Write('\tsignature identifiers: {0:s}\n'.format( ', '.join(self._signature_identifiers)))
[docs] class FileEntryFilterCollection(object): """Collection of file entry filters."""
[docs] def __init__(self): """Initializes a file entry filter collection.""" super(FileEntryFilterCollection, self).__init__() self._filters = []
[docs] def AddFilter(self, file_entry_filter): """Adds a file entry filter to the collection. Args: file_entry_filter (FileEntryFilter): file entry filter. """ self._filters.append(file_entry_filter)
[docs] def HasFilters(self): """Determines if filters are defined. Returns: bool: True if filters are defined. """ return bool(self._filters)
[docs] def Matches(self, file_entry): """Compares the file entry against the filter collection. Args: file_entry (dfvfs.FileEntry): file entry to compare. Returns: bool: True if the file entry matches one of the filters. If no filters are provided or applicable the result will be True. """ if not self._filters: return True results = [] for file_entry_filter in self._filters: result = file_entry_filter.Matches(file_entry) results.append(result) return True in results or False not in results
[docs] def Print(self, output_writer): """Prints a human readable version of the filter. Args: output_writer (CLIOutputWriter): output writer. """ if self._filters: output_writer.Write('Filters:\n') for file_entry_filter in self._filters: file_entry_filter.Print(output_writer)