Skip to content

Commit

Permalink
Merge pull request #127 from icanbwell/cd-DFP-1627-2
Browse files Browse the repository at this point in the history
ID update for Bwell FHIR v3 id support Try 2
  • Loading branch information
cdukebwell authored Jun 20, 2023
2 parents ff1bab6 + 5a03e63 commit 6c5516e
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 7 deletions.
4 changes: 3 additions & 1 deletion spark_auto_mapper_fhir/fhir_types/fhir_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ def __init__(
self,
resource: str,
column: Union[AutoMapperDataTypeColumn, AutoMapperTextLikeBase],
use_long_id: Optional[bool] = False,
):
super().__init__()

assert resource
assert "/" not in resource
self.resource: str = resource
self.use_long_id = use_long_id
self.column: Union[AutoMapperDataTypeColumn, AutoMapperTextLikeBase] = FhirId(
column, is_reference=True
column, is_reference=True, use_long_id=self.use_long_id
)

def get_column_spec(
Expand Down
4 changes: 3 additions & 1 deletion spark_auto_mapper_fhir/fhir_types/id.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ def __init__(
self,
column: Union[AutoMapperDataTypeColumn, AutoMapperTextLikeBase],
is_reference: Optional[bool] = False,
use_long_id: Optional[bool] = False,
):
super().__init__()

self.column: Union[AutoMapperDataTypeColumn, AutoMapperTextLikeBase] = column
self.is_reference = is_reference
self.use_long_id = use_long_id

def get_column_spec(
self,
Expand All @@ -41,6 +43,6 @@ def get_column_spec(
replacement="-",
),
0,
63,
1024 * 1024 if (self.use_long_id) else 63,
)
return column_spec
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from spark_auto_mapper_fhir.resources.patient import Patient


def test_auto_mapper_fhir_reference(spark_session: SparkSession) -> None:
def test_auto_mapper_fhir_reference_short(spark_session: SparkSession) -> None:
# Arrange
spark_session.createDataFrame(
[
Expand Down Expand Up @@ -78,3 +78,75 @@ def test_auto_mapper_fhir_reference(spark_session: SparkSession) -> None:
.collect()[0][0]
== "Organization/Duke|test"
)


def test_auto_mapper_fhir_reference_long(spark_session: SparkSession) -> None:
# Arrange
spark_session.createDataFrame(
[
(1, "Qureshi"),
(2, "Vidal"),
(
3,
"1F238EABDF31F1FFEDE1D316B5887.922A7B6A6EB4D9C82C9E7FFC748E8-7C28346B3999CB6F",
),
],
["member_id", "last_name"],
).createOrReplaceTempView("patients")

source_df: DataFrame = spark_session.table("patients")

df = source_df.select("member_id")
df.createOrReplaceTempView("members")

# Act
mapper = AutoMapper(
view="members", source_view="patients", keys=["member_id"]
).columns(
patient=Patient(
id_=FhirId(A.column("last_name")),
managingOrganization=Reference(
reference=FhirReference(
"Organization",
A.if_regex(
A.column("member_id"),
"[1-2]",
A.column("last_name"),
A.concat(A.column("last_name"), "|test"),
),
use_long_id=True,
)
),
)
)

assert isinstance(mapper, AutoMapper)
sql_expressions: Dict[str, Column] = mapper.get_column_specs(source_df=source_df)
for column_name, sql_expression in sql_expressions.items():
print(f"{column_name}: {sql_expression}")

result_df: DataFrame = mapper.transform(df=df)

# Assert
result_df.printSchema()
result_df.show(truncate=False)

assert (
result_df.where("member_id == 1")
.selectExpr("patient.managingOrganization.reference")
.collect()[0][0]
== "Organization/Qureshi"
)
assert (
result_df.where("member_id == 2")
.selectExpr("patient.managingOrganization.reference")
.collect()[0][0]
== "Organization/Vidal"
)

assert (
result_df.where("member_id == 3")
.selectExpr("patient.managingOrganization.reference")
.collect()[0][0]
== "Organization/1F238EABDF31F1FFEDE1D316B5887.922A7B6A6EB4D9C82C9E7FFC748E8-7C28346B3999CB6F|test"
)
67 changes: 63 additions & 4 deletions tests/fhir_types/id/test_automapper_fhir_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@
from spark_auto_mapper_fhir.resources.patient import Patient


def test_auto_mapper_fhir_id(spark_session: SparkSession) -> None:
def test_auto_mapper_fhir_id_short(spark_session: SparkSession) -> None:
# Arrange
spark_session.createDataFrame(
[
(1, "Qureshi- / &&"),
(2, " Vidal. "),
(
3,
"1F238EABDF31F1FFEDE1D316B5887.922A7B6A6EB4D9C82C9E7FFC748E8-7C28346B3999CB6F",
),
],
["member_id", "last_name"],
).createOrReplaceTempView("patients")
Expand All @@ -25,10 +29,10 @@ def test_auto_mapper_fhir_id(spark_session: SparkSession) -> None:
df = source_df.select("member_id")
df.createOrReplaceTempView("members")

# Act
# Sample 1
mapper = AutoMapper(
view="members", source_view="patients", keys=["member_id"]
).columns(patient=Patient(id_=FhirId(A.column("last_name"))))
view="members_short", source_view="patients", keys=["member_id"]
).columns(patient=Patient(id_=FhirId(A.column("last_name"), False, False)))

assert isinstance(mapper, AutoMapper)
sql_expressions: Dict[str, Column] = mapper.get_column_specs(source_df=source_df)
Expand All @@ -49,3 +53,58 @@ def test_auto_mapper_fhir_id(spark_session: SparkSession) -> None:
result_df.where("member_id == 2").selectExpr("patient.id").collect()[0][0]
== "-Vidal.-"
)

assert (
result_df.where("member_id == 3").selectExpr("patient.id").collect()[0][0]
== "1F238EABDF31F1FFEDE1D316B5887.922A7B6A6EB4D9C82C9E7FFC748E8-7C2"
)


def test_auto_mapper_fhir_id_long(spark_session: SparkSession) -> None:
# Arrange
spark_session.createDataFrame(
[
(1, "Qureshi- / &&"),
(2, " Vidal. "),
(
3,
"1F238EABDF31F1FFEDE1D316B5887.922A7B6A6EB4D9C82C9E7FFC748E8-7C28346B3999CB6F",
),
],
["member_id", "last_name"],
).createOrReplaceTempView("patients")

source_df: DataFrame = spark_session.table("patients")

df = source_df.select("member_id")
df.createOrReplaceTempView("members")

# Sample 1
mapper = AutoMapper(
view="members_short", source_view="patients", keys=["member_id"]
).columns(patient=Patient(id_=FhirId(A.column("last_name"), use_long_id=True)))

assert isinstance(mapper, AutoMapper)
sql_expressions: Dict[str, Column] = mapper.get_column_specs(source_df=source_df)
for column_name, sql_expression in sql_expressions.items():
print(f"{column_name}: {sql_expression}")

result_df: DataFrame = mapper.transform(df=df)

# Assert
result_df.printSchema()
result_df.show(truncate=False)

assert (
result_df.where("member_id == 1").selectExpr("patient.id").collect()[0][0]
== "Qureshi------"
)
assert (
result_df.where("member_id == 2").selectExpr("patient.id").collect()[0][0]
== "-Vidal.-"
)

assert (
result_df.where("member_id == 3").selectExpr("patient.id").collect()[0][0]
== "1F238EABDF31F1FFEDE1D316B5887.922A7B6A6EB4D9C82C9E7FFC748E8-7C28346B3999CB6F"
)

0 comments on commit 6c5516e

Please sign in to comment.