CVBpy 14.1
cvb/CompositeStreamHandler
1# CVBpy Example Script for 3rd generation acquisition stack
2# to demonstrate a basic usage of CompositeStreamHandler.
3#
4# 1. Discover all devices.
5# 2. Open the first device found.
6# 3. Create a stream handler.
7# 4. Start acquiring composites.
8# 5. Wait for the handler acquires 10 composites.
9# 6. Stop acquiring composites.
10#
11# Requires: CVMockTL.
12
13from typing import List, Tuple
14from threading import Thread, Lock
15import time
16
17import cvb
18
19
20TIMEOUT = 3 # sec
21NUM_ELEMENTS_TO_ACQUIRE = 10 # composites
22
23
24class Observer:
25 """
26 A demonstration helper that is used to check if a particular event happened.
27 """
28 def __init__(self):
29 self._is_notified = False
30
31 def notify(self) -> None:
32 """
33 Notifies the observer that the expected event has just happened.
34 """
35 self._is_notified = True
36
37 @property
38 def is_notified(self) -> bool:
39 """
40 Returns true if the observer has been notified. false otherwise.
41 """
42 return self._is_notified
43
44
45class Monitor(Thread):
46 """
47 Waits until a given observer finishes its job.
48 """
49 def __init__(self, observer: Observer, lock: Lock):
50 super().__init__()
51 self._observer = observer
52 self._is_completed = False
53 self._lock = lock
54
55 def run(self) -> None:
56 while not self._is_completed:
57 with self._lock:
58 if self._observer.is_notified:
59 self._is_completed = True
60 time.sleep(0.1)
61
62
63class CustomStreamHandler(cvb.CompositeStreamHandler):
64 """
65 Derives the CompositeStreamHandler class so that we can implement our own
66 customized tasks for the demonstration.
67 """
68 def __init__(self, streams: List[cvb.CompositeStream], observer: Observer,
69 lock: Lock):
70 super().__init__(streams)
71 self._num_deliverables = 0
72 self._observer = observer
73 self._lock = lock
74
75 def handle_async_stream(self, streams: List[cvb.CompositeStream]) -> None:
76 """
77 Asynchronously called for all registered streams.
78 """
79
80 # the following code is the same as the default implementation;
81 # this is just to demonstrate that you can freely override depending
82 # on your demand.
83 wait_result_list = []
84 for stream in streams:
85 wait_result_list.append(stream.wait_for(TIMEOUT * 1000))
86
87 # let it take care of the acquired deliverables.
88 self.handle_async_wait_result(wait_result_list)
89
90 def handle_async_wait_result(
91 self,
92 result_tuples: List[
93 Tuple[cvb.Composite, int, cvb.NodeMapEnumerator]]) -> None:
94 """
95 Asynchronously called for all acquired deliverables.
96 """
97
98 # the default implementation does nothing; you may add tasks by
99 # yourself.
100
101 with self._lock:
102 if self._observer.is_notified:
103 return
104
105 # iterate over streams.
106 print(f"round: #{self._num_deliverables}")
107 for m, wait_result in enumerate(result_tuples):
108 # check the wait status.
109 print(f"stream: #{m}")
110
111 status = wait_result[1]
112 if status == cvb.WaitStatus.Ok:
113 status_string = "ok"
114 else:
115 status_string = "not ok; "
116 if status == cvb.WaitStatus.Timeout:
117 status_string += "timeout"
118 elif status == cvb.WaitStatus.Abort:
119 status_string += "abort"
120 else:
121 status_string = "unknown"
122 print(f"wait status: {status_string}")
123
124 # pick up the delivered composite.
125 composite = wait_result[0]
126 for n in range(composite.item_count):
127 item = composite[n]
128 item_type = "unknown"
129 if isinstance(item, cvb.Image):
130 item_type = "image"
131 elif isinstance(item, cvb.Plane):
132 item_type = "plane"
133 elif isinstance(item, cvb.PlaneEnumerator):
134 item_type = "plane enumerator"
135 elif isinstance(item, cvb.PFNCBuffer):
136 item_type = "pfnc buffer"
137 print(f"composite item #{n}: {item_type}")
138
139 print("") # delimiter
140 self._num_deliverables += 1
141 if self._num_deliverables == NUM_ELEMENTS_TO_ACQUIRE:
142 self._observer.notify()
143
144
145if __name__ == "__main__":
146 # enumerate devices.
147 device_info_list = cvb.DeviceFactory.discover_from_root(
148 cvb.DiscoverFlags.IgnoreVins)
149
150 # cannot continue the demonstration if CVMockTL is not present
151 access_token = None
152 for device_info in device_info_list:
153 if "MockTL" in device_info.access_token:
154 access_token = device_info.access_token
155 assert access_token
156
157 # instantiate CVMockTL on the list; however, it is worth knowing
158 # that you can bind streams of other devices to a stream handle if needed.
160 access_token,
161 cvb.AcquisitionStack.GenTL) as device: # type: cvb.GenICamDevice
162
163 # create a stream handler.
164 # IMPORTANT: this tutorial assumes that each stream is synchronized,
165 # i.e., it is running at the same acquisition frame rate; if any of
166 # them is out of sync the application will lose images. if any of the
167 # streams are not synchronized, please consider preparing a dedicated
168 # stream handler object for every single stream.
169 streams = []
170 for n in range(device.stream_count):
171 streams.append(device.stream(cvb.CompositeStream, n))
172
173 lock = Lock()
174 observer = Observer()
175 handler = CustomStreamHandler(streams, observer, lock)
176
177 # start the data acquisition.
178 handler.run()
179
180 # wait for the job is done.
181 monitor = Monitor(observer, lock)
182 monitor.start()
183 monitor.join(TIMEOUT)
184
185 # stop the data acquisition, ignore errors.
186 handler.try_finish()
The Common Vision Blox composite.
Definition: __init__.py:745
Handler object for multiple synchronous streams.
Definition: __init__.py:970
The composite stream class.
Definition: __init__.py:782
Union[cvb.GenICamDevice, cvb.VinDevice, cvb.EmuDevice, cvb.VideoDevice, cvb.NonStreamingDevice] open(str provider, int acquisition_stack=cvb.AcquisitionStack.PreferVin)
Opens a device with the given provider and acquisition stack.
Definition: __init__.py:1570
List[cvb.DiscoveryInformation] discover_from_root(int flags=cvb.DiscoverFlags.FindAll, int time_span=300)
Discovers available devices / nodes depending on the given flags.
Definition: __init__.py:1550
The Common Vision Blox image.
Definition: __init__.py:2038
Lazy enumeration of node maps.
Definition: __init__.py:3723
PFNC buffer class implementing the IPFNCBuffer interface.
Definition: __init__.py:3856
A collection of planes.
Definition: __init__.py:4208
Plane container.
Definition: __init__.py:4066