Source code for plaso.parsers.jsonl_plugins.gcp_log

"""JSON-L parser plugin for Google Cloud (GCP) log files."""

import re

from dfdatetime import time_elements as dfdatetime_time_elements

from plaso.containers import events
from plaso.parsers import jsonl_parser
from plaso.parsers.jsonl_plugins import interface


[docs] class GCPLogEventData(events.EventData): """Google Cloud (GCP) log event data. Attributes: caller_ip (str): IP address of the client that requested the operation. container (str): TODO dcsa_emails (list[str]): default compute service account attached to a Google Compute Engine (GCE) instance. dcsa_scopes (list[str]): OAuth scopes granted to the default compute service account. delegation_chain (str): service account delegation chain. event_subtype (str): JSON event sub type or protocol buffer method. event_type (str): TODO filename (str): TODO firewall_rules (list[str]): firewall rules. firewall_source_ranges (list[str]): firewall source ranges. gcloud_command_identity (str): unique gcloud command identity. gcloud_command_partial (str): partial gcloud command. log_name (str): name of the log entry. message (str): TODO method_name (str): operation performed. permissions (list[str]): IAM permission used for the operation. policy_deltas (list[str]): TODO principal_email (str): email address of the requester. principal_subject (str): subject name of the requester. recorded_time (dfdatetime.DateTimeValues): date and time the log entry was recorded. request_account_identifier (str): GCP account identifier of the request. request_address (str): IP address assigned to a Google Cloud Engine (GCE) instance. request_description (str): description of the request. request_direction (str): direction of the request. request_email (str): email address of the request. request_member (str): member of the request. request_metadata (list[str]): request metadata values. request_name (str): name of the request. request_target_tags (str): TODO resource_labels (list[str]): resource labels. resource_name (str): name of the resource. service_account_delegation (list[str]): service accounts delegation in the authentication. service_account_display_name (str): display name of the service account. service_account_key_name (str): service account key name used in authentication. service_name (str): name of the service. severity (str): log entry severity. source_images (list[str]): source images of disks attached to a compute engine instance. status_code (str): operation success or failure code. status_message (str): operation success or failure message. status_reasons (list[str]): reasons for operation failure. text_payload (str): text payload for logs not using a JSON or proto payload. user_agent (str): user agent used in the request. """ DATA_TYPE = "gcp:log:entry"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.caller_ip = None self.container = None self.dcsa_emails = None self.dcsa_scopes = None self.delegation_chain = None self.event_subtype = None self.event_type = None self.filename = None self.firewall_rules = None self.firewall_source_ranges = None self.gcloud_command_identity = None self.gcloud_command_partial = None self.log_name = None self.method_name = None self.message = None self.permissions = None self.policy_deltas = None self.principal_email = None self.principal_subject = None self.recorded_time = None self.request_account_identifier = None self.request_address = None self.request_description = None self.request_direction = None self.request_email = None self.request_member = None self.request_metadata = None self.request_name = None self.request_target_tags = None self.resource_labels = None self.resource_name = None self.service_account_delegation = None self.service_account_display_name = None self.service_account_key_name = None self.service_name = None self.severity = None self.source_images = None self.status_code = None self.status_message = None self.status_reasons = None self.text_payload = None self.user_agent = None
[docs] class GCPLogJSONLPlugin(interface.JSONLPlugin): """JSON-L parser plugin for Google Cloud (GCP) log files.""" NAME = "gcp_log" DATA_FORMAT = "Google Cloud (GCP) log" _USER_AGENT_COMMAND_RE = re.compile(r"command/([^\s]+)") _USER_AGENT_INVOCATION_ID_RE = re.compile(r"invocation-id/([^\s]+)") def _ParseJSONPayload(self, json_dict, event_data): """Extracts information from a jsonPayload value. Args: json_dict (dict): JSON dictionary of the log record. event_data (GCPLogEventData): event data. """ json_payload = self._GetJSONValue(json_dict, "jsonPayload") if not json_payload: return event_data.container = self._GetJSONValue(json_payload, "container") event_data.event_subtype = self._GetJSONValue(json_payload, "event_subtype") event_data.event_type = self._GetJSONValue(json_payload, "event_type") event_data.filename = self._GetJSONValue(json_payload, "filename") event_data.message = self._GetJSONValue(json_payload, "message") actor_json = self._GetJSONValue(json_payload, "actor") if actor_json: event_data.user = self._GetJSONValue(actor_json, "user") def _ParseAuthenticationInfo(self, proto_payload, event_data): """Extracts information from `protoPayload.authenticationInfo`. Args: proto_payload (dict): JSON dictionary of the `protoPayload` value. event_data (GCPLogEventData): event data. """ authentication_info = self._GetJSONValue(proto_payload, "authenticationInfo") if not authentication_info: return principal_email = self._GetJSONValue(authentication_info, "principalEmail") if principal_email: event_data.principal_email = principal_email principal_subject = self._GetJSONValue(authentication_info, "principalSubject") if principal_subject: event_data.principal_subject = principal_subject service_account_key_name = self._GetJSONValue( authentication_info, "serviceAccountKeyName" ) if service_account_key_name: event_data.service_account_key_name = service_account_key_name delegations = [] delegation_info_list = self._GetJSONValue( authentication_info, "serviceAccountDelegationInfo", [] ) for delegation_info in delegation_info_list: first_party_principal = self._GetJSONValue( delegation_info, "firstPartyPrincipal", {} ) first_party_principal_email = self._GetJSONValue( first_party_principal, "principalEmail" ) if first_party_principal_email: delegations.append(first_party_principal_email) else: first_party_principal_subject = self._GetJSONValue( first_party_principal, "principalSubject" ) if first_party_principal_subject: delegations.append(first_party_principal_subject) if delegations: event_data.service_account_delegation = delegations event_data.delegation_chain = "->".join(delegations) def _ParseAuthorizationInfo(self, proto_payload, event_data): """Extracts information from `protoPayload.authorizationInfo`. Args: proto_payload (dict): JSON dictionary of the `protoPayload` value. event_data (GCPLogEventData): event data. """ permissions = [] authorization_info_list = self._GetJSONValue( proto_payload, "authorizationInfo", [] ) for authorization_info in authorization_info_list: permission = self._GetJSONValue(authorization_info, "permission") if permission: permissions.append(permission) if permissions: event_data.permissions = permissions def _ParseRequestMetadata(self, proto_payload, event_data): """Extracts information from `protoPayload.requestMetadata`. Args: proto_payload (dict): JSON dictionary of the `protoPayload` value. event_data (GCPLogEventData): event data. """ request_metadata = self._GetJSONValue(proto_payload, "requestMetadata") if not request_metadata: return event_data.caller_ip = self._GetJSONValue(request_metadata, "callerIp") event_data.user_agent = self._GetJSONValue( request_metadata, "callerSuppliedUserAgent" ) if event_data.user_agent: if "command/" in event_data.user_agent: matches = self._USER_AGENT_COMMAND_RE.search(event_data.user_agent) if matches: command_string = matches.group(1).replace(".", " ") event_data.gcloud_command_partial = command_string if "invocation-id" in event_data.user_agent: matches = self._USER_AGENT_INVOCATION_ID_RE.search( event_data.user_agent ) if matches: event_data.gcloud_command_identity = matches.group(1) def _ParseProtoPayloadStatus(self, proto_payload, event_data): """Extracts information from `protoPayload.status`. Args: proto_payload (dict): JSON dictionary of the `protoPayload` value. event_data (GCPLogEventData): event data. """ status = self._GetJSONValue(proto_payload, "status") if status: # Non empty `protoPayload.status` field could have empty # `protoPayload.status.code` field. # # Empty `code` and `message` fields indicate the operation was successful. event_data.status_code = str(self._GetJSONValue(status, "code", "")) event_data.status_message = self._GetJSONValue(status, "message") # `protoPayload.status.details[].reason` contains reason for an operation # failure. status_reasons = [] for status_detail in self._GetJSONValue(status, "details", []): status_reason = self._GetJSONValue(status_detail, "reason") if status_reason: status_reasons.append(status_reason) if status_reasons: event_data.status_reasons = status_reasons def _ParseComputeInsertRequest(self, request, event_data): """Extracts compute.instances.insert information. Args: request (dict): JSON dictionary of the `protoPayload.request` field. event_data (GCPLogEventData): event data. """ # source_images hold Google Cloud source disk path used in creating a GCE # instance. source_images = [] for disk in self._GetJSONValue(request, "disks", []): initialize_params = self._GetJSONValue(disk, "initializeParams", {}) source_image = self._GetJSONValue(initialize_params, "sourceImage") if source_image: source_images.append(source_image) if source_images: event_data.source_images = source_images # Default compute service account aka dcsa dcsa_emails = [] dcsa_scopes = [] service_account_list = self._GetJSONValue(request, "serviceAccounts", []) for service_account in service_account_list: email = self._GetJSONValue(service_account, "email") if email: dcsa_emails.append(email) scopes = self._GetJSONValue(service_account, "scopes") if scopes: dcsa_scopes.extend(scopes) if dcsa_emails: event_data.dcsa_emails = dcsa_emails if dcsa_scopes: event_data.dcsa_scopes = dcsa_scopes def _ParseComputeProtoPayload(self, proto_payload, event_data): """Extracts compute.googleapis.com information. Args: proto_payload (dict): JSON dictionary of the `protoPayload` value. event_data (GCPLogEventData): event data. """ request = self._GetJSONValue(proto_payload, "request") if not request: return request_type = self._GetJSONValue(request, "@type") if not request_type: return if request_type == "type.googleapis.com/compute.instances.insert": self._ParseComputeInsertRequest(request, event_data) def _ParseProtoPayload(self, json_dict, event_data): """Extracts information from a protoPayload value. Args: json_dict (dict): JSON dictionary of the log record. event_data (GCPLogEventData): event data. """ proto_payload = self._GetJSONValue(json_dict, "protoPayload") if not proto_payload: return event_data.service_name = self._GetJSONValue(proto_payload, "serviceName") event_data.resource_name = self._GetJSONValue(proto_payload, "resourceName") method_name = self._GetJSONValue(proto_payload, "methodName") if method_name and not event_data.event_subtype: event_data.event_subtype = method_name event_data.method_name = method_name self._ParseAuthenticationInfo(proto_payload, event_data) self._ParseAuthorizationInfo(proto_payload, event_data) self._ParseRequestMetadata(proto_payload, event_data) self._ParseProtoPayloadStatus(proto_payload, event_data) self._ParseProtoPayloadRequest(proto_payload, event_data) self._ParseProtoPayloadServiceData(proto_payload, event_data) if event_data.service_name == "compute.googleapis.com": self._ParseComputeProtoPayload(proto_payload, event_data) def _ParseProtoPayloadRequest(self, proto_payload, event_data): """Extracts information from the request field of a protoPayload field. Args: proto_payload (dict): JSON dictionary of the `protoPayload` value. event_data (GCPLogEventData): event data. """ request = self._GetJSONValue(proto_payload, "request") if not request: return event_data.request_account_identifier = self._GetJSONValue( request, "account_id" ) event_data.request_address = self._GetJSONValue(request, "address") event_data.request_description = self._GetJSONValue(request, "description") event_data.request_direction = self._GetJSONValue(request, "direction") event_data.request_email = self._GetJSONValue(request, "email") event_data.request_member = self._GetJSONValue(request, "member") event_data.request_name = self._GetJSONValue(request, "name") event_data.request_target_tags = self._GetJSONValue(request, "targetTags") # Firewall specific attributes. event_data.firewall_source_ranges = self._GetJSONValue(request, "sourceRanges") firewall_rules = [] alloweds = self._GetJSONValue(request, "alloweds", default_value=[]) for allowed in alloweds: ip_protocol = self._GetJSONValue(allowed, "IPProtocol") ports = self._GetJSONValue(allowed, "ports", default_value="all") firewall_rules.append(f"ALLOW: {ip_protocol:s} {ports!s}") denieds = self._GetJSONValue(request, "denieds", default_value=[]) for denied in denieds: ip_protocol = self._GetJSONValue(denied, "IPProtocol") ports = self._GetJSONValue(denied, "ports", default_value="all") firewall_rules.append(f"DENY: {ip_protocol:s} {ports!s}") event_data.firewall_rules = firewall_rules or None # Service account specific attributes service_account = self._GetJSONValue(request, "service_account") if service_account: event_data.service_account_display_name = self._GetJSONValue( service_account, "display_name" ) def _ParseProtoPayloadServiceData(self, proto_payload, event_data): """Extracts information from the serviceData in the protoPayload value. Args: proto_payload (dict): JSON dictionary of the `protoPayload` value. event_data (GCPLogEventData): event data. """ service_data = self._GetJSONValue(proto_payload, "serviceData") if not service_data: return policy_delta = self._GetJSONValue(service_data, "policyDelta") if not policy_delta: return policy_deltas = [] binding_deltas = self._GetJSONValue( policy_delta, "bindingDeltas", default_value=[] ) for binding_delta_value in binding_deltas: action = self._GetJSONValue(binding_delta_value, "action") or "N/A" member = self._GetJSONValue(binding_delta_value, "member") or "N/A" role = self._GetJSONValue(binding_delta_value, "role") or "N/A" policy_deltas.append(f"{action:s} {member:s} with role {role:s}") event_data.policy_deltas = policy_deltas or None def _ParseRecord(self, parser_mediator, json_dict): """Parses a Google Cloud (GCP) log record. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. json_dict (dict): JSON dictionary of the log record. """ resource = self._GetJSONValue(json_dict, "resource", default_value={}) labels = self._GetJSONValue(resource, "labels", default_value={}) resource_labels = [f"{name:s}: {value!s}" for name, value in labels.items()] event_data = GCPLogEventData() event_data.log_name = self._GetJSONValue(json_dict, "logName") event_data.recorded_time = self._ParseISO8601DateTimeString( parser_mediator, json_dict, "timestamp" ) event_data.resource_labels = resource_labels or None event_data.severity = self._GetJSONValue(json_dict, "severity") event_data.text_payload = self._GetJSONValue(json_dict, "textPayload") self._ParseJSONPayload(json_dict, event_data) self._ParseProtoPayload(json_dict, event_data) parser_mediator.ProduceEventData(event_data)
[docs] def CheckRequiredFormat(self, json_dict): """Check if the log record has the minimal structure required by the plugin. Args: json_dict (dict): JSON dictionary of the log record. Returns: bool: True if this is the correct parser, False otherwise. """ log_name = json_dict.get("logName") or None timestamp = json_dict.get("timestamp") or None if None in (log_name, timestamp): return False date_time = dfdatetime_time_elements.TimeElementsInMicroseconds() try: date_time.CopyFromStringISO8601(timestamp) except ValueError: return False return True
jsonl_parser.JSONLParser.RegisterPlugin(GCPLogJSONLPlugin)