"""The status view."""
import ctypes
import sys
import time
try:
import win32api
import win32console
except ImportError:
win32console = None
from dfvfs.lib import definitions as dfvfs_definitions
import plaso
from plaso.cli import tools
from plaso.cli import views
from plaso.lib import definitions
[docs]
class StatusView:
"""Processing status view."""
MODE_FILE = "file"
MODE_LINEAR = "linear"
MODE_WINDOW = "window"
_SOURCE_TYPES = {
definitions.SOURCE_TYPE_ARCHIVE: "archive",
dfvfs_definitions.SOURCE_TYPE_DIRECTORY: "directory",
dfvfs_definitions.SOURCE_TYPE_FILE: "single file",
dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_DEVICE: ("storage media device"),
dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE: ("storage media image"),
}
_UNITS_1024 = ["B", "KiB", "MiB", "GiB", "TiB", "EiB", "ZiB", "YiB"]
_WINAPI_STD_OUTPUT_HANDLE = -11
_WINAPI_ENABLE_PROCESSED_INPUT = 1
_WINAPI_ENABLE_LINE_INPUT = 2
_WINAPI_ENABLE_ECHO_INPUT = 4
_WINAPI_ANSI_CONSOLE_MODE = (
_WINAPI_ENABLE_PROCESSED_INPUT
| _WINAPI_ENABLE_LINE_INPUT
| _WINAPI_ENABLE_ECHO_INPUT
)
[docs]
def __init__(self, output_writer, tool_name):
"""Initializes a status view.
Args:
output_writer (OutputWriter): output writer.
tool_name (str): name of the tool.
"""
super().__init__()
self._artifact_filters = None
self._filter_file = None
self._have_ansi_support = not win32console
self._mode = self.MODE_WINDOW
self._output_writer = output_writer
self._source_path = None
self._source_type = None
self._status_file = "status.info"
self._stdout_output_writer = isinstance(output_writer, tools.StdoutOutputWriter)
self._storage_file_path = None
self._tool_name = tool_name
if win32console:
kernel32 = ctypes.windll.kernel32
stdout_handle = kernel32.GetStdHandle(self._WINAPI_STD_OUTPUT_HANDLE)
result = kernel32.SetConsoleMode(
stdout_handle, self._WINAPI_ANSI_CONSOLE_MODE
)
self._have_ansi_support = result != 0
def _AddsAnalysisProcessStatusTableRow(self, process_status, table_view):
"""Adds an analysis process status table row.
Args:
process_status (ProcessStatus): processing status.
table_view (CLITabularTableView): table view.
"""
used_memory = self._FormatSizeInUnitsOf1024(process_status.used_memory)
events = ""
if (
process_status.number_of_consumed_events is not None
and process_status.number_of_consumed_events_delta is not None
):
events = (
f"{process_status.number_of_consumed_events:d} "
f"({process_status.number_of_consumed_events_delta:d})"
)
event_tags = ""
if (
process_status.number_of_produced_event_tags is not None
and process_status.number_of_produced_event_tags_delta is not None
):
event_tags = (
f"{process_status.number_of_produced_event_tags:d} "
f"({process_status.number_of_produced_event_tags_delta:d})"
)
reports = ""
if (
process_status.number_of_produced_reports is not None
and process_status.number_of_produced_reports_delta is not None
):
reports = (
f"{process_status.number_of_produced_reports:d} "
f"({process_status.number_of_produced_reports_delta:d})"
)
table_view.AddRow(
[
process_status.identifier,
process_status.pid,
process_status.status,
used_memory,
events,
event_tags,
reports,
]
)
def _AddExtractionProcessStatusTableRow(
self, process_status, table_view, show_events
):
"""Adds an extraction process status table row.
Args:
process_status (ProcessStatus): processing status.
table_view (CLITabularTableView): table view.
show_events (bool): True if number of events should be shown instead of
number of event data.
"""
used_memory = self._FormatSizeInUnitsOf1024(process_status.used_memory)
sources = ""
if (
process_status.number_of_produced_sources is not None
and process_status.number_of_produced_sources_delta is not None
):
sources = (
f"{process_status.number_of_produced_sources:d} "
f"({process_status.number_of_produced_sources_delta:d})"
)
events_or_event_data = ""
if show_events:
if (
process_status.number_of_produced_events is not None
and process_status.number_of_produced_events_delta is not None
):
events_or_event_data = (
f"{process_status.number_of_produced_events:d} "
f"({process_status.number_of_produced_events_delta:d})"
)
else:
if (
process_status.number_of_produced_event_data is not None
and process_status.number_of_produced_event_data_delta is not None
):
events_or_event_data = (
f"{process_status.number_of_produced_event_data:d} "
f"({process_status.number_of_produced_event_data_delta:d})"
)
# TODO: shorten display name to fit in 80 chars and show the filename.
table_view.AddRow(
[
process_status.identifier,
process_status.pid,
process_status.status,
used_memory,
sources,
events_or_event_data,
process_status.display_name,
]
)
def _ClearScreen(self):
"""Clears the terminal/console screen."""
if self._have_ansi_support:
# ANSI escape sequence to clear screen.
self._output_writer.Write("\033[2J")
# ANSI escape sequence to move cursor to top left.
self._output_writer.Write("\033[H")
elif win32console:
# This version of Windows cmd.exe does not support ANSI escape codes, thus
# instead we fill the console screen buffer with spaces. The downside of
# this approach is an annoying flicker.
top_left_coordinate = win32console.PyCOORDType(0, 0)
screen_buffer = win32console.GetStdHandle(win32api.STD_OUTPUT_HANDLE)
screen_buffer_information = screen_buffer.GetConsoleScreenBufferInfo()
screen_buffer_attributes = screen_buffer_information["Attributes"]
screen_buffer_size = screen_buffer_information["Size"]
console_size = screen_buffer_size.X * screen_buffer_size.Y
screen_buffer.FillConsoleOutputCharacter(
" ", console_size, top_left_coordinate
)
screen_buffer.FillConsoleOutputAttribute(
screen_buffer_attributes, console_size, top_left_coordinate
)
screen_buffer.SetConsoleCursorPosition(top_left_coordinate)
# TODO: remove update flicker. For win32console we could set the cursor
# top left, write the table, clean the remainder of the screen buffer
# and set the cursor at the end of the table.
def _FormatSizeInUnitsOf1024(self, size):
"""Represents a number of bytes in units of 1024.
Args:
size (int): size in bytes.
Returns:
str: human readable string of the size.
"""
magnitude_1024 = 0
used_memory_1024 = float(size)
while used_memory_1024 >= 1024:
used_memory_1024 /= 1024
magnitude_1024 += 1
if 0 < magnitude_1024 <= 7:
units = self._UNITS_1024[magnitude_1024]
return f"{used_memory_1024:.1f} {units:s}"
return f"{size:d} B"
def _FormatProcessingTime(self, processing_status):
"""Formats the processing time.
Args:
processing_status (ProcessingStatus): processing status.
Returns:
str: processing time formatted as: "5 days, 12:34:56".
"""
processing_time = 0
if processing_status:
processing_time = time.time() - processing_status.start_time
processing_time, seconds = divmod(int(processing_time), 60)
processing_time, minutes = divmod(processing_time, 60)
days, hours = divmod(processing_time, 24)
if days == 0:
days_string = ""
elif days == 1:
days_string = "1 day, "
else:
days_string = f"{days:d} days, "
return f"{days_string:s}{hours:02d}:{minutes:02d}:{seconds:02d}"
def _PrintAnalysisStatusHeader(self, processing_status):
"""Prints the analysis status header.
Args:
processing_status (ProcessingStatus): processing status.
"""
self._output_writer.Write(f"Storage file\t\t: {self._storage_file_path:s}\n")
processing_time = self._FormatProcessingTime(processing_status)
self._output_writer.Write(f"Processing time\t\t: {processing_time:s}\n")
if processing_status and processing_status.events_status:
self._PrintEventsStatus(processing_status.events_status)
self._output_writer.Write("\n")
def _GetPathSpecificationString(self, path_spec):
"""Retrieves a printable string representation of the path specification.
Args:
path_spec (dfvfs.PathSpec): path specification.
Returns:
str: printable string representation of the path specification.
"""
return "\n".join(
[
line.translate(definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE)
for line in path_spec.comparable.split("\n")
]
)
def _PrintAnalysisStatusUpdateFile(self, processing_status):
"""Prints an analysis status update in file mode.
Args:
processing_status (ProcessingStatus): processing status.
"""
if processing_status and processing_status.events_status:
events_status = processing_status.events_status
with open(self._status_file, "w", encoding="utf-8") as file_object:
file_object.write(
f"Events: Filtered: {events_status.number_of_filtered_events:d} In "
f"time slice: {events_status.number_of_events_from_time_slice:d} "
f"Duplicates: {events_status.number_of_duplicate_events:d} MACB "
f"grouped: {events_status.number_of_macb_grouped_events:d} Total: "
f"{events_status.total_number_of_events:d}\n"
)
def _PrintAnalysisStatusUpdateLinear(self, processing_status):
"""Prints an analysis status update in linear mode.
Args:
processing_status (ProcessingStatus): processing status.
"""
processing_time = self._FormatProcessingTime(processing_status)
self._output_writer.Write(f"Processing time: {processing_time:s}\n")
self._output_writer.Write(
f"{processing_status.foreman_status.identifier:s} (PID: "
f"{processing_status.foreman_status.pid:d}) status: "
f"{processing_status.foreman_status.status:s}, events consumed: "
f"{processing_status.foreman_status.number_of_consumed_events:d}\n"
)
for worker_status in processing_status.workers_status:
self._output_writer.Write(
f"{worker_status.identifier:s} (PID: {worker_status.pid:d}) status: "
f"{worker_status.status:s}, events consumed: "
f"{worker_status.number_of_consumed_events:d}\n"
)
self._output_writer.Write("\n")
def _PrintAnalysisStatusUpdateWindow(self, processing_status):
"""Prints an analysis status update in window mode.
Args:
processing_status (ProcessingStatus): processing status.
"""
if self._stdout_output_writer:
self._ClearScreen()
self._output_writer.Write(
f"plaso - {self._tool_name:s} version {plaso.__version__:s}\n\n"
)
self._PrintAnalysisStatusHeader(processing_status)
table_view = views.CLITabularTableView(
column_names=[
"Identifier",
"PID",
"Status",
"Memory",
"Events",
"Tags",
"Reports",
],
column_sizes=[23, 7, 15, 15, 15, 15, 0],
)
self._AddsAnalysisProcessStatusTableRow(
processing_status.foreman_status, table_view
)
for worker_status in processing_status.workers_status:
self._AddsAnalysisProcessStatusTableRow(worker_status, table_view)
table_view.Write(self._output_writer)
self._output_writer.Write("\n")
if processing_status.aborted:
self._output_writer.Write("Processing aborted - waiting for clean up.\n\n")
if self._stdout_output_writer:
# We need to explicitly flush stdout to prevent partial status updates.
sys.stdout.flush()
def _PrintExtractionStatusUpdateFile(self, processing_status):
"""Prints an extraction status update in file mode.
Args:
processing_status (ProcessingStatus): processing status.
"""
if processing_status and processing_status.tasks_status:
tasks_status = processing_status.tasks_status
with open(self._status_file, "w", encoding="utf-8") as file_object:
file_object.write(
f"Tasks: Queued: {tasks_status.number_of_queued_tasks:d} "
f"Processing: {tasks_status.number_of_tasks_processing:d} "
f"Merging: {tasks_status.number_of_tasks_pending_merge:d} "
f"Abandoned: {tasks_status.number_of_abandoned_tasks:d} "
f"Total: {tasks_status.total_number_of_tasks:d}\n"
)
def _PrintExtractionStatusUpdateLinear(self, processing_status):
"""Prints an extraction status update in linear mode.
Args:
processing_status (ProcessingStatus): processing status.
"""
processing_time = self._FormatProcessingTime(processing_status)
self._output_writer.Write(f"Processing time: {processing_time:s}\n")
self._output_writer.Write(
f"{processing_status.foreman_status.identifier:s} (PID: "
f"{processing_status.foreman_status.pid:d}) status: "
f"{processing_status.foreman_status.status:s}, event data produced: "
f"{processing_status.foreman_status.number_of_produced_event_data:d}, "
f"events produced: "
f"{processing_status.foreman_status.number_of_produced_events:d}, "
f"file: {processing_status.foreman_status.display_name:s}\n"
)
for worker_status in processing_status.workers_status:
self._output_writer.Write(
f"{worker_status.identifier:s} (PID: {worker_status.pid:d}) status: "
f"{worker_status.status:s}, event data produced: "
f"{worker_status.number_of_produced_events:d}, file: "
f"{worker_status.display_name:s}\n"
)
self._output_writer.Write("\n")
def _PrintExtractionStatusUpdateWindow(self, processing_status):
"""Prints an extraction status update in window mode.
Args:
processing_status (ProcessingStatus): processing status.
"""
if self._stdout_output_writer:
self._ClearScreen()
workers_status = processing_status.workers_status
show_events = bool(
processing_status.foreman_status.number_of_produced_events
and not workers_status
)
self._output_writer.Write(
f"plaso - {self._tool_name:s} version {plaso.__version__:s}\n\n"
)
self.PrintExtractionStatusHeader(processing_status)
if show_events:
column_name = [
"Identifier",
"PID",
"Status",
"Memory",
"Sources",
"Events",
"File",
]
else:
column_name = [
"Identifier",
"PID",
"Status",
"Memory",
"Sources",
"Event Data",
"File",
]
table_view = views.CLITabularTableView(
column_names=column_name, column_sizes=[15, 7, 15, 15, 15, 15, 0]
)
self._AddExtractionProcessStatusTableRow(
processing_status.foreman_status, table_view, show_events
)
for worker_status in workers_status:
self._AddExtractionProcessStatusTableRow(
worker_status, table_view, show_events
)
table_view.Write(self._output_writer)
self._output_writer.Write("\n")
if processing_status.aborted:
self._output_writer.Write("Processing aborted - waiting for clean up.\n\n")
# TODO: remove update flicker. For win32console we could set the cursor
# top left, write the table, clean the remainder of the screen buffer
# and set the cursor at the end of the table.
if self._stdout_output_writer:
# We need to explicitly flush stdout to prevent partial status updates.
sys.stdout.flush()
def _PrintEventsStatus(self, events_status):
"""Prints the status of the events.
Args:
events_status (EventsStatus): events status.
"""
if events_status:
table_view = views.CLITabularTableView(
column_names=[
"Events:",
"Filtered",
"In time slice",
"Duplicates",
"MACB grouped",
"Total",
],
column_sizes=[15, 15, 15, 15, 15, 0],
)
table_view.AddRow(
[
"",
events_status.number_of_filtered_events,
events_status.number_of_events_from_time_slice,
events_status.number_of_duplicate_events,
events_status.number_of_macb_grouped_events,
events_status.total_number_of_events,
]
)
self._output_writer.Write("\n")
table_view.Write(self._output_writer)
def _PrintTasksStatus(self, processing_status):
"""Prints the status of the tasks.
Args:
processing_status (ProcessingStatus): processing status.
"""
if processing_status and processing_status.tasks_status:
tasks_status = processing_status.tasks_status
table_view = views.CLITabularTableView(
column_names=[
"Tasks:",
"Queued",
"Processing",
"Merging",
"Abandoned",
"Total",
],
column_sizes=[15, 7, 15, 15, 15, 0],
)
table_view.AddRow(
[
"",
tasks_status.number_of_queued_tasks,
tasks_status.number_of_tasks_processing,
tasks_status.number_of_tasks_pending_merge,
tasks_status.number_of_abandoned_tasks,
tasks_status.total_number_of_tasks,
]
)
self._output_writer.Write("\n")
table_view.Write(self._output_writer)
[docs]
def GetAnalysisStatusUpdateCallback(self):
"""Retrieves the analysis status update callback function.
Returns:
function: status update callback function or None if not available.
"""
if self._mode == self.MODE_FILE:
return self._PrintAnalysisStatusUpdateFile
if self._mode == self.MODE_LINEAR:
return self._PrintAnalysisStatusUpdateLinear
if self._mode == self.MODE_WINDOW:
return self._PrintAnalysisStatusUpdateWindow
return None
# TODO: refactor to protected method.
[docs]
def SetMode(self, mode):
"""Sets the mode.
Args:
mode (str): status view mode.
"""
self._mode = mode
[docs]
def SetStatusFile(self, path):
"""Sets the status file.
Args:
path (str): path of the status file.
"""
self._status_file = path