Common Vision Blox 15.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Events Friends Modules Pages
Image Manager/CVBpy/StreamingSimple

This example program is located in your CVB installation under %CVB%Tutorial/Image Manager/CVBpy/StreamingSimple.

streaming_simple.py:

# CVBpy Example Script
#
# 1. Open the GenICam.vin driver.
# 2. Acquire images.
#
# Requires: -
import os
import cvb
os.path.join(cvb.install_path(), "drivers", "CVMock.vin"),
cvb.AcquisitionStack.Vin) as device:
stream = device.stream()
stream.start()
for i in range(10):
image, status = stream.wait()
if status == cvb.WaitStatus.Ok:
print("Acquired image: " + str(i) + " | Timestamp: " + str(image.raw_timestamp))
stream.abort()
Union[cvb.GenICamDevice, cvb.VinDevice, cvb.EmuDevice, cvb.VideoDevice, cvb.NonStreamingDevice] open(str provider, int acquisition_stack=cvb.AcquisitionStack.PreferVin)
str install_path()

streaming_simple_3gen.py:

# @brief Example for simple Image, PointCloud and Composite streaming.
# CVBpy Example Script with 3rd generation acquisition stack
#
# 1. Open the device with 3rd generation acq stack.
# 2. Get an ImageStream or PointCloudStream or CompositeStream
# 3. Acquire images
#
# Requires: A connected and configured GenICam device
import cvb
devices = cvb.DeviceFactory.discover_from_root(cvb.DiscoverFlags.IgnoreVins)
with cvb.DeviceFactory.open(devices[0].access_token, cvb.AcquisitionStack.GenTL) as device:
stream = device.stream(cvb.ImageStream)
# stream = device.stream(cvb.PointCloudStream)
# stream = device.stream(cvb.CompositeStream)
stream.start()
for i in range(10):
image, status, node_maps = stream.wait()
#pointcloud, status, node_maps = stream.wait()
#composite, status, node_maps = stream.wait()
with image:
print("Acquired image: {0} | Timestamp: {1}".format(i, image.raw_timestamp))
stream.abort()
List[cvb.DiscoveryInformation] discover_from_root(int flags=cvb.DiscoverFlags.FindAll, int time_span=300)

streaming_simple_with_mock.py:

# CVBpy Example Script
#
# 1. Open a mock device.
# 2. Start data streaming.
# 3. Acquire a set of deliverables.
# 4. Get the Vin buffer node map to check the buffer-related properties.
# 3. Abort the data streaming.
#
# Requires: The official CVB mock GenTL producer file. Note that
# it is not necessary to know which is the CTI file or where the
# CTI file is located since the CVB installer installs the file in
# the right place for you.
import sys
import cvb
cvb.DiscoverFlags.IgnoreVins | cvb.DiscoverFlags.IncludeMockTL)
if len(deviceInfoList) < 1:
print("No devices found.")
sys.exit()
access_token = ''
found_mock = False
for info in deviceInfoList:
if 'MockTL' in info.access_token:
found_mock = True
access_token = info.access_token
break
if not found_mock:
print("Could not find CVMockTL.")
sys.exit()
# Open the mock device.
with cvb.DeviceFactory.open(access_token, cvb.AcquisitionStack.GenTL) as device:
# Open the first stream module.
stream = device.stream(cvb.ImageStream, 0)
# Start data streaming.
stream.start()
# Acquire a set of deliverables.
for image_index in range(10):
image, status, node_maps = stream.wait()
# Get the bundled Vin buffer node map to learn the
# buffer-related properties of the image.
buffer_node_map = node_maps['VinBuffer']
# Incompleteness indicates a corrupt buffer
is_incomplete_node = buffer_node_map["IsIncomplete"]
# The ID of the frame that is transported in the corresponding buffer
frame_id_node = buffer_node_map["FrameID"]
# Padding in x-direction
x_padding_node = buffer_node_map["XPadding"]
# Padding in y-direction
y_padding_node = buffer_node_map["YPadding"]
# The total number of bytes written into the buffer
size_filled_node = buffer_node_map["SizeFilled"]
# Timestamp (UINT64) the buffer was acquired
timestamp_node = buffer_node_map['Timestamp']
print(f"Image #{image_index}, "
f"IsIncomplete: {(is_incomplete_node.value)}, FrameID: {frame_id_node.value}, "
f"Width: {image.width}, Height: {image.height}, "
f"XPadding: {x_padding_node}, YPadding: {y_padding_node}, "
f"SizeFilled: {size_filled_node.value}, Timestamp: {hex(timestamp_node.value)}")
# Abort the data streaming.
stream.abort()