oak_d-image-inference.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import os
  2. import json
  3. import numpy as np
  4. import cv2
  5. from pathlib import Path
  6. import depthai as dai
  7. # Define path to the model, test data directory, and results
  8. YOLOV8N_MODEL = "yolov8-960-blob-result/best_openvino_2022.1_8shave.blob" #Adjust path accordingly
  9. YOLOV8N_CONFIG = "yolov8-960-blob-result/best.json" #Adjust path accordingly
  10. TEST_DATA = "img-40_jpg.rf.80b05d1760d1169f29d5fd2cf3120ae9.jpg" #Adjust path accordingly
  11. OUTPUT_IMAGES_YOLOv8n = "result/gesture_pred_images_v8n" #Adjust path accordingly
  12. CAMERA_PREV_DIM = (960, 960)
  13. LABELS = ["Pot-hole"]
  14. # LABELS =
  15. def load_config(config_path):
  16. with open(config_path) as f:
  17. return json.load(f)
  18. def create_image_pipeline(config_path, model_path):
  19. pipeline = dai.Pipeline()
  20. model_config = load_config(config_path)
  21. nnConfig = model_config.get("nn_config", {})
  22. metadata = nnConfig.get("NN_specific_metadata", {})
  23. classes = metadata.get("classes", {})
  24. coordinates = metadata.get("coordinates", {})
  25. anchors = metadata.get("anchors", {})
  26. anchorMasks = metadata.get("anchor_masks", {})
  27. iouThreshold = metadata.get("iou_threshold", {})
  28. confidenceThreshold = metadata.get("confidence_threshold", {})
  29. detectionIN = pipeline.create(dai.node.XLinkIn)
  30. detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork)
  31. nnOut = pipeline.create(dai.node.XLinkOut)
  32. nnOut.setStreamName("nn")
  33. detectionIN.setStreamName("detection_in")
  34. detectionNetwork.setConfidenceThreshold(confidenceThreshold)
  35. detectionNetwork.setNumClasses(classes)
  36. detectionNetwork.setCoordinateSize(coordinates)
  37. detectionNetwork.setAnchors(anchors)
  38. detectionNetwork.setAnchorMasks(anchorMasks)
  39. detectionNetwork.setIouThreshold(iouThreshold)
  40. detectionNetwork.setBlobPath(model_path)
  41. detectionNetwork.setNumInferenceThreads(2)
  42. detectionNetwork.input.setBlocking(False)
  43. detectionIN.out.link(detectionNetwork.input)
  44. detectionNetwork.out.link(nnOut.input)
  45. return pipeline
  46. def annotate_frame(frame, detections):
  47. color = (0, 0, 255)
  48. for detection in detections:
  49. bbox = frame_norm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
  50. cv2.putText(frame, LABELS[detection.label], (bbox[0] + 10, bbox[1] + 25), cv2.FONT_HERSHEY_TRIPLEX, 1, color)
  51. cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 60), cv2.FONT_HERSHEY_TRIPLEX, 1, color)
  52. cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
  53. return frame
  54. def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray:
  55. resized = cv2.resize(arr, shape)
  56. return resized.transpose(2, 0, 1)
  57. def frame_norm(frame, bbox):
  58. norm_vals = np.full(len(bbox), frame.shape[0])
  59. norm_vals[::2] = frame.shape[1]
  60. return (np.clip(np.array(bbox), 0, 1) * norm_vals).astype(int)
  61. # Create pipeline
  62. pipeline = create_image_pipeline(YOLOV8N_CONFIG, YOLOV8N_MODEL)
  63. # Ensure output directory exists
  64. os.makedirs(OUTPUT_IMAGES_YOLOv8n, exist_ok=True)
  65. # Connect to device and start pipeline
  66. with dai.Device(pipeline) as device:
  67. # Define the queues that will be used in order to communicate with depthai
  68. detectionIN = device.getInputQueue("detection_in")
  69. detectionNN = device.getOutputQueue("nn")
  70. # Load the input image and then resize it
  71. image = cv2.imread(TEST_DATA)
  72. if image is None:
  73. raise FileNotFoundError(f"[ERROR] Could not load image {TEST_DATA}")
  74. image_res = cv2.resize(image, CAMERA_PREV_DIM)
  75. # Initialize depthai NNData() class which is fed with the image data resized and transposed to model input shape
  76. nn_data = dai.NNData()
  77. nn_data.setLayer("input", to_planar(image_res, CAMERA_PREV_DIM))
  78. # Send the image to detectionIN queue further passed to the detection network for inference as defined in pipeline
  79. detectionIN.send(nn_data)
  80. # Fetch the neural network output
  81. inDet = detectionNN.get()
  82. if inDet is not None:
  83. detections = inDet.detections
  84. # Annotate the image if object is detected
  85. image_res = annotate_frame(image_res, detections)
  86. print("Detections",detections)
  87. # Write the image to the output path
  88. output_image_path = os.path.join(OUTPUT_IMAGES_YOLOv8n, os.path.basename(TEST_DATA))
  89. cv2.imwrite(output_image_path, image_res)
  90. print(f"[INFO] Processed {TEST_DATA} and saved to {output_image_path}")
  91. # Verify if the image was saved
  92. if os.path.exists(output_image_path):
  93. print(f"[INFO] Successfully saved the image at {output_image_path}")
  94. else:
  95. print(f"[ERROR] Failed to save the image at {output_image_path}")