Common Vision Blox 15.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Events Friends Modules Pages
Image Manager/CVBpy/CompositeStreamHandler

This example program is located in your CVB installation under %CVB%Tutorial/Image Manager/CVBpy/CompositeStreamHandler.

composite_stream_handler.py:

# @brief CVBpy Example Script for 3rd generation acquisition stack
# to demonstrate a basic usage of CompositeStreamHandler.
#
# 1. Discover all devices.
# 2. Open the first device found.
# 3. Create a stream handler.
# 4. Start acquiring composites.
# 5. Wait for the handler acquires 10 composites.
# 6. Stop acquiring composites.
#
# Requires: CVMockTL.
from typing import List, Tuple
from threading import Thread, Lock
import time
import cvb
TIMEOUT = 3 # sec
NUM_ELEMENTS_TO_ACQUIRE = 10 # composites
class Observer:
"""
A demonstration helper that is used to check if a particular event happened.
"""
def __init__(self):
self._is_notified = False
def notify(self) -> None:
"""
Notifies the observer that the expected event has just happened.
"""
self._is_notified = True
@property
def is_notified(self) -> bool:
"""
Returns true if the observer has been notified. false otherwise.
"""
return self._is_notified
class Monitor(Thread):
"""
Waits until a given observer finishes its job.
"""
def __init__(self, observer: Observer, lock: Lock):
super().__init__()
self._observer = observer
self._is_completed = False
self._lock = lock
def run(self) -> None:
while not self._is_completed:
with self._lock:
if self._observer.is_notified:
self._is_completed = True
time.sleep(0.1)
class CustomStreamHandler(cvb.CompositeStreamHandler):
"""
Derives the CompositeStreamHandler class so that we can implement our own
customized tasks for the demonstration.
"""
def __init__(self, streams: List[cvb.CompositeStream], observer: Observer,
lock: Lock):
super().__init__(streams)
self._num_deliverables = 0
self._observer = observer
self._lock = lock
def handle_async_stream(self, streams: List[cvb.CompositeStream]) -> None:
"""
Asynchronously called for all registered streams.
"""
# the following code is the same as the default implementation;
# this is just to demonstrate that you can freely override depending
# on your demand.
wait_result_list = []
for stream in streams:
wait_result_list.append(stream.wait_for(TIMEOUT * 1000))
# let it take care of the acquired deliverables.
self.handle_async_wait_result(wait_result_list)
def handle_async_wait_result(
self,
result_tuples: List[
Tuple[cvb.Composite, int, cvb.NodeMapEnumerator]]) -> None:
"""
Asynchronously called for all acquired deliverables.
"""
# the default implementation does nothing; you may add tasks by
# yourself.
with self._lock:
if self._observer.is_notified:
return
# iterate over streams.
print(f"round: #{self._num_deliverables}")
for m, wait_result in enumerate(result_tuples):
# check the wait status.
print(f"stream: #{m}")
status = wait_result[1]
if status == cvb.WaitStatus.Ok:
status_string = "ok"
else:
status_string = "not ok; "
if status == cvb.WaitStatus.Timeout:
status_string += "timeout"
elif status == cvb.WaitStatus.Abort:
status_string += "abort"
else:
status_string = "unknown"
print(f"wait status: {status_string}")
# pick up the delivered composite.
composite = wait_result[0]
for n in range(composite.item_count):
item = composite[n]
item_type = "unknown"
if isinstance(item, cvb.Image):
item_type = "image"
elif isinstance(item, cvb.Plane):
item_type = "plane"
elif isinstance(item, cvb.PlaneEnumerator):
item_type = "plane enumerator"
elif isinstance(item, cvb.PFNCBuffer):
item_type = "pfnc buffer"
print(f"composite item #{n}: {item_type}")
print("") # delimiter
self._num_deliverables += 1
if self._num_deliverables == NUM_ELEMENTS_TO_ACQUIRE:
self._observer.notify()
if __name__ == "__main__":
# enumerate devices.
cvb.DiscoverFlags.IgnoreVins)
# cannot continue the demonstration if CVMockTL is not present
access_token = None
for device_info in device_info_list:
if "MockTL" in device_info.access_token:
access_token = device_info.access_token
assert access_token
# instantiate CVMockTL on the list; however, it is worth knowing
# that you can bind streams of other devices to a stream handle if needed.
access_token,
cvb.AcquisitionStack.GenTL) as device: # type: cvb.GenICamDevice
# create a stream handler.
# IMPORTANT: this tutorial assumes that each stream is synchronized,
# i.e., it is running at the same acquisition frame rate; if any of
# them is out of sync the application will lose images. if any of the
# streams are not synchronized, please consider preparing a dedicated
# stream handler object for every single stream.
streams = []
for n in range(device.stream_count):
streams.append(device.stream(cvb.CompositeStream, n))
lock = Lock()
observer = Observer()
handler = CustomStreamHandler(streams, observer, lock)
# start the data acquisition.
handler.run()
# wait for the job is done.
monitor = Monitor(observer, lock)
monitor.start()
monitor.join(TIMEOUT)
# stop the data acquisition, ignore errors.
handler.try_finish()
Union[cvb.GenICamDevice, cvb.VinDevice, cvb.EmuDevice, cvb.VideoDevice, cvb.NonStreamingDevice] open(str provider, int acquisition_stack=cvb.AcquisitionStack.PreferVin)
List[cvb.DiscoveryInformation] discover_from_root(int flags=cvb.DiscoverFlags.FindAll, int time_span=300)
void Lock()