# -*- coding: utf-8 -*-
"""Parser for Advanced Packaging Tool (APT) History log files."""
from __future__ import unicode_literals
import pyparsing
from dfdatetime import time_elements as dfdatetime_time_elements
from plaso.containers import events
from plaso.containers import time_events
from plaso.lib import errors
from plaso.lib import definitions
from plaso.parsers import logger
from plaso.parsers import manager
from plaso.parsers import text_parser
[docs]class APTHistoryLogEventData(events.EventData):
"""APT History log event data.
Attributes:
command (str): command exectued
error (str): reported error.
packages (str): list of packages being affected.
requester (str): user requesting the activity.
"""
DATA_TYPE = 'apt:history:line'
def __init__(self):
"""Initializes event data."""
super(APTHistoryLogEventData, self).__init__(data_type=self.DATA_TYPE)
self.command = None
self.error = None
self.packages = None
self.requester = None
[docs]class APTHistoryLogParser(text_parser.PyparsingSingleLineTextParser):
"""Parses events from APT History log files."""
NAME = 'apt_history'
DESCRIPTION = 'Parser for APT History log files.'
# APT History log lines can be very long.
MAX_LINE_LENGTH = 65536
_ENCODING = 'utf-8'
_HYPHEN = text_parser.PyparsingConstants.HYPHEN
_FOUR_DIGITS = text_parser.PyparsingConstants.FOUR_DIGITS
_TWO_DIGITS = text_parser.PyparsingConstants.TWO_DIGITS
_APTHISTORY_DATE_TIME = pyparsing.Group(
_FOUR_DIGITS + _HYPHEN +
_TWO_DIGITS + _HYPHEN +
_TWO_DIGITS +
_TWO_DIGITS + pyparsing.Suppress(':') +
_TWO_DIGITS + pyparsing.Suppress(':') +
_TWO_DIGITS
)
_RECORD_START = (
# APT History logs may start with empty lines
pyparsing.ZeroOrMore(pyparsing.lineEnd()) +
pyparsing.Literal('Start-Date:') +
_APTHISTORY_DATE_TIME.setResultsName('start_date') +
pyparsing.lineEnd())
_RECORD_BODY = (
pyparsing.MatchFirst([
pyparsing.Literal('Commandline:'),
pyparsing.Literal('Downgrade:'),
pyparsing.Literal('Error:'),
pyparsing.Literal('Install:'),
pyparsing.Literal('Purge:'),
pyparsing.Literal('Remove:'),
pyparsing.Literal('Requested-By:'),
pyparsing.Literal('Upgrade:')]) +
pyparsing.restOfLine())
_RECORD_END = (
pyparsing.Literal('End-Date:') +
_APTHISTORY_DATE_TIME.setResultsName('end_date') +
pyparsing.OneOrMore(pyparsing.lineEnd()))
LINE_STRUCTURES = [
('record_start', _RECORD_START),
('record_body', _RECORD_BODY),
('record_end', _RECORD_END)]
def __init__(self):
"""Initializes an APT History parser."""
super(APTHistoryLogParser, self).__init__()
self._date_time = None
self._event_data = None
self._downgrade = None
self._install = None
self._purge = None
self._remove = None
self._upgrade = None
@staticmethod
def _BuildDateTime(time_elements_structure):
"""Builds time elements from an APT History time stamp.
Args:
time_elements_structure (pyparsing.ParseResults): structure of tokens
derived from an APT History time stamp.
Returns:
dfdatetime.TimeElements: date and time extracted from the structure or
None f the structure does not represent a valid string.
"""
try:
date_time = dfdatetime_time_elements.TimeElements(
time_elements_tuple=time_elements_structure)
# APT History logs store date and time values in local time.
date_time.is_local_time = True
return date_time
except ValueError:
return None
def _ParseRecordStart(self, parser_mediator, structure):
"""Parses the first line of a log record.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfvfs.
structure (pyparsing.ParseResults): structure of tokens derived from
a log entry.
"""
time_elements_structure = self._GetValueFromStructure(
structure, 'start_date')
self._date_time = self._BuildDateTime(time_elements_structure)
if not self._date_time:
parser_mediator.ProduceExtractionWarning(
'invalid date time value: {0!s}'.format(time_elements_structure))
return
self._event_data = APTHistoryLogEventData()
return
def _ParseRecordBody(self, structure):
"""Parses a line from the body of a log record.
Args:
structure (pyparsing.ParseResults): structure of tokens derived from
a log entry.
Raises:
ParseError: when the structure type is unknown.
"""
if not self._date_time:
raise errors.ParseError('Unable to parse, record incomplete.')
# Command data
if structure[0] == 'Commandline:':
self._event_data.command = ''.join(structure)
elif structure[0] == 'Error:':
self._event_data.error = ''.join(structure)
elif structure[0] == 'Requested-By:':
self._event_data.requester = ''.join(structure)
# Package lists
elif structure[0] == 'Downgrade:':
self._downgrade = ''.join(structure)
elif structure[0] == 'Install:':
self._install = ''.join(structure)
elif structure[0] == 'Purge:':
self._purge = ''.join(structure)
elif structure[0] == 'Remove:':
self._remove = ''.join(structure)
elif structure[0] == 'Upgrade:':
self._upgrade = ''.join(structure)
def _ParseRecordEnd(self, parser_mediator):
"""Parses the last line of a log record.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfvfs.
Raises:
ParseError: when the structure type is unknown.
"""
if not self._date_time:
raise errors.ParseError('Unable to parse, record incomplete.')
# Create relevant events for record
if self._downgrade:
self._event_data.packages = self._downgrade
event = time_events.DateTimeValuesEvent(
self._date_time,
definitions.TIME_DESCRIPTION_DOWNGRADE,
time_zone=parser_mediator.timezone)
parser_mediator.ProduceEventWithEventData(event, self._event_data)
if self._install:
self._event_data.packages = self._install
event = time_events.DateTimeValuesEvent(
self._date_time,
definitions.TIME_DESCRIPTION_INSTALLATION,
time_zone=parser_mediator.timezone)
parser_mediator.ProduceEventWithEventData(event, self._event_data)
if self._purge:
self._event_data.packages = self._purge
event = time_events.DateTimeValuesEvent(
self._date_time,
definitions.TIME_DESCRIPTION_DELETED,
time_zone=parser_mediator.timezone)
parser_mediator.ProduceEventWithEventData(event, self._event_data)
if self._remove:
self._event_data.packages = self._remove
event = time_events.DateTimeValuesEvent(
self._date_time,
definitions.TIME_DESCRIPTION_DELETED,
time_zone=parser_mediator.timezone)
parser_mediator.ProduceEventWithEventData(event, self._event_data)
if self._upgrade:
self._event_data.packages = self._upgrade
event = time_events.DateTimeValuesEvent(
self._date_time,
definitions.TIME_DESCRIPTION_UPDATE,
time_zone=parser_mediator.timezone)
parser_mediator.ProduceEventWithEventData(event, self._event_data)
def _ResetState(self):
"""Resets stored values in the parser."""
self._date_time = None
self._downgrade = None
self._event_data = None
self._install = None
self._purge = None
self._remove = None
self._upgrade = None
[docs] def ParseRecord(self, parser_mediator, key, structure):
"""Parses a log record structure and produces events.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfvfs.
key (str): identifier of the structure of tokens.
structure (pyparsing.ParseResults): structure of tokens derived from
a log entry.
"""
if key == 'record_start':
self._ParseRecordStart(parser_mediator, structure)
return
if key == 'record_body':
self._ParseRecordBody(structure)
return
if key == 'record_end':
self._ParseRecordEnd(parser_mediator)
# Reset for next record.
self._ResetState()
return
raise errors.ParseError(
'Unable to parse record, unknown structure: {0:s}'.format(key))
[docs] def VerifyStructure(self, parser_mediator, line):
"""Verify that this file is an APT History log file.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfvfs.
line (str): single line from the text file.
Returns:
bool: True if this is the correct parser, False otherwise.
"""
try:
self._RECORD_START.parseString(line)
# Reset stored values for parsing a new file.
self._ResetState()
except pyparsing.ParseException as exception:
logger.debug('Not an APT History log file: {0!s}'.format(exception))
return False
return True
manager.ParsersManager.RegisterParser(APTHistoryLogParser)