"""JSON-L parser plugin for Google Cloud (GCP) log files."""
import re
from dfdatetime import time_elements as dfdatetime_time_elements
from plaso.containers import events
from plaso.parsers import jsonl_parser
from plaso.parsers.jsonl_plugins import interface
[docs]
class GCPLogEventData(events.EventData):
"""Google Cloud (GCP) log event data.
Attributes:
caller_ip (str): IP address of the client that requested the operation.
container (str): TODO
dcsa_emails (list[str]): default compute service account attached to a
Google Compute Engine (GCE) instance.
dcsa_scopes (list[str]): OAuth scopes granted to the default compute service
account.
delegation_chain (str): service account delegation chain.
event_subtype (str): JSON event sub type or protocol buffer method.
event_type (str): TODO
filename (str): TODO
firewall_rules (list[str]): firewall rules.
firewall_source_ranges (list[str]): firewall source ranges.
gcloud_command_identity (str): unique gcloud command identity.
gcloud_command_partial (str): partial gcloud command.
log_name (str): name of the log entry.
message (str): TODO
method_name (str): operation performed.
permissions (list[str]): IAM permission used for the operation.
policy_deltas (list[str]): TODO
principal_email (str): email address of the requester.
principal_subject (str): subject name of the requester.
recorded_time (dfdatetime.DateTimeValues): date and time the log entry
was recorded.
request_account_identifier (str): GCP account identifier of the request.
request_address (str): IP address assigned to a Google Cloud Engine (GCE)
instance.
request_description (str): description of the request.
request_direction (str): direction of the request.
request_email (str): email address of the request.
request_member (str): member of the request.
request_metadata (list[str]): request metadata values.
request_name (str): name of the request.
request_target_tags (str): TODO
resource_labels (list[str]): resource labels.
resource_name (str): name of the resource.
service_account_delegation (list[str]): service accounts delegation in the
authentication.
service_account_display_name (str): display name of the service account.
service_account_key_name (str): service account key name used in
authentication.
service_name (str): name of the service.
severity (str): log entry severity.
source_images (list[str]): source images of disks attached to a compute
engine instance.
status_code (str): operation success or failure code.
status_message (str): operation success or failure message.
status_reasons (list[str]): reasons for operation failure.
text_payload (str): text payload for logs not using a JSON or proto payload.
user_agent (str): user agent used in the request.
"""
DATA_TYPE = "gcp:log:entry"
[docs]
def __init__(self):
"""Initializes event data."""
super().__init__(data_type=self.DATA_TYPE)
self.caller_ip = None
self.container = None
self.dcsa_emails = None
self.dcsa_scopes = None
self.delegation_chain = None
self.event_subtype = None
self.event_type = None
self.filename = None
self.firewall_rules = None
self.firewall_source_ranges = None
self.gcloud_command_identity = None
self.gcloud_command_partial = None
self.log_name = None
self.method_name = None
self.message = None
self.permissions = None
self.policy_deltas = None
self.principal_email = None
self.principal_subject = None
self.recorded_time = None
self.request_account_identifier = None
self.request_address = None
self.request_description = None
self.request_direction = None
self.request_email = None
self.request_member = None
self.request_metadata = None
self.request_name = None
self.request_target_tags = None
self.resource_labels = None
self.resource_name = None
self.service_account_delegation = None
self.service_account_display_name = None
self.service_account_key_name = None
self.service_name = None
self.severity = None
self.source_images = None
self.status_code = None
self.status_message = None
self.status_reasons = None
self.text_payload = None
self.user_agent = None
[docs]
class GCPLogJSONLPlugin(interface.JSONLPlugin):
"""JSON-L parser plugin for Google Cloud (GCP) log files."""
NAME = "gcp_log"
DATA_FORMAT = "Google Cloud (GCP) log"
_USER_AGENT_COMMAND_RE = re.compile(r"command/([^\s]+)")
_USER_AGENT_INVOCATION_ID_RE = re.compile(r"invocation-id/([^\s]+)")
def _ParseJSONPayload(self, json_dict, event_data):
"""Extracts information from a jsonPayload value.
Args:
json_dict (dict): JSON dictionary of the log record.
event_data (GCPLogEventData): event data.
"""
json_payload = self._GetJSONValue(json_dict, "jsonPayload")
if not json_payload:
return
event_data.container = self._GetJSONValue(json_payload, "container")
event_data.event_subtype = self._GetJSONValue(json_payload, "event_subtype")
event_data.event_type = self._GetJSONValue(json_payload, "event_type")
event_data.filename = self._GetJSONValue(json_payload, "filename")
event_data.message = self._GetJSONValue(json_payload, "message")
actor_json = self._GetJSONValue(json_payload, "actor")
if actor_json:
event_data.user = self._GetJSONValue(actor_json, "user")
def _ParseAuthenticationInfo(self, proto_payload, event_data):
"""Extracts information from `protoPayload.authenticationInfo`.
Args:
proto_payload (dict): JSON dictionary of the `protoPayload` value.
event_data (GCPLogEventData): event data.
"""
authentication_info = self._GetJSONValue(proto_payload, "authenticationInfo")
if not authentication_info:
return
principal_email = self._GetJSONValue(authentication_info, "principalEmail")
if principal_email:
event_data.principal_email = principal_email
principal_subject = self._GetJSONValue(authentication_info, "principalSubject")
if principal_subject:
event_data.principal_subject = principal_subject
service_account_key_name = self._GetJSONValue(
authentication_info, "serviceAccountKeyName"
)
if service_account_key_name:
event_data.service_account_key_name = service_account_key_name
delegations = []
delegation_info_list = self._GetJSONValue(
authentication_info, "serviceAccountDelegationInfo", []
)
for delegation_info in delegation_info_list:
first_party_principal = self._GetJSONValue(
delegation_info, "firstPartyPrincipal", {}
)
first_party_principal_email = self._GetJSONValue(
first_party_principal, "principalEmail"
)
if first_party_principal_email:
delegations.append(first_party_principal_email)
else:
first_party_principal_subject = self._GetJSONValue(
first_party_principal, "principalSubject"
)
if first_party_principal_subject:
delegations.append(first_party_principal_subject)
if delegations:
event_data.service_account_delegation = delegations
event_data.delegation_chain = "->".join(delegations)
def _ParseAuthorizationInfo(self, proto_payload, event_data):
"""Extracts information from `protoPayload.authorizationInfo`.
Args:
proto_payload (dict): JSON dictionary of the `protoPayload` value.
event_data (GCPLogEventData): event data.
"""
permissions = []
authorization_info_list = self._GetJSONValue(
proto_payload, "authorizationInfo", []
)
for authorization_info in authorization_info_list:
permission = self._GetJSONValue(authorization_info, "permission")
if permission:
permissions.append(permission)
if permissions:
event_data.permissions = permissions
def _ParseRequestMetadata(self, proto_payload, event_data):
"""Extracts information from `protoPayload.requestMetadata`.
Args:
proto_payload (dict): JSON dictionary of the `protoPayload` value.
event_data (GCPLogEventData): event data.
"""
request_metadata = self._GetJSONValue(proto_payload, "requestMetadata")
if not request_metadata:
return
event_data.caller_ip = self._GetJSONValue(request_metadata, "callerIp")
event_data.user_agent = self._GetJSONValue(
request_metadata, "callerSuppliedUserAgent"
)
if event_data.user_agent:
if "command/" in event_data.user_agent:
matches = self._USER_AGENT_COMMAND_RE.search(event_data.user_agent)
if matches:
command_string = matches.group(1).replace(".", " ")
event_data.gcloud_command_partial = command_string
if "invocation-id" in event_data.user_agent:
matches = self._USER_AGENT_INVOCATION_ID_RE.search(
event_data.user_agent
)
if matches:
event_data.gcloud_command_identity = matches.group(1)
def _ParseProtoPayloadStatus(self, proto_payload, event_data):
"""Extracts information from `protoPayload.status`.
Args:
proto_payload (dict): JSON dictionary of the `protoPayload` value.
event_data (GCPLogEventData): event data.
"""
status = self._GetJSONValue(proto_payload, "status")
if status:
# Non empty `protoPayload.status` field could have empty
# `protoPayload.status.code` field.
#
# Empty `code` and `message` fields indicate the operation was successful.
event_data.status_code = str(self._GetJSONValue(status, "code", ""))
event_data.status_message = self._GetJSONValue(status, "message")
# `protoPayload.status.details[].reason` contains reason for an operation
# failure.
status_reasons = []
for status_detail in self._GetJSONValue(status, "details", []):
status_reason = self._GetJSONValue(status_detail, "reason")
if status_reason:
status_reasons.append(status_reason)
if status_reasons:
event_data.status_reasons = status_reasons
def _ParseComputeInsertRequest(self, request, event_data):
"""Extracts compute.instances.insert information.
Args:
request (dict): JSON dictionary of the `protoPayload.request` field.
event_data (GCPLogEventData): event data.
"""
# source_images hold Google Cloud source disk path used in creating a GCE
# instance.
source_images = []
for disk in self._GetJSONValue(request, "disks", []):
initialize_params = self._GetJSONValue(disk, "initializeParams", {})
source_image = self._GetJSONValue(initialize_params, "sourceImage")
if source_image:
source_images.append(source_image)
if source_images:
event_data.source_images = source_images
# Default compute service account aka dcsa
dcsa_emails = []
dcsa_scopes = []
service_account_list = self._GetJSONValue(request, "serviceAccounts", [])
for service_account in service_account_list:
email = self._GetJSONValue(service_account, "email")
if email:
dcsa_emails.append(email)
scopes = self._GetJSONValue(service_account, "scopes")
if scopes:
dcsa_scopes.extend(scopes)
if dcsa_emails:
event_data.dcsa_emails = dcsa_emails
if dcsa_scopes:
event_data.dcsa_scopes = dcsa_scopes
def _ParseComputeProtoPayload(self, proto_payload, event_data):
"""Extracts compute.googleapis.com information.
Args:
proto_payload (dict): JSON dictionary of the `protoPayload` value.
event_data (GCPLogEventData): event data.
"""
request = self._GetJSONValue(proto_payload, "request")
if not request:
return
request_type = self._GetJSONValue(request, "@type")
if not request_type:
return
if request_type == "type.googleapis.com/compute.instances.insert":
self._ParseComputeInsertRequest(request, event_data)
def _ParseProtoPayload(self, json_dict, event_data):
"""Extracts information from a protoPayload value.
Args:
json_dict (dict): JSON dictionary of the log record.
event_data (GCPLogEventData): event data.
"""
proto_payload = self._GetJSONValue(json_dict, "protoPayload")
if not proto_payload:
return
event_data.service_name = self._GetJSONValue(proto_payload, "serviceName")
event_data.resource_name = self._GetJSONValue(proto_payload, "resourceName")
method_name = self._GetJSONValue(proto_payload, "methodName")
if method_name and not event_data.event_subtype:
event_data.event_subtype = method_name
event_data.method_name = method_name
self._ParseAuthenticationInfo(proto_payload, event_data)
self._ParseAuthorizationInfo(proto_payload, event_data)
self._ParseRequestMetadata(proto_payload, event_data)
self._ParseProtoPayloadStatus(proto_payload, event_data)
self._ParseProtoPayloadRequest(proto_payload, event_data)
self._ParseProtoPayloadServiceData(proto_payload, event_data)
if event_data.service_name == "compute.googleapis.com":
self._ParseComputeProtoPayload(proto_payload, event_data)
def _ParseProtoPayloadRequest(self, proto_payload, event_data):
"""Extracts information from the request field of a protoPayload field.
Args:
proto_payload (dict): JSON dictionary of the `protoPayload` value.
event_data (GCPLogEventData): event data.
"""
request = self._GetJSONValue(proto_payload, "request")
if not request:
return
event_data.request_account_identifier = self._GetJSONValue(
request, "account_id"
)
event_data.request_address = self._GetJSONValue(request, "address")
event_data.request_description = self._GetJSONValue(request, "description")
event_data.request_direction = self._GetJSONValue(request, "direction")
event_data.request_email = self._GetJSONValue(request, "email")
event_data.request_member = self._GetJSONValue(request, "member")
event_data.request_name = self._GetJSONValue(request, "name")
event_data.request_target_tags = self._GetJSONValue(request, "targetTags")
# Firewall specific attributes.
event_data.firewall_source_ranges = self._GetJSONValue(request, "sourceRanges")
firewall_rules = []
alloweds = self._GetJSONValue(request, "alloweds", default_value=[])
for allowed in alloweds:
ip_protocol = self._GetJSONValue(allowed, "IPProtocol")
ports = self._GetJSONValue(allowed, "ports", default_value="all")
firewall_rules.append(f"ALLOW: {ip_protocol:s} {ports!s}")
denieds = self._GetJSONValue(request, "denieds", default_value=[])
for denied in denieds:
ip_protocol = self._GetJSONValue(denied, "IPProtocol")
ports = self._GetJSONValue(denied, "ports", default_value="all")
firewall_rules.append(f"DENY: {ip_protocol:s} {ports!s}")
event_data.firewall_rules = firewall_rules or None
# Service account specific attributes
service_account = self._GetJSONValue(request, "service_account")
if service_account:
event_data.service_account_display_name = self._GetJSONValue(
service_account, "display_name"
)
def _ParseProtoPayloadServiceData(self, proto_payload, event_data):
"""Extracts information from the serviceData in the protoPayload value.
Args:
proto_payload (dict): JSON dictionary of the `protoPayload` value.
event_data (GCPLogEventData): event data.
"""
service_data = self._GetJSONValue(proto_payload, "serviceData")
if not service_data:
return
policy_delta = self._GetJSONValue(service_data, "policyDelta")
if not policy_delta:
return
policy_deltas = []
binding_deltas = self._GetJSONValue(
policy_delta, "bindingDeltas", default_value=[]
)
for binding_delta_value in binding_deltas:
action = self._GetJSONValue(binding_delta_value, "action") or "N/A"
member = self._GetJSONValue(binding_delta_value, "member") or "N/A"
role = self._GetJSONValue(binding_delta_value, "role") or "N/A"
policy_deltas.append(f"{action:s} {member:s} with role {role:s}")
event_data.policy_deltas = policy_deltas or None
def _ParseRecord(self, parser_mediator, json_dict):
"""Parses a Google Cloud (GCP) log record.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
json_dict (dict): JSON dictionary of the log record.
"""
resource = self._GetJSONValue(json_dict, "resource", default_value={})
labels = self._GetJSONValue(resource, "labels", default_value={})
resource_labels = [f"{name:s}: {value!s}" for name, value in labels.items()]
event_data = GCPLogEventData()
event_data.log_name = self._GetJSONValue(json_dict, "logName")
event_data.recorded_time = self._ParseISO8601DateTimeString(
parser_mediator, json_dict, "timestamp"
)
event_data.resource_labels = resource_labels or None
event_data.severity = self._GetJSONValue(json_dict, "severity")
event_data.text_payload = self._GetJSONValue(json_dict, "textPayload")
self._ParseJSONPayload(json_dict, event_data)
self._ParseProtoPayload(json_dict, event_data)
parser_mediator.ProduceEventData(event_data)
jsonl_parser.JSONLParser.RegisterPlugin(GCPLogJSONLPlugin)