From f23ba6fa661135f8c5bc7b3c98063f24acffa082 Mon Sep 17 00:00:00 2001 From: Nico Reinartz Date: Sun, 15 Sep 2024 02:04:07 +0200 Subject: [PATCH 1/6] feat: implement basic functionality for 8.8 inch version --- library/lcd/lcd_comm_rev_e.py | 431 ++++++++++++++++++++++++++++++++++ 1 file changed, 431 insertions(+) create mode 100644 library/lcd/lcd_comm_rev_e.py diff --git a/library/lcd/lcd_comm_rev_e.py b/library/lcd/lcd_comm_rev_e.py new file mode 100644 index 00000000..d875d8f5 --- /dev/null +++ b/library/lcd/lcd_comm_rev_e.py @@ -0,0 +1,431 @@ +# turing-smart-screen-python - a Python system monitor and library for USB-C displays like Turing Smart Screen or XuanFang +# https://github.com/mathoudebine/turing-smart-screen-python/ + +# Copyright (C) 2021-2023 Matthieu Houdebine (mathoudebine) +# Copyright (C) 2023-2023 Alex W. Baulé (alexwbaule) +# Copyright (C) 2023-2023 Arthur Ferrai (arthurferrai) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import queue +import time +from enum import Enum +from math import ceil, floor + +import serial +from PIL import Image +from serial.tools.list_ports import comports + +from library.lcd.lcd_comm import Orientation, LcdComm +from library.log import logger + + +class Count: + Start = 0 + + +# READ HELLO ALWAYS IS 23. +# ALL READS IS 1024 + +# ORDER: +# SEND HELLO +# READ HELLO (23) +# SEND STOP_VIDEO +# SEND STOP_MEDIA +# READ STATUS (1024) +# SEND SET_BRIGHTNESS +# SEND SET_OPTIONS WITH ORIENTATION ? +# SEND PRE_UPDATE_BITMAP +# SEND START_DISPLAY_BITMAP +# SEND DISPLAY_BITMAP +# READ STATUS (1024) +# SEND QUERY_STATUS +# READ STATUS (1024) +# WHILE: +# SEND UPDATE_BITMAP +# SEND QUERY_STATUS +# READ STATUS(1024) + +class Command(Enum): + # COMMANDS + HELLO = bytearray((0x01, 0xef, 0x69, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0xc5, 0xd3)) + + RESTART = bytearray((0x84, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01)) + TURNOFF = bytearray((0x83, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01)) + TURNON = bytearray((0x83, 0xef, 0x69, 0x00, 0x00, 0x00, 0x00)) + + SET_BRIGHTNESS = bytearray( + (0x7b, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00)) + + # STOP COMMANDS + STOP_VIDEO = bytearray((0x79, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01)) + STOP_MEDIA = bytearray((0x96, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01)) + + # IMAGE QUERY STATUS + QUERY_STATUS = bytearray((0xcf, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01)) + + # STATIC IMAGE + START_DISPLAY_BITMAP = bytearray((0x2c,)) + PRE_UPDATE_BITMAP = bytearray((0x86, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01)) + UPDATE_BITMAP = bytearray((0xcc, 0xef, 0x69,)) + UPDATE_BITMAP_NO_CHANGES = bytearray((0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0xEF, 0x69,)) + STOP_UPDATE_BITMAP = bytearray((0x87, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01)) + + RESTARTSCREEN = bytearray((0x84, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01)) + DISPLAY_BITMAP = bytearray((0xc8, 0xef, 0x69)) + + OPTIONS = bytearray((0x7d, 0xef, 0x69, 0x00, 0x00, + 0x00, 0x05, 0x00, 0x00, 0x00, 0xff)) + STARTMODE_DEFAULT = bytearray((0x00,)) + STARTMODE_IMAGE = bytearray((0x01,)) + STARTMODE_VIDEO = bytearray((0x02,)) + FLIP_180 = bytearray((0x01,)) + NO_FLIP = bytearray((0x00,)) + + SEND_PAYLOAD = bytearray((0xFF,)) + + def __init__(self, command): + self.command = command + + +class Padding(Enum): + NULL = bytearray([0x00]) + START_DISPLAY_BITMAP = bytearray([0x2c]) + + def __init__(self, command): + self.command = command + + +class SleepInterval(Enum): + OFF = bytearray((0x00,)) + ONE = bytearray((0x01,)) + TWO = bytearray((0x02,)) + THREE = bytearray((0x03,)) + FOUR = bytearray((0x04,)) + FIVE = bytearray((0x05,)) + SIX = bytearray((0x06,)) + SEVEN = bytearray((0x07,)) + EIGHT = bytearray((0x08,)) + NINE = bytearray((0x09,)) + TEN = bytearray((0x0a,)) + + def __init__(self, command): + self.command = command + + +class SubRevision(Enum): + UNKNOWN = None + EIGHTINCH = "chs_88inch" + + def __init__(self, command): + self.command = command + + +# This class is for Turing Smart Screen 5" screens +class LcdCommRevE(LcdComm): + def __init__(self, com_port: str = "AUTO", display_width: int = 1920, display_height: int = 480, + update_queue: queue.Queue = None): + logger.debug("HW revision: E") + LcdComm.__init__(self, com_port, display_width, display_height, update_queue) + self.openSerial() + + def __del__(self): + self.closeSerial() + + @staticmethod + def auto_detect_com_port(): + com_ports = comports() + + # Try to find awake device through serial number or vid/pid + for com_port in com_ports: + if com_port.serial_number == 'CT88INCH': + return com_port.device + if com_port.vid == 0x0525 and com_port.pid == 0xa4a7: + return com_port.device + + # Try to find sleeping device and wake it up + for com_port in com_ports: + if com_port.serial_number == 'USB7INCH' or com_port.serial_number == 'CT88INCH': + LcdCommRevE._connect_to_reset_device_name(com_port) + return LcdCommRevE.auto_detect_com_port() + + return None + + @staticmethod + def _connect_to_reset_device_name(com_port): + # this device enumerates differently when off, we need to connect once to reset it to correct COM device + try: + logger.debug(f"Waiting for device {com_port} to be turned ON...") + serial.Serial(com_port.device, 115200, timeout=1, rtscts=1) + except serial.serialutil.SerialException: + pass + time.sleep(10) + + def _send_command(self, cmd: Command, payload: bytearray = None, padding: Padding = None, + bypass_queue: bool = False, readsize: int = None) -> bytes | None: + message = bytearray() + + if cmd != Command.SEND_PAYLOAD: + message = bytearray(cmd.value) + + # logger.debug("Command: {}".format(cmd.name)) + + if not padding: + padding = Padding.NULL + + if payload: + message.extend(payload) + + msg_size = len(message) + + if not (msg_size / 250).is_integer(): + pad_size = (250 * ceil(msg_size / 250) - msg_size) + message += bytearray(padding.value * pad_size) + + # If no queue for async requests, or if asked explicitly to do the request sequentially: do request now + if not self.update_queue or bypass_queue: + self.WriteData(message) + if readsize: + return self.ReadData(readsize) + else: + # Lock queue mutex then queue the request + self.update_queue.put((self.WriteData, [message])) + if readsize: + self.update_queue.put((self.ReadData, [readsize])) + + def _hello(self): + # This command reads LCD answer on serial link, so it bypasses the queue + self.sub_revision = SubRevision.UNKNOWN + self._send_command(Command.HELLO, bypass_queue=True) + response = str(self.lcd_serial.read(23).decode()) + self.lcd_serial.flushInput() + if response.startswith(SubRevision.EIGHTINCH.value): + self.sub_revision = SubRevision.EIGHTINCH + else: + logger.warning( + "Display returned unknown sub-revision on Hello answer (%s)" % str(response)) + + logger.debug("HW sub-revision: %s" % (str(self.sub_revision))) + + def InitializeComm(self): + pass + + def Reset(self): + logger.info("Display reset (COM port may change)...") + # Reset command bypasses queue because it is run when queue threads are not yet started + self._send_command(Command.HELLO, readsize=1024) + self._send_command(Command.RESTART) + self._send_command(Command.RESTART) + self.closeSerial() + # Wait for display reset then reconnect + time.sleep(15) + self.openSerial() + + def Clear(self): + # This hardware does not implement a Clear command: display a blank image on the whole screen + # Force an orientation in case the screen is currently configured with one different from the theme + backup_orientation = self.orientation + self.SetOrientation(orientation=Orientation.PORTRAIT) + + blank = Image.new( + "RGB", (self.get_width(), self.get_height()), (255, 255, 255)) + self.DisplayPILImage(blank) + + # Restore orientation + self.SetOrientation(orientation=backup_orientation) + + def ScreenOff(self): + logger.info("Calling ScreenOff") + self._send_command(Command.HELLO, bypass_queue=True) + self._send_command(Command.STOP_VIDEO) + response = self._send_command(Command.STOP_MEDIA, readsize=1024) + + assert response == b'media_stop', 'Failed to stop media' + + self._send_command(Command.TURNOFF) + + def ScreenOn(self, is_isolated_call: bool = True): + logger.info("Calling ScreenOn") + + if is_isolated_call: + self._init_packet_interaction() + + self._send_command(Command.STOP_VIDEO) + self._send_command(Command.STOP_MEDIA, readsize=1024) + # self._send_command(Command.SET_BRIGHTNESS, payload=bytearray([255])) + + def SetBrightness(self, level: int = 25, is_isolated_call: bool = True): + # logger.info("Call SetBrightness") + assert 0 <= level <= 100, 'Brightness level must be [0-100]' + + # Brightness scales from 0 to 255, with 255 being the brightest and 0 being the darkest. + # Convert our brightness % to an absolute value. + converted_level = int((level / 100) * 255) + + if is_isolated_call: + self._init_packet_interaction() + + self._send_command(Command.SET_BRIGHTNESS, payload=bytearray( + (converted_level,)), bypass_queue=True) + + def SetOrientation(self, orientation: Orientation = Orientation.PORTRAIT, is_isolated_call: bool = True): + self.orientation = orientation + # logger.info(f"Call SetOrientation to: {self.orientation.name}") + + if is_isolated_call: + self._init_packet_interaction() + + if self.orientation == Orientation.REVERSE_LANDSCAPE or self.orientation == Orientation.REVERSE_PORTRAIT: + b = Command.STARTMODE_DEFAULT.value + Padding.NULL.value + \ + Command.FLIP_180.value + SleepInterval.OFF.value + self._send_command(Command.OPTIONS, payload=b) + else: + b = Command.STARTMODE_DEFAULT.value + Padding.NULL.value + \ + Command.NO_FLIP.value + SleepInterval.OFF.value + self._send_command(Command.OPTIONS, payload=b) + + def DisplayPILImage( + self, + image: Image, + x: int = 0, y: int = 0, + image_width: int = 0, + image_height: int = 0 + ): + # If the image height/width isn't provided, use the native image size + if not image_height: + image_height = image.size[1] + if not image_width: + image_width = image.size[0] + + # If our image is bigger than our display, resize it to fit our screen + if image.size[1] > self.get_height(): + image_height = self.get_height() + if image.size[0] > self.get_width(): + image_width = self.get_width() + + assert x <= self.get_width(), 'Image X coordinate must be <= display width' + assert y <= self.get_height(), 'Image Y coordinate must be <= display height' + assert image_height > 0, 'Image height must be > 0' + assert image_width > 0, 'Image width must be > 0' + + if x == 0 and y == 0 and (image_width == self.get_width()) and (image_height == self.get_height()): + with self.update_queue_mutex: + self._stop_media() + self.SetOrientation(self.orientation, is_isolated_call=False) + + self._send_command(Command.PRE_UPDATE_BITMAP) + self._send_command(Command.START_DISPLAY_BITMAP, + padding=Padding.START_DISPLAY_BITMAP) + + self.SetBrightness(60, is_isolated_call=False) + + self._send_command(Command.DISPLAY_BITMAP, payload=bytearray( + int(self.display_width * self.display_width).to_bytes(4))) + response = self._send_command(Command.SEND_PAYLOAD, + payload=bytearray(self._generate_full_image( + image, self.orientation)), + readsize=1024) + + assert response.startswith(b'full_png_sucess'), 'Failed to display bitmap' + + self._send_command(Command.QUERY_STATUS, readsize=1024) + else: + with self.update_queue_mutex: + update_image, img_len = self._generate_update_image( + image, x, y, self.orientation) + + payload_len = img_len.to_bytes(4, byteorder='big') + command_payload = payload_len + Count.Start.to_bytes(7, byteorder='big') + + self._send_command(Command.UPDATE_BITMAP, payload=command_payload) + self._send_command(Command.SEND_PAYLOAD, payload=update_image) + self._send_command(Command.QUERY_STATUS, readsize=1024) + Count.Start += 1 + + @staticmethod + def _generate_full_image(image: Image, orientation: Orientation = Orientation.PORTRAIT): + if orientation == Orientation.PORTRAIT: + image = image.rotate(90, expand=True) + elif orientation == Orientation.REVERSE_PORTRAIT: + image = image.rotate(270, expand=True) + elif orientation == Orientation.REVERSE_LANDSCAPE: + image = image.rotate(180) + + image_data = image.convert("RGBA").load() + pixel_data = [] + for y in range(image.height): + for x in range(image.width): + pixel = image_data[x, y] + pixel_data += [pixel[2], pixel[1], pixel[0], pixel[3]] + + hex_data = bytes(pixel_data) + return b'\x00'.join(hex_data[i:i + 249] for i in range(0, len(hex_data), 249)) + + def _generate_update_image(self, image, x, y, orientation: Orientation = Orientation.PORTRAIT): + # x0, y0 = x, y + + # if orientation == Orientation.PORTRAIT: + # image = image.rotate(90, expand=True) + # x0 = self.get_width() - x - image.height + # elif orientation == Orientation.REVERSE_PORTRAIT: + # image = image.rotate(270, expand=True) + # y0 = self.get_height() - y - image.width + # elif orientation == Orientation.REVERSE_LANDSCAPE: + # image = image.rotate(180, expand=True) + # y0 = self.get_width() - x - image.width + # x0 = self.get_height() - y - image.height + # elif orientation == Orientation.LANDSCAPE: + # x0, y0 = y, x + + img_raw_data = bytes([]) + image_data = image.convert("RGBA").load() + + for w in range(image.width): + # Target start + img_raw_data += (((x + w) * self.display_height) + (y + image.height)).to_bytes(3, byteorder='big') + + # Number of pixels to be written + img_raw_data += image.height.to_bytes(2, byteorder='big') + + for h in range(image.height): + current_pixel = image_data[w, image.height - h - 1] + img_raw_data += bytes([current_pixel[2], current_pixel[1], current_pixel[0], current_pixel[3]]) + + img_raw_data += bytes([0xef, 0x69]) + + return b'\x00'.join(img_raw_data[i:i + 249] for i in range(0, len(img_raw_data), 249)), len(img_raw_data) + + def _stop_media(self, is_isolated_call: bool = True): + if is_isolated_call: + self._init_packet_interaction() + + self._send_command(Command.STOP_MEDIA) + response = self._send_command(Command.STOP_VIDEO, readsize=1024, bypass_queue=True) + + assert response.startswith(b'media_stop'), 'Failed to stop media' + + def _init_packet_interaction(self): + response = self._send_command(Command.HELLO, readsize=1024, bypass_queue=True) + assert response.startswith(b'chs_88inch'), 'Failed to initialize packet interaction' + + def _no_update(self): + payload_len = (8).to_bytes(4, byteorder='big') + command_payload = payload_len + Count.Start.to_bytes(7, byteorder='big') + + self._send_command(Command.UPDATE_BITMAP, payload=command_payload) + self._send_command(Command.UPDATE_BITMAP_NO_CHANGES) + self._send_command(Command.QUERY_STATUS, readsize=1024) + + Count.Start += 1 From 0670c7668d1f18455d4d39a23dc2904444f6f890 Mon Sep 17 00:00:00 2001 From: Nico Reinartz Date: Mon, 16 Sep 2024 22:10:45 +0200 Subject: [PATCH 2/6] feat: continue with features --- library/lcd/lcd_comm_rev_e.py | 208 +++++++++++++++++++++++++++------- simple-program.py | 30 ++++- 2 files changed, 195 insertions(+), 43 deletions(-) diff --git a/library/lcd/lcd_comm_rev_e.py b/library/lcd/lcd_comm_rev_e.py index d875d8f5..d229f7a8 100644 --- a/library/lcd/lcd_comm_rev_e.py +++ b/library/lcd/lcd_comm_rev_e.py @@ -78,9 +78,10 @@ class Command(Enum): # STATIC IMAGE START_DISPLAY_BITMAP = bytearray((0x2c,)) - PRE_UPDATE_BITMAP = bytearray((0x86, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01)) + PRE_UPDATE_BITMAP = bytearray((0x86, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01)) UPDATE_BITMAP = bytearray((0xcc, 0xef, 0x69,)) - UPDATE_BITMAP_NO_CHANGES = bytearray((0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0xEF, 0x69,)) + UPDATE_BITMAP_NO_CHANGES = bytearray( + (0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0xEF, 0x69,)) STOP_UPDATE_BITMAP = bytearray((0x87, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01)) RESTARTSCREEN = bytearray((0x84, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01)) @@ -94,6 +95,19 @@ class Command(Enum): FLIP_180 = bytearray((0x01,)) NO_FLIP = bytearray((0x00,)) + UPLOAD_FILE = bytearray((0x6F, 0xef, 0x69,)) # 6FEF6900000017000000 + DELETE_FILE = bytearray((0x66, 0xef, 0x69,)) + LIST_FILES = bytearray((0x65, 0xEF, 0x69,)) + QUERY_FILE_SIZE = bytearray((0x6e, 0xef, 0x69,)) + QUERY_STORAGE_INFORMATION = bytearray((0x64, 0xef, 0x69, + 0x00, 0x00, 0x00, 0x01,)) # 64EF6900000001 + + PLAY_IMAGE = bytearray((0x8C, 0xEF, 0x69, 0x00, 0x00, + 0x00, 0x21, 0x00, 0x00, 0x00,)) # 8CEF6900000017000000 + + PLAY_VIDEO = bytearray((0x78, 0xEF, 0x69, 0x00, 0x00, + 0x00, 0x1A, 0x00, 0x00, 0x00,)) # 78EF690000001A000000 + SEND_PAYLOAD = bytearray((0xFF,)) def __init__(self, command): @@ -138,7 +152,8 @@ class LcdCommRevE(LcdComm): def __init__(self, com_port: str = "AUTO", display_width: int = 1920, display_height: int = 480, update_queue: queue.Queue = None): logger.debug("HW revision: E") - LcdComm.__init__(self, com_port, display_width, display_height, update_queue) + LcdComm.__init__(self, com_port, display_width, + display_height, update_queue) self.openSerial() def __del__(self): @@ -258,10 +273,10 @@ def ScreenOff(self): def ScreenOn(self, is_isolated_call: bool = True): logger.info("Calling ScreenOn") - + if is_isolated_call: self._init_packet_interaction() - + self._send_command(Command.STOP_VIDEO) self._send_command(Command.STOP_MEDIA, readsize=1024) # self._send_command(Command.SET_BRIGHTNESS, payload=bytearray([255])) @@ -273,10 +288,10 @@ def SetBrightness(self, level: int = 25, is_isolated_call: bool = True): # Brightness scales from 0 to 255, with 255 being the brightest and 0 being the darkest. # Convert our brightness % to an absolute value. converted_level = int((level / 100) * 255) - + if is_isolated_call: self._init_packet_interaction() - + self._send_command(Command.SET_BRIGHTNESS, payload=bytearray( (converted_level,)), bypass_queue=True) @@ -296,6 +311,102 @@ def SetOrientation(self, orientation: Orientation = Orientation.PORTRAIT, is_iso Command.NO_FLIP.value + SleepInterval.OFF.value self._send_command(Command.OPTIONS, payload=b) + def ListDirectory(self, path: str) -> tuple[list[str], list[str]]: + self._init_packet_interaction() + + payload = len(path).to_bytes(4, byteorder='big') + \ + Padding.NULL.value * 3 + bytearray(path, 'utf-8') + + response = self._send_command( + Command.LIST_FILES, + payload=payload, + readsize=10240, + bypass_queue=True) + + responseList = response.decode().rstrip('\x00') + print(responseList) + + assert responseList.startswith("result"), 'Failed to list files' + + if responseList.startswith('result:'): + parts = responseList.split(':') + return parts[2].split('/')[:-1], parts[3].split('/')[:-1] + + return [], [] + + def UploadFile(self, src_path: str, target_path: str): + payload = len(target_path).to_bytes(4, byteorder='big') + \ + Padding.NULL * 3 + bytearray(target_path, 'utf-8') + + response = self._send_command( + Command.UPLOAD_FILE, + payload=payload, + readsize=1024, + bypass_queue=True) + + assert response.startswith(b'create_success'), 'Failed to create file' + + with open(src_path, 'rb') as file: + byte = file.read(1024) + sent = 0 + while byte != b"": + if len(byte) == 1024: + self._send_command(Command.SEND_PAYLOAD, + payload=byte, bypass_queue=True) + sent += 1024 + else: + response == self._send_command( + Command.SEND_PAYLOAD, payload=byte, readsize=1024, bypass_queue=True) + assert response.startswith( + b'file_rev_done'), 'Failed to upload file' + print("Sent %d bytes" % sent) + byte = file.read(1024) + + def DeleteFile(self, target_path: str): + self._init_packet_interaction() + + payload = len(target_path).to_bytes(4, byteorder='big') + \ + Padding.NULL.value * 3 + bytearray(target_path, 'utf-8') + + self._send_command( + Command.DELETE_FILE, + payload=payload, + bypass_queue=True) + + def GetFileSize(self, target_path: str, is_isolated_call: bool = True): + if is_isolated_call: + self._init_packet_interaction() + + response = self._send_command(Command.QUERY_FILE_SIZE, payload=bytearray( + target_path, 'utf-8'), readsize=1024, bypass_queue=True) + size = int(response.decode().rstrip('\x00')) + + assert size > 0, 'File does not exist' + return size + + def PlayImageFromStorage(self, target_path: str, is_isolated_call: bool = True): + if is_isolated_call: + self._init_packet_interaction() + + response = self._send_command(Command.PLAY_IMAGE, payload=bytearray(target_path, 'utf-8'), + readsize=1024, bypass_queue=True) + + assert response.startswith(b'play_img_ok'), 'Failed to play image' + + def PlayVideoFromStorage(self, target_path: str, is_isolated_call: bool = True): + if is_isolated_call: + self._init_packet_interaction() + + self._stop_media(is_isolated_call=False) + self.SetBrightness(61, is_isolated_call=False) + size = self.GetFileSize(target_path, is_isolated_call=False) + print(bytearray(target_path, 'utf-8').hex()) + response = self._send_command(Command.PLAY_VIDEO, payload=bytearray(target_path, 'utf-8'), + readsize=1024, bypass_queue=True) + + assert response.startswith( + b'play_video_success'), 'Failed to play video' + def DisplayPILImage( self, image: Image, @@ -324,32 +435,43 @@ def DisplayPILImage( with self.update_queue_mutex: self._stop_media() self.SetOrientation(self.orientation, is_isolated_call=False) - + self._send_command(Command.PRE_UPDATE_BITMAP) self._send_command(Command.START_DISPLAY_BITMAP, padding=Padding.START_DISPLAY_BITMAP) - + self.SetBrightness(60, is_isolated_call=False) - - self._send_command(Command.DISPLAY_BITMAP, payload=bytearray( - int(self.display_width * self.display_width).to_bytes(4))) - response = self._send_command(Command.SEND_PAYLOAD, - payload=bytearray(self._generate_full_image( - image, self.orientation)), - readsize=1024) - assert response.startswith(b'full_png_sucess'), 'Failed to display bitmap' + self._send_command( + Command.DISPLAY_BITMAP, + payload=bytearray(int(self.display_width * self.display_width) + .to_bytes(4)), + bypass_queue=True + ) + + response = self._send_command( + Command.SEND_PAYLOAD, + payload=bytearray( + self._generate_full_image(image, self.orientation)), + readsize=1024, + bypass_queue=True + ) + + assert response.startswith( + b'full_png_sucess'), 'Failed to display bitmap' self._send_command(Command.QUERY_STATUS, readsize=1024) else: with self.update_queue_mutex: update_image, img_len = self._generate_update_image( - image, x, y, self.orientation) - - payload_len = img_len.to_bytes(4, byteorder='big') - command_payload = payload_len + Count.Start.to_bytes(7, byteorder='big') + image, x, y, self.orientation) - self._send_command(Command.UPDATE_BITMAP, payload=command_payload) + payload_len = img_len.to_bytes(4, byteorder='big') + command_payload = payload_len + \ + Count.Start.to_bytes(7, byteorder='big') + + self._send_command(Command.UPDATE_BITMAP, + payload=command_payload) self._send_command(Command.SEND_PAYLOAD, payload=update_image) self._send_command(Command.QUERY_STATUS, readsize=1024) Count.Start += 1 @@ -367,7 +489,7 @@ def _generate_full_image(image: Image, orientation: Orientation = Orientation.PO pixel_data = [] for y in range(image.height): for x in range(image.width): - pixel = image_data[x, y] + pixel = image_data[x, y] pixel_data += [pixel[2], pixel[1], pixel[0], pixel[3]] hex_data = bytes(pixel_data) @@ -391,20 +513,22 @@ def _generate_update_image(self, image, x, y, orientation: Orientation = Orienta img_raw_data = bytes([]) image_data = image.convert("RGBA").load() - + for w in range(image.width): # Target start - img_raw_data += (((x + w) * self.display_height) + (y + image.height)).to_bytes(3, byteorder='big') - + img_raw_data += (((x + w) * self.display_height) + + (y + image.height)).to_bytes(3, byteorder='big') + # Number of pixels to be written img_raw_data += image.height.to_bytes(2, byteorder='big') - + for h in range(image.height): current_pixel = image_data[w, image.height - h - 1] - img_raw_data += bytes([current_pixel[2], current_pixel[1], current_pixel[0], current_pixel[3]]) - - img_raw_data += bytes([0xef, 0x69]) - + img_raw_data += bytes([current_pixel[2], current_pixel[1], + current_pixel[0], current_pixel[3]]) + + img_raw_data += bytes([0xef, 0x69]) + return b'\x00'.join(img_raw_data[i:i + 249] for i in range(0, len(img_raw_data), 249)), len(img_raw_data) def _stop_media(self, is_isolated_call: bool = True): @@ -412,20 +536,24 @@ def _stop_media(self, is_isolated_call: bool = True): self._init_packet_interaction() self._send_command(Command.STOP_MEDIA) - response = self._send_command(Command.STOP_VIDEO, readsize=1024, bypass_queue=True) + response = self._send_command( + Command.STOP_VIDEO, readsize=1024, bypass_queue=True) assert response.startswith(b'media_stop'), 'Failed to stop media' - + def _init_packet_interaction(self): - response = self._send_command(Command.HELLO, readsize=1024, bypass_queue=True) - assert response.startswith(b'chs_88inch'), 'Failed to initialize packet interaction' - + response = self._send_command( + Command.HELLO, readsize=1024, bypass_queue=True) + assert response.startswith( + b'chs_88inch'), 'Failed to initialize packet interaction' + def _no_update(self): - payload_len = (8).to_bytes(4, byteorder='big') - command_payload = payload_len + Count.Start.to_bytes(7, byteorder='big') - + payload_len = (8).to_bytes(4, byteorder='big') + command_payload = payload_len + \ + Count.Start.to_bytes(7, byteorder='big') + self._send_command(Command.UPDATE_BITMAP, payload=command_payload) self._send_command(Command.UPDATE_BITMAP_NO_CHANGES) self._send_command(Command.QUERY_STATUS, readsize=1024) - + Count.Start += 1 diff --git a/simple-program.py b/simple-program.py index af41c0bd..c8a2896c 100755 --- a/simple-program.py +++ b/simple-program.py @@ -30,6 +30,7 @@ from library.lcd.lcd_comm_rev_b import LcdCommRevB from library.lcd.lcd_comm_rev_c import LcdCommRevC from library.lcd.lcd_comm_rev_d import LcdCommRevD +from library.lcd.lcd_comm_rev_e import LcdCommRevE from library.lcd.lcd_simulated import LcdSimulated from library.log import logger @@ -46,7 +47,7 @@ # - SIMU for 3.5" simulated LCD (image written in screencap.png) # - SIMU5 for 5" simulated LCD # To identify your smart screen: https://github.com/mathoudebine/turing-smart-screen-python/wiki/Hardware-revisions -REVISION = "A" +REVISION = "E" stop = False @@ -79,6 +80,9 @@ def sighandler(signum, frame): elif REVISION == "D": logger.info("Selected Hardware Revision D (Kipye Qiye Smart Display 3.5\")") lcd_comm = LcdCommRevD(com_port=COM_PORT) + elif REVISION == "E": + logger.info("Selected Hardware Revision E (Turing Smart Screen 8.8\")") + lcd_comm = LcdCommRevE() elif REVISION == "SIMU2.1": logger.info("Selected 2.1\" Simulated LCD") lcd_comm = LcdSimulated(display_width=480, display_height=480) @@ -99,7 +103,7 @@ def sighandler(signum, frame): os._exit(1) # Reset screen in case it was in an unstable state (screen is also cleared) - lcd_comm.Reset() + #lcd_comm.Clear() # Send initialization commands lcd_comm.InitializeComm() @@ -113,6 +117,26 @@ def sighandler(signum, frame): # Set orientation (screen starts in Portrait) lcd_comm.SetOrientation(orientation=Orientation.LANDSCAPE) + #lcd_comm.GetFileSize("/mnt/UDISK/video/earthd.mp4") + + dirs, files = lcd_comm.ListDirectory("/mnt/UDISK/img/") + + print(dirs) + print(files) + + lcd_comm.DeleteFile("/mnt/UDISK/img/test.png") + + dirs, files = lcd_comm.ListDirectory("/mnt/UDISK/img/") + + print(dirs) + print(files) + + lcd_comm.UploadFile("./res/themes/--Theme examples/8.8inch/Cyberpunk 2077/theme_res_1888.png", "/mnt/UDISK/img/cyberpunk.png") + + lcd_comm.PlayVideoFromStorage("/mnt/UDISK/video/earth.mp4") + lcd_comm.PlayImageFromStorage("/mnt/UDISK/img/test.png") + + #lcd_comm._stop_media(); # Define background picture background = f"res/backgrounds/example_{lcd_comm.get_width()}x{lcd_comm.get_height()}.png" @@ -141,7 +165,7 @@ def sighandler(signum, frame): font_color=(255, 255, 255), background_image=background) - # Display the current time and some progress bars as fast as possible + #Display the current time and some progress bars as fast as possible bar_value = 0 while not stop: start = time.perf_counter() From e8b781b2f49900eba0ad148457e3b8669c7d5406 Mon Sep 17 00:00:00 2001 From: Nico Reinartz Date: Tue, 17 Sep 2024 23:50:43 +0200 Subject: [PATCH 3/6] feat: continue with all functions, include in display.py --- library/display.py | 4 ++++ library/lcd/lcd_comm_rev_e.py | 43 +++++++++++++++++++++-------------- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/library/display.py b/library/display.py index 4913b202..d5deabaf 100644 --- a/library/display.py +++ b/library/display.py @@ -24,6 +24,7 @@ from library.lcd.lcd_comm_rev_b import LcdCommRevB from library.lcd.lcd_comm_rev_c import LcdCommRevC from library.lcd.lcd_comm_rev_d import LcdCommRevD +from library.lcd.lcd_comm_rev_e import LcdCommRevE from library.lcd.lcd_simulated import LcdSimulated from library.log import logger @@ -67,6 +68,9 @@ def __init__(self): elif config.CONFIG_DATA["display"]["REVISION"] == "D": self.lcd = LcdCommRevD(com_port=config.CONFIG_DATA['config']['COM_PORT'], update_queue=config.update_queue) + elif config.CONFIG_DATA["display"]["REVISION"] == "E": + self.lcd = LcdCommRevE(com_port=config.CONFIG_DATA['config']['COM_PORT'], + update_queue=config.update_queue) elif (config.CONFIG_DATA["display"]["REVISION"] == "SIMU" or config.CONFIG_DATA["display"]["REVISION"] == "SIMU3.5"): self.lcd = LcdSimulated(display_width=320, diff --git a/library/lcd/lcd_comm_rev_e.py b/library/lcd/lcd_comm_rev_e.py index d229f7a8..2142e125 100644 --- a/library/lcd/lcd_comm_rev_e.py +++ b/library/lcd/lcd_comm_rev_e.py @@ -102,11 +102,8 @@ class Command(Enum): QUERY_STORAGE_INFORMATION = bytearray((0x64, 0xef, 0x69, 0x00, 0x00, 0x00, 0x01,)) # 64EF6900000001 - PLAY_IMAGE = bytearray((0x8C, 0xEF, 0x69, 0x00, 0x00, - 0x00, 0x21, 0x00, 0x00, 0x00,)) # 8CEF6900000017000000 - - PLAY_VIDEO = bytearray((0x78, 0xEF, 0x69, 0x00, 0x00, - 0x00, 0x1A, 0x00, 0x00, 0x00,)) # 78EF690000001A000000 + PLAY_IMAGE = bytearray((0x8C, 0xEF, 0x69,)) + PLAY_VIDEO = bytearray((0x78, 0xEF, 0x69,)) SEND_PAYLOAD = bytearray((0xFF,)) @@ -336,7 +333,7 @@ def ListDirectory(self, path: str) -> tuple[list[str], list[str]]: def UploadFile(self, src_path: str, target_path: str): payload = len(target_path).to_bytes(4, byteorder='big') + \ - Padding.NULL * 3 + bytearray(target_path, 'utf-8') + Padding.NULL.value * 3 + bytearray(target_path, 'utf-8') response = self._send_command( Command.UPLOAD_FILE, @@ -355,7 +352,7 @@ def UploadFile(self, src_path: str, target_path: str): payload=byte, bypass_queue=True) sent += 1024 else: - response == self._send_command( + response = self._send_command( Command.SEND_PAYLOAD, payload=byte, readsize=1024, bypass_queue=True) assert response.startswith( b'file_rev_done'), 'Failed to upload file' @@ -376,9 +373,16 @@ def DeleteFile(self, target_path: str): def GetFileSize(self, target_path: str, is_isolated_call: bool = True): if is_isolated_call: self._init_packet_interaction() + + payload = len(target_path).to_bytes(4, byteorder='big') + \ + Padding.NULL.value * 3 + bytearray(target_path, 'utf-8') - response = self._send_command(Command.QUERY_FILE_SIZE, payload=bytearray( - target_path, 'utf-8'), readsize=1024, bypass_queue=True) + response = self._send_command( + Command.QUERY_FILE_SIZE, + payload=payload, + readsize=1024, + bypass_queue=True) + size = int(response.decode().rstrip('\x00')) assert size > 0, 'File does not exist' @@ -387,8 +391,11 @@ def GetFileSize(self, target_path: str, is_isolated_call: bool = True): def PlayImageFromStorage(self, target_path: str, is_isolated_call: bool = True): if is_isolated_call: self._init_packet_interaction() + + payload = len(target_path).to_bytes(4, byteorder='big') + \ + Padding.NULL.value * 3 + bytearray(target_path, 'utf-8') - response = self._send_command(Command.PLAY_IMAGE, payload=bytearray(target_path, 'utf-8'), + response = self._send_command(Command.PLAY_IMAGE, payload=payload, readsize=1024, bypass_queue=True) assert response.startswith(b'play_img_ok'), 'Failed to play image' @@ -396,13 +403,15 @@ def PlayImageFromStorage(self, target_path: str, is_isolated_call: bool = True): def PlayVideoFromStorage(self, target_path: str, is_isolated_call: bool = True): if is_isolated_call: self._init_packet_interaction() - - self._stop_media(is_isolated_call=False) - self.SetBrightness(61, is_isolated_call=False) - size = self.GetFileSize(target_path, is_isolated_call=False) - print(bytearray(target_path, 'utf-8').hex()) - response = self._send_command(Command.PLAY_VIDEO, payload=bytearray(target_path, 'utf-8'), - readsize=1024, bypass_queue=True) + + payload = len(target_path).to_bytes(4, byteorder='big') + \ + Padding.NULL.value * 3 + bytearray(target_path, 'utf-8') + + response = self._send_command( + Command.PLAY_VIDEO, + payload=payload, + readsize=1024, + bypass_queue=True) assert response.startswith( b'play_video_success'), 'Failed to play video' From 04c51de49517ec07c5beef150c5efb67b431477a Mon Sep 17 00:00:00 2001 From: Nico Reinartz Date: Sun, 6 Oct 2024 21:56:07 +0200 Subject: [PATCH 4/6] fix: support all display orientations --- library/lcd/lcd_comm_rev_e.py | 78 ++++++++++++----------------------- 1 file changed, 26 insertions(+), 52 deletions(-) diff --git a/library/lcd/lcd_comm_rev_e.py b/library/lcd/lcd_comm_rev_e.py index 2142e125..1ec9465a 100644 --- a/library/lcd/lcd_comm_rev_e.py +++ b/library/lcd/lcd_comm_rev_e.py @@ -21,7 +21,7 @@ import queue import time from enum import Enum -from math import ceil, floor +from math import ceil import serial from PIL import Image @@ -34,29 +34,6 @@ class Count: Start = 0 - -# READ HELLO ALWAYS IS 23. -# ALL READS IS 1024 - -# ORDER: -# SEND HELLO -# READ HELLO (23) -# SEND STOP_VIDEO -# SEND STOP_MEDIA -# READ STATUS (1024) -# SEND SET_BRIGHTNESS -# SEND SET_OPTIONS WITH ORIENTATION ? -# SEND PRE_UPDATE_BITMAP -# SEND START_DISPLAY_BITMAP -# SEND DISPLAY_BITMAP -# READ STATUS (1024) -# SEND QUERY_STATUS -# READ STATUS (1024) -# WHILE: -# SEND UPDATE_BITMAP -# SEND QUERY_STATUS -# READ STATUS(1024) - class Command(Enum): # COMMANDS HELLO = bytearray((0x01, 0xef, 0x69, 0x00, 0x00, 0x00, @@ -169,7 +146,7 @@ def auto_detect_com_port(): # Try to find sleeping device and wake it up for com_port in com_ports: - if com_port.serial_number == 'USB7INCH' or com_port.serial_number == 'CT88INCH': + if com_port.serial_number == 'CT88INCH': LcdCommRevE._connect_to_reset_device_name(com_port) return LcdCommRevE.auto_detect_com_port() @@ -229,17 +206,15 @@ def _hello(self): logger.warning( "Display returned unknown sub-revision on Hello answer (%s)" % str(response)) - logger.debug("HW sub-revision: %s" % (str(self.sub_revision))) - def InitializeComm(self): pass def Reset(self): logger.info("Display reset (COM port may change)...") - # Reset command bypasses queue because it is run when queue threads are not yet started - self._send_command(Command.HELLO, readsize=1024) - self._send_command(Command.RESTART) - self._send_command(Command.RESTART) + + self._init_packet_interaction() + self._send_command(Command.RESTART, bypass_queue=True) + self._send_command(Command.RESTART, bypass_queue=True) self.closeSerial() # Wait for display reset then reconnect time.sleep(15) @@ -487,12 +462,12 @@ def DisplayPILImage( @staticmethod def _generate_full_image(image: Image, orientation: Orientation = Orientation.PORTRAIT): - if orientation == Orientation.PORTRAIT: + if orientation == Orientation.REVERSE_PORTRAIT: image = image.rotate(90, expand=True) - elif orientation == Orientation.REVERSE_PORTRAIT: - image = image.rotate(270, expand=True) elif orientation == Orientation.REVERSE_LANDSCAPE: - image = image.rotate(180) + image = image.rotate(180, expand=True) + elif orientation == Orientation.PORTRAIT: + image = image.rotate(270, expand=True) image_data = image.convert("RGBA").load() pixel_data = [] @@ -505,28 +480,27 @@ def _generate_full_image(image: Image, orientation: Orientation = Orientation.PO return b'\x00'.join(hex_data[i:i + 249] for i in range(0, len(hex_data), 249)) def _generate_update_image(self, image, x, y, orientation: Orientation = Orientation.PORTRAIT): - # x0, y0 = x, y - - # if orientation == Orientation.PORTRAIT: - # image = image.rotate(90, expand=True) - # x0 = self.get_width() - x - image.height - # elif orientation == Orientation.REVERSE_PORTRAIT: - # image = image.rotate(270, expand=True) - # y0 = self.get_height() - y - image.width - # elif orientation == Orientation.REVERSE_LANDSCAPE: - # image = image.rotate(180, expand=True) - # y0 = self.get_width() - x - image.width - # x0 = self.get_height() - y - image.height - # elif orientation == Orientation.LANDSCAPE: - # x0, y0 = y, x + x0, y0 = x, y + + if orientation == Orientation.PORTRAIT: + y0 = self.get_height() - y - image.height + elif orientation == Orientation.REVERSE_PORTRAIT: + image = image.rotate(180, expand=True) + x0 = self.get_width() - x - image.width + elif orientation == Orientation.LANDSCAPE: + image = image.rotate(90, expand=True) + x0, y0 = y, x + elif orientation == Orientation.REVERSE_LANDSCAPE: + image = image.rotate(270, expand=True) + x0 = self.get_height() - y - image.width + y0 = self.get_width() - x - image.height img_raw_data = bytes([]) image_data = image.convert("RGBA").load() for w in range(image.width): # Target start - img_raw_data += (((x + w) * self.display_height) + - (y + image.height)).to_bytes(3, byteorder='big') + img_raw_data += (((x0 + w) * self.display_height) + y0).to_bytes(3, byteorder='big') # Number of pixels to be written img_raw_data += image.height.to_bytes(2, byteorder='big') @@ -544,7 +518,7 @@ def _stop_media(self, is_isolated_call: bool = True): if is_isolated_call: self._init_packet_interaction() - self._send_command(Command.STOP_MEDIA) + self._send_command(Command.STOP_MEDIA, bypass_queue=True) response = self._send_command( Command.STOP_VIDEO, readsize=1024, bypass_queue=True) From 57e660d95c867a788d4c6de499c356a31b960df4 Mon Sep 17 00:00:00 2001 From: Nico Reinartz Date: Thu, 10 Oct 2024 19:24:02 +0200 Subject: [PATCH 5/6] fix: remove test code --- simple-program.py | 24 ++---------------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/simple-program.py b/simple-program.py index c8a2896c..ea4450d3 100755 --- a/simple-program.py +++ b/simple-program.py @@ -82,7 +82,7 @@ def sighandler(signum, frame): lcd_comm = LcdCommRevD(com_port=COM_PORT) elif REVISION == "E": logger.info("Selected Hardware Revision E (Turing Smart Screen 8.8\")") - lcd_comm = LcdCommRevE() + lcd_comm = LcdCommRevE(com_port=COM_PORT) elif REVISION == "SIMU2.1": logger.info("Selected 2.1\" Simulated LCD") lcd_comm = LcdSimulated(display_width=480, display_height=480) @@ -103,7 +103,7 @@ def sighandler(signum, frame): os._exit(1) # Reset screen in case it was in an unstable state (screen is also cleared) - #lcd_comm.Clear() + lcd_comm.Reset() # Send initialization commands lcd_comm.InitializeComm() @@ -117,26 +117,6 @@ def sighandler(signum, frame): # Set orientation (screen starts in Portrait) lcd_comm.SetOrientation(orientation=Orientation.LANDSCAPE) - #lcd_comm.GetFileSize("/mnt/UDISK/video/earthd.mp4") - - dirs, files = lcd_comm.ListDirectory("/mnt/UDISK/img/") - - print(dirs) - print(files) - - lcd_comm.DeleteFile("/mnt/UDISK/img/test.png") - - dirs, files = lcd_comm.ListDirectory("/mnt/UDISK/img/") - - print(dirs) - print(files) - - lcd_comm.UploadFile("./res/themes/--Theme examples/8.8inch/Cyberpunk 2077/theme_res_1888.png", "/mnt/UDISK/img/cyberpunk.png") - - lcd_comm.PlayVideoFromStorage("/mnt/UDISK/video/earth.mp4") - lcd_comm.PlayImageFromStorage("/mnt/UDISK/img/test.png") - - #lcd_comm._stop_media(); # Define background picture background = f"res/backgrounds/example_{lcd_comm.get_width()}x{lcd_comm.get_height()}.png" From a30ff43d9e27c0b7a9ec4e5a54cd742b57a64e13 Mon Sep 17 00:00:00 2001 From: Nico Reinartz Date: Sat, 12 Oct 2024 00:37:55 +0200 Subject: [PATCH 6/6] feat: remove subdivisions, try to make sleep work, --- configure.py | 6 +- library/lcd/lcd_comm_rev_e.py | 123 +++++++++++++++++----------------- 2 files changed, 63 insertions(+), 66 deletions(-) diff --git a/configure.py b/configure.py index c8d90631..815880b9 100755 --- a/configure.py +++ b/configure.py @@ -82,8 +82,8 @@ ('B', SIZE_3_5_INCH): XUANFANG_MODEL, ('C', SIZE_2_1_INCH): TURING_MODEL, ('C', SIZE_5_INCH): TURING_MODEL, - ('C', SIZE_8_8_INCH): TURING_MODEL, ('D', SIZE_3_5_INCH): KIPYE_MODEL, + ('E', SIZE_8_8_INCH): TURING_MODEL, ('SIMU2.1', SIZE_2_1_INCH): SIMULATED_MODEL, ('SIMU', SIZE_3_5_INCH): SIMULATED_MODEL, ('SIMU3.5', SIZE_3_5_INCH): SIMULATED_MODEL, @@ -96,9 +96,9 @@ (USBPCMONITOR_MODEL, SIZE_5_INCH): 'A', (XUANFANG_MODEL, SIZE_3_5_INCH): 'B', (TURING_MODEL, SIZE_2_1_INCH): 'C', - (TURING_MODEL, SIZE_5_INCH): 'C', - (TURING_MODEL, SIZE_8_8_INCH): 'C', + (TURING_MODEL, SIZE_5_INCH): 'C', (KIPYE_MODEL, SIZE_3_5_INCH): 'D', + (TURING_MODEL, SIZE_8_8_INCH): 'E', (SIMULATED_MODEL, SIZE_2_1_INCH): 'SIMU2.1', (SIMULATED_MODEL, SIZE_3_5_INCH): 'SIMU3.5', (SIMULATED_MODEL, SIZE_5_INCH): 'SIMU5', diff --git a/library/lcd/lcd_comm_rev_e.py b/library/lcd/lcd_comm_rev_e.py index 1ec9465a..44f41656 100644 --- a/library/lcd/lcd_comm_rev_e.py +++ b/library/lcd/lcd_comm_rev_e.py @@ -66,18 +66,20 @@ class Command(Enum): OPTIONS = bytearray((0x7d, 0xef, 0x69, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0xff)) + OPTIONS_TURNOFF = bytearray((0x7d, 0xef, 0x69, 0x00, 0x00, + 0x00, 0x05, 0x00, 0x00, 0x00, 0x61)) STARTMODE_DEFAULT = bytearray((0x00,)) STARTMODE_IMAGE = bytearray((0x01,)) STARTMODE_VIDEO = bytearray((0x02,)) FLIP_180 = bytearray((0x01,)) NO_FLIP = bytearray((0x00,)) - UPLOAD_FILE = bytearray((0x6F, 0xef, 0x69,)) # 6FEF6900000017000000 + UPLOAD_FILE = bytearray((0x6F, 0xef, 0x69,)) DELETE_FILE = bytearray((0x66, 0xef, 0x69,)) LIST_FILES = bytearray((0x65, 0xEF, 0x69,)) QUERY_FILE_SIZE = bytearray((0x6e, 0xef, 0x69,)) QUERY_STORAGE_INFORMATION = bytearray((0x64, 0xef, 0x69, - 0x00, 0x00, 0x00, 0x01,)) # 64EF6900000001 + 0x00, 0x00, 0x00, 0x01,)) PLAY_IMAGE = bytearray((0x8C, 0xEF, 0x69,)) PLAY_VIDEO = bytearray((0x78, 0xEF, 0x69,)) @@ -112,15 +114,6 @@ class SleepInterval(Enum): def __init__(self, command): self.command = command - -class SubRevision(Enum): - UNKNOWN = None - EIGHTINCH = "chs_88inch" - - def __init__(self, command): - self.command = command - - # This class is for Turing Smart Screen 5" screens class LcdCommRevE(LcdComm): def __init__(self, com_port: str = "AUTO", display_width: int = 1920, display_height: int = 480, @@ -139,16 +132,16 @@ def auto_detect_com_port(): # Try to find awake device through serial number or vid/pid for com_port in com_ports: - if com_port.serial_number == 'CT88INCH': - return com_port.device if com_port.vid == 0x0525 and com_port.pid == 0xa4a7: return com_port.device - - # Try to find sleeping device and wake it up + + # If device is asleep, it is listening on a different COM port and identifies as CT88INCH or VID_1A86 & PID_CA88 for com_port in com_ports: - if com_port.serial_number == 'CT88INCH': - LcdCommRevE._connect_to_reset_device_name(com_port) - return LcdCommRevE.auto_detect_com_port() + if com_port.serial_number == "CT88INCH": + logger.debug("Oh look, I found a sleeping 88 inch. Wakey wakey!") + return com_port.device + elif com_port.vid == 0x1a86 and com_port.pid == 0xCA88: + return com_port.device return None @@ -195,16 +188,7 @@ def _send_command(self, cmd: Command, payload: bytearray = None, padding: Paddin self.update_queue.put((self.ReadData, [readsize])) def _hello(self): - # This command reads LCD answer on serial link, so it bypasses the queue - self.sub_revision = SubRevision.UNKNOWN - self._send_command(Command.HELLO, bypass_queue=True) - response = str(self.lcd_serial.read(23).decode()) - self.lcd_serial.flushInput() - if response.startswith(SubRevision.EIGHTINCH.value): - self.sub_revision = SubRevision.EIGHTINCH - else: - logger.warning( - "Display returned unknown sub-revision on Hello answer (%s)" % str(response)) + self._init_packet_interaction(); def InitializeComm(self): pass @@ -212,12 +196,21 @@ def InitializeComm(self): def Reset(self): logger.info("Display reset (COM port may change)...") - self._init_packet_interaction() + # When the device is asleep, there is an empty response to the hello + self._send_command(Command.HELLO, readsize=1024, bypass_queue=True) + + # Device wants to receive the restart command twice self._send_command(Command.RESTART, bypass_queue=True) self._send_command(Command.RESTART, bypass_queue=True) self.closeSerial() + # Wait for display reset then reconnect time.sleep(15) + + # When waking up, the device switches from COM3 to COM5 + # -> Reset com port + self.com_port = 'AUTO' + self.openSerial() def Clear(self): @@ -235,25 +228,30 @@ def Clear(self): def ScreenOff(self): logger.info("Calling ScreenOff") - self._send_command(Command.HELLO, bypass_queue=True) - self._send_command(Command.STOP_VIDEO) - response = self._send_command(Command.STOP_MEDIA, readsize=1024) - - assert response == b'media_stop', 'Failed to stop media' - - self._send_command(Command.TURNOFF) + + # I really don't know why I cannot stop the bitmap updates from here + + self.SetBrightness(0, is_isolated_call=False, bypass_queue=True) + + self._send_command(Command.OPTIONS_TURNOFF, bypass_queue=True) + self._send_command(Command.TURNOFF, bypass_queue=True) + self._send_command(Command.STOP_VIDEO, bypass_queue=True) + + # UsbMonitorS sends a completely black bitmap at this point + # Experiments showed that this is not needed - def ScreenOn(self, is_isolated_call: bool = True): + def ScreenOn(self): logger.info("Calling ScreenOn") + # Display needs to be reinizialized at this point, but as far as I can tell, this isn't possible at this point + + self.closeSerial() + # # Give the display some time to wake up and init COM5 + time.sleep(10) + + self.com_port = "AUTO" + self.openSerial() - if is_isolated_call: - self._init_packet_interaction() - - self._send_command(Command.STOP_VIDEO) - self._send_command(Command.STOP_MEDIA, readsize=1024) - # self._send_command(Command.SET_BRIGHTNESS, payload=bytearray([255])) - - def SetBrightness(self, level: int = 25, is_isolated_call: bool = True): + def SetBrightness(self, level: int = 25, is_isolated_call: bool = True, bypass_queue: bool = True): # logger.info("Call SetBrightness") assert 0 <= level <= 100, 'Brightness level must be [0-100]' @@ -265,7 +263,7 @@ def SetBrightness(self, level: int = 25, is_isolated_call: bool = True): self._init_packet_interaction() self._send_command(Command.SET_BRIGHTNESS, payload=bytearray( - (converted_level,)), bypass_queue=True) + (converted_level,)), bypass_queue=bypass_queue) def SetOrientation(self, orientation: Orientation = Orientation.PORTRAIT, is_isolated_call: bool = True): self.orientation = orientation @@ -296,15 +294,13 @@ def ListDirectory(self, path: str) -> tuple[list[str], list[str]]: bypass_queue=True) responseList = response.decode().rstrip('\x00') - print(responseList) - - assert responseList.startswith("result"), 'Failed to list files' - if responseList.startswith('result:'): - parts = responseList.split(':') - return parts[2].split('/')[:-1], parts[3].split('/')[:-1] + if not responseList.startswith("result"): + logger.warning("Failed to list files") + return [], [] - return [], [] + parts = responseList.split(':') + return parts[2].split('/')[:-1], parts[3].split('/')[:-1] def UploadFile(self, src_path: str, target_path: str): payload = len(target_path).to_bytes(4, byteorder='big') + \ @@ -397,7 +393,7 @@ def DisplayPILImage( x: int = 0, y: int = 0, image_width: int = 0, image_height: int = 0 - ): + ): # If the image height/width isn't provided, use the native image size if not image_height: image_height = image.size[1] @@ -428,8 +424,7 @@ def DisplayPILImage( self._send_command( Command.DISPLAY_BITMAP, - payload=bytearray(int(self.display_width * self.display_width) - .to_bytes(4)), + payload=bytearray(int(self.display_width * self.display_height * 4).to_bytes(4)), bypass_queue=True ) @@ -440,13 +435,13 @@ def DisplayPILImage( readsize=1024, bypass_queue=True ) - - assert response.startswith( - b'full_png_sucess'), 'Failed to display bitmap' + + if not response.startswith(b'full_png_sucess'): + logger.warning("Failed to display bitmap") self._send_command(Command.QUERY_STATUS, readsize=1024) else: - with self.update_queue_mutex: + with self.update_queue_mutex: update_image, img_len = self._generate_update_image( image, x, y, self.orientation) @@ -522,13 +517,15 @@ def _stop_media(self, is_isolated_call: bool = True): response = self._send_command( Command.STOP_VIDEO, readsize=1024, bypass_queue=True) - assert response.startswith(b'media_stop'), 'Failed to stop media' + if not response.startswith(b'media_stop'): + logger.warning("Failed to stop media, got response: %s", response) def _init_packet_interaction(self): response = self._send_command( Command.HELLO, readsize=1024, bypass_queue=True) - assert response.startswith( - b'chs_88inch'), 'Failed to initialize packet interaction' + + if not response.startswith(b'chs_88inch'): + logger.warning("Failed to initialize packet interaction, got response: %s", response) def _no_update(self): payload_len = (8).to_bytes(4, byteorder='big')