oak-d-camera-inference.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # !python3 -m pip install depthai
  2. import os
  3. import json
  4. import numpy as np
  5. import cv2
  6. from pathlib import Path
  7. import depthai as dai
  8. import time
  9. # Define path to the model and configuration
  10. YOLOV8N_MODEL = "Yolov8-2022.1-blob/yolov8n-pothole-best_openvino_2022.1_8shave.blob" #Adjust path accordingly
  11. YOLOV8N_CONFIG = "Yolov8-2022.1-blob/yolov8n-pothole-best.json" #Adjust path accordingly
  12. OUTPUT_VIDEO = "vid_result/960-oak-d-live_video.mp4" #Adjust path accordingly
  13. CAMERA_PREV_DIM = (960, 960)
  14. LABELS = ["Pot-hole"]
  15. def load_config(config_path):
  16. with open(config_path) as f:
  17. return json.load(f)
  18. def create_camera_pipeline(config_path, model_path):
  19. pipeline = dai.Pipeline()
  20. model_config = load_config(config_path)
  21. nnConfig = model_config.get("nn_config", {})
  22. metadata = nnConfig.get("NN_specific_metadata", {})
  23. classes = metadata.get("classes", {})
  24. coordinates = metadata.get("coordinates", {})
  25. anchors = metadata.get("anchors", {})
  26. anchorMasks = metadata.get("anchor_masks", {})
  27. iouThreshold = metadata.get("iou_threshold", {})
  28. confidenceThreshold = metadata.get("confidence_threshold", {})
  29. # Create camera node
  30. camRgb = pipeline.create(dai.node.ColorCamera)
  31. camRgb.setPreviewSize(CAMERA_PREV_DIM[0], CAMERA_PREV_DIM[1])
  32. camRgb.setInterleaved(False)
  33. camRgb.setBoardSocket(dai.CameraBoardSocket.RGB)
  34. camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
  35. detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork)
  36. nnOut = pipeline.create(dai.node.XLinkOut)
  37. nnOut.setStreamName("nn")
  38. detectionNetwork.setConfidenceThreshold(confidenceThreshold)
  39. detectionNetwork.setNumClasses(classes)
  40. detectionNetwork.setCoordinateSize(coordinates)
  41. detectionNetwork.setAnchors(anchors)
  42. detectionNetwork.setAnchorMasks(anchorMasks)
  43. detectionNetwork.setIouThreshold(iouThreshold)
  44. detectionNetwork.setBlobPath(model_path)
  45. detectionNetwork.setNumInferenceThreads(2)
  46. detectionNetwork.input.setBlocking(False)
  47. # Linking
  48. camRgb.preview.link(detectionNetwork.input)
  49. detectionNetwork.out.link(nnOut.input)
  50. return pipeline
  51. def annotate_frame(frame, detections, fps):
  52. color = (0, 0, 255)
  53. for detection in detections:
  54. bbox = frame_norm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
  55. cv2.putText(frame, LABELS[detection.label], (bbox[0] + 10, bbox[1] + 25), cv2.FONT_HERSHEY_TRIPLEX, 1, color)
  56. cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 60), cv2.FONT_HERSHEY_TRIPLEX, 1, color)
  57. cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
  58. # Annotate the frame with the FPS
  59. cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
  60. return frame
  61. def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray:
  62. resized = cv2.resize(arr, shape)
  63. return resized.transpose(2, 0, 1)
  64. def frame_norm(frame, bbox):
  65. norm_vals = np.full(len(bbox), frame.shape[0])
  66. norm_vals[::2] = frame.shape[1]
  67. return (np.clip(np.array(bbox), 0, 1) * norm_vals).astype(int)
  68. # Create pipeline
  69. pipeline = create_camera_pipeline(YOLOV8N_CONFIG, YOLOV8N_MODEL)
  70. # Ensure output directory exists
  71. os.makedirs(os.path.dirname(OUTPUT_VIDEO), exist_ok=True)
  72. # Connect to device and start pipeline
  73. with dai.Device(pipeline) as device:
  74. # Define the queue that will be used to receive the neural network output
  75. detectionNN = device.getOutputQueue("nn", maxSize=4, blocking=False)
  76. # Video writer to save the output video
  77. fps = 30 # Assuming 30 FPS for the OAK-D camera
  78. frame_width, frame_height = CAMERA_PREV_DIM
  79. out = cv2.VideoWriter(OUTPUT_VIDEO, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))
  80. start_time = time.time()
  81. frame_count = 0
  82. while True:
  83. inDet = detectionNN.get()
  84. detections = []
  85. if inDet is not None:
  86. detections = inDet.detections
  87. print("Detections", detections)
  88. # Retrieve the frame from the camera preview
  89. frame = inDet.getFrame()
  90. frame_count += 1
  91. # Calculate the FPS
  92. elapsed_time = time.time() - start_time
  93. fps = frame_count / elapsed_time if elapsed_time > 0 else 0
  94. # Annotate the frame with detections and FPS
  95. frame = annotate_frame(frame, detections, fps)
  96. # Display the frame
  97. cv2.imshow("Frame", frame)
  98. # Write the frame to the output video
  99. out.write(frame)
  100. # Break the loop if 'q' is pressed
  101. if cv2.waitKey(1) & 0xFF == ord('q'):
  102. break
  103. out.release()
  104. cv2.destroyAllWindows()
  105. print(f"[INFO] Processed live stream and saved to {OUTPUT_VIDEO}")