Skip to content

Commit

Permalink
send_statustext_message (#71)
Browse files Browse the repository at this point in the history
* send_statustext_message

* updated function name

* Added test script and restriction for after encoding

* changed file name so pytest can't detect it

* Added testing

* Fixed formatting

* Refactored and made error messages print

* Unchanged connection address

* Main guarded test_send_messages

* Minor refactoring

* Minor fixes

---------

Co-authored-by: lilyjge <[email protected]>
  • Loading branch information
Aayush1104 and lilyjge authored Dec 25, 2024
1 parent ce40732 commit c7ab98a
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 16 deletions.
20 changes: 19 additions & 1 deletion modules/mavlink/flight_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ def create(cls, address: str, baud: int = 57600) -> "tuple[bool, FlightControlle
"""
try:
# Wait ready is false as the drone may be on the ground
drone = dronekit.connect(address, wait_ready=False, baud=baud)
drone = dronekit.connect(
address, wait_ready=False, baud=baud, source_component=0, source_system=1
)
except dronekit.TimeoutError:
print("No messages are being received. Make sure address/port is a host address/port.")
return False, None
Expand Down Expand Up @@ -351,3 +353,19 @@ def insert_waypoint(
commands.insert(index, new_waypoint)

return self.upload_commands(commands)

def send_statustext_msg(
self,
message: str,
severity: int = mavutil.mavlink.MAV_SEVERITY_INFO,
) -> bool:
"""
Sends a STATUSTEXT message to the vehicle.
"""
message_bytes = message.encode("utf-8")
if len(message_bytes) > 50:
print("Message too long, cannot send STATUSTEXT message")
return False
msg = self.drone.message_factory.statustext_encode(severity, message_bytes)
self.drone.send_mavlink(msg)
return True
31 changes: 16 additions & 15 deletions tests/integration/test_flight_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


DELAY_TIME = 0.5 # seconds
MISSION_PLANNER_ADDRESS = "tcp:127.0.0.1:14550"
MISSION_PLANNER_ADDRESS = "tcp:localhost:5762"
TIMEOUT = 1.0 # seconds


Expand All @@ -27,21 +27,22 @@ def main() -> int:
for _ in range(5):
result, odometry = controller.get_odometry()
if result:
print("lat: " + str(odometry.position.latitude))
print("lon: " + str(odometry.position.longitude))
print("alt: " + str(odometry.position.altitude))
print("yaw: " + str(odometry.orientation.yaw))
print("roll: " + str(odometry.orientation.roll))
print("pitch: " + str(odometry.orientation.pitch))
print("")
controller.send_statustext_msg("lat: " + str(odometry.position.latitude))
controller.send_statustext_msg("lat: " + str(odometry.position.latitude))
controller.send_statustext_msg("lon: " + str(odometry.position.longitude))
controller.send_statustext_msg("alt: " + str(odometry.position.altitude))
controller.send_statustext_msg("yaw: " + str(odometry.orientation.yaw))
controller.send_statustext_msg("roll: " + str(odometry.orientation.roll))
controller.send_statustext_msg("pitch: " + str(odometry.orientation.pitch))
controller.send_statustext_msg("")
else:
print("Failed to get odometry")

result, home = controller.get_home_position(TIMEOUT)
if result:
print("lat: " + str(home.latitude))
print("lon: " + str(home.longitude))
print("alt: " + str(home.altitude))
controller.send_statustext_msg("lat: " + str(home.latitude))
controller.send_statustext_msg("lon: " + str(home.longitude))
controller.send_statustext_msg("alt: " + str(home.altitude))
else:
print("Failed to get home position")

Expand All @@ -52,15 +53,15 @@ def main() -> int:
if success:
print("Downloaded commands:")
for command in commands:
print(command)
print(str(command))
else:
print("Failed to download commands.")

result, next_waypoint = controller.get_next_waypoint()
if result:
print("next waypoint lat: " + str(next_waypoint.latitude))
print("next waypoint lon: " + str(next_waypoint.longitude))
print("next waypoint alt: " + str(next_waypoint.altitude))
controller.send_statustext_msg("next waypoint lat: " + str(next_waypoint.latitude))
controller.send_statustext_msg("next waypoint lon: " + str(next_waypoint.longitude))
controller.send_statustext_msg("next waypoint alt: " + str(next_waypoint.altitude))
else:
print("Failed to get next waypoint.")

Expand Down
51 changes: 51 additions & 0 deletions tests/integration/test_send_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Test script to send messages to the drone using the FlightController class.
"""

import time

from pymavlink import mavutil

from modules.mavlink.flight_controller import FlightController


DELAY_TIME = 0.5 # seconds
MISSION_PLANNER_ADDRESS = "tcp:localhost:5672"


def main() -> int:
"""
Main function.
"""
# Connect to the vehicle
success, controller = FlightController.create(MISSION_PLANNER_ADDRESS)
if not success:
print("Failed to connect")
return -1

messages = [ # 10 random messages
"System startup",
"Initializing sensors",
"GPS lock acquired",
"Motors armed",
"Taking off",
"Reaching altitude",
"Mission started",
"Waypoint reached",
"Returning home",
"Landing sequence initiated",
]

for msg in messages:
controller.send_statustext_msg(msg, mavutil.mavlink.MAV_SEVERITY_INFO)
time.sleep(DELAY_TIME)

return 0


if __name__ == "__main__":
result_main = main()
if result_main != 0:
print(f"ERROR: Status code: {result_main}")

print("Done!")

0 comments on commit c7ab98a

Please sign in to comment.