Skip to content

Commit

Permalink
Merge pull request cc-api#37 from Ruoyu-y/configfs-tsm
Browse files Browse the repository at this point in the history
  • Loading branch information
wenhuizhang authored Apr 12, 2024
2 parents a763c92 + 087af14 commit b788f28
Showing 1 changed file with 84 additions and 4 deletions.
88 changes: 84 additions & 4 deletions src/python/cctrusted_vm/cvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import struct
import fcntl
import socket
import tempfile
from abc import abstractmethod
from cctrusted_base.api import CCTrustedApi
from cctrusted_base.imr import TcgIMR
Expand All @@ -28,6 +29,8 @@
class ConfidentialVM:

_inst = None
# Configfs-tsm directory prefix
tsm_prefix = "/sys/kernel/config/tsm/report"

def __init__(self, cctype):
self._cc_type:int = cctype
Expand Down Expand Up @@ -132,11 +135,77 @@ def get_cc_report(self, nonce: bytearray, data: bytearray, extraArgs) -> CcRepor
nonce (bytearray): against replay attacks.
data (bytearray): user data
extraArgs: for TPM, it will be given list of IMR/PCRs
for configfs-tsm, it will be privilege level
Returns:
The ``CcReport`` object.
"""
raise NotImplementedError("Should be implemented by inherited class")
if not os.path.exists(self.tsm_prefix):
return None

# Process input data
input_data = None
privilege = None
if nonce is None and data is None:
LOG.info("No report data, generating default quote")
else:
LOG.info("Calculate report data by nonce and user data")
hash_algo = hashlib.sha512()
if nonce is not None:
hash_algo.update(bytes(nonce))
if data is not None:
hash_algo.update(bytes(data))
input_data = hash_algo.digest()
if extraArgs is not None and "privilege" in extraArgs.keys():
privilege = extraArgs["privilege"]

td_report = None
provider = None
generation = None
aux_blob = None
# Create a temporary directory to request TEE attestation report
with tempfile.TemporaryDirectory(prefix="report_", dir=self.tsm_prefix) as tempdir:
LOG.info("Creating tempdir %s to request cc report", tempdir)
# Check if configfs-tsm interface has been generated
if not os.path.exists(os.path.join(tempdir, "inblob")):
LOG.debug("Inblob file not found under directory %s.", tempdir)
os.rmdir(tempdir)
return None

if privilege is not None and isinstance(privilege, int):
with open(os.path.join(tempdir, "privlevel"), 'w', encoding='utf-8') \
as privilege_file:
privilege_file.write(privilege)

# Insert input data
with open(os.path.join(tempdir, "inblob"), 'wb') as inblob_file:
inblob_file.write(input_data)

# Read the output of report
with open(os.path.join(tempdir, "outblob"), 'rb') as outblob_file:
td_report = outblob_file.read()

# Read provider info
with open(os.path.join(tempdir, "provider"), 'r', encoding='utf-8') as provider_file:
provider = provider_file.read()

# Read generation info
with open(os.path.join(tempdir, "generation"), 'r', encoding='utf-8') \
as generation_file:
generation = generation_file.read()

if os.path.exists(os.path.join(tempdir, "auxblob")):
with open(os.path.join(tempdir, "auxblob"), 'rb') as auxblob_file:
aux_blob = auxblob_file.read()

os.rmdir(tempdir)

if td_report is not None:
LOG.info("Fetched cc report using Configfs TSM with type %s", provider)
# pylint: disable-next=E1121
return CcReport(td_report, self._cc_type, aux_blob, generation, provider)

return None

def dump(self):
"""Dump confidential VM information."""
Expand Down Expand Up @@ -402,11 +471,22 @@ def get_cc_report(self, nonce: bytearray, data: bytearray, extraArgs) -> CcRepor
binascii.Error when the parameter "nonce" or "data" is not base64 encoded.
"""

td_report = None

# Prepare user defined data which could include nonce
if nonce is not None:
nonce = base64.b64decode(nonce, validate=True)
if data is not None:
data = base64.b64decode(data, validate=True)

# Check if configfs-tsm has been enabled in kernel
# if yes, call the super function
if os.path.exists(ConfidentialVM.tsm_prefix):
td_report = super().get_cc_report(nonce, data, extraArgs)

if td_report is not None:
return TdxQuote(td_report.data)

report_bytes = None
input_data = None
if nonce is None and data is None:
Expand All @@ -419,6 +499,9 @@ def get_cc_report(self, nonce: bytearray, data: bytearray, extraArgs) -> CcRepor
if data is not None:
hash_algo.update(bytes(data))
input_data = hash_algo.digest()

# Check if appropriate qgs vsock port specified in TDX attest config
# If specified, use vsock to get quote and return TdxQuote object
self.process_cc_report(input_data)
report_bytes = self.tdreport.data

Expand All @@ -427,9 +510,6 @@ def get_cc_report(self, nonce: bytearray, data: bytearray, extraArgs) -> CcRepor
elif self.version is TDX_VERSION_1_5:
quote_req = TdxQuoteReq15()

# Check if appropriate qgs vsock port specified in TDX attest config
# If specified, use vsock to get quote and return TdxQuote object
td_report = None
if self._config and "port" in self._config:
LOG.info("Use vsock for TDX quote fetching.")
td_report = self._invoke_quote_fetching_on_vsock(
Expand Down

0 comments on commit b788f28

Please sign in to comment.