from typing import List, Tuple
from threading import Thread, Lock
import time
import cvb
TIMEOUT = 3
NUM_ELEMENTS_TO_ACQUIRE = 10
class Observer:
"""
A demonstration helper that is used to check if a particular event happened.
"""
def __init__(self):
self._is_notified = False
def notify(self) -> None:
"""
Notifies the observer that the expected event has just happened.
"""
self._is_notified = True
@property
def is_notified(self) -> bool:
"""
Returns true if the observer has been notified. false otherwise.
"""
return self._is_notified
class Monitor(Thread):
"""
Waits until a given observer finishes its job.
"""
def __init__(self, observer: Observer, lock: Lock):
super().__init__()
self._observer = observer
self._is_completed = False
self._lock = lock
def run(self) -> None:
while not self._is_completed:
with self._lock:
if self._observer.is_notified:
self._is_completed = True
time.sleep(0.1)
"""
Derives the CompositeStreamHandler class so that we can implement our own
customized tasks for the demonstration.
"""
lock: Lock):
super().__init__(streams)
self._num_deliverables = 0
self._observer = observer
self._lock = lock
"""
Asynchronously called for all registered streams.
"""
wait_result_list = []
for stream in streams:
wait_result_list.append(stream.wait_for(TIMEOUT * 1000))
self.handle_async_wait_result(wait_result_list)
def handle_async_wait_result(
self,
result_tuples: List[
"""
Asynchronously called for all acquired deliverables.
"""
with self._lock:
if self._observer.is_notified:
return
print(f"round: #{self._num_deliverables}")
for m, wait_result in enumerate(result_tuples):
print(f"stream: #{m}")
status = wait_result[1]
if status == cvb.WaitStatus.Ok:
status_string = "ok"
else:
status_string = "not ok; "
if status == cvb.WaitStatus.Timeout:
status_string += "timeout"
elif status == cvb.WaitStatus.Abort:
status_string += "abort"
else:
status_string = "unknown"
print(f"wait status: {status_string}")
composite = wait_result[0]
for n in range(composite.item_count):
item = composite[n]
item_type = "unknown"
item_type = "image"
item_type = "plane"
item_type = "plane enumerator"
item_type = "pfnc buffer"
print(f"composite item #{n}: {item_type}")
print("")
self._num_deliverables += 1
if self._num_deliverables == NUM_ELEMENTS_TO_ACQUIRE:
self._observer.notify()
if __name__ == "__main__":
cvb.DiscoverFlags.IgnoreVins)
access_token = None
for device_info in device_info_list:
if "MockTL" in device_info.access_token:
access_token = device_info.access_token
assert access_token
access_token,
cvb.AcquisitionStack.GenTL) as device:
streams = []
for n in range(device.stream_count):
observer = Observer()
handler = CustomStreamHandler(streams, observer, lock)
handler.run()
monitor = Monitor(observer, lock)
monitor.start()
monitor.join(TIMEOUT)
handler.try_finish()
Union[cvb.GenICamDevice, cvb.VinDevice, cvb.EmuDevice, cvb.VideoDevice, cvb.NonStreamingDevice] open(str provider, int acquisition_stack=cvb.AcquisitionStack.PreferVin)
List[cvb.DiscoveryInformation] discover_from_root(int flags=cvb.DiscoverFlags.FindAll, int time_span=300)