From 664802ddead9b6762c3ace00dbd8970e83a2a0bb Mon Sep 17 00:00:00 2001 From: Antoine Martin Date: Thu, 6 Feb 2025 22:06:35 +0700 Subject: [PATCH] #4491 encode server supports mmap or lz4 --- xpra/client/base/command.py | 85 --------------------- xpra/client/base/encode.py | 135 +++++++++++++++++++++++++++++++++ xpra/client/mixins/mmap.py | 6 +- xpra/net/mmap.py | 44 ++++++++--- xpra/scripts/main.py | 3 +- xpra/server/encoder_server.py | 30 +++++--- xpra/server/source/mmap.py | 2 +- xpra/server/window/compress.py | 7 +- 8 files changed, 200 insertions(+), 112 deletions(-) create mode 100644 xpra/client/base/encode.py diff --git a/xpra/client/base/command.py b/xpra/client/base/command.py index 603bb7f80a..5e15a03b3a 100644 --- a/xpra/client/base/command.py +++ b/xpra/client/base/command.py @@ -752,88 +752,3 @@ def __init__(self, opts): self.displayfd = int(opts.displayfd) except (ValueError, TypeError): self.displayfd = 0 - - -class EncodeClient(HelloRequestClient): - """ - Sends the file(s) to the server for encoding, - saves the result in the current working directory - this requires a server version 6.3 or later - """ - - def __init__(self, options, filenames: Sequence[str]): - super().__init__(options) - if not filenames: - raise ValueError("please specify some filenames to encode") - self.filenames = list(filenames) - self.add_packets("encode-response", "encodings") - from xpra.codecs.pillow.decoder import get_encodings, decompress - self.encodings = get_encodings() - self.decompress = decompress - encodings = ("png", "jpeg") - self.encoding_options = { - "options": encodings, - "core": encodings, - "encoding": options.encoding, - } - for attr, value in { - "quality": options.quality, - "min-quality": options.min_quality, - "speed": options.speed, - "min-speed": options.min_speed, - }.items(): - if value > 0: - self.encoding_options[attr] = value - - def client_type(self) -> str: - return "encoder" - - def _process_encodings(self, packet: PacketType) -> None: - encodings = typedict(packet[1]).dictget("encodings", {}).get("core", ()) - common = tuple(set(self.encodings) & set(encodings)) - log("server encodings=%s, common=%s", encodings, common) - - def _process_encode_response(self, packet: PacketType) -> None: - encoding, data, options, width, height, bpp, stride, metadata = packet[1:9] - log("encode-response: %8s %6i bytes, %5ix-%5i %ibits, stride=%i, options=%s, metadata=%s", - encoding, len(data), width, height, bpp, stride, options, metadata) - filename = typedict(metadata).strget("filename") - if not filename: - log.error("Error: 'filename' is missing from the metadata") - self.quit(ExitCode.NO_DATA) - return - save_as = os.path.splitext(os.path.basename(filename))[0] + f".{encoding}" - with open(save_as, "wb") as f: - f.write(data) - log.info(f"saved %i bytes to {save_as!r}", len(data)) - if not self.filenames: - self.quit(ExitCode.OK) - return - self.send_encode() - - def hello_request(self) -> dict[str, Any]: - return { - "request": "encode", - "ui_client": True, - "encoding": self.encoding_options, - } - - def do_command(self, caps: typedict) -> None: - log(f"{caps=}") - self._protocol.large_packets.append("encode") - self.send_encode() - - def send_encode(self): - filename = self.filenames.pop(0) - ext = filename.split(".")[-1] - if ext not in self.encodings: - log.warn(f"Warning: {ext!r} format is not supported") - log.warn(" use %s", " or ".join(self.encodings)) - self.quit(ExitCode.UNSUPPORTED) - return - img_data = load_binary_file(filename) - options = typedict() - rgb_format, raw_data, width, height, rowstride = self.decompress(ext, img_data, options) - encode_options = {} - metadata = {"filename": filename} - self.send("encode", rgb_format, raw_data, width, height, rowstride, encode_options, metadata) diff --git a/xpra/client/base/encode.py b/xpra/client/base/encode.py new file mode 100644 index 0000000000..dab1a402db --- /dev/null +++ b/xpra/client/base/encode.py @@ -0,0 +1,135 @@ +# This file is part of Xpra. +# Copyright (C) 2010 Antoine Martin +# Copyright (C) 2008 Nathaniel Smith +# Xpra is released under the terms of the GNU GPL v2, or, at your option, any +# later version. See the file COPYING for details. + +import os +from typing import Sequence, Any + +from xpra.client.base.command import HelloRequestClient +from xpra.client.mixins.mmap import MmapClient +from xpra.exit_codes import ExitCode +from xpra.net.common import PacketType +from xpra.util.io import load_binary_file +from xpra.util.objects import typedict +from xpra.log import Logger + +log = Logger("client", "encoding") + + +class EncodeClient(HelloRequestClient, MmapClient): + """ + Sends the file(s) to the server for encoding, + saves the result in the current working directory + this requires a server version 6.3 or later + """ + + def __init__(self, options, filenames: Sequence[str]): + if not filenames: + raise ValueError("please specify some filenames to encode") + HelloRequestClient.__init__(self, options) + MmapClient.__init__(self) + self.filenames = list(filenames) + self.add_packets("encode-response", "encodings") + from xpra.codecs.pillow.decoder import get_encodings, decompress + self.encodings = get_encodings() + self.decompress = decompress + encodings = ("png", "jpeg") + self.encoding_options = { + "options": encodings, + "core": encodings, + "encoding": options.encoding, + } + for attr, value in { + "quality": options.quality, + "min-quality": options.min_quality, + "speed": options.speed, + "min-speed": options.min_speed, + }.items(): + if value > 0: + self.encoding_options[attr] = value + + def init(self, opts) -> None: + if opts.mmap.lower() == "auto": + opts.mmap = "yes" + HelloRequestClient.init(self, opts) + MmapClient.init(self, opts) + + def setup_connection(self, conn): + # must do mmap first to ensure the mmap areas are initialized: + MmapClient.setup_connection(self, conn) + # because HelloRequestClient will call get_caps() to retrieve them + return HelloRequestClient.setup_connection(self, conn) + + def client_type(self) -> str: + return "encoder" + + def _process_encodings(self, packet: PacketType) -> None: + encodings = typedict(packet[1]).dictget("encodings", {}).get("core", ()) + common = tuple(set(self.encodings) & set(encodings)) + log("server encodings=%s, common=%s", encodings, common) + + def _process_encode_response(self, packet: PacketType) -> None: + encoding, data, options, width, height, bpp, stride, metadata = packet[1:9] + log("encode-response: %8s %6i bytes, %5ix-%5i %ibits, stride=%i, options=%s, metadata=%s", + encoding, len(data), width, height, bpp, stride, options, metadata) + filename = typedict(metadata).strget("filename") + if not filename: + log.error("Error: 'filename' is missing from the metadata") + self.quit(ExitCode.NO_DATA) + return + save_as = os.path.splitext(os.path.basename(filename))[0] + f".{encoding}" + with open(save_as, "wb") as f: + f.write(data) + log.info(f"saved %i bytes to {save_as!r}", len(data)) + if not self.filenames: + self.quit(ExitCode.OK) + return + self.send_encode() + + def hello_request(self) -> dict[str, Any]: + hello = { + "request": "encode", + "ui_client": True, + "encoding": self.encoding_options, + } + hello.update(MmapClient.get_caps(self)) + return hello + + def do_command(self, caps: typedict) -> None: + log(f"{caps=}") + self._protocol.large_packets.append("encode") + self.send_encode() + + def send_encode(self): + filename = self.filenames.pop(0) + log(f"send_encode() {filename=!r}") + ext = filename.split(".")[-1] + if ext not in self.encodings: + log.warn(f"Warning: {ext!r} format is not supported") + log.warn(" use %s", " or ".join(self.encodings)) + self.quit(ExitCode.UNSUPPORTED) + return + img_data = load_binary_file(filename) + options = typedict() + rgb_format, raw_data, width, height, rowstride = self.decompress(ext, img_data, options) + encoding = "rgb" + encode_options = {} + if self.mmap_write_area and self.mmap_write_area.enabled: + mmap_data = self.mmap_write_area.write_data(raw_data) + log("mmap_write_area=%s, mmap_data=%s", self.mmap_write_area.get_info(), mmap_data) + encoding = "mmap" + data = b"" + encode_options["chunks"] = mmap_data + elif self.compression_level > 0: + from xpra.net.lz4.lz4 import compress + data = compress(raw_data) + encode_options["lz4"] = 1 + log("lz4 compressed from %i bytes down to %i", len(raw_data), len(data)) + else: + log("sending uncompressed") + data = raw_data + + metadata = {"filename": filename} + self.send("encode", encoding, rgb_format, data, width, height, rowstride, encode_options, metadata) diff --git a/xpra/client/mixins/mmap.py b/xpra/client/mixins/mmap.py index 39fda2c5b8..59f97144a1 100644 --- a/xpra/client/mixins/mmap.py +++ b/xpra/client/mixins/mmap.py @@ -125,7 +125,11 @@ def setup_connection(self, conn) -> None: log("setup_connection(%s) mmap supported=%s", conn, self.mmap_supported) if not self.mmap_supported: return - for area in (self.mmap_read_area, self.mmap_write_area): + for name, area in { + "read": self.mmap_read_area, + "write": self.mmap_write_area, + }.items(): + log(f"{name!r}={area}") if area: area.init_mmap(conn.filename or "") diff --git a/xpra/net/mmap.py b/xpra/net/mmap.py index 0eb1e180b6..4b438128a5 100644 --- a/xpra/net/mmap.py +++ b/xpra/net/mmap.py @@ -307,12 +307,17 @@ def free_mem(*_args) -> None: return bdata, noop -def mmap_write(mmap_area, mmap_size: int, data) -> tuple[list[tuple[int, int]], int]: +def mmap_write(mmap_area, mmap_size: int, data) -> list[tuple[int, int]]: """ Sends 'data' to the client via the mmap shared memory region, returns the chunks of the mmap area used (or None if it failed) and the mmap area's free memory. """ + size = len(data) + if size > (mmap_size - 8): + log.warn("Warning: mmap area is too small!") + log.warn(" we need to store %s bytes but the mmap area is limited to %i", size, (mmap_size - 8)) + return [] # This is best explained using diagrams: # mmap_area=[&S&E-------------data-------------] # The first pair of 4 bytes are occupied by: @@ -326,7 +331,6 @@ def mmap_write(mmap_area, mmap_size: int, data) -> tuple[list[tuple[int, int]], mmap_data_end: c_uint32 = int_from_buffer(mmap_area, 4) start = max(8, mmap_data_start.value) end = max(8, mmap_data_end.value) - size = len(data) log("mmap: start=%i, end=%i, size of data to write=%i", start, end, size) if end < start: # we have wrapped around but the client hasn't yet: @@ -342,16 +346,10 @@ def mmap_write(mmap_area, mmap_size: int, data) -> tuple[list[tuple[int, int]], # [****--------S++++++++++++E*********] chunk = mmap_size - end available = chunk + (start - 8) - # update global mmap stats: - mmap_free_size = available - size - if size > (mmap_size - 8): - log.warn("Warning: mmap area is too small!") - log.warn(" we need to store %s bytes but the mmap area is limited to %i", size, (mmap_size - 8)) - return [], mmap_free_size - if mmap_free_size <= 0: + if size > available: log.warn("Warning: mmap area is full!") log.warn(" we need to store %s bytes but only have %s free space left", size, available) - return [], mmap_free_size + return [] if size < chunk: # data fits in the first chunk: # ie: initially: @@ -385,8 +383,24 @@ def mmap_write(mmap_area, mmap_size: int, data) -> tuple[list[tuple[int, int]], l2 = size - chunk chunks = [(end, chunk), (8, l2)] mmap_data_end.value = 8 + l2 - log("sending damage with mmap: %s bytes", len(data)) - return chunks, mmap_free_size + log("mmap_write: %s bytes", len(data)) + return chunks + + +def mmap_free_size(mmap_area, mmap_size: int) -> int: + mmap_data_start: c_uint32 = int_from_buffer(mmap_area, 0) + mmap_data_end: c_uint32 = int_from_buffer(mmap_area, 4) + start = max(8, mmap_data_start.value) + end = min(mmap_size, max(8, mmap_data_end.value)) + if end < start: + # we have wrapped around but the client hasn't yet: + # [++++++++E--------------------S+++++] + # so there is one chunk available (from E to S) + return start - end + # we have not wrapped around yet, or the client has wrapped around too: + # [------------S++++++++++++E---------] + # so there are two chunks available (from E to the end, from the start to S) + return (start - 8) + (mmap_size - end) class BaseMmapArea: @@ -462,3 +476,9 @@ def gen_token(self) -> None: def write_token(self) -> None: write_mmap_token(self.mmap, self.token, self.token_index, self.token_bytes) + + def write_data(self, data) -> list[tuple[int, int]]: + return mmap_write(self.mmap, self.size, data) + + def mmap_read(self, *descr_data: tuple[int, int]) -> tuple[bytes | memoryview, PaintCallback]: + return mmap_read(self.mmap, *descr_data) diff --git a/xpra/scripts/main.py b/xpra/scripts/main.py index ffce16f9a5..ec1fb9d612 100755 --- a/xpra/scripts/main.py +++ b/xpra/scripts/main.py @@ -3534,11 +3534,12 @@ def run_top(error_cb, options, args, cmdline) -> ExitValue: def run_encode(error_cb, options, args, cmdline) -> ExitValue: - from xpra.client.base.command import EncodeClient + from xpra.client.base.encode import EncodeClient if not args: raise ValueError("please specify a display and at least one filename") display_desc = pick_display(error_cb, options, args[:1], cmdline) app = EncodeClient(options, args[1:]) + app.init(options) connect_to_server(app, display_desc, options) return app.run() diff --git a/xpra/server/encoder_server.py b/xpra/server/encoder_server.py index cd6fab1252..d8818b86c3 100644 --- a/xpra/server/encoder_server.py +++ b/xpra/server/encoder_server.py @@ -5,6 +5,7 @@ from collections.abc import Callable +from xpra.common import noop from xpra.os_util import gi_import from xpra.util.objects import typedict from xpra.net.common import PacketType @@ -56,30 +57,41 @@ def init_packet_handlers(self) -> None: super().init_packet_handlers() self.add_packets("encode") + def add_new_client(self, ss, c: typedict, send_ui: bool, share_count: int) -> None: + super().add_new_client(ss, c, send_ui, share_count) + ss.protocol.large_packets.append("encode-response") + def _process_encode(self, proto: SocketProtocol, packet: PacketType) -> None: ss = self.get_server_source(proto) if not ss: return - rgb_format = packet[1] - raw_data = packet[2] - width = packet[3] - height = packet[4] - rowstride = packet[5] - options = packet[6] - metadata = packet[7] + input_coding, rgb_format, raw_data, width, height, rowstride, options, metadata = packet[1:9] depth = 32 bpp = 4 full_range = True encoding = "png" if ss.encoding in ("auto", "") else ss.encoding - log("encode request from %s, encoding=%s from %s", ss, encoding, ss.encoding) + log("encode request from %s, %s to %s from %s", ss, input_coding, encoding, ss.encoding) # connection encoding options: eo = dict(ss.default_encoding_options) # the request can override: eo.update(options) log("using settings: %s", eo) from xpra.codecs.image import ImageWrapper, PlanarFormat - image = ImageWrapper(0, 0, width, height, raw_data, rgb_format, depth, rowstride, + if input_coding == "mmap": + if not ss.mmap_supported or not ss.mmap_read_area: + raise RuntimeError("mmap packet but mmap read is not available") + chunks = options["chunks"] + rgb_data, free = ss.mmap_read_area.mmap_read(*chunks) + else: + if options.get("lz4") > 0: + from xpra.net.lz4.lz4 import decompress + rgb_data = decompress(raw_data, max_size=64*1024*1024) + else: + rgb_data = raw_data + free = noop + image = ImageWrapper(0, 0, width, height, rgb_data, rgb_format, depth, rowstride, bpp, PlanarFormat.PACKED, True, None, full_range) coding, compressed, client_options, width, height, stride, bpp = self.encode(encoding, image, typedict(eo)) + free() packet = ["encode-response", coding, compressed.data, client_options, width, height, stride, bpp, metadata] ss.send_async(*packet) diff --git a/xpra/server/source/mmap.py b/xpra/server/source/mmap.py index c309e22660..a2e700d9e3 100644 --- a/xpra/server/source/mmap.py +++ b/xpra/server/source/mmap.py @@ -70,7 +70,7 @@ def mmap_path(self, filename: str, index: int) -> str: log(f"using global server specified mmap file path: {filename!r}") return filename - def parse_area_caps(self, name: str, raw_caps: dict, index: int): + def parse_area_caps(self, name: str, raw_caps: dict, index: int) -> BaseMmapArea | None: log("parse_area_caps(%r, %r, %i)", name, raw_caps, index) if not raw_caps: return None diff --git a/xpra/server/window/compress.py b/xpra/server/window/compress.py index 51fac7033f..41a21ca116 100644 --- a/xpra/server/window/compress.py +++ b/xpra/server/window/compress.py @@ -204,6 +204,7 @@ def __init__(self, # mmap: self._mmap = mmap self._mmap_size = mmap_size + log.warn("mmap=%s, mmap-size=%s", self._mmap, self._mmap_size) self.init_vars() @@ -2865,15 +2866,15 @@ def mmap_encode(self, coding: str, image: ImageWrapper, _options) -> tuple: data = image.get_pixels() if not data: raise RuntimeError(f"failed to get pixels from {image}") - from xpra.net.mmap import mmap_write - mmap_data, mmap_free_size = mmap_write(self._mmap, self._mmap_size, data) + from xpra.net.mmap import mmap_write, mmap_free_size + mmap_data = mmap_write(self._mmap, self._mmap_size, data) # elapsed = monotonic()-start+0.000000001 # make sure never zero! # log("%s MBytes/s - %s bytes written to mmap in %.1f ms", int(len(data)/elapsed/1024/1024), # len(data), 1000*elapsed) if not mmap_data: return () self.global_statistics.mmap_bytes_sent += len(data) - self.global_statistics.mmap_free_size = mmap_free_size + self.global_statistics.mmap_free_size = mmap_free_size(self._mmap, self._mmap_size) # the data we send is the index within the mmap area: return ( "mmap", mmap_data, {"rgb_format": pf},