oak-video-inference.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. #Install Dependencies
  2. # !python3 -m pip install depthai
  3. import os
  4. import json
  5. import numpy as np
  6. import cv2
  7. from pathlib import Path
  8. import depthai as dai
  9. import time
  10. # Define path to the model, test data directory, and results
  11. YOLOV8N_MODEL = "/home/jaykumaran/Blogs/Poth-hole-Detection/Final Media/Yolov8-2022.1-blob/yolov8n-pothole-best_openvino_2022.1_8shave.blob" #Adjust path accordingly
  12. YOLOV8N_CONFIG = "/home/jaykumaran/Blogs/Poth-hole-Detection/Final Media/Yolov8-2022.1-blob/yolov8n-pothole-best.json" #Adjust path accordingly
  13. INPUT_VIDEO = "videoplayback.mp4"
  14. OUTPUT_VIDEO = "vid_result/960-oak-d-videoplayback_video.mp4"
  15. CAMERA_PREV_DIM = (960, 960)
  16. LABELS = ["Pot-hole"]
  17. def load_config(config_path):
  18. with open(config_path) as f:
  19. return json.load(f)
  20. def create_image_pipeline(config_path, model_path):
  21. pipeline = dai.Pipeline()
  22. model_config = load_config(config_path)
  23. nnConfig = model_config.get("nn_config", {})
  24. metadata = nnConfig.get("NN_specific_metadata", {})
  25. classes = metadata.get("classes", {})
  26. coordinates = metadata.get("coordinates", {})
  27. anchors = metadata.get("anchors", {})
  28. anchorMasks = metadata.get("anchor_masks", {})
  29. iouThreshold = metadata.get("iou_threshold", {})
  30. confidenceThreshold = metadata.get("confidence_threshold", {})
  31. detectionIN = pipeline.create(dai.node.XLinkIn)
  32. detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork)
  33. nnOut = pipeline.create(dai.node.XLinkOut)
  34. nnOut.setStreamName("nn")
  35. detectionIN.setStreamName("detection_in")
  36. detectionNetwork.setConfidenceThreshold(confidenceThreshold)
  37. detectionNetwork.setNumClasses(classes)
  38. detectionNetwork.setCoordinateSize(coordinates)
  39. detectionNetwork.setAnchors(anchors)
  40. detectionNetwork.setAnchorMasks(anchorMasks)
  41. detectionNetwork.setIouThreshold(iouThreshold)
  42. detectionNetwork.setBlobPath(model_path)
  43. detectionNetwork.setNumInferenceThreads(2)
  44. detectionNetwork.input.setBlocking(False)
  45. # Linking
  46. detectionIN.out.link(detectionNetwork.input)
  47. detectionNetwork.out.link(nnOut.input)
  48. return pipeline
  49. def annotate_frame(frame, detections, fps):
  50. color = (0, 0, 255)
  51. for detection in detections:
  52. bbox = frame_norm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
  53. cv2.putText(frame, LABELS[detection.label], (bbox[0] + 10, bbox[1] + 25), cv2.FONT_HERSHEY_TRIPLEX, 1, color)
  54. cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 60), cv2.FONT_HERSHEY_TRIPLEX, 1, color)
  55. cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
  56. # Annotate the frame with the FPS
  57. cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
  58. return frame
  59. def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray:
  60. resized = cv2.resize(arr, shape)
  61. return resized.transpose(2, 0, 1)
  62. def frame_norm(frame, bbox):
  63. norm_vals = np.full(len(bbox), frame.shape[0])
  64. norm_vals[::2] = frame.shape[1]
  65. return (np.clip(np.array(bbox), 0, 1) * norm_vals).astype(int)
  66. # Create pipeline
  67. pipeline = create_image_pipeline(YOLOV8N_CONFIG, YOLOV8N_MODEL)
  68. # Ensure output directory exists
  69. os.makedirs(os.path.dirname(OUTPUT_VIDEO), exist_ok=True)
  70. cap = cv2.VideoCapture(INPUT_VIDEO)
  71. frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
  72. frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  73. fps = cap.get(cv2.CAP_PROP_FPS)
  74. out = cv2.VideoWriter(OUTPUT_VIDEO, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))
  75. # Connect to device and start pipeline
  76. with dai.Device(pipeline) as device:
  77. # Define the queues that will be used in order to communicate with depthai
  78. detectionIN = device.getInputQueue("detection_in")
  79. detectionNN = device.getOutputQueue("nn")
  80. start_time = time.time()
  81. frame_count = 0
  82. while cap.isOpened():
  83. ret, frame = cap.read()
  84. if not ret:
  85. break
  86. frame_count += 1
  87. image_res = cv2.resize(frame, CAMERA_PREV_DIM)
  88. # Initialize depthai NNData() class which is fed with the image data resized and transposed to model input shape
  89. nn_data = dai.NNData()
  90. nn_data.setLayer("input", to_planar(frame, CAMERA_PREV_DIM))
  91. # Send the image to detectionIN queue further passed to the detection network for inference as defined in pipeline
  92. detectionIN.send(nn_data)
  93. # Fetch the neural network output
  94. inDet = detectionNN.get()
  95. detections = []
  96. if inDet is not None:
  97. detections = inDet.detections
  98. print("Detections", detections)
  99. # Calculate the FPS
  100. elapsed_time = time.time() - start_time
  101. fps = frame_count / elapsed_time if elapsed_time > 0 else 0
  102. # Annotate the frame with detections and FPS
  103. frame = annotate_frame(frame, detections, fps)
  104. out.write(frame)
  105. cap.release()
  106. out.release()
  107. print(f"[INFO] Processed video {INPUT_VIDEO} and saved to {OUTPUT_VIDEO}")