#!/usr/bin/python3 # -*- coding:utf-8 -*- # Copyright jk-ethz # Released under GNU AGPL-3.0 # Contact us for other licensing options. # Developed and tested on system version # 4.2.1 # Inspired by # https://github.com/frankaemika/libfranka/issues/63 # https://github.com/ib101/DVK/blob/master/Code/DVK.py # TODO username and IP from env var. import requests from urllib.parse import urljoin import hashlib import base64 import argparse from time import sleep from itertools import count import atexit from threading import Event from rich.progress import Progress import time import sys import signal import logging from typing import Optional # Configure logging logger = logging.getLogger('frankalockunlock_logger') logger.setLevel(logging.INFO) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) file_handler = logging.FileHandler('/app/.logs/frankalockunlock.log') file_handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) file_handler.setFormatter(formatter) logger.addHandler(console_handler) logger.addHandler(file_handler) shutdown_event = Event() class FrankaLockUnlock: """Handles locking and unlocking of Franka Emika Panda joint brakes programmatically.""" def __init__(self, hostname: str, username: str, password: str, protocol: str = "https", lock: bool = False): """Initialize FrankaLockUnlock instance. Args: hostname (str): The Franka Desk IP address or hostname. username (str): The Franka Desk username. password (str): The Franka Desk password. protocol (str, optional): The protocol to use for the hostname. Defaults to "https". lock (bool, optional): Whether to lock the brakes. Defaults to False. """ requests.packages.urllib3.disable_warnings() self._session = requests.Session() self._session.verify = False self._hostname = f"{protocol}://{hostname}" self._username = username self._password = password self._logged_in = False self._token = None self._token_id = None self._lock = lock atexit.register(self._cleanup) def _cleanup(self): """Performs cleanup operations.""" logger.info("Cleaning up...") if self._lock: self.run(unlock=False) if self._token is not None or self._token_id is not None: self._release_token() if self._logged_in: self._logout() logger.info("Successfully cleaned up.") @staticmethod def _encode_password(username, password): """Encodes the password.""" bs = ",".join( str(b) for b in hashlib.sha256( f"{password}#{username}@franka".encode("utf-8") ).digest() ) return base64.encodebytes(bs.encode("utf-8")).decode("utf-8") def _login(self): """Logs in to the Franka Desk.""" logger.info("Logging in...") if self._logged_in: logger.info("Already logged in.") return login = self._session.post( urljoin(self._hostname, "/admin/api/login"), json={ "login": self._username, "password": self._encode_password(self._username, self._password), }, ) assert login.status_code == 200, "Error logging in." self._session.cookies.set("authorization", login.text) self._logged_in = True logger.info("Successfully logged in.") def _logout(self): """Logs out from the Franka Desk.""" logger.info("Logging out...") assert self._logged_in logout = self._session.post(urljoin(self._hostname, "/admin/api/logout")) assert logout.status_code == 200, "Error logging out" self._session.cookies.clear() self._logged_in = False logger.info("Successfully logged out.") def _get_active_token_id(self): """Gets the active token ID.""" token_query = self._session.get(urljoin(self._hostname, "/admin/api/control-token")) assert token_query.status_code == 200, "Error getting control token status." json = token_query.json() return None if json["activeToken"] is None else json["activeToken"]["id"] def _is_active_token(self): """Checks if the token is active.""" active_token_id = self._get_active_token_id() return active_token_id is None or active_token_id == self._token_id def _request_token(self, physically=False): """Requests a control token.""" logger.info("Requesting a control token...") if self._token is not None: assert self._token_id is not None logger.info("Already having a control token.") return token_request = self._session.post( urljoin(self._hostname, f'/admin/api/control-token/request{"?force" if physically else ""}'), json={"requestedBy": self._username}, ) assert token_request.status_code == 200, "Error requesting control token." json = token_request.json() self._token = json["token"] self._token_id = json["id"] logger.info(f"Received control token is {self._token} with id {self._token_id}.") def _release_token(self): """Releases the control token.""" logger.info("Releasing control token...") token_delete = self._session.delete( urljoin(self._hostname, "/admin/api/control-token"), json={"token": self._token}, ) assert token_delete.status_code == 200, "Error releasing control token." self._token = None self._token_id = None logger.info("Successfully released control token.") def _activate_fci(self): """Activates the Franka Control Interface (FCI).""" logger.info("Activating FCI...") fci_request = self._session.post( urljoin(self._hostname, f"/admin/api/control-token/fci"), json={"token": self._token}, ) assert fci_request.status_code == 200, "Error activating FCI." logger.info("Successfully activated FCI.") def _home_gripper(self): """Homes the gripper.""" logger.info("Homing the gripper...") action = self._session.post( urljoin(self._hostname, f"/desk/api/gripper/homing"), headers={"X-Control-Token": self._token}, ) assert action.status_code == 200, "Error homing gripper." logger.info("Successfully homed the gripper.") def _lock_unlock(self, unlock: bool, force: bool = False): """Locks or unlocks the robot.""" logger.info(f'{"Unlocking" if unlock else "Locking"} the robot...') action = self._session.post( urljoin(self._hostname, f'/desk/api/robot/{"open" if unlock else "close"}-brakes'), files={"force": force}, headers={"X-Control-Token": self._token}, ) assert action.status_code == 200, "Error requesting brake open/close action." logger.info(f'Successfully {"unlocked" if unlock else "locked"} the robot.') def _reboot(self): """Reboots the robot.""" logger.info("Rebooting the robot. This takes a minute.") action = self._session.post( urljoin(self._hostname, f"/admin/api/reboot"), headers={"X-Control-Token": self._token}, ) logger.info(action.status_code) assert action.status_code == 200, f"Error rebooting the robot. {action.status_code}" def _shutdown(self): """Shuts down the robot.""" logger.info("Shutting down the robot. This takes a minute.") action = self._session.post( urljoin(self._hostname, f"/admin/api/shutdown"), headers={"X-Control-Token": self._token}, ) assert action.status_code == 200, f"Error shutting down the robot. {action.status_code}" def _home_robot(self): """Homes the robot.""" logger.info("Homeing the robot. This takes some time.") action = self._session.post( urljoin(self._hostname, "/desk/api/execution"), data={"id": "0_home_position_fast"}, headers={ "X-Control-Token": self._token, "Content-Type": "application/x-www-form-urlencoded", }, ) assert action.status_code == 200, f"Error homing down the robot. {action.status_code}" def run(self, unlock: bool = False, force: bool = False, wait: bool = False, request: bool = False, persistent: bool = False, fci: bool = False, homeGripper: bool = False, reboot: bool = False, shutdown: bool = False, homeRobot: bool = False) -> None: """Runs the Franka Lock Unlock routine. Args: unlock (bool, optional): Whether to unlock the brakes. Defaults to False. force (bool, optional): Whether to force the action. Defaults to False. wait (bool, optional): Whether to wait for the robot web UI to be available. Defaults to False. request (bool, optional): Whether to request control. Defaults to False. persistent (bool, optional): Whether to keep the connection open persistently. Defaults to False. fci (bool, optional): Whether to activate the FCI. Defaults to False. homeGripper (bool, optional): Whether to home the gripper. Defaults to False. reboot (bool, optional): Whether to reboot the robot. Defaults to False. shutdown (bool, optional): Whether to shut down the robot. Defaults to False. homeRobot (bool, optional): Whether to home the robot. Defaults to False. """ # Check conditions and raise assertions if they are not met, providing an error message if they fail. assert not request or wait, "Requesting control without waiting for obtaining control is not supported." assert not fci or unlock, "Activating FCI without unlocking is not possible." assert not fci or persistent, "Activating FCI without persistence is not possible." assert not homeGripper or unlock, "Homing the gripper without unlocking is not possible." # Attempt to log in to the Franka Desk. self._login() try: # Ensure that either a token is already present, no token is active, or waiting is enabled. assert self._token is not None or self._get_active_token_id() is None or wait, "Error requesting control, the robot is currently in use." # Loop indefinitely until successful acquisition of control. while True: # Request a control token, optionally requiring physical confirmation. self._request_token(physically=request) try: # Iterate through a fixed range of attempts or an infinite loop, depending on whether physical confirmation is requested. for _ in range(20) if request else count(): # Check if control has been successfully acquired or wait for it to become active. if (not wait and not request) or self._is_active_token(): logger.info("Successfully acquired control over the robot.") # Perform actions based on the provided parameters. self._lock_unlock(unlock=unlock) if homeGripper: self._home_gripper() if fci: self._activate_fci() if homeRobot: self._home_robot() if reboot: self._reboot() if shutdown: self._shutdown() return # Exit the function once actions are completed. # Provide instructions based on the request type. if request: logger.info("Please press the button with the (blue) circle on the robot to confirm physical access.") elif wait: logger.info("Please confirm the request message in the web interface on the logged in user.") # Pause execution for 1 second. sleep(1) # Release the control token if it was not acquired successfully within the allowed attempts. self._release_token() finally: # Release the control token if persistence is not enabled. if not persistent: self._release_token() finally: # Log out from the Franka Desk if persistence is not enabled. if not persistent: self._logout() def handle_sigterm(signum: int, frame) -> None: """Handles SIGTERM signal.""" # When SIGTERM signal is received, set the shutdown event to indicate that shutdown is requested. logger.info("SIGTERM received, signaling shutdown...") shutdown_event.set() def cleanup(franka_lock_unlock: Optional[FrankaLockUnlock]) -> None: """Performs cleanup on SIGTERM.""" logger.info("Handling SIGTERM signal...") if franka_lock_unlock: # If the FrankaLockUnlock instance exists, run the cleanup process: # 1. Request unlock with waiting, request control with persistence, and home the robot. franka_lock_unlock.run( unlock=True, wait=True, request=True, persistent=True, fci=False, homeGripper=False, reboot=False, shutdown=False, homeRobot=True, ) # Wait for 10 seconds to ensure the cleanup process completes. time.sleep(10) # Execute the internal cleanup method of FrankaLockUnlock instance. franka_lock_unlock._cleanup() # Log that the shutdown process is completed. logger.info("Shutdown completed") # Exit the program. sys.exit(0) def main(): """Main function.""" # Set up a signal handler for SIGTERM signal.signal(signal.SIGTERM, handle_sigterm) # Parse command-line arguments parser = argparse.ArgumentParser( prog="FrankaLockUnlock", description="Lock or unlock the Franka Emika Panda joint brakes programmatically.", epilog="(c) jk-ethz, https://git-ce.rwth-aachen.de/llt_dpp/all/franka_wwl_demonstrator, forked from https://github.com/jk-ethz", ) parser.add_argument("password", help="The Franka Desk password.") parser.add_argument("--hostname", default="192.168.1.101", help='The Franka Desk IP address or hostname, for example "1.2.3.4".') parser.add_argument("--username", default="Admin", help='The Franka Desk username, usually "admin".') parser.add_argument("--unlock", action="store_true", help="Unlock the brakes. Otherwise, lock them.") parser.add_argument("--lock", action="store_true", help="Lock the brakes on exit.") parser.add_argument("--wait", action="store_true", help="Wait in case the robot web UI is currently in use.") parser.add_argument("--request", action="store_true", help="Request control by confirming physical access to the robot in case the robot web UI is currently in use.") parser.add_argument("--persistent", action="store_true", help="Keep the connection to the robot open persistently.") parser.add_argument("--fci", action="store_true", help="Activate the FCI.") parser.add_argument("--homeGripper", action="store_true", help="Home the gripper.") parser.add_argument("--reboot", action="store_true", help="Reboot the robot.") parser.add_argument("--shutdown", action="store_true", help="Shutdown the robot.") parser.add_argument("--homeRobot", action="store_true", help="Home the robot.") args, _ = parser.parse_known_args() # Assert conditions on command-line arguments assert not args.lock or args.unlock, "Locking without prior unlocking is not possible." assert not args.lock or args.persistent, "Locking without persistence would cause an immediate unlock-lock cycle." franka_lock_unlock = None try: # Initialize FrankaLockUnlock object franka_lock_unlock = FrankaLockUnlock( hostname=args.hostname, username=args.username, password=args.password, lock=args.lock, ) # Run the FrankaLockUnlock routine with specified arguments franka_lock_unlock.run( unlock=args.unlock, wait=args.wait, request=args.request, persistent=args.persistent, fci=args.fci, homeGripper=args.homeGripper, reboot=args.reboot, shutdown=args.shutdown, homeRobot=args.homeRobot, ) # If persistent connection is requested, keep the connection open until SIGTERM is received if args.persistent: logger.info("Keeping persistent connection...") while not shutdown_event.is_set(): time.sleep(0.5) finally: # Clean up resources on exit if franka_lock_unlock: time.sleep(25) # Wait for ROS to finish cleanup(franka_lock_unlock) logger.info("Done") if __name__ == "__main__": main()