import cv2 from cv2 import aruco import numpy as np import time import sys from gpiozero import LED import os sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from default_config import * visualization = True def is_in_center(x_px, y_px, frame_w, frame_h, margin_ratio=0.25): margin_x = frame_w * margin_ratio margin_y = frame_h * margin_ratio return (margin_x < x_px < frame_w - margin_x) and (margin_y < y_px < frame_h - margin_y) def get_euler_angles_from_rvec(rvec): rot_matrix, _ = cv2.Rodrigues(rvec) sy = np.sqrt(rot_matrix[0, 0]**2 + rot_matrix[1, 0]**2) singular = sy < 1e-6 if not singular: roll = np.arctan2(rot_matrix[2, 1], rot_matrix[2, 2]) pitch = np.arctan2(-rot_matrix[2, 0], sy) yaw = np.arctan2(rot_matrix[1, 0], rot_matrix[0, 0]) else: roll = np.arctan2(-rot_matrix[1, 2], rot_matrix[1, 1]) pitch = np.arctan2(-rot_matrix[2, 0], sy) yaw = 0 return np.degrees(roll), np.degrees(pitch), np.degrees(yaw) def is_roll_valid(roll): return (abs(roll) < ALLOWED_ANGLE_DEG) or (160 <= abs(roll) <= 180) calib_data_path = f"camera_calibration/calib_data/{set_resolution}/MultiMatrix.npz" calib_data = np.load(calib_data_path) print('resolution:', set_resolution) cam_mat = calib_data["camMatrix"] dist_coef = calib_data["distCoef"] marker_dict = aruco.getPredefinedDictionary(aruco.DICT_7X7_50) param_markers = aruco.DetectorParameters() cap = cv2.VideoCapture(0) # Thay đổi độ phân giải phù hợp với yêu cầu ban đầu new_height = RESOLUTION_STANDARDS[set_resolution] new_width, new_height = change_resolution(cap,new_height) print(f'Resolution camera {new_width}x{new_height}') fps = cap.get(cv2.CAP_PROP_FPS) print(f"Frames per second: {fps}") def start_spraying(marker_id): print(f"🟡 SPRAYING marker {marker_id}...") def stop_spraying(marker_id): print(f"✅ SPRAYED marker {marker_id}") spray_output = LED(17) def update_spray_output(should_spray): if should_spray: spray_output.on() else: spray_output.off() # --- Track state --- marker_states = {} # marker_id -> {"state": "NOT_SPRAYED"/"SPRAYING"/"SPRAYED", "start_time": float} while True: ret, frame = cap.read() if not ret: break gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) detector = cv2.aruco.ArucoDetector(marker_dict, param_markers) marker_corners, marker_IDs, _ = detector.detectMarkers(gray_frame) current_time = time.time() need_to_spray = False if marker_corners: rVec, tVec, _ = aruco.estimatePoseSingleMarkers(marker_corners, MARKER_SIZE, cam_mat, dist_coef) for i, (ids, corners) in enumerate(zip(marker_IDs, marker_corners)): marker_id = int(ids[0]) corners = corners.reshape(4, 2).astype(int) cx = int(np.mean(corners[:, 0])) cy = int(np.mean(corners[:, 1])) x, y, z = tVec[i][0] roll, pitch, yaw = get_euler_angles_from_rvec(rVec[i]) # Kiểm tra đủ điều kiện in_center = is_in_center(cx, cy, new_width, new_height) within_angle = is_roll_valid(roll) within_xyz = ( ALLOWED_XYZ_RANGE["x"][0] <= x <= ALLOWED_XYZ_RANGE["x"][1] and ALLOWED_XYZ_RANGE["y"][0] <= y <= ALLOWED_XYZ_RANGE["y"][1] and ALLOWED_XYZ_RANGE["z"][0] <= z <= ALLOWED_XYZ_RANGE["z"][1] ) if marker_id not in marker_states: marker_states[marker_id] = {"state": "NOT_SPRAYED", "start_time": 0} state_info = marker_states[marker_id] if in_center and within_angle and within_xyz: if state_info["state"] == "NOT_SPRAYED": start_spraying(marker_id) marker_states[marker_id] = { "state": "SPRAYING", "start_time": current_time } elif state_info["state"] == "SPRAYING": elapsed = current_time - state_info["start_time"] if elapsed >= SPRAY_DURATION: stop_spraying(marker_id) marker_states[marker_id]["state"] = "SPRAYED" # Shared decision: still spraying? if marker_states[marker_id]["state"] in ("NOT_SPRAYED", "SPRAYING"): need_to_spray = True # --- Visualization --- status = marker_states[marker_id]["state"] color = {"NOT_SPRAYED": (0, 0, 255), "SPRAYING": (0, 255, 255), "SPRAYED": (0, 255, 0)}[status] if visualization: cv2.polylines(frame, [corners], True, color, 2) cv2.drawFrameAxes(frame, cam_mat, dist_coef, rVec[i], tVec[i], 4, 4) cv2.putText(frame, f"ID: {marker_id}", (cx + 10, cy), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) cv2.putText(frame, f"{status}", (cx + 10, cy + 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) cv2.putText(frame, f"x:{x:.1f} y:{y:.1f} z:{z:.1f}", (cx + 10, cy + 45), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 1) cv2.putText(frame, f"roll:{roll:.1f} pitch:{pitch:.1f} yaw:{yaw:.1f}", (cx + 10, cy + 65), cv2.FONT_HERSHEY_PLAIN, 1, (0, 255, 255), 1) update_spray_output(need_to_spray) if visualization: cv2.imshow("frame", frame) key = cv2.waitKey(1) if key == ord("q"): break cap.release() cv2.destroyAllWindows()