Source code for plaso.parsers.jsonl_plugins.aws_cloudtrail_log

"""JSON-L parser plugin for AWS CloudTrail log files."""

import json

from json import decoder as json_decoder

from dfdatetime import time_elements as dfdatetime_time_elements

from plaso.containers import events
from plaso.parsers import jsonl_parser
from plaso.parsers.jsonl_plugins import interface


[docs] class AWSCloudTrailEventData(events.EventData): """AWS CloudTrail log event data. Attributes: access_key (str): access key identifier. account_identifier (str): AWS account identifier. cloud_trail_event (str): CloudTrail event. event_name (str): event name. event_source (str): AWS service. recorded_time (dfdatetime.DateTimeValues): date and time the log entry was recorded. resources (str): resources. source_ip (str): source IP address. user_identity_arn (str): AWS ARN of the user. user_name (str): name of the AWS user. """ DATA_TYPE = "aws:cloudtrail:entry"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.access_key = None self.account_identifier = None self.cloud_trail_event = None self.event_name = None self.event_source = None self.recorded_time = None self.resources = None self.source_ip = None self.user_identity_arn = None self.user_name = None
[docs] class AWSCloudTrailLogJSONLPlugin(interface.JSONLPlugin): """JSON-L parser plugin for AWS CloudTrail log files.""" NAME = "aws_cloudtrail_log" DATA_FORMAT = "AWS CloudTrail Log" def _ParseRecord(self, parser_mediator, json_dict): """Parses an AWS CloudTrail log record. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. json_dict (dict): JSON dictionary of the log record. """ date_time = None event_time = self._GetJSONValue(json_dict, "EventTime") if event_time: try: date_time = dfdatetime_time_elements.TimeElementsInMicroseconds() date_time.CopyFromDateTimeString(event_time) except ValueError as exception: parser_mediator.ProduceExtractionWarning( f"Unable to parse EventTime value: {event_time:s} with error: " f"{exception!s}" ) date_time = None resource_list = self._GetJSONValue(json_dict, "Resources", default_value=[]) # Flatten multiple resources into a string of resource names. resources = ", ".join([resource["ResourceName"] for resource in resource_list]) cloud_trail_event = self._GetJSONValue( json_dict, "CloudTrailEvent", default_value="{}" ) try: cloud_trail_event_json = json.loads(cloud_trail_event) except json_decoder.JSONDecodeError as exception: parser_mediator.ProduceExtractionWarning( f"Unable to decode CloudTrail event with error: {exception!s}" ) cloud_trail_event_json = {} user_identity_json = self._GetJSONValue(cloud_trail_event_json, "userIdentity") event_data = AWSCloudTrailEventData() event_data.access_key = self._GetJSONValue(json_dict, "AccessKeyId") event_data.account_identifier = self._GetJSONValue( cloud_trail_event_json, "accountId" ) event_data.cloud_trail_event = cloud_trail_event event_data.event_name = self._GetJSONValue(json_dict, "EventName") event_data.event_source = self._GetJSONValue(json_dict, "EventSource") event_data.recorded_time = date_time event_data.resources = resources or None event_data.source_ip = self._GetJSONValue( cloud_trail_event_json, "sourceIPAddress" ) event_data.user_identity_arn = self._GetJSONValue(user_identity_json, "arn") event_data.user_name = self._GetJSONValue(json_dict, "Username") parser_mediator.ProduceEventData(event_data)
[docs] def CheckRequiredFormat(self, json_dict): """Check if the log record has the minimal structure required by the plugin. Args: json_dict (dict): JSON dictionary of the log record. Returns: bool: True if this is the correct parser, False otherwise. """ cloud_trail_event = self._GetJSONValue(json_dict, "CloudTrailEvent") event_time = self._GetJSONValue(json_dict, "EventTime") if None in (cloud_trail_event, event_time): return False date_time = dfdatetime_time_elements.TimeElementsInMicroseconds() try: date_time.CopyFromDateTimeString(event_time) except ValueError: return False return True
jsonl_parser.JSONLParser.RegisterPlugin(AWSCloudTrailLogJSONLPlugin)