import os
import sys
import time
import numpy as np
import cvb
from PIL import Image, ImageDraw, ImageFont
if sys.version_info >= (3, 11):
from PySide6.QtCore import Qt, QThread, Signal, Slot
from PySide6.QtGui import QAction, QImage, QPixmap, QIcon, QStandardItemModel, QStandardItem
from PySide6.QtWidgets import (QApplication, QMainWindow, QPushButton, QSizePolicy,
QVBoxLayout, QWidget, QLabel, QTableView, QHeaderView, QHBoxLayout)
else:
from PySide2.QtCore import Qt, QThread, Signal, Slot
from PySide2.QtGui import QAction, QImage, QPixmap, QIcon, QStandardItemModel, QStandardItem
from PySide2.QtWidgets import (QApplication, QMainWindow, QPushButton, QSizePolicy,
QVBoxLayout, QWidget, QLabel, QTableView, QHeaderView, QHBoxLayout)
MOCK = True
OUTPUT_SIZE = (600, 600)
purpose_list = {
cvb.CompositePurpose.Custom: "Custom",
cvb.CompositePurpose.Image: "Image",
cvb.CompositePurpose.PointCloud: "PointCloud",
cvb.CompositePurpose.ImageList: "ImageList"
}
def purpose_from_index(index):
if 0 <= index < len(purpose_list):
return purpose_list[index]
else:
return "Incompatible purpose"
def resize_image(image, target_size):
pil_image = Image.fromarray(image)
pil_image = pil_image.resize(target_size, Image.LANCZOS)
return np.array(pil_image)
def format_image(image, mode="RGB"):
pil_image = Image.fromarray(image)
if pil_image.mode != mode:
pil_image = pil_image.convert(mode)
return np.array(pil_image)
def prepare_device(device):
nm = device.node_maps["Device"]
node_tlpl = nm.nodes["Std::TLParamsLocked"]
node_tlpl.value = 0
if MOCK:
node_count = nm.nodes["Cust::TestGenDCImageCount"]
node_count.value = 2
node_mode = nm.nodes["Cust::GenDCFlowMappingConfiguration"]
node_mode.value = "MultipleFlow"
node_streamingmode = nm.nodes["Std::GenDCStreamingMode"]
node_streamingmode.value = "On"
node_tlpl = nm.nodes["Std::TLParamsLocked"]
node_tlpl.value = 1
def create_image_with_text(
text, image_size=(
400, 200), bg_color=(
255, 255, 255), text_color=(0, 0, 0)):
image = Image.new("RGB", image_size, bg_color)
try:
font = ImageFont.load_default()
except OSError:
font = None
if font:
draw = ImageDraw.Draw(image)
text_width, text_height = draw.textbbox.textsize(text, font)
x = (image_size[0] - text_width) // 2
y = (image_size[1] - text_height) // 2
draw.text((x, y), text, fill=text_color, font=font)
image_array = np.array(image)
else:
image_array = None
return image_array
def evaluate_composite(composite, id):
images = []
table = []
table.append(f"Image ID: {id}")
table.append(f"Composite purpose: {purpose_from_index(composite.purpose)}")
table.append(f"Composite element count: {composite.item_count}")
for n in range(composite.item_count):
item = composite[n]
item_type = "unknown"
img = None
item_type = "image"
item_type = "plane"
img = create_image_with_text("part is a plane")
item_type = "plane enumerator"
img = create_image_with_text("part is a plane enumerator")
item_type = "pfnc buffer"
img = create_image_with_text("part is a PFNC buffer")
else:
img = create_image_with_text("unknown part type")
table.append(f"Composite item #{n}: {item_type}")
img = format_image(img)
images.append(img)
return images, table
def subdivide_images(images):
num_images = len(images)
side_length = int(num_images ** 0.5)
num_rows = num_cols = side_length
if num_rows * num_cols < num_images:
num_cols += 1
if num_rows * num_cols < num_images:
num_rows += 1
sub_image_width = OUTPUT_SIZE[1] // num_cols
sub_image_height = OUTPUT_SIZE[0] // num_rows
output_image = np.zeros(
(OUTPUT_SIZE[0], OUTPUT_SIZE[1], 3), dtype=np.uint8)
for idx, image in enumerate(images):
sub_image = resize_image(image, (sub_image_width, sub_image_height))
row = idx // num_cols
col = idx % num_cols
output_image[row * sub_image_height:(row + 1) * sub_image_height,
col * sub_image_width:(col + 1) * sub_image_width] = sub_image
return output_image
class AcquisitionThread(QThread):
updateFrame = Signal(np.ndarray)
updateTable = Signal(list)
def __init__(self, parent=None):
QThread.__init__(self, parent)
self.status = True
def run(self):
if MOCK:
cvb.DiscoverFlags.IgnoreVins | cvb.DiscoverFlags.IncludeMockTL, time_span=300)
else:
cvb.DiscoverFlags.IgnoreVins, time_span=300)
if devices:
device_token = next(iter([dev.access_token for dev in devices if
(MOCK and "MockTL" in dev.access_token) or
(not MOCK and "SD" in dev.access_token)]), None)
if not device_token:
raise RuntimeError("No suitable device found.")
prepare_device(device)
stream.start()
image_count = 0
while self.status:
wait_result = stream.wait()
images, table = evaluate_composite(composite, image_count)
if len(images) > 1:
output_image = subdivide_images(images)
else:
output_image = images[0]
output_image_copy = output_image.copy()
self.updateFrame.emit(output_image_copy)
self.updateTable.emit(table)
image_count += 1
stream.stop()
sys.exit(-1)
class Window(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("CVB GenDC Demo")
self.setGeometry(0, 0, 800, 700)
script_dir = os.path.dirname(os.path.realpath(__file__))
icon_path = os.path.join(script_dir, 'Tutorial-Python_32x32.png')
self.setWindowIcon(QIcon(icon_path))
menu = self.menuBar()
menu_file = menu.addMenu("File")
exit = QAction("Exit", self, triggered=qApp.quit)
menu_file.addAction(exit)
self.label = QLabel(self)
self.label.setAlignment(Qt.AlignCenter)
self.label.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Minimum)
buttons_layout = QHBoxLayout()
self.button1 = QPushButton("Start")
self.button2 = QPushButton("Stop/Close")
self.button1.setMaximumWidth(100)
self.button2.setMaximumWidth(100)
self.button1.clicked.connect(self.start)
self.button2.clicked.connect(self.kill_thread)
self.button2.setEnabled(False)
buttons_layout.addWidget(self.button1)
buttons_layout.addWidget(self.button2)
self.table_model = QStandardItemModel(0, 1, self)
self.table_view = QTableView(self)
self.table_view.setModel(self.table_model)
self.table_view.verticalHeader().setVisible(False)
self.table_view.verticalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
table_widget = QWidget()
table_layout = QVBoxLayout()
table_layout.addWidget(self.table_view)
table_widget.setLayout(table_layout)
table_widget.setSizePolicy(
QSizePolicy.Expanding, QSizePolicy.Preferred)
image_widget = QWidget()
image_layout = QVBoxLayout()
image_layout.addWidget(self.label)
image_widget.setLayout(image_layout)
central_widget = QWidget(self)
central_layout = QVBoxLayout()
central_layout.addWidget(image_widget)
central_layout.addLayout(buttons_layout)
central_layout.addWidget(table_widget)
central_widget.setLayout(central_layout)
self.setCentralWidget(central_widget)
self.th = AcquisitionThread(self)
self.th.finished.connect(self.close)
self.th.updateFrame.connect(self.set_image)
self.th.updateTable.connect(self.update_table)
dummy_array = np.zeros(
(OUTPUT_SIZE[0], OUTPUT_SIZE[1], 3), dtype=np.uint8)
self.set_image(dummy_array)
dummy_table_data = ["Press start to begin GenDC streaming"]
self.update_table(dummy_table_data)
@Slot(np.ndarray)
def set_image(self, np_image):
h, w, ch = np_image.shape
q_img = QImage(np_image.data, w, h, ch * w, QImage.Format_RGB888)
scaled_image = QPixmap.fromImage(q_img)
self.label.setPixmap(scaled_image)
@Slot()
def kill_thread(self):
print("Finishing...")
self.button2.setEnabled(False)
self.button1.setEnabled(True)
self.status = False
self.th.terminate()
time.sleep(1)
@Slot()
def start(self):
print("Starting...")
text = []
text.append("Starting...")
self.update_table(text)
self.button2.setEnabled(True)
self.button1.setEnabled(False)
self.th.start()
@Slot(list)
def update_table(self, data):
self.table_model.removeRows(0, self.table_model.rowCount())
self.table_model.setHorizontalHeaderLabels(["Infos"])
for item in data:
self.table_model.appendRow(QStandardItem(item))
self.table_view.horizontalHeader().setSectionResizeMode(
QHeaderView.ResizeToContents)
if __name__ == "__main__":
app = QApplication(sys.argv)
if sys.platform == 'win32':
import ctypes
myappid = u'stemmerimaging.commonvisionblox.pystreamdisplay.0'
ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID(myappid)
w = Window()
w.show()
sys.exit(app.exec())
Union[cvb.GenICamDevice, cvb.VinDevice, cvb.EmuDevice, cvb.VideoDevice, cvb.NonStreamingDevice] open(str provider, int acquisition_stack=cvb.AcquisitionStack.PreferVin)
List[cvb.DiscoveryInformation] discover_from_root(int flags=cvb.DiscoverFlags.FindAll, int time_span=300)
numpy.array to_array(Any buffer)