Source code for flexlogger.automation._events

import time

from ._event_payloads import AlarmPayload
from ._event_payloads import EventPayload
from ._event_payloads import FilePayload
from ._event_type import EventType
from ._flexlogger_error import FlexLoggerError
from .proto import (
    Events_pb2,
    Events_pb2_grpc,
)
from .proto.EventType_pb2 import EventType as EventType_pb2
from concurrent.futures import ThreadPoolExecutor
from google.protobuf.timestamp_pb2 import Timestamp
from grpc import Channel, RpcError
from typing import Callable, Iterator, List


[docs] class FlexLoggerEventHandler: """Represents a FlexLogger event handler. You must create a FlexLoggerEventHandler object to be able to register for events. """ def __init__(self, channel: Channel, client_id: str, application, raise_if_application_closed: Callable[[], None]) -> None: self._application = application self._callbacks = [] self._channel = channel self._client_id = client_id self._event_types_per_callback = [] self._is_subscribed = False self._raise_if_application_closed = raise_if_application_closed self._stub = Events_pb2_grpc.FlexLoggerEventsStub(self._channel) self._thread_executor = ThreadPoolExecutor() def __enter__(self): return self def __exit__(self, *args): self.unregister_from_events()
[docs] def get_registered_events(self) -> List[EventType]: """Gets the list of registered event types. Returns The list of registered event types Raises: FlexLoggerError: if the request failed. """ try: response = self._stub.GetRegisteredEvents(Events_pb2.GetRegisteredEventsRequest(client_id=self._client_id)) event_types_pb2 = response.event_types event_types = self._marshal_event_types_pb2(event_types_pb2) return event_types except (RpcError, ValueError, AttributeError) as rpc_error: self._raise_if_application_closed() raise FlexLoggerError("Failed to get the registered events") from rpc_error
[docs] def unregister_from_events(self) -> None: """Unregister from events.""" try: self._callbacks = [] self._event_types_per_callback = [] self._is_subscribed = False self._stub.UnsubscribeFromEvents(Events_pb2.UnsubscribeFromEventsRequest(client_id=self._client_id)) except (RpcError, ValueError, AttributeError) as rpc_error: self._raise_if_application_closed() raise FlexLoggerError("Failed to unregister from events") from rpc_error
[docs] def register_event_callback(self, callback, event_types=None) -> None: """Register for events and specify a callback method Args: callback: callback method. The callback method must have the following parameters: application: Application Reference to the FlexLogger application event_type: EventType The event type event_payload: EventPayload The event payload event_types: List of EventType to subscribe to. Raises: FlexLoggerError: if the callback registration failed. """ event_types_parameter = self._marshal_event_types(event_types) if self._is_subscribed: try: self._stub.RegisterEvents(Events_pb2.SubscribeToEventsRequest(client_id=self._client_id, event_types=event_types_parameter)) self._add_callback_and_events_to_dictionary(callback, event_types) except (RpcError, ValueError, AttributeError) as rpc_error: self._raise_if_application_closed() raise FlexLoggerError("Failed to register events") from rpc_error return # Add function and event types to respective lists self._is_subscribed = True self._add_callback_and_events_to_dictionary(callback, event_types) event_iterator = self._stub.SubscribeToEvents( Events_pb2.SubscribeToEventsRequest(client_id=self._client_id, event_types=event_types_parameter)) self._thread_executor.submit(self._event_handler, event_iterator) # Wait for the server to register the client ID. time.sleep(0.15)
@staticmethod def _marshal_event_types(event_types): if event_types is None: return None event_types_pb2 = [] for event_type in event_types: event_types_pb2.append(event_type.to_event_type_pb2()) return event_types_pb2 @staticmethod def _marshal_event_types_pb2(event_types_pb2): if event_types_pb2 is None: return None event_types = [] for event_type_pb2 in event_types_pb2: event_types.append(EventType.from_event_type_pb2(event_type_pb2)) return event_types def _add_callback_and_events_to_dictionary(self, callback, event_types: [EventType]): # If the event type is not provided, respond to all. if event_types is None: event_types = [EventType.ALARM, EventType.LOG_FILE, EventType.TEST_SESSION, EventType.CUSTOM] if callback in self._callbacks: index = self._callbacks.index(callback) self._event_types_per_callback[index] = event_types else: self._callbacks.append(callback) self._event_types_per_callback.append(event_types) def _event_handler(self, event_iterator: Iterator[Events_pb2.SubscribeToEventsResponse]) -> None: try: while self._is_subscribed: event_response = next(event_iterator) if event_response is not None: event_type = EventType.from_event_type_pb2(event_response.event_type) for i in range(len(self._callbacks)): if event_type in self._event_types_per_callback[i]: payload = self._create_payload(event_response) self._callbacks[i](self._application, event_type, payload) except Exception as error: self._raise_if_application_closed() raise FlexLoggerError("Failed to receive event.") from error @staticmethod def _create_payload(event_response: Events_pb2.SubscribeToEventsResponse): if event_response.event_type == EventType_pb2.EVENT_TYPE_ALARM: payload = AlarmPayload(event_response) elif event_response.event_type == EventType_pb2.EVENT_TYPE_LOG_FILE: payload = FilePayload(event_response) else: payload = EventPayload(event_response) return payload