Source code for plaso.engine.profilers

# -*- coding: utf-8 -*-
"""The profiler classes."""

import codecs
import gzip
import os
import time


[docs] class CPUTimeMeasurement(object): """The CPU time measurement. Attributes: start_sample_time (float): start sample time or None if not set. total_cpu_time (float): total CPU time or None if not set. """
[docs] def __init__(self): """Initializes the CPU time measurement.""" super(CPUTimeMeasurement, self).__init__() self._start_cpu_time = None self.start_sample_time = None self.total_cpu_time = None
[docs] def SampleStart(self): """Starts measuring the CPU time.""" self._start_cpu_time = time.perf_counter() self.start_sample_time = time.time() self.total_cpu_time = 0
[docs] def SampleStop(self): """Stops measuring the CPU time.""" if self._start_cpu_time is not None: self.total_cpu_time += time.perf_counter() - self._start_cpu_time
[docs] class SampleFileProfiler(object): """Shared functionality for sample file-based profilers.""" # This constant must be overridden by subclasses. _FILENAME_PREFIX = None # This constant must be overridden by subclasses. _FILE_HEADER = None
[docs] def __init__(self, identifier, configuration): """Initializes a sample file profiler. Sample files are gzip compressed UTF-8 encoded CSV files. Args: identifier (str): identifier of the profiling session used to create the sample filename. configuration (ProfilingConfiguration): profiling configuration. """ super(SampleFileProfiler, self).__init__() self._identifier = identifier self._path = configuration.directory self._profile_measurements = {} self._sample_file = None self._start_time = None
def _WritesString(self, content): """Writes a string to the sample file. Args: content (str): content to write to the sample file. """ content_bytes = codecs.encode(content, 'utf-8') self._sample_file.write(content_bytes)
[docs] @classmethod def IsSupported(cls): """Determines if the profiler is supported. Returns: bool: True if the profiler is supported. """ return True
[docs] def Start(self): """Starts the profiler.""" filename = f'{self._FILENAME_PREFIX:s}-{self._identifier:s}.csv.gz' if self._path: filename = os.path.join(self._path, filename) self._sample_file = gzip.open(filename, 'wb') self._WritesString(self._FILE_HEADER) self._start_time = time.time()
[docs] def Stop(self): """Stops the profiler.""" self._sample_file.close() self._sample_file = None
[docs] class CPUTimeProfiler(SampleFileProfiler): """The CPU time profiler.""" _FILENAME_PREFIX = 'cputime' _FILE_HEADER = 'Time\tName\tProcessing time\n'
[docs] def StartTiming(self, profile_name): """Starts timing CPU time. Args: profile_name (str): name of the profile to sample. """ if profile_name not in self._profile_measurements: self._profile_measurements[profile_name] = CPUTimeMeasurement() self._profile_measurements[profile_name].SampleStart()
[docs] def StopTiming(self, profile_name): """Stops timing CPU time. Args: profile_name (str): name of the profile to sample. """ measurements = self._profile_measurements.get(profile_name) if measurements: measurements.SampleStop() self._WritesString(( f'{measurements.start_sample_time:f}\t{profile_name:s}\t' f'{measurements.total_cpu_time:f}\n'))
[docs] class MemoryProfiler(SampleFileProfiler): """The memory profiler.""" _FILENAME_PREFIX = 'memory' _FILE_HEADER = 'Time\tName\tUsed memory\n'
[docs] def Sample(self, profile_name, used_memory): """Takes a sample for profiling. Args: profile_name (str): name of the profile to sample. used_memory (int): amount of used memory in bytes. """ sample_time = time.time() self._WritesString(f'{sample_time:f}\t{profile_name:s}\t{used_memory:d}\n')
[docs] class AnalyzersProfiler(CPUTimeProfiler): """The analyzers profiler.""" _FILENAME_PREFIX = 'analyzers'
[docs] class ProcessingProfiler(CPUTimeProfiler): """The processing profiler.""" _FILENAME_PREFIX = 'processing'
[docs] class SerializersProfiler(CPUTimeProfiler): """The serializers profiler.""" _FILENAME_PREFIX = 'serializers'
[docs] class StorageProfiler(SampleFileProfiler): """The storage profiler.""" _FILENAME_PREFIX = 'storage' _FILE_HEADER = ( 'Time\tName\tOperation\tDescription\tProcessing time\tData size\t' 'Compressed data size\n')
[docs] def StartTiming(self, profile_name): """Starts timing CPU time. Args: profile_name (str): name of the profile to sample. """ if profile_name not in self._profile_measurements: self._profile_measurements[profile_name] = CPUTimeMeasurement() self._profile_measurements[profile_name].SampleStart()
[docs] def StopTiming(self, profile_name): """Stops timing CPU time. Args: profile_name (str): name of the profile to sample. """ measurements = self._profile_measurements.get(profile_name) if measurements: measurements.SampleStop()
[docs] def Sample( self, profile_name, operation, description, data_size, compressed_data_size): """Takes a sample of data read or written for profiling. Args: profile_name (str): name of the profile to sample. operation (str): operation, either 'read' or 'write'. description (str): description of the data read. data_size (int): size of the data read in bytes. compressed_data_size (int): size of the compressed data read in bytes. """ measurements = self._profile_measurements.get(profile_name) if measurements: sample_time = measurements.start_sample_time processing_time = measurements.total_cpu_time else: sample_time = time.time() processing_time = 0.0 self._WritesString(( f'{sample_time:f}\t{profile_name:s}\t{operation:s}\t{description:s}\t' f'{processing_time:f}\t{data_size:d}\t{compressed_data_size:d}\n'))
[docs] class TaskQueueProfiler(SampleFileProfiler): """The task queue profiler.""" _FILENAME_PREFIX = 'task_queue' _FILE_HEADER = ( 'Time\tQueued\tProcessing\tTo merge\tAbandoned\tTotal\n')
[docs] def Sample(self, tasks_status): """Takes a sample of the status of queued tasks for profiling. Args: tasks_status (TasksStatus): status information about tasks. """ sample_time = time.time() self._WritesString(( f'{sample_time:f}\t{tasks_status.number_of_queued_tasks:d}\t' f'{tasks_status.number_of_tasks_processing:d}\t' f'{tasks_status.number_of_tasks_pending_merge:d}\t' f'{tasks_status.number_of_abandoned_tasks:d}\t' f'{tasks_status.total_number_of_tasks:d}\n'))
[docs] class TasksProfiler(SampleFileProfiler): """The tasks profiler.""" _FILENAME_PREFIX = 'tasks' _FILE_HEADER = 'Time\tIdentifier\tStatus\n'
[docs] def Sample(self, task, status): """Takes a sample of the status of a task for profiling. Args: task (Task): a task. status (str): status. """ sample_time = time.time() self._WritesString(f'{sample_time:f}\t{task.identifier:s}\t{status:s}\n')