13from typing
import List, Tuple
14from threading
import Thread, Lock
21NUM_ELEMENTS_TO_ACQUIRE = 10
26 A demonstration helper that is used to check
if a particular event happened.
29 self._is_notified =
False
31 def notify(self) -> None:
33 Notifies the observer that the expected event has just happened.
35 self._is_notified = True
38 def is_notified(self) -> bool:
40 Returns true if the observer has been notified. false otherwise.
42 return self._is_notified
47 Waits until a given observer finishes its job.
49 def __init__(self, observer: Observer, lock: Lock):
51 self._observer = observer
52 self._is_completed =
False
55 def run(self) -> None:
56 while not self._is_completed:
58 if self._observer.is_notified:
59 self._is_completed =
True
65 Derives the CompositeStreamHandler class so that we can implement our own
66 customized tasks
for the demonstration.
70 super().__init__(streams)
71 self._num_deliverables = 0
72 self._observer = observer
77 Asynchronously called for all registered streams.
84 for stream
in streams:
85 wait_result_list.append(stream.wait_for(TIMEOUT * 1000))
88 self.handle_async_wait_result(wait_result_list)
90 def handle_async_wait_result(
95 Asynchronously called for all acquired deliverables.
102 if self._observer.is_notified:
106 print(f
"round: #{self._num_deliverables}")
107 for m, wait_result
in enumerate(result_tuples):
109 print(f
"stream: #{m}")
111 status = wait_result[1]
112 if status == cvb.WaitStatus.Ok:
115 status_string =
"not ok; "
116 if status == cvb.WaitStatus.Timeout:
117 status_string +=
"timeout"
118 elif status == cvb.WaitStatus.Abort:
119 status_string +=
"abort"
121 status_string =
"unknown"
122 print(f
"wait status: {status_string}")
125 composite = wait_result[0]
126 for n
in range(composite.item_count):
128 item_type =
"unknown"
134 item_type =
"plane enumerator"
136 item_type =
"pfnc buffer"
137 print(f
"composite item #{n}: {item_type}")
140 self._num_deliverables += 1
141 if self._num_deliverables == NUM_ELEMENTS_TO_ACQUIRE:
142 self._observer.notify()
145if __name__ ==
"__main__":
148 cvb.DiscoverFlags.IgnoreVins)
152 for device_info
in device_info_list:
153 if "MockTL" in device_info.access_token:
154 access_token = device_info.access_token
161 cvb.AcquisitionStack.GenTL)
as device:
170 for n
in range(device.stream_count):
174 observer = Observer()
175 handler = CustomStreamHandler(streams, observer, lock)
181 monitor = Monitor(observer, lock)
183 monitor.join(TIMEOUT)
The Common Vision Blox composite.
Definition: __init__.py:745
Handler object for multiple synchronous streams.
Definition: __init__.py:970
The composite stream class.
Definition: __init__.py:782
Union[cvb.GenICamDevice, cvb.VinDevice, cvb.EmuDevice, cvb.VideoDevice, cvb.NonStreamingDevice] open(str provider, int acquisition_stack=cvb.AcquisitionStack.PreferVin)
Opens a device with the given provider and acquisition stack.
Definition: __init__.py:1570
List[cvb.DiscoveryInformation] discover_from_root(int flags=cvb.DiscoverFlags.FindAll, int time_span=300)
Discovers available devices / nodes depending on the given flags.
Definition: __init__.py:1550
The Common Vision Blox image.
Definition: __init__.py:2038
Lazy enumeration of node maps.
Definition: __init__.py:3723
PFNC buffer class implementing the IPFNCBuffer interface.
Definition: __init__.py:3856
A collection of planes.
Definition: __init__.py:4208
Plane container.
Definition: __init__.py:4066