Source code for flexlogger.automation._event_payloads

from ._event_type import EventType
from ._severity_level import SeverityLevel
from .proto import Events_pb2
from datetime import datetime
import json


[docs] class EventPayload: """Represents an event payload.""" def __init__(self, event_response: Events_pb2.SubscribeToEventsResponse) -> None: self._event_type = EventType.from_event_type_pb2(event_response.event_type) self._event_name = event_response.event_name self._payload = event_response.payload self._timestamp = event_response.timestamp @property def event_type(self) -> EventType: """The type of event received.""" return self._event_type @property def event_name(self) -> str: """The name of the event received.""" return self._event_name @property def timestamp(self) -> datetime: """The time the event was received.""" return self._timestamp
[docs] class AlarmPayload(EventPayload): """Represents an alarm event payload.""" def __init__(self, event_response: Events_pb2.SubscribeToEventsResponse) -> None: super().__init__(event_response) json_payload = json.loads(self._payload) self._alarm_id = json_payload['AlarmId'] self._active = json_payload['Active'] self._acknowledged = json_payload['Acknowledged'] self._acknowledged_at = json_payload['AcknowledgedAt'] self._occurred_at = json_payload['OccurredAt'] self._severity_level = json_payload['SeverityLevel'] self._updated_at = json_payload['UpdatedAt'] self._channel = json_payload['Channel'] self._condition = json_payload['Condition'] self._display_name = json_payload['DisplayName'] self._description = json_payload['Description'] @property def alarm_id(self) -> str: """The alarm ID.""" return self._alarm_id @property def active(self) -> bool: """Whether the alarm is active.""" return self._active @property def acknowledged(self) -> bool: """Whether the alarm has been acknowledged.""" return self._acknowledged @property def acknowledged_at(self) -> datetime: """When the alarm was acknowledged.""" return self._acknowledged_at @property def occurred_at(self) -> datetime: """When the alarm occurred.""" return self._occurred_at @property def severity_level(self) -> SeverityLevel: """The alarm's severity level.""" return self._severity_level @property def updated_at(self) -> datetime: """When the alarm was last updated.""" return self._updated_at @property def channel(self) -> str: """Channel associated with the alarm.""" return self._channel @property def condition(self) -> str: """The alarm condition.""" return self._condition @property def display_name(self) -> str: """The alarm's display name.""" return self._display_name @property def description(self) -> str: """Description of the alarm.""" return self._description
[docs] class FilePayload(EventPayload): """Represents a file event payload.""" def __init__(self, event_response: Events_pb2.SubscribeToEventsResponse) -> None: super().__init__(event_response) self._file_path = self._payload @property def file_path(self) -> str: """The TDMS file path.""" return self._file_path