Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Poisson distributed requests for benchmark #131

Merged
merged 16 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions evals/benchmark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,18 @@ test_suite_config:
deployment_type: "k8s" # Default is "k8s", can also be "docker"
service_ip: None # Leave as None for k8s, specify for Docker
service_port: None # Leave as None for k8s, specify for Docker
concurrent_level: 4 # The concurrency level
load_shape: # Tenant concurrency pattern
name: constant # poisson or constant(locust default load shape)
params: # Loadshape-specific parameters
constant: # Poisson load shape specific parameters, activate only if load_shape is poisson
concurrent_level: 4 # If user_queries is specified, concurrent_level is target number of requests per user. If not, it is the number of simulated users
poisson: # Poisson load shape specific parameters, activate only if load_shape is poisson
arrival-rate: 1.0 # Request arrival rate
warm_ups: 0 # Number of test requests for warm-ups
run_time: 60m # Total runtime for the test suite
user_queries: [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048] # Number of test requests
query_timeout: 120 # Number of seconds to wait for a simulated user to complete any executing task before exiting. 120 sec by defeult.
random_prompt: false # Use random prompts if true, fixed prompts if false
run_time: 60m # Total runtime for the test suite
collect_service_metric: false # Enable service metrics collection
data_visualization: false # Enable data visualization
test_output_dir: "/home/sdp/benchmark_output" # Directory for test outputs
Expand Down
165 changes: 137 additions & 28 deletions evals/benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,70 +40,157 @@ def extract_test_case_data(content):

return {
"examples": test_suite_config.get("examples", []),
"concurrent_level": test_suite_config.get("concurrent_level"),
"warm_ups": test_suite_config.get("warm_ups", 0),
"user_queries": test_suite_config.get("user_queries", []),
"random_prompt": test_suite_config.get("random_prompt"),
"test_output_dir": test_suite_config.get("test_output_dir"),
"run_time": test_suite_config.get("run_time"),
"run_time": test_suite_config.get("run_time", None),
"collect_service_metric": test_suite_config.get("collect_service_metric"),
"llm_model": test_suite_config.get("llm_model"),
"deployment_type": test_suite_config.get("deployment_type"),
"service_ip": test_suite_config.get("service_ip"),
"service_port": test_suite_config.get("service_port"),
"load_shape": test_suite_config.get("load_shape"),
"query_timeout": test_suite_config.get("query_timeout", 120),
"all_case_data": {
example: content["test_cases"].get(example, {}) for example in test_suite_config.get("examples", [])
},
}


def create_run_yaml_content(service, base_url, bench_target, concurrency, user_queries, test_suite_config):
def create_run_yaml_content(service, base_url, bench_target, test_phase, num_queries, test_params):
"""Create content for the run.yaml file."""
return {

# If a load shape includes the parameter concurrent_level,
# the parameter will be passed to Locust to launch fixed
# number of simulated users.
concurrency = 1
try:
load_shape = test_params["load_shape"]["name"]
load_shape_params = test_params["load_shape"]["params"][load_shape]
if load_shape_params and load_shape_params["concurrent_level"]:
if num_queries >= 0:
concurrency = max(1, num_queries // load_shape_params["concurrent_level"])
else:
concurrency = load_shape_params["concurrent_level"]
except KeyError as e:
# If the concurrent_level is not specified, load shapes should
# manage concurrency and user spawn rate by themselves.
pass

yaml_content = {
"profile": {
"storage": {"hostpath": test_suite_config["test_output_dir"]},
"storage": {"hostpath": test_params["test_output_dir"]},
"global-settings": {
"tool": "locust",
"locustfile": os.path.join(os.getcwd(), "stresscli/locust/aistress.py"),
"host": base_url,
"stop-timeout": 120,
"stop-timeout": test_params["query_timeout"],
"processes": 2,
"namespace": "default",
"bench-target": bench_target,
"run-time": test_suite_config["run_time"],
"service-metric-collect": test_suite_config["collect_service_metric"],
"service-metric-collect": test_params["collect_service_metric"],
"service-list": service.get("service_list", []),
"llm-model": test_suite_config["llm_model"],
"deployment-type": test_suite_config["deployment_type"],
"llm-model": test_params["llm_model"],
"deployment-type": test_params["deployment_type"],
"load-shape": test_params["load_shape"],
},
"runs": [{"name": "benchmark", "users": concurrency, "max-request": user_queries}],
"runs": [{"name": test_phase, "users": concurrency, "max-request": num_queries}],
}
}

# For the following scenarios, test will stop after the specified run-time
# 1) run_time is not specified in benchmark.yaml
# 2) Not a warm-up run
# TODO: According to Locust's doc, run-time should default to run forever,
# however the default is 48 hours.
if test_params["run_time"] is not None and test_phase != "warmup":
yaml_content["profile"]["global-settings"]["run-time"] = test_params["run_time"]

return yaml_content


def generate_stresscli_run_yaml(
example, case_type, case_params, test_params, test_phase, num_queries, base_url, ts
) -> str:
"""Create a stresscli configuration file and persist it on disk.

Parameters
----------
example : str
The name of the example.
case_type : str
The type of the test case
case_params : dict
The parameters of single test case.
test_phase : str [warmup|benchmark]
Current phase of the test.
num_queries : int
The number of test requests sent to SUT
base_url : str
The root endpoint of SUT
test_params : dict
The parameters of the test
ts : str
Timestamp

Returns
-------
run_yaml_path : str
The path of the generated YAML file.
"""
# Get the workload
if case_type == "e2e":
bench_target = f"{example}{'bench' if test_params['random_prompt'] else 'fixed'}"
else:
bench_target = f"{case_type}{'bench' if test_params['random_prompt'] else 'fixed'}"

# Generate the content of stresscli configuration file
stresscli_yaml = create_run_yaml_content(case_params, base_url, bench_target, test_phase, num_queries, test_params)

# Dump the stresscli configuration file
service_name = case_params.get("service_name")
run_yaml_path = os.path.join(
test_params["test_output_dir"], f"run_{service_name}_{ts}_{test_phase}_{num_queries}.yaml"
)
with open(run_yaml_path, "w") as yaml_file:
yaml.dump(stresscli_yaml, yaml_file)

return run_yaml_path


def create_and_save_run_yaml(example, deployment_type, service_type, service, base_url, test_suite_config, index):
"""Create and save the run.yaml file for the service being tested."""
os.makedirs(test_suite_config["test_output_dir"], exist_ok=True)

service_name = service.get("service_name")
run_yaml_paths = []
for user_queries in test_suite_config["user_queries"]:
concurrency = max(1, user_queries // test_suite_config["concurrent_level"])

if service_type == "e2e":
bench_target = f"{example}{'bench' if test_suite_config['random_prompt'] else 'fixed'}"
else:
bench_target = f"{service_type}{'bench' if test_suite_config['random_prompt'] else 'fixed'}"
run_yaml_content = create_run_yaml_content(
service, base_url, bench_target, concurrency, user_queries, test_suite_config
)

run_yaml_path = os.path.join(
test_suite_config["test_output_dir"], f"run_{service_name}_{index}_users_{user_queries}.yaml"
# Add YAML configuration of stresscli for warm-ups
warm_ups = test_suite_config["warm_ups"]
if warm_ups is not None and warm_ups > 0:
run_yaml_paths.append(
generate_stresscli_run_yaml(
example, service_type, service, test_suite_config, "warmup", warm_ups, base_url, index
)
)
with open(run_yaml_path, "w") as yaml_file:
yaml.dump(run_yaml_content, yaml_file)

run_yaml_paths.append(run_yaml_path)
# Add YAML configuration of stresscli for benchmark
user_queries_lst = test_suite_config["user_queries"]
if user_queries_lst is None or len(user_queries_lst) == 0:
# Test stop is controlled by run time
run_yaml_paths.append(
generate_stresscli_run_yaml(
example, service_type, service, test_suite_config, "benchmark", -1, base_url, index
)
)
else:
# Test stop is controlled by request count
for user_queries in user_queries_lst:
run_yaml_paths.append(
generate_stresscli_run_yaml(
example, service_type, service, test_suite_config, "benchmark", user_queries, base_url, index
)
)

return run_yaml_paths

Expand Down Expand Up @@ -178,13 +265,31 @@ def process_service(example, service_type, case_data, test_suite_config):
run_service_test(example, service_type, service, test_suite_config)


def check_test_suite_config(test_suite_config):
"""Check the configuration of test suite.

Parameters
----------
test_suite_config : dict
The name of the example.

Raises
-------
ValueError
If incorrect configuration detects
"""

# User must specify either run_time or user_queries.
if test_suite_config["run_time"] is None and len(test_suite_config["user_queries"]) == 0:
raise ValueError("Must specify either run_time or user_queries.")


if __name__ == "__main__":
# Load test suit configuration
yaml_content = load_yaml("./benchmark.yaml")
# Extract data
parsed_data = extract_test_case_data(yaml_content)
test_suite_config = {
"concurrent_level": parsed_data["concurrent_level"],
"user_queries": parsed_data["user_queries"],
"random_prompt": parsed_data["random_prompt"],
"run_time": parsed_data["run_time"],
Expand All @@ -194,7 +299,11 @@ def process_service(example, service_type, case_data, test_suite_config):
"service_ip": parsed_data["service_ip"],
"service_port": parsed_data["service_port"],
"test_output_dir": parsed_data["test_output_dir"],
"load_shape": parsed_data["load_shape"],
"query_timeout": parsed_data["query_timeout"],
"warm_ups": parsed_data["warm_ups"],
}
check_test_suite_config(test_suite_config)

# Mapping of example names to service types
example_service_map = {
Expand Down
12 changes: 10 additions & 2 deletions evals/benchmark/benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@ test_suite_config: # Overall configuration settings for the test suite
deployment_type: "k8s" # Default is "k8s", can also be "docker"
service_ip: None # Leave as None for k8s, specify for Docker
service_port: None # Leave as None for k8s, specify for Docker
concurrent_level: 4 # The concurrency level, adjustable based on requirements
warm_ups: 0 # Number of test requests for warm-up
run_time: 60m # The max total run time for the test suite
user_queries: [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048] # Number of test requests at each concurrency level
query_timeout: 120 # Number of seconds to wait for a simulated user to complete any executing task before exiting. 120 sec by defeult.
random_prompt: false # Use random prompts if true, fixed prompts if false
run_time: 60m # The max total run time for the test suite
collect_service_metric: false # Collect service metrics if true, do not collect service metrics if false
data_visualization: false # Generate data visualization if true, do not generate data visualization if false
llm_model: "Intel/neural-chat-7b-v3-3" # The LLM model used for the test
test_output_dir: "/home/sdp/benchmark_output" # The directory to store the test output
load_shape: # Tenant concurrency pattern
name: constant # poisson or constant(locust default load shape)
params: # Loadshape-specific parameters
constant: # Poisson load shape specific parameters, activate only if load_shape is poisson
concurrent_level: 4 # If user_queries is specified, concurrent_level is target number of requests per user. If not, it is the number of simulated users
poisson: # Poisson load shape specific parameters, activate only if load_shape is poisson
arrival-rate: 1.0 # Request arrival rate

test_cases:
chatqna:
Expand Down
Loading