import os, sys
import cvb
if sys.version_info >= (3, 11):
from PySide6.QtWidgets import QApplication
from PySide6.QtQuick import QQuickView
from PySide6.QtCore import QObject, Signal, Slot, Property, QUrl
from PySide6.QtGui import QIcon
else:
from PySide2.QtWidgets import QApplication
from PySide2.QtQuick import QQuickView
from PySide2.QtCore import QObject, Signal, Slot, Property, QUrl
from PySide2.QtGui import QIcon
def __init__(self, stream):
super().__init__(stream)
def handle_async_wait_result(self, image, status):
if status is cvb.WaitStatus.Ok:
print("New image arrived")
if BackEnd.grab_image or BackEnd.single_shot:
BackEnd.single_shot = False
BackEnd.image_controller.refresh(image)
if BackEnd.has_ring_buffer:
image.unlock()
elif BackEnd.server_started and (not BackEnd.server_single_shot_send_enabled or BackEnd.server_single_shot_send) and BackEnd.server.stream.is_running:
try:
print('Sending image')
BackEnd.server_single_shot_send = False
if BackEnd.has_ring_buffer:
BackEnd.buffer_index = image.buffer_index
BackEnd.img_list[image.buffer_index] = image
def image_release(img):
img.unlock()
BackEnd.img_list[BackEnd.buffer_index].unlock()
print('Unlocked buffer at index ' + str(img.buffer_index))
BackEnd.server.stream.send(image, image_release)
BackEnd.image_controller.refresh(BackEnd.img_list[BackEnd.buffer_index])
else:
BackEnd.server.stream.send(image)
BackEnd.image_controller.refresh(image)
except:
print('Failed to send image')
elif BackEnd.has_ring_buffer:
print("Unlock unused image")
image.unlock()
class BackEnd(QObject):
connection_list_changed = Signal()
server_started = False
server_single_shot_send_enabled = False
server_single_shot_send = False
single_shot = False
server = None
grab_image = False
has_ring_buffer = False
img_list = list()
buffer_index = 0
def __init__(self):
QObject.__init__(self)
self.start_in_progress = False
self.window_state_reg_node = None
self.event_cookie = None
cvb.AcquisitionStack.Vin)
self.stream = self.device.stream()
BackEnd.has_ring_buffer = bool(self.stream.ring_buffer)
if BackEnd.has_ring_buffer:
self.stream.ring_buffer.lock_mode = cvb.RingBufferLockMode.On
BackEnd.img_list = [None] * self.stream.ring_buffer.count
self.handler = StreamProcessingHandler(self.stream)
self.view = view
self.view.rootContext().setContextProperty('mainImage', BackEnd.image_controller)
self.stream.get_snapshot()
BackEnd.image_controller.refresh(self.device.device_image)
self.handler.run()
self.connections = gs.LogicalNetworkInterface.get_all_available()
self.m_connection_list = list()
for connection in self.connections:
self.m_connection_list.append(connection.ip_address() + ' (' + connection.ipv4_mask() + ')')
self.connection_list_changed.emit()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.handler.is_active:
self.handler.finish()
if BackEnd.server:
BackEnd.server = None
@Property('QVariantList', notify=connection_list_changed)
def connection_list(self):
return self.m_connection_list
@Property('QUrl')
def driver_path(self):
@Property('QUrl')
def desktop_path(self):
return QUrl.fromLocalFile(os.path.expanduser("~/Desktop"))
@Slot(bool)
def switch_grab(self, switched):
if switched and not self.handler.is_active:
self.handler.run()
BackEnd.grab_image = switched
def reset_ui(self):
print("Resetting ui")
if BackEnd.grab_image:
print('Resetting grab switch')
BackEnd.grab_image = False
switch = self.view.findChild(QObject, 'switchGrab')
switch.setProperty('checked', False)
def stop_grab(self):
print('Stopping grab')
self.reset_ui()
if self.stream.is_running:
self.handler.try_finish()
print('Grab stopped')
@Slot(str)
def load_device(self, path):
print('Loading device from ' + path)
self.stop_grab()
try:
self.device = tmp_device
self.stream = self.device.stream()
self.handler = StreamProcessingHandler(self.stream)
self.stream.get_snapshot()
BackEnd.has_ring_buffer = bool(self.stream.ring_buffer)
if BackEnd.has_ring_buffer:
self.stream.ring_buffer.lock_mode = cvb.RingBufferLockMode.On
BackEnd.img_list = [None] * self.stream.ring_buffer.count
BackEnd.image_controller.refresh(self.device.device_image)
self.handler.run()
except:
print('Failed to load new device')
@Slot(str)
def save_image(self, path):
print('Saving image to ' + path)
self.device.device_image.save(path)
@Slot()
def btn_single_shot(self):
print('Single Shot')
self.reset_ui()
if not self.handler.is_active:
self.handler.run()
BackEnd.single_shot = True
@Slot(bool)
def switch_enable_single_shot_send(self, enabled):
BackEnd.server_single_shot_send_enabled = enabled
@Slot()
def btn_single_shot_send(self):
if BackEnd.server.stream.is_running:
print('Send single shot')
BackEnd.server_single_shot_send = True
else:
print('Acquisition not enabled on server')
@Slot(int, int)
def start_server(self, connection_index, driver_type_index):
print('connection_index', connection_index, 'driver_type_index', driver_type_index)
if self.start_in_progress:
return
self.start_in_progress = True
try:
if BackEnd.server_started is False:
self.reset_ui()
BackEnd.server = gs.Server.create_with_const_size(self.device.device_image.size,
self.device.device_image.color_model,
self.device.device_image.planes[0].data_type,
driver_type_index)
self.add_genicam_features()
BackEnd.server.user_version = 'Python GevServer'
if(BackEnd.has_ring_buffer):
BackEnd.server.stream.resend_buffers_count = 2
else:
BackEnd.server.stream.resend_buffers_count = 0
BackEnd.server.start(self.connections[connection_index].ip_address())
BackEnd.server_started = True
print('Started server')
else:
BackEnd.server.stop()
BackEnd.server_started = False
print('Server stopped')
except:
print('Failed to start/stop the server')
self.start_in_progress = False
def on_window_size_changed(self, valueNode):
print('Window size change callback called')
val = self.window_state_reg_node.value
if val == 0:
self.view.showMinimized()
elif val == 1:
self.view.showNormal()
elif val == 2:
view.showMaximized()
def add_genicam_features(self):
cat_node = gs.CategoryNode.create('Cust::CustomFeatures')
BackEnd.server.node_map.add_node(cat_node)
cat_node.display_name = 'Custom Features'
cat_node.tool_tip = 'Contains all application defined features'
root_node = BackEnd.server.node_map['Root']
root_node.add(cat_node, gs.NodeList.Child)
self.window_state_reg_node = gs.Int32RegNode.create('Cust::WindowStateReg')
BackEnd.server.node_map.add_node(self.window_state_reg_node)
self.window_state_reg_node.visibility = cvb.Visibility.Invisible
self.window_state_reg_node.cache_mode = cvb.CacheMode.NoCache
self.window_state_reg_node.polling_time = 333
self.window_state_reg_node.value = 1
self.event_cookie = self.window_state_reg_node.register_event_written_updated(self.on_window_size_changed)
enumeration_node = gs.EnumerationNode.create('Cust::WindowState')
BackEnd.server.node_map.add_node(enumeration_node)
enumeration_node.display_name = 'Window State'
enumeration_node.tool_tip = 'Current window state of server application'
enumeration_node.value_config_as_node = self.window_state_reg_node
minimized_node = gs.EnumEntryNode.create('Cust::Minimized')
minimized_node.numeric_value = 0
enumeration_node.add(minimized_node, gs.NodeList.Child)
normal_node = gs.EnumEntryNode.create('Cust::Normal')
normal_node.numeric_value = 1
enumeration_node.add(normal_node, gs.NodeList.Child)
max_node = gs.EnumEntryNode.create('Cust::Maximized')
max_node.numeric_value = 2
enumeration_node.add(max_node, gs.NodeList.Child)
cat_node.add(enumeration_node, gs.NodeList.Child)
if __name__ == '__main__':
sys.argv += ['--style', 'Windows']
app = QApplication(sys.argv)
app.setOrganizationName('STEMMER IMAGING')
app.setOrganizationDomain('https://www.stemmer-imaging.com/')
app.setApplicationName('GevServer Python tutorial')
if sys.platform == 'win32':
import ctypes
myappid = u'stemmerimaging.commonvisionblox.pygevserver.0'
ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID(myappid)
app.setWindowIcon(QIcon('Tutorial-Python_32x32.png'))
view = QQuickView()
view.setResizeMode(QQuickView.SizeRootObjectToView)
with BackEnd() as backEnd:
view.rootContext().setContextProperty('backEnd', backEnd)
view.setSource(QUrl('main.qml'))
backEnd.btn_single_shot()
view.resize(640, 390)
view.show()
app.exec_()
Union[cvb.GenICamDevice, cvb.VinDevice, cvb.EmuDevice, cvb.VideoDevice, cvb.NonStreamingDevice] open(str provider, int acquisition_stack=cvb.AcquisitionStack.PreferVin)
None register(cls, str uri="CvbQuick", int version_major=1, int version_minor=0, str qml_name="ImageView")
str expand_path(str path)