Source code for plaso.parsers.cups_ipp

"""The CUPS IPP files parser.

CUPS IPP version 1.0:
* https://datatracker.ietf.org/doc/html/rfc2565
* https://datatracker.ietf.org/doc/html/rfc2566
* https://datatracker.ietf.org/doc/html/rfc2567
* https://datatracker.ietf.org/doc/html/rfc2568
* https://datatracker.ietf.org/doc/html/rfc2569
* https://datatracker.ietf.org/doc/html/rfc2639

CUPS IPP version 1.1:
* https://datatracker.ietf.org/doc/html/rfc2910
* https://datatracker.ietf.org/doc/html/rfc2911
* https://datatracker.ietf.org/doc/html/rfc3196
* https://datatracker.ietf.org/doc/html/rfc3510

CUPS IPP version 2.0:
* N/A
"""

import os

from dfdatetime import posix_time as dfdatetime_posix_time
from dfdatetime import rfc2579_date_time as dfdatetime_rfc2579_date_time

from plaso.containers import events
from plaso.lib import dtfabric_helper
from plaso.lib import errors
from plaso.parsers import interface
from plaso.parsers import logger
from plaso.parsers import manager

# TODO: RFC pending types: resolution, dateTime, rangeOfInteger.
#       "dateTime" is not used by Mac OS, instead it uses integer types.
# TODO: Only tested against CUPS IPP MacOS.


[docs] class CupsIppEventData(events.EventData): """CUPS IPP event data. Attributes: application (str): application that prints the document. computer_name (str): name of the computer. copies (int): number of copies. creation_time (dfdatetime.DateTimeValues): date and time the print job was created (added). doc_type (str): type of document. end_time (dfdatetime.DateTimeValues): date and time the print job was stopped. job_id (str): job identifier. job_name (str): job name. owner (str): real name of the user. printer_id (str): identification name of the print. start_time (dfdatetime.DateTimeValues): date and time the print job was started. uri (str): URL of the CUPS service. user (str): system user name. """ DATA_TYPE = "cups:ipp:event"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.application = None self.computer_name = None self.copies = None self.creation_time = None self.doc_type = None self.end_time = None self.job_id = None self.job_name = None self.owner = None self.printer_id = None self.start_time = None self.uri = None self.user = None
[docs] class CupsIppParser(interface.FileObjectParser, dtfabric_helper.DtFabricHelper): """Parser for CUPS IPP files.""" NAME = "cups_ipp" DATA_FORMAT = "CUPS IPP file" _DEFINITION_FILE = os.path.join(os.path.dirname(__file__), "cups_ipp.yaml") _SUPPORTED_FORMAT_VERSIONS = ("1.0", "1.1", "2.0") _DELIMITER_TAG_OPERATION_ATTRIBUTES = 0x01 _DELIMITER_TAG_JOB_ATTRIBUTES = 0x02 _DELIMITER_TAG_END_OF_ATTRIBUTES = 0x03 _DELIMITER_TAG_PRINTER_ATTRIBUTES = 0x04 _DELIMITER_TAG_UNSUPPORTED_ATTRIBUTES = 0x05 _DELIMITER_TAGS = frozenset( [ _DELIMITER_TAG_OPERATION_ATTRIBUTES, _DELIMITER_TAG_JOB_ATTRIBUTES, _DELIMITER_TAG_PRINTER_ATTRIBUTES, _DELIMITER_TAG_UNSUPPORTED_ATTRIBUTES, ] ) _TAG_VALUE_NONE = 0x13 _TAG_VALUE_INTEGER = 0x21 _TAG_VALUE_BOOLEAN = 0x22 _TAG_VALUE_ENUM = 0x23 _TAG_VALUE_DATE_TIME = 0x31 _TAG_VALUE_RESOLUTION = 0x32 _TAG_VALUE_TEXT_WITHOUT_LANGUAGE = 0x41 _TAG_VALUE_NAME_WITHOUT_LANGUAGE = 0x42 _TAG_VALUE_KEYWORD = 0x44 _TAG_VALUE_URI = 0x45 _TAG_VALUE_URI_SCHEME = 0x46 _TAG_VALUE_CHARSET = 0x47 _TAG_VALUE_NATURAL_LANGUAGE = 0x48 _TAG_VALUE_MEDIA_TYPE = 0x49 _ASCII_STRING_VALUES = frozenset( [ _TAG_VALUE_KEYWORD, _TAG_VALUE_URI, _TAG_VALUE_URI_SCHEME, _TAG_VALUE_CHARSET, _TAG_VALUE_NATURAL_LANGUAGE, _TAG_VALUE_MEDIA_TYPE, ] ) _INTEGER_TAG_VALUES = frozenset([_TAG_VALUE_INTEGER, _TAG_VALUE_ENUM]) _STRING_WITHOUT_LANGUAGE_VALUES = frozenset( [_TAG_VALUE_TEXT_WITHOUT_LANGUAGE, _TAG_VALUE_NAME_WITHOUT_LANGUAGE] ) _ATTRIBUTE_NAME_TRANSLATION = { "com.apple.print.JobInfo.PMApplicationName": "application", "com.apple.print.JobInfo.PMJobOwner": "owner", "DestinationPrinterID": "printer_id", "document-format": "doc_type", "job-name": "job_name", "job-originating-host-name": "computer_name", "job-originating-user-name": "user", "job-uuid": "job_id", "printer-uri": "uri", }
[docs] def __init__(self): """Initializes a CUPS IPP file parser.""" super().__init__() self._last_charset_attribute = "ascii"
def _GetStringValue(self, cupp_ipp_values, name, default_value=None): """Retrieves a specific string value from the data dict. Args: cupp_ipp_values (dict[str, list[str]]): CUPP IPP values per name. name (str): name of the value to retrieve. default_value (Optional[object]): default value if no CUPP IPP value is available. Returns: object: value represented as a string or default value. """ values = cupp_ipp_values.get(name) if not values: return default_value for index, value in enumerate(values): if "," in value: values[index] = f'"{value:s}"' return ", ".join(values) def _ParseAttribute(self, file_object): """Parses a CUPS IPP attribute from a file-like object. Args: file_object (dfvfs.FileIO): file-like object. Returns: tuple[str, object]: attribute name and value. Raises: ParseError: if the attribute cannot be parsed. """ file_offset = file_object.tell() attribute_map = self._GetDataTypeMap("cups_ipp_attribute") try: attribute, _ = self._ReadStructureFromFileObject( file_object, file_offset, attribute_map ) except (ValueError, errors.ParseError) as exception: raise errors.ParseError( f"Unable to parse attribute with error: {exception!s}" ) if attribute.tag_value == self._TAG_VALUE_NONE: value = None elif attribute.tag_value in self._INTEGER_TAG_VALUES: # TODO: correct file offset to point to the start of value_data. value = self._ParseIntegerValue(attribute.value_data, file_offset) elif attribute.tag_value == self._TAG_VALUE_BOOLEAN: value = self._ParseBooleanValue(attribute.value_data) elif attribute.tag_value == self._TAG_VALUE_DATE_TIME: # TODO: correct file offset to point to the start of value_data. value = self._ParseDateTimeValue(attribute.value_data, file_offset) elif attribute.tag_value in self._STRING_WITHOUT_LANGUAGE_VALUES: value = attribute.value_data.decode(self._last_charset_attribute) elif attribute.tag_value in self._ASCII_STRING_VALUES: value = attribute.value_data.decode("ascii") if attribute.tag_value == self._TAG_VALUE_CHARSET: self._last_charset_attribute = value else: value = attribute.value_data return attribute.name, value def _ParseAttributesGroup(self, file_object): """Parses a CUPS IPP attributes group from a file-like object. Args: file_object (dfvfs.FileIO): file-like object. Yields: tuple[str, object]: attribute name and value. Raises: ParseError: if the attributes group cannot be parsed. """ tag_value_map = self._GetDataTypeMap("int8") tag_value = 0 while tag_value != self._DELIMITER_TAG_END_OF_ATTRIBUTES: file_offset = file_object.tell() tag_value, _ = self._ReadStructureFromFileObject( file_object, file_offset, tag_value_map ) if tag_value >= 0x10: file_object.seek(file_offset, os.SEEK_SET) yield self._ParseAttribute(file_object) elif ( tag_value != self._DELIMITER_TAG_END_OF_ATTRIBUTES and tag_value not in self._DELIMITER_TAGS ): raise errors.ParseError( ( f"Unsupported attributes groups start tag value: " f"0x{tag_value:02x}." ) ) def _ParseBooleanValue(self, byte_stream): """Parses a boolean value. Args: byte_stream (bytes): byte stream. Returns: bool: boolean value. Raises: ParseError: when the boolean value cannot be parsed. """ if byte_stream == b"\x00": return False if byte_stream == b"\x01": return True raise errors.ParseError("Unsupported boolean value.") def _ParseDateTimeValue(self, byte_stream, file_offset): """Parses a CUPS IPP RFC2579 date-time value from a byte stream. Args: byte_stream (bytes): byte stream. file_offset (int): offset of the attribute data relative to the start of the file-like object. Returns: dfdatetime.RFC2579DateTime: RFC2579 date-time stored in the value. Raises: ParseError: when the RFC2579 date-time value cannot be parsed. """ datetime_value_map = self._GetDataTypeMap("cups_ipp_datetime_value") try: value = self._ReadStructureFromByteStream( byte_stream, file_offset, datetime_value_map ) except (ValueError, errors.ParseError) as exception: raise errors.ParseError( f"Unable to parse datetime value with error: {exception!s}" ) direction_from_utc = chr(value.direction_from_utc) rfc2579_date_time_tuple = ( value.year, value.month, value.day_of_month, value.hours, value.minutes, value.seconds, value.deciseconds, direction_from_utc, value.hours_from_utc, value.minutes_from_utc, ) return dfdatetime_rfc2579_date_time.RFC2579DateTime( rfc2579_date_time_tuple=rfc2579_date_time_tuple ) def _ParseIntegerValue(self, byte_stream, file_offset): """Parses an integer value. Args: byte_stream (bytes): byte stream. file_offset (int): offset of the attribute data relative to the start of the file-like object. Returns: int: integer value. Raises: ParseError: when the integer value cannot be parsed. """ data_type_map = self._GetDataTypeMap("int32be") try: return self._ReadStructureFromByteStream( byte_stream, file_offset, data_type_map ) except (ValueError, errors.ParseError) as exception: raise errors.ParseError( f"Unable to parse integer value with error: {exception!s}" ) def _ParseHeader(self, parser_mediator, file_object): """Parses a CUPS IPP header from a file-like object. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_object (dfvfs.FileIO): file-like object. Raises: WrongParser: when the header cannot be parsed. """ header_map = self._GetDataTypeMap("cups_ipp_header") try: header, _ = self._ReadStructureFromFileObject(file_object, 0, header_map) except (ValueError, errors.ParseError) as exception: raise errors.WrongParser( f"[{self.NAME:s}] Unable to parse header with error: {exception!s}" ) format_version = f"{header.major_version:d}.{header.minor_version:d}" if format_version not in self._SUPPORTED_FORMAT_VERSIONS: raise errors.WrongParser( f"[{self.NAME:s}] Unsupported format version {format_version:s}." ) if header.operation_identifier != 5: # TODO: generate ExtractionWarning instead of printing debug output. display_name = parser_mediator.GetDisplayName() logger.debug( f"[{self.NAME:s}] Non-standard operation identifier: " f"0x{header.operation_identifier:08x} in file header " f"of: {display_name:s}." )
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses a CUPS IPP file-like object. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_object (dfvfs.FileIO): file-like object. Raises: WrongParser: when the file cannot be parsed. """ self._last_charset_attribute = "ascii" self._ParseHeader(parser_mediator, file_object) cupp_ipp_values = {} is_first_attribute_group = True try: for name, value in self._ParseAttributesGroup(file_object): if value is None: continue name = self._ATTRIBUTE_NAME_TRANSLATION.get(name, name) cupp_ipp_values.setdefault(name, []).append(value) is_first_attribute_group = False except (ValueError, errors.ParseError) as exception: error_message = f"unable to parse attribute group with error: {exception!s}" if is_first_attribute_group: raise errors.WrongParser(error_message) parser_mediator.ProduceExtractionWarning(error_message) return event_data = CupsIppEventData() event_data.application = self._GetStringValue(cupp_ipp_values, "application") event_data.computer_name = self._GetStringValue( cupp_ipp_values, "computer_name" ) event_data.copies = cupp_ipp_values.get("copies", [0])[0] event_data.doc_type = self._GetStringValue(cupp_ipp_values, "doc_type") event_data.job_id = self._GetStringValue(cupp_ipp_values, "job_id") event_data.job_name = self._GetStringValue(cupp_ipp_values, "job_name") event_data.user = self._GetStringValue(cupp_ipp_values, "user") event_data.owner = self._GetStringValue(cupp_ipp_values, "owner") event_data.printer_id = self._GetStringValue(cupp_ipp_values, "printer_id") event_data.uri = self._GetStringValue(cupp_ipp_values, "uri") # CUPS IPP version 1.1 date and time values event_data.creation_time = cupp_ipp_values.get("date-time-at-creation", None) event_data.end_time = cupp_ipp_values.get("date-time-at-completed") event_data.start_time = cupp_ipp_values.get("date-time-at-processing") # CUPS IPP version 1.0 date and time values if not event_data.creation_time: timestamp = cupp_ipp_values.get("time-at-creation", []) if timestamp: event_data.creation_time = dfdatetime_posix_time.PosixTime( timestamp=timestamp[0] ) if not event_data.end_time: timestamp = cupp_ipp_values.get("time-at-completed", []) if timestamp: event_data.end_time = dfdatetime_posix_time.PosixTime( timestamp=timestamp[0] ) if not event_data.start_time: timestamp = cupp_ipp_values.get("time-at-processing", []) if timestamp: event_data.start_time = dfdatetime_posix_time.PosixTime( timestamp=timestamp[0] ) parser_mediator.ProduceEventData(event_data)
manager.ParsersManager.RegisterParser(CupsIppParser)