-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathfloris_standin_test.py
388 lines (328 loc) · 14.9 KB
/
floris_standin_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
import logging
from pathlib import Path
import numpy as np
import pandas as pd
from floris import FlorisModel
from hercules.amr_wind_standin import AMRWindStandin
from hercules.floris_standin import (
construct_floris_from_amr_input,
default_floris_dict,
FlorisStandin,
)
from SEAS.federate_agent import FederateAgent
AMR_INPUT = Path(__file__).resolve().parent / "test_inputs" / "amr_input_florisstandin.inp"
AMR_EXTERNAL_DATA = Path(__file__).resolve().parent / "test_inputs" / "amr_standin_data.csv"
AMR_EXTERNAL_DATA_HET = Path(__file__).resolve().parent / "test_inputs" / "amr_standin_data_het.csv"
CONFIG = {
"name": "floris_standin",
"gridpack": {},
"helics": {
"deltat": 1.0,
"subscription_topics": ["control"],
"publication_topics": ["status"],
"endpoints": [],
"helicsport": None,
},
"publication_interval": 1,
"endpoint_interval": 1,
"starttime": 0,
"stoptime": 10.0,
"Agent": "floris_standin",
}
def test_construct_floris_from_amr_input():
fmodel_test = construct_floris_from_amr_input(AMR_INPUT)
assert isinstance(fmodel_test, FlorisModel)
def test_FlorisStandin_instantiation():
# Check instantiates correctly
floris_standin = FlorisStandin(CONFIG, AMR_INPUT)
# Check inheritance
assert isinstance(floris_standin, FederateAgent)
assert isinstance(floris_standin, AMRWindStandin)
# Get FLORIS equivalent, match layout and turbines
fmodel_true = FlorisModel(default_floris_dict)
fmodel_true.set(
layout_x=floris_standin.fmodel.layout_x,
layout_y=floris_standin.fmodel.layout_y,
turbine_type=floris_standin.fmodel.core.farm.turbine_definitions,
)
assert fmodel_true.core.as_dict() == floris_standin.fmodel.core.as_dict()
def test_FlorisStandin_get_step_yaw_angles():
floris_standin = FlorisStandin(CONFIG, AMR_INPUT, smoothing_coefficient=0.0)
# Get FLORIS equivalent, match layout and turbines
fmodel_true = FlorisModel(default_floris_dict)
fmodel_true.set(
layout_x=floris_standin.fmodel.layout_x,
layout_y=floris_standin.fmodel.layout_y,
turbine_type=floris_standin.fmodel.core.farm.turbine_definitions,
)
default_wind_direction = 240.0 # Matches default in FlorisStandin
default_wind_speed = 8.0 # Matches default in FlorisStandin
# Test with None yaw angles
fs_ws, fs_wd, fs_tp, fs_twd = floris_standin.get_step(5.0)
fmodel_true.set(wind_speeds=[default_wind_speed], wind_directions=[default_wind_direction])
fmodel_true.run()
fmodel_true_tp = fmodel_true.get_turbine_powers() / 1000 # kW expected
assert fs_ws == default_wind_speed
assert fs_wd == default_wind_direction
assert fs_twd == [default_wind_direction] * 2
assert np.allclose(fs_tp, fmodel_true_tp.flatten().tolist())
# Test with any "no value" yaw angles (should apply no yaw angle)
fs_ws, fs_wd, fs_tp, fs_twd = floris_standin.get_step(5.0, yaw_angles=[-1000, 20])
assert fs_ws == default_wind_speed
assert fs_wd == default_wind_direction
assert fs_twd == [default_wind_direction] * 2
assert np.allclose(fs_tp, fmodel_true_tp.flatten().tolist())
# Test with aligned turbines
yaw_angles = [240.0, 240.0]
fs_ws, fs_wd, fs_tp, fs_twd = floris_standin.get_step(5.0, yaw_angles)
fmodel_true.set(wind_speeds=[default_wind_speed], wind_directions=[default_wind_direction])
fmodel_true.run()
fmodel_true_tp = fmodel_true.get_turbine_powers() / 1000 # kW expected
assert np.allclose(fs_tp, fmodel_true_tp.flatten().tolist())
# Test with misaligned turbines
yaw_angles = [260.0, 230.0]
fs_ws, fs_wd, fs_tp, fs_twd = floris_standin.get_step(5.0, yaw_angles)
fmodel_true.set(wind_speeds=[default_wind_speed], wind_directions=[default_wind_direction])
fmodel_true.run() # Don't expect to work
fmodel_true_tp = fmodel_true.get_turbine_powers() / 1000
assert not np.allclose(fs_tp, fmodel_true_tp.flatten().tolist())
# Correct yaw angles
fmodel_true.set(yaw_angles=default_wind_direction - np.array([yaw_angles]))
fmodel_true.run()
fmodel_true_tp = fmodel_true.get_turbine_powers() / 1000 # kW expected
assert np.allclose(fs_tp, fmodel_true_tp.flatten().tolist())
# Test that yaw angles are maintained from the previous step if large misalignments are provided
yaw_angles = [0.0, 10.0]
_, _, fs_tp2, _ = floris_standin.get_step(5.0, yaw_angles)
assert np.allclose(fs_tp, fs_tp2)
assert np.allclose(
default_wind_direction-floris_standin.fmodel.core.farm.yaw_angles,
[260.0, 230.0]
)
def test_FlorisStandin_get_step_power_setpoints(caplog):
floris_standin = FlorisStandin(CONFIG, AMR_INPUT, smoothing_coefficient=0.0)
# Get FLORIS equivalent, match layout and turbines
fmodel_true = FlorisModel(default_floris_dict)
fmodel_true.set(
layout_x=floris_standin.fmodel.layout_x,
layout_y=floris_standin.fmodel.layout_y,
turbine_type=floris_standin.fmodel.core.farm.turbine_definitions,
)
default_wind_direction = 240.0 # Matches default in FlorisStandin
default_wind_speed = 8.0 # Matches default in FlorisStandin
# Test with power setpoints
fs_ws, fs_wd, fs_tp, fs_twd = floris_standin.get_step(5.0, power_setpoints=[1e3, 1e3])
fmodel_true.set(wind_speeds=[default_wind_speed], wind_directions=[default_wind_direction])
fmodel_true.run() # don't expect to work
fmodel_true_tp = fmodel_true.get_turbine_powers() / 1000
assert not np.allclose(fs_tp, fmodel_true_tp.flatten().tolist())
# Correct power setpoints
fmodel_true.set(power_setpoints=np.array([[1e6, 1e6]]))
fmodel_true.run()
fmodel_true_tp = fmodel_true.get_turbine_powers() / 1000 # kW expected
assert np.allclose(fs_tp, fmodel_true_tp.flatten().tolist())
# Mixed power setpoints
fs_ws, fs_wd, fs_tp, fs_twd = floris_standin.get_step(5.0, power_setpoints=[None, 1e3])
fmodel_true.set(power_setpoints=np.array([[None, 1e6]]))
fmodel_true.run()
fmodel_true_tp = fmodel_true.get_turbine_powers() / 1000
assert np.allclose(fs_tp, fmodel_true_tp.flatten().tolist())
# Test warning raise with invalid combination of yaw angles and power setpoints
with caplog.at_level(logging.WARNING):
floris_standin.get_step(5.0, yaw_angles=[230.0, 240.0], power_setpoints=[1e3, 1e3])
assert caplog.text != "" # Checking not empty
caplog.clear()
# Test with valid combination of yaw angles and power setpoints
yaw_angles = [260.0, 240.0]
power_setpoints = [None, 1e3]
fs_ws, fs_wd, fs_tp, fs_twd = floris_standin.get_step(
5.0,
yaw_angles=yaw_angles,
power_setpoints=power_setpoints
)
floris_power_setpoints = np.array([power_setpoints])
floris_power_setpoints[0,1] *= 1e3
fmodel_true.set(
yaw_angles=default_wind_direction - np.array([yaw_angles]),
power_setpoints=floris_power_setpoints
)
fmodel_true.run()
fmodel_true_tp = fmodel_true.get_turbine_powers() / 1000
assert np.allclose(fs_tp, fmodel_true_tp.flatten().tolist())
def test_FlorisStandin_with_standin_data_yaw_angles():
floris_standin = FlorisStandin(CONFIG, AMR_INPUT, AMR_EXTERNAL_DATA, smoothing_coefficient=0.0)
yaw_angles_all = [
[240.0, 240.0],
[240.0, 240.0], # Step up from 8 to 10 m/s
[240.0, 240.0],
[240.0, 240.0], # Step back down to 8 m/s
[240.0, 240.0], # wd changes to 270.0 here
[270.0, 270.0], # change to match wd
[270.0, 270.0],
[250.0, 270.0], # Apply simple wake steering
[250.0, 270.0],
[250.0, 270.0],
]
# Initialize storage
fs_ws_all = []
fs_wd_all = []
fs_tp_all = []
fs_twd_all = []
for i, s in enumerate(np.arange(0, 10.0, 1.0)):
fs_ws, fs_wd, fs_tp, fs_twd = floris_standin.get_step(s, yaw_angles=yaw_angles_all[i])
fs_ws_all.append(fs_ws)
fs_wd_all.append(fs_wd)
fs_tp_all.append(fs_tp)
fs_twd_all.append(fs_twd)
# Check standin data mapped over correctly
assert fs_ws_all == floris_standin.standin_data.amr_wind_speed.to_list()
assert fs_wd_all == floris_standin.standin_data.amr_wind_direction.to_list()
assert np.allclose(
np.array(fs_twd_all)[:, 0], floris_standin.standin_data.amr_wind_direction.values
)
# Check power behaves as expected
# Same condition for each
fs_tp_all = np.array(fs_tp_all)
assert np.allclose(fs_tp_all[0, :], fs_tp_all[3, :])
# Higher power at 10 than 8 m/s
assert (np.array(fs_tp_all[0, :]) < np.array(fs_tp_all[1, :])).all()
# Same power at upstream turbine, lower power at downstream turbine when aligned
assert fs_tp_all[5, 0] == fs_tp_all[0, 0]
assert fs_tp_all[5, 1] < fs_tp_all[0, 1]
# Lower power at upstream turbine, higher power at downstream turbine when steering,
# total power uplift
assert fs_tp_all[7, 0] < fs_tp_all[6, 0]
assert fs_tp_all[7, 1] > fs_tp_all[6, 1]
assert fs_tp_all[7, :].sum() > fs_tp_all[6, :].sum()
# More power steering at 10m/s than 8m/s
assert fs_tp_all[9, :].sum() > fs_tp_all[7, :].sum()
def test_FlorisStandin_with_standin_data_power_setpoints():
floris_standin = FlorisStandin(CONFIG, AMR_INPUT, AMR_EXTERNAL_DATA, smoothing_coefficient=0.0)
power_setpoints_all = [
[None, None],
[None, None], # Step up from 8 to 10 m/s
[None, None],
[None, None], # Step back down to 8 m/s
[None, None], # wd changes to 270.0 here
[1e3, None], # Apply a power setpoint at upstream turbine
[1e3, 1e3], # Apply a power setpoint at both turbines
[None, 1e3], # Apply a power setpoint at downstream turbine
[None, None],
[None, None],
]
# Initialize storage
fs_ws_all = []
fs_wd_all = []
fs_tp_all = []
fs_twd_all = []
for i, s in enumerate(np.arange(0, 10.0, 1.0)):
fs_ws, fs_wd, fs_tp, fs_twd = floris_standin.get_step(
s,
power_setpoints=power_setpoints_all[i]
)
fs_ws_all.append(fs_ws)
fs_wd_all.append(fs_wd)
fs_tp_all.append(fs_tp)
fs_twd_all.append(fs_twd)
# Check power behaves as expected
# Same condition for each
fs_tp_all = np.array(fs_tp_all)
assert np.allclose(fs_tp_all[0, :], fs_tp_all[3, :])
# Higher power at 10 than 8 m/s
assert (np.array(fs_tp_all[0, :]) < np.array(fs_tp_all[1, :])).all()
# Same power at upstream turbine, lower power at downstream turbine when aligned
assert fs_tp_all[4, 0] == fs_tp_all[0, 0]
assert fs_tp_all[4, 1] < fs_tp_all[0, 1]
# Turbines match setpoint when set
assert fs_tp_all[5, 0] == 1e3
assert fs_tp_all[5, 1] > 1e3
assert (fs_tp_all[6, :] == 1e3).all()
assert fs_tp_all[7, 0] > 1e3
assert fs_tp_all[7, 1] <= 1e3
def test_FlorisStandin_smoothing_coefficient():
floris_standin_no_smoothing = FlorisStandin(CONFIG, AMR_INPUT, smoothing_coefficient=0.0)
floris_standin_default_smoothing = FlorisStandin(CONFIG, AMR_INPUT)
floris_standin_heavy_smoothing = FlorisStandin(CONFIG, AMR_INPUT, smoothing_coefficient=0.9)
# Start at zero power
floris_standin_no_smoothing.turbine_powers_prev = np.zeros(2)
floris_standin_default_smoothing.turbine_powers_prev = np.zeros(2)
floris_standin_heavy_smoothing.turbine_powers_prev = np.zeros(2)
# Step forward
fs_tp_no_smoothing = floris_standin_no_smoothing.get_step(1.0)[2]
fs_tp_default_smoothing = floris_standin_default_smoothing.get_step(1.0)[2]
fs_tp_heavy_smoothing = floris_standin_heavy_smoothing.get_step(1.0)[2]
# Check smoothing ordering correct
assert (np.array(fs_tp_no_smoothing) > np.array(fs_tp_default_smoothing)).all()
assert (np.array(fs_tp_default_smoothing) > np.array(fs_tp_heavy_smoothing)).all()
# Check magnitude is correct
assert np.allclose(0.1*np.array(fs_tp_no_smoothing), np.array(fs_tp_heavy_smoothing))
def test_FlorisStandin_with_heterogeneous_inflow():
floris_standin = FlorisStandin(
CONFIG,
AMR_INPUT,
AMR_EXTERNAL_DATA_HET,
smoothing_coefficient=0.0
)
# Set the wind shear to zero to simplify comparisons
floris_standin.fmodel.set(wind_shear=0.0)
# Check heterogeneous map interpolation at specified time
floris_standin.get_step(0)
turbine_vels_norm_0 = floris_standin.fmodel.turbine_average_velocities / 8.0
assert np.allclose(turbine_vels_norm_0[0,0], 0.9)
assert 0.9 < turbine_vels_norm_0[0,1] < 1.0 # Unwaked at 0 degrees
# Check matches data at time 0
# Check changed correctly at time 4
floris_standin.get_step(4)
turbine_vels_norm_4 = floris_standin.fmodel.turbine_average_velocities / 8.0
assert np.allclose(turbine_vels_norm_4[0,:], 1.0)
# Check interpolates correctly for time step 2.
floris_standin.get_step(2)
turbine_vels_norm_2 = floris_standin.fmodel.turbine_average_velocities / 8.0
assert (turbine_vels_norm_0 < turbine_vels_norm_2).all()
assert (turbine_vels_norm_2 < turbine_vels_norm_4).all()
def test_FlorisStandin_with_bad_heterogeneous_inflow():
# Read in valid heterogeneous inflow data
df_het_orig = pd.read_csv(AMR_EXTERNAL_DATA_HET)
# Add extra key to heterogeneous_inflow_config; reprint
for i in range(len(df_het_orig)):
dict_temp = eval(df_het_orig.heterogeneous_inflow_config.iloc[i])
dict_temp["bad_key"] = 0.0
df_het_orig.heterogeneous_inflow_config.iloc[i] = str(dict_temp)
# Write to new file
AMR_EXTERNAL_DATA_HET_BAD = (
Path(__file__).resolve().parent / "test_inputs" / "amr_standin_data_het_bad.csv"
)
df_het_orig.to_csv(AMR_EXTERNAL_DATA_HET_BAD, index=False)
# Check that the extra key is just ignored
floris_standin = FlorisStandin(
CONFIG,
AMR_INPUT,
AMR_EXTERNAL_DATA_HET_BAD,
smoothing_coefficient=0.0
)
# Set the wind shear to zero to simplify comparisons
floris_standin.fmodel.set(wind_shear=0.0)
# Step, check ok
floris_standin.get_step(0)
turbine_vels_norm_0 = floris_standin.fmodel.turbine_average_velocities / 8.0
assert np.allclose(turbine_vels_norm_0[0,0], 0.9)
# Remove needed key for heterogeneous_inflow_config
df_het_orig = pd.read_csv(AMR_EXTERNAL_DATA_HET)
for i in range(len(df_het_orig)):
dict_temp = eval(df_het_orig.heterogeneous_inflow_config.iloc[i])
del dict_temp["speed_multipliers"]
df_het_orig.heterogeneous_inflow_config.iloc[i] = str(dict_temp)
df_het_orig.to_csv(AMR_EXTERNAL_DATA_HET_BAD, index=False)
# Check proceeds without heterogeneity
floris_standin = FlorisStandin(
CONFIG,
AMR_INPUT,
AMR_EXTERNAL_DATA_HET_BAD,
smoothing_coefficient=0.0
)
floris_standin.fmodel.set(wind_shear=0.0)
floris_standin.get_step(0)
turbine_vels_norm_0 = floris_standin.fmodel.turbine_average_velocities / 8.0
assert np.allclose(turbine_vels_norm_0, 1.0) # No heterogeneity applied
# Delete the bad file
AMR_EXTERNAL_DATA_HET_BAD.unlink()