Skip to content
Snippets Groups Projects
Commit 9dfdb0bf authored by Jiandong Chen's avatar Jiandong Chen
Browse files

refactor infrared

parent 9686f757
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,8 @@ STEREO_PROCESS = {
"right_images": ("stereo_right_link", "/stereo_cam/right_img/compressed"),
}
INFRARED_PROCESS = {
"images": ("infrared_link", "/infrared/image/compressed"),
"images": ("infrared_link", "/infrared/image"),
"visible": ("infrared_link", "/infrared_visible/image/compressed"),
}
OMNI_PROCESS = {
"cam_0": ("omni0_link", "/omni_cam/cam_0/compressed"),
......@@ -83,8 +84,7 @@ def process_images(
rosbag_path: Path,
topic: str,
frame_id: str,
dst_fmt=None,
is_infrared=False,
compress_fmt=None,
):
# read the images path and sort based on file name
images = util.get_sorted_file_list(data_path)
......@@ -103,7 +103,6 @@ def process_images(
# store all images into bag
bridge = cv_bridge.CvBridge()
postprocess_func = np.vectorize(postprocess_infrared)
bagmode = check_bag_mode(rosbag_path)
with rosbag.Bag(rosbag_path, bagmode) as bag:
......@@ -120,16 +119,13 @@ def process_images(
)
break
if is_infrared:
img = cv2.imread(img_path.as_posix(), -1)
img = postprocess_func(img)
img = img.astype(np.uint8)
img = cv2.imread(img_path.as_posix(), cv2.IMREAD_UNCHANGED)
if compress_fmt is not None:
msg = bridge.cv2_to_compressed_imgmsg(img, dst_format=compress_fmt)
else:
img = cv2.imread(img_path.as_posix())
msg = bridge.cv2_to_imgmsg(img)
msg = bridge.cv2_to_compressed_imgmsg(
img, dst_format=dst_fmt if dst_fmt is not None else "jpg"
)
msg.header.seq = t_seq
msg.header.stamp = t
msg.header.frame_id = frame_id
......@@ -197,23 +193,23 @@ def process_navigation(data_folder: Path, rosbag_path: Path):
def process_stereo(data_folder: Path, rosbag_path: Path):
data_path = data_folder.joinpath("stereo")
for k, v in STEREO_PROCESS.items():
process_images(data_path / k, rosbag_path, v[1], v[0], "png")
process_images(data_path / k, rosbag_path, v[1], v[0], compress_fmt="png")
print("Stereo Done!")
def process_infrared(data_folder: Path, rosbag_path: Path):
data_path = data_folder.joinpath("infrared")
for k, v in INFRARED_PROCESS.items():
process_images(data_path / k, rosbag_path, v[1], v[0], "png", is_infrared=True)
# raw data
generate_visible_infrared(data_path.joinpath("images"))
for k, v in INFRARED_PROCESS.items():
process_images(
data_path / "images",
data_path / k,
rosbag_path,
"/infrared/raw/image/compressed",
INFRARED_PROCESS["images"][0],
"png",
v[1],
v[0],
compress_fmt="png" if k == "visible" else None,
)
print("Infrared Done!")
......@@ -227,7 +223,7 @@ def process_omni(data_folder: Path, rosbag_path: Path):
return
for k, v in OMNI_PROCESS.items():
process_images(data_path / k, rosbag_path, v[1], v[0])
process_images(data_path / k, rosbag_path, v[1], v[0], compress_fmt="jpg")
print("Omni Done!")
......@@ -249,7 +245,26 @@ def check_bag_mode(rosbag_path: Path):
return bagmode
def postprocess_infrared(data):
def generate_visible_infrared(image_path: Path):
to_temperture = np.vectorize(infrared_temperture2pixel)
target_path = image_path.parent.joinpath("visible")
print(f"Save Visible Infrared Into {target_path}")
if target_path.exists():
if next(target_path.iterdir(), None) is not None:
print(f"Target Dir is NOT empty, Visible Image Generate Skip!")
else:
target_path.mkdir()
for img_file in image_path.iterdir():
img = cv2.imread(img_file.as_posix(), cv2.IMREAD_UNCHANGED)
img = to_temperture(img)
img = img.astype(np.uint8)
cv2.imwrite(target_path.joinpath(img_file.name).as_posix(), img)
def infrared_temperture2pixel(data):
# computer temperture
temp = data * 0.04 - 273.15
if temp <= MIN_INFRARED_TEMPERTURE:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment