Skip to content

Commit

Permalink
chore(cli-e2e): adding test with Kafka trigger on e2e (#3088)
Browse files Browse the repository at this point in the history
* Adding test with Kafka trigger on e2e

* Add close procedures to Kafka test
  • Loading branch information
danielbdias authored Aug 21, 2023
1 parent 847caca commit 07289cd
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 16 deletions.
6 changes: 3 additions & 3 deletions testing/cli-e2etest/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ help: Makefile ## show list of commands
@echo ""
@awk 'BEGIN {FS = ":.*?## "} /[a-zA-Z_-]+:.*?## / {sub("\\\\n",sprintf("\n%22c"," "), $$2);printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort

test: # run tests for this application
test: ## run tests for this application
export TRACETEST_CLI=$(TRACETEST_CLI); \
export TEST_ENVIRONMENT=$(TEST_ENVIRONMENT); \
export TAG=$(TAG); \
export ENABLE_CLI_DEBUG=$(ENABLE_CLI_DEBUG); \
go clean -testcache; \
go test -v -timeout 300s -p 1 ./...;

test/scenario: # run tests for this application
test/scenario: ## run tests for this application
export TRACETEST_CLI=$(TRACETEST_CLI); \
export TEST_ENVIRONMENT=$(TEST_ENVIRONMENT); \
export TAG=$(TAG); \
export ENABLE_CLI_DEBUG=$(ENABLE_CLI_DEBUG); \
go clean -testcache; \
go test -v -timeout 300s -p 1 "$(PROJECT_ROOT)/testscenarios/$(TEST_SCENARIO)";

test/debug: # run tests for this application with debug mode enabled
test/debug: ## run tests for this application with debug mode enabled
export ENABLE_CLI_DEBUG="true"; \
make test;
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
services:
cache:
healthcheck:
test:
- CMD
- redis-cli
- ping
timeout: 3s
interval: 1s
retries: 60
image: redis:6
restart: unless-stopped

demo-api:
depends_on:
cache:
condition: service_healthy
postgres:
condition: service_healthy
queue:
condition: service_healthy
environment:
COLLECTOR_ENDPOINT: http://otel-collector:4317
DATABASE_URL: postgresql://postgres:postgres@postgres:5432/postgres?schema=public
NPM_RUN_COMMAND: api
POKE_API_BASE_URL: https://pokeapi.co/api/v2
RABBITMQ_HOST: queue
REDIS_URL: cache
healthcheck:
test:
- CMD
- wget
- --spider
- localhost:8081
timeout: 3s
interval: 1s
retries: 60
image: kubeshop/demo-pokemon-api:latest
ports:
- mode: ingress
target: 8081
published: 8081
protocol: tcp
pull_policy: always
restart: unless-stopped

demo-rpc:
depends_on:
cache:
condition: service_healthy
postgres:
condition: service_healthy
queue:
condition: service_healthy
environment:
COLLECTOR_ENDPOINT: http://otel-collector:4317
DATABASE_URL: postgresql://postgres:postgres@postgres:5432/postgres?schema=public
NPM_RUN_COMMAND: rpc
POKE_API_BASE_URL: https://pokeapi.co/api/v2
RABBITMQ_HOST: queue
REDIS_URL: cache
healthcheck:
test:
- CMD
- lsof
- -i
- "8082"
timeout: 3s
interval: 1s
retries: 60
image: kubeshop/demo-pokemon-api:latest
ports:
- mode: ingress
target: 8082
published: 8082
protocol: tcp
pull_policy: always
restart: unless-stopped

demo-worker:
depends_on:
cache:
condition: service_healthy
postgres:
condition: service_healthy
queue:
condition: service_healthy
environment:
COLLECTOR_ENDPOINT: http://otel-collector:4317
DATABASE_URL: postgresql://postgres:postgres@postgres:5432/postgres?schema=public
NPM_RUN_COMMAND: worker
POKE_API_BASE_URL: https://pokeapi.co/api/v2
RABBITMQ_HOST: queue
REDIS_URL: cache
image: kubeshop/demo-pokemon-api:latest
pull_policy: always
restart: unless-stopped

demo-streaming-worker:
image: kubeshop/demo-pokemon-api:latest
environment:
DATABASE_URL: postgresql://postgres:postgres@postgres:5432/postgres?schema=public
POKE_API_BASE_URL: https://pokeapi.co/api/v2
COLLECTOR_ENDPOINT: http://otel-collector:4317
ZIPKIN_URL: http://localhost:9411
NPM_RUN_COMMAND: stream-worker
KAFKA_BROKER: 'stream:9092'
KAFKA_TOPIC: 'pokemon'
KAFKA_CLIENT_ID: 'streaming-worker'
REDIS_URL: cache
depends_on:
postgres:
condition: service_healthy
stream:
condition: service_healthy
cache:
condition: service_healthy
otel-collector:
condition: service_started

queue:
healthcheck:
test:
- CMD-SHELL
- rabbitmq-diagnostics -q check_running
timeout: 5s
interval: 1s
retries: 60
image: rabbitmq:3.8-management
restart: unless-stopped

stream:
image: confluentinc/cp-kafka:latest-ubi8
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://stream:9092
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
- [email protected]:9093
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
- KAFKA_PROCESS_ROLES=controller,broker
- KAFKA_NODE_ID=1
- KAFKA_METADATA_LOG_SEGMENT_MS=15000
- KAFKA_METADATA_MAX_RETENTION_MS=60000
- KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS=2800
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_HEAP_OPTS=-Xmx200m -Xms200m
- CLUSTER_ID=ckjPoprWQzOf0-FuNkGfFQ
healthcheck:
test: nc -z stream 9092
start_period: 10s
interval: 5s
timeout: 10s
retries: 10

networks:
default:
name: _default
52 changes: 39 additions & 13 deletions testing/cli-e2etest/environment/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ type Manager interface {
type option func(*internalManager)

type internalManager struct {
environmentType string
dockerComposeNoApiFilePath string
dockerComposePokeshopFilePath string
dockerProjectName string
pokeshopEnabled bool
datastoreEnabled bool
environmentType string
dockerComposeNoApiFilePath string
dockerComposePokeshopFilePath string
dockerComposePokeshopWithStreamFilepath string
dockerProjectName string
pokeshopEnabled bool
pokeshopStreamEnabled bool
datastoreEnabled bool
}

func CreateAndStart(t *testing.T, options ...option) Manager {
Expand All @@ -67,6 +69,13 @@ func WithPokeshop() option {
}
}

func WithPokeshopWithStream() option {
return func(im *internalManager) {
im.pokeshopEnabled = true
im.pokeshopStreamEnabled = true
}
}

func WithDataStoreEnabled() option {
return func(im *internalManager) {
im.datastoreEnabled = true
Expand All @@ -90,14 +99,16 @@ func GetManager(environmentType string, options ...option) Manager {
currentDir := getExecutingDir()
dockerComposeNoApiFilepath := fmt.Sprintf("%s/%s/server-setup/docker-compose-no-api.yaml", currentDir, environmentType)
dockerComposePokeshopFilepath := fmt.Sprintf("%s/%s/server-setup/docker-compose-pokeshop.yaml", currentDir, environmentType)
dockerComposePokeshopWithStreamFilepath := fmt.Sprintf("%s/%s/server-setup/docker-compose-pokeshop-with-stream.yaml", currentDir, environmentType)

atomic.AddInt64(&envCounter, 1)

manager := &internalManager{
environmentType: environmentType,
dockerComposeNoApiFilePath: dockerComposeNoApiFilepath,
dockerComposePokeshopFilePath: dockerComposePokeshopFilepath,
dockerProjectName: fmt.Sprintf("tracetest-env-%d", envCounter),
environmentType: environmentType,
dockerComposeNoApiFilePath: dockerComposeNoApiFilepath,
dockerComposePokeshopFilePath: dockerComposePokeshopFilepath,
dockerComposePokeshopWithStreamFilepath: dockerComposePokeshopWithStreamFilepath,
dockerProjectName: fmt.Sprintf("tracetest-env-%d", envCounter),
}

for _, option := range options {
Expand All @@ -120,10 +131,15 @@ func (m *internalManager) Start(t *testing.T) {
}

if m.pokeshopEnabled {
pokeshopDockerComposeFile := m.dockerComposePokeshopFilePath
if m.pokeshopStreamEnabled {
pokeshopDockerComposeFile = m.dockerComposePokeshopWithStreamFilepath
}

args = []string{
"compose",
"--file", m.dockerComposeNoApiFilePath, // choose docker compose relative to the chosen environment
"--file", m.dockerComposePokeshopFilePath, // choose docker compose relative to the chosen environment
"--file", pokeshopDockerComposeFile, // choose docker compose relative to the chosen environment
"--project-name", m.dockerProjectName, // create a project name to isolate this scenario
"up", "--detach",
}
Expand All @@ -138,10 +154,15 @@ func (m *internalManager) Start(t *testing.T) {
waitForPort("11633")

if m.pokeshopEnabled {
sleepTime := 10 * time.Second
if m.pokeshopStreamEnabled {
sleepTime = 30 * time.Second
}

// wait for pokeshop services
waitForPort("8081")
waitForPort("8082")
time.Sleep(10 * time.Second)
time.Sleep(sleepTime)
}

if m.datastoreEnabled {
Expand Down Expand Up @@ -198,10 +219,15 @@ func (m *internalManager) Close(t *testing.T) {
}

if m.pokeshopEnabled {
pokeshopDockerComposeFile := m.dockerComposePokeshopFilePath
if m.pokeshopStreamEnabled {
pokeshopDockerComposeFile = m.dockerComposePokeshopWithStreamFilepath
}

args = []string{
"compose",
"--file", m.dockerComposeNoApiFilePath, // choose docker compose relative to the chosen environment
"--file", m.dockerComposePokeshopFilePath, // choose docker compose relative to the chosen environment
"--file", pokeshopDockerComposeFile, // choose docker compose relative to the chosen environment
"--project-name", m.dockerProjectName, // choose isolated project name
"rm",
"--force", // bypass removal question
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
type: Test
spec:
id: tf0tOTRSg
name: Import a Pokemon reading a Stream
description: Import a Pokemon via Stream
trigger:
type: kafka
kafka:
brokerUrls:
- stream:9092
topic: pokemon
headers: []
messageKey: snorlax-key
messageValue: "{\"id\":143}"
specs:
- selector: span[tracetest.span.type="messaging" name="pokemon process" messaging.system="kafka" messaging.destination="pokemon" messaging.destination_kind="topic" messaging.operation="process"]
name: A message was received from Kafka stream
assertions:
- attr:messaging.system = "kafka"
- selector: span[tracetest.span.type="general" name="import pokemon"]
name: Import Pokemon use case was triggered
assertions:
- attr:name = "import pokemon"
28 changes: 28 additions & 0 deletions testing/cli-e2etest/testscenarios/test/run_test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,31 @@ func TestRunTestWithGrpcTrigger(t *testing.T) {
require.Contains(result.StdOut, "✔ It calls Pokeshop correctly") // checks if the assertion was succeeded
})
}

func TestRunTestWithKafkaTrigger(t *testing.T) {
// setup isolated e2e environment
env := environment.CreateAndStart(t, environment.WithDataStoreEnabled(), environment.WithPokeshopWithStream())
defer env.Close(t)

cliConfig := env.GetCLIConfigPath(t)

t.Run("should pass", func(t *testing.T) {
// instantiate require with testing helper
require := require.New(t)

// Given I am a Tracetest CLI user
// And I have my server recently created
// And the datasource is already set

// When I try to run a test with a Kafka trigger
// Then it should pass
testFile := env.GetTestResourcePath(t, "kafka-trigger")

command := fmt.Sprintf("run test -f %s", testFile)
result := tracetestcli.Exec(t, command, tracetestcli.WithCLIConfig(cliConfig))
helpers.RequireExitCodeEqual(t, result, 0)
// checks if the assertions were succeeded
require.Contains(result.StdOut, "✔ Import Pokemon use case was triggered")
require.Contains(result.StdOut, "✔ A message was received from Kafka stream")
})
}

0 comments on commit 07289cd

Please sign in to comment.