Skip to content

Commit

Permalink
#4491 encode server supports mmap or lz4
Browse files Browse the repository at this point in the history
  • Loading branch information
totaam committed Feb 6, 2025
1 parent 07d043a commit 664802d
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 112 deletions.
85 changes: 0 additions & 85 deletions xpra/client/base/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,88 +752,3 @@ def __init__(self, opts):
self.displayfd = int(opts.displayfd)
except (ValueError, TypeError):
self.displayfd = 0


class EncodeClient(HelloRequestClient):
"""
Sends the file(s) to the server for encoding,
saves the result in the current working directory
this requires a server version 6.3 or later
"""

def __init__(self, options, filenames: Sequence[str]):
super().__init__(options)
if not filenames:
raise ValueError("please specify some filenames to encode")
self.filenames = list(filenames)
self.add_packets("encode-response", "encodings")
from xpra.codecs.pillow.decoder import get_encodings, decompress
self.encodings = get_encodings()
self.decompress = decompress
encodings = ("png", "jpeg")
self.encoding_options = {
"options": encodings,
"core": encodings,
"encoding": options.encoding,
}
for attr, value in {
"quality": options.quality,
"min-quality": options.min_quality,
"speed": options.speed,
"min-speed": options.min_speed,
}.items():
if value > 0:
self.encoding_options[attr] = value

def client_type(self) -> str:
return "encoder"

def _process_encodings(self, packet: PacketType) -> None:
encodings = typedict(packet[1]).dictget("encodings", {}).get("core", ())
common = tuple(set(self.encodings) & set(encodings))
log("server encodings=%s, common=%s", encodings, common)

def _process_encode_response(self, packet: PacketType) -> None:
encoding, data, options, width, height, bpp, stride, metadata = packet[1:9]
log("encode-response: %8s %6i bytes, %5ix-%5i %ibits, stride=%i, options=%s, metadata=%s",
encoding, len(data), width, height, bpp, stride, options, metadata)
filename = typedict(metadata).strget("filename")
if not filename:
log.error("Error: 'filename' is missing from the metadata")
self.quit(ExitCode.NO_DATA)
return
save_as = os.path.splitext(os.path.basename(filename))[0] + f".{encoding}"
with open(save_as, "wb") as f:
f.write(data)
log.info(f"saved %i bytes to {save_as!r}", len(data))
if not self.filenames:
self.quit(ExitCode.OK)
return
self.send_encode()

def hello_request(self) -> dict[str, Any]:
return {
"request": "encode",
"ui_client": True,
"encoding": self.encoding_options,
}

def do_command(self, caps: typedict) -> None:
log(f"{caps=}")
self._protocol.large_packets.append("encode")
self.send_encode()

def send_encode(self):
filename = self.filenames.pop(0)
ext = filename.split(".")[-1]
if ext not in self.encodings:
log.warn(f"Warning: {ext!r} format is not supported")
log.warn(" use %s", " or ".join(self.encodings))
self.quit(ExitCode.UNSUPPORTED)
return
img_data = load_binary_file(filename)
options = typedict()
rgb_format, raw_data, width, height, rowstride = self.decompress(ext, img_data, options)
encode_options = {}
metadata = {"filename": filename}
self.send("encode", rgb_format, raw_data, width, height, rowstride, encode_options, metadata)
135 changes: 135 additions & 0 deletions xpra/client/base/encode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# This file is part of Xpra.
# Copyright (C) 2010 Antoine Martin <[email protected]>
# Copyright (C) 2008 Nathaniel Smith <[email protected]>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.

import os
from typing import Sequence, Any

from xpra.client.base.command import HelloRequestClient
from xpra.client.mixins.mmap import MmapClient
from xpra.exit_codes import ExitCode
from xpra.net.common import PacketType
from xpra.util.io import load_binary_file
from xpra.util.objects import typedict
from xpra.log import Logger

log = Logger("client", "encoding")


class EncodeClient(HelloRequestClient, MmapClient):
"""
Sends the file(s) to the server for encoding,
saves the result in the current working directory
this requires a server version 6.3 or later
"""

def __init__(self, options, filenames: Sequence[str]):
if not filenames:
raise ValueError("please specify some filenames to encode")
HelloRequestClient.__init__(self, options)
MmapClient.__init__(self)
self.filenames = list(filenames)
self.add_packets("encode-response", "encodings")
from xpra.codecs.pillow.decoder import get_encodings, decompress
self.encodings = get_encodings()
self.decompress = decompress
encodings = ("png", "jpeg")
self.encoding_options = {
"options": encodings,
"core": encodings,
"encoding": options.encoding,
}
for attr, value in {
"quality": options.quality,
"min-quality": options.min_quality,
"speed": options.speed,
"min-speed": options.min_speed,
}.items():
if value > 0:
self.encoding_options[attr] = value

def init(self, opts) -> None:
if opts.mmap.lower() == "auto":
opts.mmap = "yes"
HelloRequestClient.init(self, opts)
MmapClient.init(self, opts)

def setup_connection(self, conn):
# must do mmap first to ensure the mmap areas are initialized:
MmapClient.setup_connection(self, conn)
# because HelloRequestClient will call get_caps() to retrieve them
return HelloRequestClient.setup_connection(self, conn)

def client_type(self) -> str:
return "encoder"

def _process_encodings(self, packet: PacketType) -> None:
encodings = typedict(packet[1]).dictget("encodings", {}).get("core", ())
common = tuple(set(self.encodings) & set(encodings))
log("server encodings=%s, common=%s", encodings, common)

def _process_encode_response(self, packet: PacketType) -> None:
encoding, data, options, width, height, bpp, stride, metadata = packet[1:9]
log("encode-response: %8s %6i bytes, %5ix-%5i %ibits, stride=%i, options=%s, metadata=%s",
encoding, len(data), width, height, bpp, stride, options, metadata)
filename = typedict(metadata).strget("filename")
if not filename:
log.error("Error: 'filename' is missing from the metadata")
self.quit(ExitCode.NO_DATA)
return
save_as = os.path.splitext(os.path.basename(filename))[0] + f".{encoding}"
with open(save_as, "wb") as f:
f.write(data)
log.info(f"saved %i bytes to {save_as!r}", len(data))
if not self.filenames:
self.quit(ExitCode.OK)
return
self.send_encode()

def hello_request(self) -> dict[str, Any]:
hello = {
"request": "encode",
"ui_client": True,
"encoding": self.encoding_options,
}
hello.update(MmapClient.get_caps(self))
return hello

def do_command(self, caps: typedict) -> None:
log(f"{caps=}")
self._protocol.large_packets.append("encode")
self.send_encode()

def send_encode(self):
filename = self.filenames.pop(0)
log(f"send_encode() {filename=!r}")
ext = filename.split(".")[-1]
if ext not in self.encodings:
log.warn(f"Warning: {ext!r} format is not supported")
log.warn(" use %s", " or ".join(self.encodings))
self.quit(ExitCode.UNSUPPORTED)
return
img_data = load_binary_file(filename)
options = typedict()
rgb_format, raw_data, width, height, rowstride = self.decompress(ext, img_data, options)
encoding = "rgb"
encode_options = {}
if self.mmap_write_area and self.mmap_write_area.enabled:
mmap_data = self.mmap_write_area.write_data(raw_data)
log("mmap_write_area=%s, mmap_data=%s", self.mmap_write_area.get_info(), mmap_data)
encoding = "mmap"
data = b""
encode_options["chunks"] = mmap_data
elif self.compression_level > 0:
from xpra.net.lz4.lz4 import compress
data = compress(raw_data)
encode_options["lz4"] = 1
log("lz4 compressed from %i bytes down to %i", len(raw_data), len(data))
else:
log("sending uncompressed")
data = raw_data

metadata = {"filename": filename}
self.send("encode", encoding, rgb_format, data, width, height, rowstride, encode_options, metadata)
6 changes: 5 additions & 1 deletion xpra/client/mixins/mmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ def setup_connection(self, conn) -> None:
log("setup_connection(%s) mmap supported=%s", conn, self.mmap_supported)
if not self.mmap_supported:
return
for area in (self.mmap_read_area, self.mmap_write_area):
for name, area in {
"read": self.mmap_read_area,
"write": self.mmap_write_area,
}.items():
log(f"{name!r}={area}")
if area:
area.init_mmap(conn.filename or "")

Expand Down
44 changes: 32 additions & 12 deletions xpra/net/mmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,17 @@ def free_mem(*_args) -> None:
return bdata, noop


def mmap_write(mmap_area, mmap_size: int, data) -> tuple[list[tuple[int, int]], int]:
def mmap_write(mmap_area, mmap_size: int, data) -> list[tuple[int, int]]:
"""
Sends 'data' to the client via the mmap shared memory region,
returns the chunks of the mmap area used (or None if it failed)
and the mmap area's free memory.
"""
size = len(data)
if size > (mmap_size - 8):
log.warn("Warning: mmap area is too small!")
log.warn(" we need to store %s bytes but the mmap area is limited to %i", size, (mmap_size - 8))
return []
# This is best explained using diagrams:
# mmap_area=[&S&E-------------data-------------]
# The first pair of 4 bytes are occupied by:
Expand All @@ -326,7 +331,6 @@ def mmap_write(mmap_area, mmap_size: int, data) -> tuple[list[tuple[int, int]],
mmap_data_end: c_uint32 = int_from_buffer(mmap_area, 4)
start = max(8, mmap_data_start.value)
end = max(8, mmap_data_end.value)
size = len(data)
log("mmap: start=%i, end=%i, size of data to write=%i", start, end, size)
if end < start:
# we have wrapped around but the client hasn't yet:
Expand All @@ -342,16 +346,10 @@ def mmap_write(mmap_area, mmap_size: int, data) -> tuple[list[tuple[int, int]],
# [****--------S++++++++++++E*********]
chunk = mmap_size - end
available = chunk + (start - 8)
# update global mmap stats:
mmap_free_size = available - size
if size > (mmap_size - 8):
log.warn("Warning: mmap area is too small!")
log.warn(" we need to store %s bytes but the mmap area is limited to %i", size, (mmap_size - 8))
return [], mmap_free_size
if mmap_free_size <= 0:
if size > available:
log.warn("Warning: mmap area is full!")
log.warn(" we need to store %s bytes but only have %s free space left", size, available)
return [], mmap_free_size
return []
if size < chunk:
# data fits in the first chunk:
# ie: initially:
Expand Down Expand Up @@ -385,8 +383,24 @@ def mmap_write(mmap_area, mmap_size: int, data) -> tuple[list[tuple[int, int]],
l2 = size - chunk
chunks = [(end, chunk), (8, l2)]
mmap_data_end.value = 8 + l2
log("sending damage with mmap: %s bytes", len(data))
return chunks, mmap_free_size
log("mmap_write: %s bytes", len(data))
return chunks


def mmap_free_size(mmap_area, mmap_size: int) -> int:
mmap_data_start: c_uint32 = int_from_buffer(mmap_area, 0)
mmap_data_end: c_uint32 = int_from_buffer(mmap_area, 4)
start = max(8, mmap_data_start.value)
end = min(mmap_size, max(8, mmap_data_end.value))
if end < start:
# we have wrapped around but the client hasn't yet:
# [++++++++E--------------------S+++++]
# so there is one chunk available (from E to S)
return start - end
# we have not wrapped around yet, or the client has wrapped around too:
# [------------S++++++++++++E---------]
# so there are two chunks available (from E to the end, from the start to S)
return (start - 8) + (mmap_size - end)


class BaseMmapArea:
Expand Down Expand Up @@ -462,3 +476,9 @@ def gen_token(self) -> None:

def write_token(self) -> None:
write_mmap_token(self.mmap, self.token, self.token_index, self.token_bytes)

def write_data(self, data) -> list[tuple[int, int]]:
return mmap_write(self.mmap, self.size, data)

def mmap_read(self, *descr_data: tuple[int, int]) -> tuple[bytes | memoryview, PaintCallback]:
return mmap_read(self.mmap, *descr_data)
3 changes: 2 additions & 1 deletion xpra/scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3534,11 +3534,12 @@ def run_top(error_cb, options, args, cmdline) -> ExitValue:


def run_encode(error_cb, options, args, cmdline) -> ExitValue:
from xpra.client.base.command import EncodeClient
from xpra.client.base.encode import EncodeClient
if not args:
raise ValueError("please specify a display and at least one filename")
display_desc = pick_display(error_cb, options, args[:1], cmdline)
app = EncodeClient(options, args[1:])
app.init(options)
connect_to_server(app, display_desc, options)
return app.run()

Expand Down
30 changes: 21 additions & 9 deletions xpra/server/encoder_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from collections.abc import Callable

from xpra.common import noop
from xpra.os_util import gi_import
from xpra.util.objects import typedict
from xpra.net.common import PacketType
Expand Down Expand Up @@ -56,30 +57,41 @@ def init_packet_handlers(self) -> None:
super().init_packet_handlers()
self.add_packets("encode")

def add_new_client(self, ss, c: typedict, send_ui: bool, share_count: int) -> None:
super().add_new_client(ss, c, send_ui, share_count)
ss.protocol.large_packets.append("encode-response")

def _process_encode(self, proto: SocketProtocol, packet: PacketType) -> None:
ss = self.get_server_source(proto)
if not ss:
return
rgb_format = packet[1]
raw_data = packet[2]
width = packet[3]
height = packet[4]
rowstride = packet[5]
options = packet[6]
metadata = packet[7]
input_coding, rgb_format, raw_data, width, height, rowstride, options, metadata = packet[1:9]
depth = 32
bpp = 4
full_range = True
encoding = "png" if ss.encoding in ("auto", "") else ss.encoding
log("encode request from %s, encoding=%s from %s", ss, encoding, ss.encoding)
log("encode request from %s, %s to %s from %s", ss, input_coding, encoding, ss.encoding)
# connection encoding options:
eo = dict(ss.default_encoding_options)
# the request can override:
eo.update(options)
log("using settings: %s", eo)
from xpra.codecs.image import ImageWrapper, PlanarFormat
image = ImageWrapper(0, 0, width, height, raw_data, rgb_format, depth, rowstride,
if input_coding == "mmap":
if not ss.mmap_supported or not ss.mmap_read_area:
raise RuntimeError("mmap packet but mmap read is not available")
chunks = options["chunks"]
rgb_data, free = ss.mmap_read_area.mmap_read(*chunks)
else:
if options.get("lz4") > 0:
from xpra.net.lz4.lz4 import decompress
rgb_data = decompress(raw_data, max_size=64*1024*1024)
else:
rgb_data = raw_data
free = noop
image = ImageWrapper(0, 0, width, height, rgb_data, rgb_format, depth, rowstride,
bpp, PlanarFormat.PACKED, True, None, full_range)
coding, compressed, client_options, width, height, stride, bpp = self.encode(encoding, image, typedict(eo))
free()
packet = ["encode-response", coding, compressed.data, client_options, width, height, stride, bpp, metadata]
ss.send_async(*packet)
Loading

0 comments on commit 664802d

Please sign in to comment.