Skip to content

Commit

Permalink
Systematic testing enhancements
Browse files Browse the repository at this point in the history
Update SYSTEMATIC_TESTING_PRINTF to take a level argument.
Add cli argument for controlling output level of systematic testing printf's.
Add SYSTEMATIC_TESTING_YIELD and SYSTEMATIC_TESTING_PRINTF in a lot more places.
  • Loading branch information
dipinhora committed Aug 4, 2022
1 parent 5978d14 commit c237129
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 23 deletions.
45 changes: 45 additions & 0 deletions src/libponyrt/actor/actor.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#define PONY_WANT_ATOMIC_DEFS

#include "actor.h"
#include "../sched/systematic_testing.h"
#include "../sched/scheduler.h"
#include "../sched/cpu.h"
#include "../mem/pool.h"
Expand Down Expand Up @@ -98,12 +99,14 @@ static bool has_sync_flag(pony_actor_t* actor, uint8_t flag)
static void set_sync_flag(pony_actor_t* actor, uint8_t flag)
{
uint8_t flags = atomic_load_explicit(&actor->sync_flags, memory_order_acquire);
SYSTEMATIC_TESTING_YIELD();
atomic_store_explicit(&actor->sync_flags, flags | flag, memory_order_release);
}

static void unset_sync_flag(pony_actor_t* actor, uint8_t flag)
{
uint8_t flags = atomic_load_explicit(&actor->sync_flags, memory_order_acquire);
SYSTEMATIC_TESTING_YIELD();
atomic_store_explicit(&actor->sync_flags, flags & (uint8_t)~flag,
memory_order_release);
}
Expand Down Expand Up @@ -150,12 +153,14 @@ static void unset_internal_flag(pony_actor_t* actor, uint8_t flag)

static void mute_actor(pony_actor_t* actor)
{
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu muted...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
set_sync_flag(actor, SYNC_FLAG_MUTED);
DTRACE1(ACTOR_MUTED, (uintptr_t)actor);
}

void ponyint_unmute_actor(pony_actor_t* actor)
{
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu unmuted...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
unset_sync_flag(actor, SYNC_FLAG_MUTED);
DTRACE1(ACTOR_UNMUTED, (uintptr_t)actor);
}
Expand All @@ -168,6 +173,7 @@ static bool triggers_muting(pony_actor_t* actor)

static void actor_setoverloaded(pony_actor_t* actor)
{
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu overloaded...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(!ponyint_is_cycle(actor));
set_sync_flag(actor, SYNC_FLAG_OVERLOADED);
DTRACE1(ACTOR_OVERLOADED, (uintptr_t)actor);
Expand All @@ -176,6 +182,7 @@ static void actor_setoverloaded(pony_actor_t* actor)
static void actor_unsetoverloaded(pony_actor_t* actor)
{
pony_ctx_t* ctx = pony_ctx();
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu unoverloaded...\n", ctx->scheduler->tid, (uintptr_t)actor);
unset_sync_flag(actor, SYNC_FLAG_OVERLOADED);
DTRACE1(ACTOR_OVERLOADED_CLEARED, (uintptr_t)actor);
if (!has_sync_flag(actor, SYNC_FLAG_UNDER_PRESSURE))
Expand Down Expand Up @@ -264,6 +271,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu handled message ACTORMSG_ACQUIRE...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(!ponyint_is_cycle(actor));
pony_msgp_t* m = (pony_msgp_t*)msg;

Expand Down Expand Up @@ -291,6 +299,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu handled message ACTORMSG_RELEASE...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(!ponyint_is_cycle(actor));
pony_msgp_t* m = (pony_msgp_t*)msg;

Expand Down Expand Up @@ -318,6 +327,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msgi_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: CD actor %lu handled message ACTORMSG_ACK...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
Expand All @@ -331,6 +341,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msgi_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu handled message ACTORMSG_CONF...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(!ponyint_is_cycle(actor));
if(has_internal_flag(actor, FLAG_BLOCKED_SENT))
{
Expand All @@ -349,6 +360,8 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msg_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu handled message ACTORMSG_ISBLOCKED...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);

// this actor should not already be marked as pendingdestroy
// or else we could end up double freeing it
// this assertion is to ensure this invariant is not
Expand Down Expand Up @@ -378,6 +391,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
{
// runtimestats messages tracked in cycle detector

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: CD actor %lu handled message ACTORMSG_BLOCK...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
Expand All @@ -391,6 +405,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: CD actor %lu handled message ACTORMSG_UNBLOCK...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
Expand All @@ -404,6 +419,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msgp_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: CD actor %lu handled message ACTORMSG_DESTROYED...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
Expand All @@ -417,6 +433,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_ALLOC_SIZE(pony_msg_t);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: CD actor %lu handled message ACTORMSG_CHECKBLOCKED...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(ponyint_is_cycle(actor));
DTRACE3(ACTOR_MSG_RUN, (uintptr_t)ctx->scheduler, (uintptr_t)actor, msg->id);
actor->type->dispatch(ctx, actor, msg);
Expand All @@ -430,6 +447,7 @@ static bool handle_message(pony_ctx_t* ctx, pony_actor_t* actor,
ctx->schedulerstats.mem_allocated_inflight_messages -= POOL_SIZE(msg->index);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu handled APP message...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(!ponyint_is_cycle(actor));
if(has_internal_flag(actor, FLAG_BLOCKED_SENT))
{
Expand Down Expand Up @@ -459,6 +477,7 @@ static void try_gc(pony_ctx_t* ctx, pony_actor_t* actor)
ctx->schedulerstats.misc_cpu += used_cpu;
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: GC'ing actor %lu...\n", ctx->scheduler->tid, (uintptr_t)actor);
DTRACE1(GC_START, (uintptr_t)ctx->scheduler);

ponyint_gc_mark(ctx);
Expand All @@ -481,6 +500,7 @@ static void try_gc(pony_ctx_t* ctx, pony_actor_t* actor)
);
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: finished GC'ing actor %lu...\n", ctx->scheduler->tid, (uintptr_t)actor);
DTRACE1(GC_END, (uintptr_t)ctx->scheduler);

#ifdef USE_RUNTIMESTATS
Expand Down Expand Up @@ -531,6 +551,8 @@ static bool batch_limit_reached(pony_actor_t* actor, bool polling)
actor_setoverloaded(actor);
}

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu reached batch limit...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);

return true;
}

Expand All @@ -557,6 +579,7 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
ctx->schedulerstats.misc_cpu += used_cpu;
#endif

SYSTEMATIC_TESTING_YIELD();
bool app_msg = handle_message(ctx, actor, msg);

#ifdef USE_RUNTIMESTATS
Expand All @@ -573,6 +596,7 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)

// If we handle an application message, try to gc.
app++;
SYSTEMATIC_TESTING_YIELD();
try_gc(ctx, actor);

// maybe mute actor; returns true if mute occurs
Expand All @@ -596,7 +620,10 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
// Stop handling a batch if we reach the head we found when we were
// scheduled.
if(msg == head)
{
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu handled all messages...\n", ctx->scheduler->tid, (uintptr_t)actor);
break;
}
}

// We didn't hit our app message batch limit. We now believe our queue to be
Expand Down Expand Up @@ -638,6 +665,7 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
set_internal_flag(actor, FLAG_BLOCKED_SENT);
set_internal_flag(actor, FLAG_CD_CONTACTED);
ponyint_cycle_block(actor, &actor->gc);
SYSTEMATIC_TESTING_YIELD();
}

}
Expand All @@ -654,6 +682,7 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
// check if the actors queue is empty or not
if(ponyint_messageq_isempty(&actor->q))
{
SYSTEMATIC_TESTING_YIELD();
// The actors queue is empty which means this actor is a zombie
// and can be reaped.
// If the cycle detector is disabled (or it doesn't know about this actor),
Expand Down Expand Up @@ -686,6 +715,8 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
// make sure the queue is actually empty as expected
pony_assert(empty);

SYSTEMATIC_TESTING_YIELD();

// "Locally delete" the actor.
ponyint_actor_setpendingdestroy(actor);
ponyint_actor_final(ctx, actor);
Expand All @@ -707,6 +738,7 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
// and the cycle detector didn't send a message in the meantime
if(!ponyint_messageq_isempty(&actor->q))
{
SYSTEMATIC_TESTING_YIELD();
// need to undo set pending destroy because the cycle detector
// sent the actor a message and it is not safe to destroy this
// actor as a result.
Expand Down Expand Up @@ -750,9 +782,11 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)
// messages (which would also create a race condition).
set_internal_flag(actor, FLAG_BLOCKED_SENT);
ponyint_cycle_block(actor, &actor->gc);
SYSTEMATIC_TESTING_YIELD();

// mark the queue as empty or else destroy will hang
bool empty = ponyint_messageq_markempty(&actor->q);
SYSTEMATIC_TESTING_YIELD();

// make sure the queue is actually empty as expected
pony_assert(empty);
Expand All @@ -772,6 +806,7 @@ bool ponyint_actor_run(pony_ctx_t* ctx, pony_actor_t* actor, bool polling)

void ponyint_actor_destroy(pony_actor_t* actor)
{
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu destroyed...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);
pony_assert(has_sync_flag(actor, SYNC_FLAG_PENDINGDESTROY));

// Make sure the actor being destroyed has finished marking its queue
Expand Down Expand Up @@ -884,6 +919,8 @@ PONY_API pony_actor_t* pony_create(pony_ctx_t* ctx, pony_type_t* type,
ctx->schedulerstats.created_actors_counter++;
#endif

SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu created...\n", pony_ctx()->scheduler->tid, (uintptr_t)actor);

ponyint_messageq_init(&actor->q);
ponyint_heap_init(&actor->heap);
ponyint_gc_done(&actor->gc);
Expand Down Expand Up @@ -980,12 +1017,15 @@ PONY_API void pony_sendv(pony_ctx_t* ctx, pony_actor_t* to, pony_msg_t* first,
if(has_app_msg)
maybe_mark_should_mute(ctx, to);

SYSTEMATIC_TESTING_YIELD();

if(ponyint_actor_messageq_push(&to->q, first, last
#ifdef USE_DYNAMIC_TRACE
, ctx->scheduler, ctx->current, to
#endif
))
{
SYSTEMATIC_TESTING_YIELD();
if(!has_sync_flag(to, SYNC_FLAG_MUTED))
{
ponyint_sched_add(ctx, to);
Expand Down Expand Up @@ -1023,12 +1063,15 @@ PONY_API void pony_sendv_single(pony_ctx_t* ctx, pony_actor_t* to,
if(has_app_msg)
maybe_mark_should_mute(ctx, to);

SYSTEMATIC_TESTING_YIELD();

if(ponyint_actor_messageq_push_single(&to->q, first, last
#ifdef USE_DYNAMIC_TRACE
, ctx->scheduler, ctx->current, to
#endif
))
{
SYSTEMATIC_TESTING_YIELD();
if(!has_sync_flag(to, SYNC_FLAG_MUTED))
{
// if the receiving actor is currently not unscheduled AND it's not
Expand Down Expand Up @@ -1169,13 +1212,15 @@ PONY_API void pony_poll(pony_ctx_t* ctx)
PONY_API void pony_apply_backpressure()
{
pony_ctx_t* ctx = pony_ctx();
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu under pressure...\n", ctx->scheduler->tid, (uintptr_t)ctx->current);
set_sync_flag(ctx->current, SYNC_FLAG_UNDER_PRESSURE);
DTRACE1(ACTOR_UNDER_PRESSURE, (uintptr_t)ctx->current);
}

PONY_API void pony_release_backpressure()
{
pony_ctx_t* ctx = pony_ctx();
SYSTEMATIC_TESTING_PRINTF(8, "thread %lu: actor %lu not under pressure...\n", ctx->scheduler->tid, (uintptr_t)ctx->current);
unset_sync_flag(ctx->current, SYNC_FLAG_UNDER_PRESSURE);
DTRACE1(ACTOR_PRESSURE_RELEASED, (uintptr_t)ctx->current);
if (!has_sync_flag(ctx->current, SYNC_FLAG_OVERLOADED))
Expand Down
Loading

0 comments on commit c237129

Please sign in to comment.