Select Git revision
Main_IDD_Pad_Print_V_2.0.vi
-
Post, Fabian authoredPost, Fabian authored
cli.py 17.27 KiB
#!/usr/bin/python3
# -*- coding:utf-8 -*-
# Copyright jk-ethz
# Released under GNU AGPL-3.0
# Contact us for other licensing options.
# Developed and tested on system version
# 4.2.1
# Inspired by
# https://github.com/frankaemika/libfranka/issues/63
# https://github.com/ib101/DVK/blob/master/Code/DVK.py
# TODO username and IP from env var.
import requests
from urllib.parse import urljoin
import hashlib
import base64
import argparse
from time import sleep
from itertools import count
import atexit
from threading import Event
from rich.progress import Progress
import time
import sys
import signal
import logging
from typing import Optional
# Configure logging
logger = logging.getLogger('frankalockunlock_logger')
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
file_handler = logging.FileHandler('/app/.logs/frankalockunlock.log')
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
shutdown_event = Event()
class FrankaLockUnlock:
"""Handles locking and unlocking of Franka Emika Panda joint brakes programmatically."""
def __init__(self, hostname: str, username: str, password: str, protocol: str = "https", lock: bool = False):
"""Initialize FrankaLockUnlock instance.
Args:
hostname (str): The Franka Desk IP address or hostname.
username (str): The Franka Desk username.
password (str): The Franka Desk password.
protocol (str, optional): The protocol to use for the hostname. Defaults to "https".
lock (bool, optional): Whether to lock the brakes. Defaults to False.
"""
requests.packages.urllib3.disable_warnings()
self._session = requests.Session()
self._session.verify = False
self._hostname = f"{protocol}://{hostname}"
self._username = username
self._password = password
self._logged_in = False
self._token = None
self._token_id = None
self._lock = lock
atexit.register(self._cleanup)
def _cleanup(self):
"""Performs cleanup operations."""
logger.info("Cleaning up...")
if self._lock:
self.run(unlock=False)
if self._token is not None or self._token_id is not None:
self._release_token()
if self._logged_in:
self._logout()
logger.info("Successfully cleaned up.")
@staticmethod
def _encode_password(username, password):
"""Encodes the password."""
bs = ",".join(
str(b)
for b in hashlib.sha256(
f"{password}#{username}@franka".encode("utf-8")
).digest()
)
return base64.encodebytes(bs.encode("utf-8")).decode("utf-8")
def _login(self):
"""Logs in to the Franka Desk."""
logger.info("Logging in...")
if self._logged_in:
logger.info("Already logged in.")
return
login = self._session.post(
urljoin(self._hostname, "/admin/api/login"),
json={
"login": self._username,
"password": self._encode_password(self._username, self._password),
},
)
assert login.status_code == 200, "Error logging in."
self._session.cookies.set("authorization", login.text)
self._logged_in = True
logger.info("Successfully logged in.")
def _logout(self):
"""Logs out from the Franka Desk."""
logger.info("Logging out...")
assert self._logged_in
logout = self._session.post(urljoin(self._hostname, "/admin/api/logout"))
assert logout.status_code == 200, "Error logging out"
self._session.cookies.clear()
self._logged_in = False
logger.info("Successfully logged out.")
def _get_active_token_id(self):
"""Gets the active token ID."""
token_query = self._session.get(urljoin(self._hostname, "/admin/api/control-token"))
assert token_query.status_code == 200, "Error getting control token status."
json = token_query.json()
return None if json["activeToken"] is None else json["activeToken"]["id"]
def _is_active_token(self):
"""Checks if the token is active."""
active_token_id = self._get_active_token_id()
return active_token_id is None or active_token_id == self._token_id
def _request_token(self, physically=False):
"""Requests a control token."""
logger.info("Requesting a control token...")
if self._token is not None:
assert self._token_id is not None
logger.info("Already having a control token.")
return
token_request = self._session.post(
urljoin(self._hostname, f'/admin/api/control-token/request{"?force" if physically else ""}'),
json={"requestedBy": self._username},
)
assert token_request.status_code == 200, "Error requesting control token."
json = token_request.json()
self._token = json["token"]
self._token_id = json["id"]
logger.info(f"Received control token is {self._token} with id {self._token_id}.")
def _release_token(self):
"""Releases the control token."""
logger.info("Releasing control token...")
token_delete = self._session.delete(
urljoin(self._hostname, "/admin/api/control-token"),
json={"token": self._token},
)
assert token_delete.status_code == 200, "Error releasing control token."
self._token = None
self._token_id = None
logger.info("Successfully released control token.")
def _activate_fci(self):
"""Activates the Franka Control Interface (FCI)."""
logger.info("Activating FCI...")
fci_request = self._session.post(
urljoin(self._hostname, f"/admin/api/control-token/fci"),
json={"token": self._token},
)
assert fci_request.status_code == 200, "Error activating FCI."
logger.info("Successfully activated FCI.")
def _home_gripper(self):
"""Homes the gripper."""
logger.info("Homing the gripper...")
action = self._session.post(
urljoin(self._hostname, f"/desk/api/gripper/homing"),
headers={"X-Control-Token": self._token},
)
assert action.status_code == 200, "Error homing gripper."
logger.info("Successfully homed the gripper.")
def _lock_unlock(self, unlock: bool, force: bool = False):
"""Locks or unlocks the robot."""
logger.info(f'{"Unlocking" if unlock else "Locking"} the robot...')
action = self._session.post(
urljoin(self._hostname, f'/desk/api/robot/{"open" if unlock else "close"}-brakes'),
files={"force": force},
headers={"X-Control-Token": self._token},
)
assert action.status_code == 200, "Error requesting brake open/close action."
logger.info(f'Successfully {"unlocked" if unlock else "locked"} the robot.')
def _reboot(self):
"""Reboots the robot."""
logger.info("Rebooting the robot. This takes a minute.")
action = self._session.post(
urljoin(self._hostname, f"/admin/api/reboot"),
headers={"X-Control-Token": self._token},
)
logger.info(action.status_code)
assert action.status_code == 200, f"Error rebooting the robot. {action.status_code}"
def _shutdown(self):
"""Shuts down the robot."""
logger.info("Shutting down the robot. This takes a minute.")
action = self._session.post(
urljoin(self._hostname, f"/admin/api/shutdown"),
headers={"X-Control-Token": self._token},
)
assert action.status_code == 200, f"Error shutting down the robot. {action.status_code}"
def _home_robot(self):
"""Homes the robot."""
logger.info("Homeing the robot. This takes some time.")
action = self._session.post(
urljoin(self._hostname, "/desk/api/execution"),
data={"id": "0_home_position_fast"},
headers={
"X-Control-Token": self._token,
"Content-Type": "application/x-www-form-urlencoded",
},
)
assert action.status_code == 200, f"Error homing down the robot. {action.status_code}"
def run(self, unlock: bool = False, force: bool = False, wait: bool = False, request: bool = False, persistent: bool = False, fci: bool = False, homeGripper: bool = False, reboot: bool = False, shutdown: bool = False, homeRobot: bool = False) -> None:
"""Runs the Franka Lock Unlock routine.
Args:
unlock (bool, optional): Whether to unlock the brakes. Defaults to False.
force (bool, optional): Whether to force the action. Defaults to False.
wait (bool, optional): Whether to wait for the robot web UI to be available. Defaults to False.
request (bool, optional): Whether to request control. Defaults to False.
persistent (bool, optional): Whether to keep the connection open persistently. Defaults to False.
fci (bool, optional): Whether to activate the FCI. Defaults to False.
homeGripper (bool, optional): Whether to home the gripper. Defaults to False.
reboot (bool, optional): Whether to reboot the robot. Defaults to False.
shutdown (bool, optional): Whether to shut down the robot. Defaults to False.
homeRobot (bool, optional): Whether to home the robot. Defaults to False.
"""
# Check conditions and raise assertions if they are not met, providing an error message if they fail.
assert not request or wait, "Requesting control without waiting for obtaining control is not supported."
assert not fci or unlock, "Activating FCI without unlocking is not possible."
assert not fci or persistent, "Activating FCI without persistence is not possible."
assert not homeGripper or unlock, "Homing the gripper without unlocking is not possible."
# Attempt to log in to the Franka Desk.
self._login()
try:
# Ensure that either a token is already present, no token is active, or waiting is enabled.
assert self._token is not None or self._get_active_token_id() is None or wait, "Error requesting control, the robot is currently in use."
# Loop indefinitely until successful acquisition of control.
while True:
# Request a control token, optionally requiring physical confirmation.
self._request_token(physically=request)
try:
# Iterate through a fixed range of attempts or an infinite loop, depending on whether physical confirmation is requested.
for _ in range(20) if request else count():
# Check if control has been successfully acquired or wait for it to become active.
if (not wait and not request) or self._is_active_token():
logger.info("Successfully acquired control over the robot.")
# Perform actions based on the provided parameters.
self._lock_unlock(unlock=unlock)
if homeGripper:
self._home_gripper()
if fci:
self._activate_fci()
if homeRobot:
self._home_robot()
if reboot:
self._reboot()
if shutdown:
self._shutdown()
return # Exit the function once actions are completed.
# Provide instructions based on the request type.
if request:
logger.info("Please press the button with the (blue) circle on the robot to confirm physical access.")
elif wait:
logger.info("Please confirm the request message in the web interface on the logged in user.")
# Pause execution for 1 second.
sleep(1)
# Release the control token if it was not acquired successfully within the allowed attempts.
self._release_token()
finally:
# Release the control token if persistence is not enabled.
if not persistent:
self._release_token()
finally:
# Log out from the Franka Desk if persistence is not enabled.
if not persistent:
self._logout()
def handle_sigterm(signum: int, frame) -> None:
"""Handles SIGTERM signal."""
# When SIGTERM signal is received, set the shutdown event to indicate that shutdown is requested.
logger.info("SIGTERM received, signaling shutdown...")
shutdown_event.set()
def cleanup(franka_lock_unlock: Optional[FrankaLockUnlock]) -> None:
"""Performs cleanup on SIGTERM."""
logger.info("Handling SIGTERM signal...")
if franka_lock_unlock:
# If the FrankaLockUnlock instance exists, run the cleanup process:
# 1. Request unlock with waiting, request control with persistence, and home the robot.
franka_lock_unlock.run(
unlock=True,
wait=True,
request=True,
persistent=True,
fci=False,
homeGripper=False,
reboot=False,
shutdown=False,
homeRobot=True,
)
# Wait for 10 seconds to ensure the cleanup process completes.
time.sleep(10)
# Execute the internal cleanup method of FrankaLockUnlock instance.
franka_lock_unlock._cleanup()
# Log that the shutdown process is completed.
logger.info("Shutdown completed")
# Exit the program.
sys.exit(0)
def main():
"""Main function."""
# Set up a signal handler for SIGTERM
signal.signal(signal.SIGTERM, handle_sigterm)
# Parse command-line arguments
parser = argparse.ArgumentParser(
prog="FrankaLockUnlock",
description="Lock or unlock the Franka Emika Panda joint brakes programmatically.",
epilog="(c) jk-ethz, https://git-ce.rwth-aachen.de/llt_dpp/all/franka_wwl_demonstrator, forked from https://github.com/jk-ethz",
)
parser.add_argument("password", help="The Franka Desk password.")
parser.add_argument("--hostname", default="192.168.1.101", help='The Franka Desk IP address or hostname, for example "1.2.3.4".')
parser.add_argument("--username", default="Admin", help='The Franka Desk username, usually "admin".')
parser.add_argument("--unlock", action="store_true", help="Unlock the brakes. Otherwise, lock them.")
parser.add_argument("--lock", action="store_true", help="Lock the brakes on exit.")
parser.add_argument("--wait", action="store_true", help="Wait in case the robot web UI is currently in use.")
parser.add_argument("--request", action="store_true", help="Request control by confirming physical access to the robot in case the robot web UI is currently in use.")
parser.add_argument("--persistent", action="store_true", help="Keep the connection to the robot open persistently.")
parser.add_argument("--fci", action="store_true", help="Activate the FCI.")
parser.add_argument("--homeGripper", action="store_true", help="Home the gripper.")
parser.add_argument("--reboot", action="store_true", help="Reboot the robot.")
parser.add_argument("--shutdown", action="store_true", help="Shutdown the robot.")
parser.add_argument("--homeRobot", action="store_true", help="Home the robot.")
args, _ = parser.parse_known_args()
# Assert conditions on command-line arguments
assert not args.lock or args.unlock, "Locking without prior unlocking is not possible."
assert not args.lock or args.persistent, "Locking without persistence would cause an immediate unlock-lock cycle."
franka_lock_unlock = None
try:
# Initialize FrankaLockUnlock object
franka_lock_unlock = FrankaLockUnlock(
hostname=args.hostname,
username=args.username,
password=args.password,
lock=args.lock,
)
# Run the FrankaLockUnlock routine with specified arguments
franka_lock_unlock.run(
unlock=args.unlock,
wait=args.wait,
request=args.request,
persistent=args.persistent,
fci=args.fci,
homeGripper=args.homeGripper,
reboot=args.reboot,
shutdown=args.shutdown,
homeRobot=args.homeRobot,
)
# If persistent connection is requested, keep the connection open until SIGTERM is received
if args.persistent:
logger.info("Keeping persistent connection...")
while not shutdown_event.is_set():
time.sleep(0.5)
finally:
# Clean up resources on exit
if franka_lock_unlock:
time.sleep(25) # Wait for ROS to finish
cleanup(franka_lock_unlock)
logger.info("Done")
if __name__ == "__main__":
main()