123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- import cv2
- import depthai as dai
- from collections import deque
- class FeatureTrackerDrawer:
- lineColor = (200, 0, 200)
- pointColor = (0, 0, 255)
- circleRadius = 2
- maxTrackedFeaturesPathLength = 30
- # for how many frames the feature is tracked
- trackedFeaturesPathLength = 10
- trackedIDs = None
- trackedFeaturesPath = None
- def onTrackBar(self, val):
- FeatureTrackerDrawer.trackedFeaturesPathLength = val
- pass
- def trackFeaturePath(self, features):
- newTrackedIDs = set()
- for currentFeature in features:
- currentID = currentFeature.id
- newTrackedIDs.add(currentID)
- if currentID not in self.trackedFeaturesPath:
- self.trackedFeaturesPath[currentID] = deque()
- path = self.trackedFeaturesPath[currentID]
- path.append(currentFeature.position)
- while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)):
- path.popleft()
- self.trackedFeaturesPath[currentID] = path
- featuresToRemove = set()
- for oldId in self.trackedIDs:
- if oldId not in newTrackedIDs:
- featuresToRemove.add(oldId)
- for id in featuresToRemove:
- self.trackedFeaturesPath.pop(id)
- self.trackedIDs = newTrackedIDs
- def drawFeatures(self, img):
- cv2.setTrackbarPos(self.trackbarName, self.windowName, FeatureTrackerDrawer.trackedFeaturesPathLength)
- for featurePath in self.trackedFeaturesPath.values():
- path = featurePath
- for j in range(len(path) - 1):
- src = (int(path[j].x), int(path[j].y))
- dst = (int(path[j + 1].x), int(path[j + 1].y))
- cv2.line(img, src, dst, self.lineColor, 1, cv2.LINE_AA, 0)
- j = len(path) - 1
- cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, self.pointColor, -1, cv2.LINE_AA, 0)
- def __init__(self, trackbarName, windowName):
- self.trackbarName = trackbarName
- self.windowName = windowName
- cv2.namedWindow(windowName)
- cv2.createTrackbar(trackbarName, windowName, FeatureTrackerDrawer.trackedFeaturesPathLength, FeatureTrackerDrawer.maxTrackedFeaturesPathLength, self.onTrackBar)
- self.trackedIDs = set()
- self.trackedFeaturesPath = dict()
- # Create pipeline
- pipeline = dai.Pipeline()
- # Define sources and outputs
- monoLeft = pipeline.create(dai.node.MonoCamera)
- monoRight = pipeline.create(dai.node.MonoCamera)
- featureTrackerLeft = pipeline.create(dai.node.FeatureTracker)
- featureTrackerRight = pipeline.create(dai.node.FeatureTracker)
- xoutPassthroughFrameLeft = pipeline.create(dai.node.XLinkOut)
- xoutTrackedFeaturesLeft = pipeline.create(dai.node.XLinkOut)
- xoutPassthroughFrameRight = pipeline.create(dai.node.XLinkOut)
- xoutTrackedFeaturesRight = pipeline.create(dai.node.XLinkOut)
- xinTrackedFeaturesConfig = pipeline.create(dai.node.XLinkIn)
- xoutPassthroughFrameLeft.setStreamName("passthroughFrameLeft")
- xoutTrackedFeaturesLeft.setStreamName("trackedFeaturesLeft")
- xoutPassthroughFrameRight.setStreamName("passthroughFrameRight")
- xoutTrackedFeaturesRight.setStreamName("trackedFeaturesRight")
- xinTrackedFeaturesConfig.setStreamName("trackedFeaturesConfig")
- # Properties
- monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
- monoLeft.setBoardSocket(dai.CameraBoardSocket.LEFT)
- monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
- monoRight.setBoardSocket(dai.CameraBoardSocket.RIGHT)
- # Linking
- monoLeft.out.link(featureTrackerLeft.inputImage)
- featureTrackerLeft.passthroughInputImage.link(xoutPassthroughFrameLeft.input)
- featureTrackerLeft.outputFeatures.link(xoutTrackedFeaturesLeft.input)
- xinTrackedFeaturesConfig.out.link(featureTrackerLeft.inputConfig)
- monoRight.out.link(featureTrackerRight.inputImage)
- featureTrackerRight.passthroughInputImage.link(xoutPassthroughFrameRight.input)
- featureTrackerRight.outputFeatures.link(xoutTrackedFeaturesRight.input)
- xinTrackedFeaturesConfig.out.link(featureTrackerRight.inputConfig)
- # By default the least mount of resources are allocated
- # increasing it improves performance
- numShaves = 2
- numMemorySlices = 2
- featureTrackerLeft.setHardwareResources(numShaves, numMemorySlices)
- featureTrackerRight.setHardwareResources(numShaves, numMemorySlices)
- featureTrackerConfig = featureTrackerRight.initialConfig.get()
- print("Press 's' to switch between Lucas-Kanade optical flow and hardware accelerated motion estimation!")
- # Connect to device and start pipeline
- with dai.Device(pipeline) as device:
- # Output queues used to receive the results
- passthroughImageLeftQueue = device.getOutputQueue("passthroughFrameLeft", 8, False)
- outputFeaturesLeftQueue = device.getOutputQueue("trackedFeaturesLeft", 8, False)
- passthroughImageRightQueue = device.getOutputQueue("passthroughFrameRight", 8, False)
- outputFeaturesRightQueue = device.getOutputQueue("trackedFeaturesRight", 8, False)
- inputFeatureTrackerConfigQueue = device.getInputQueue("trackedFeaturesConfig")
- leftWindowName = "left"
- leftFeatureDrawer = FeatureTrackerDrawer("Feature tracking duration (frames)", leftWindowName)
- rightWindowName = "right"
- rightFeatureDrawer = FeatureTrackerDrawer("Feature tracking duration (frames)", rightWindowName)
- while True:
- inPassthroughFrameLeft = passthroughImageLeftQueue.get()
- passthroughFrameLeft = inPassthroughFrameLeft.getFrame()
- leftFrame = cv2.cvtColor(passthroughFrameLeft, cv2.COLOR_GRAY2BGR)
- inPassthroughFrameRight = passthroughImageRightQueue.get()
- passthroughFrameRight = inPassthroughFrameRight.getFrame()
- rightFrame = cv2.cvtColor(passthroughFrameRight, cv2.COLOR_GRAY2BGR)
- trackedFeaturesLeft = outputFeaturesLeftQueue.get().trackedFeatures
- leftFeatureDrawer.trackFeaturePath(trackedFeaturesLeft)
- leftFeatureDrawer.drawFeatures(leftFrame)
- trackedFeaturesRight = outputFeaturesRightQueue.get().trackedFeatures
- rightFeatureDrawer.trackFeaturePath(trackedFeaturesRight)
- rightFeatureDrawer.drawFeatures(rightFrame)
- # Show the frame
- cv2.imshow(leftWindowName, leftFrame)
- cv2.imshow(rightWindowName, rightFrame)
- key = cv2.waitKey(1)
- if key == 27:
- break
- elif key == ord('s'):
- if featureTrackerConfig.motionEstimator.type == dai.FeatureTrackerConfig.MotionEstimator.Type.LUCAS_KANADE_OPTICAL_FLOW:
- featureTrackerConfig.motionEstimator.type = dai.FeatureTrackerConfig.MotionEstimator.Type.HW_MOTION_ESTIMATION
- print("Switching to hardware accelerated motion estimation")
- else:
- featureTrackerConfig.motionEstimator.type = dai.FeatureTrackerConfig.MotionEstimator.Type.LUCAS_KANADE_OPTICAL_FLOW
- print("Switching to Lucas-Kanade optical flow")
- cfg = dai.FeatureTrackerConfig()
- cfg.set(featureTrackerConfig)
- inputFeatureTrackerConfigQueue.send(cfg)
|