facenet_face_authentication_anti_spoof.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. # Anti spoofing Face authentication
  2. # Import required modules
  3. import cv2
  4. import numpy as np
  5. import depthai as dai
  6. import os
  7. import time
  8. from face_auth import enroll_face, delist_face, authenticate_emb
  9. import blobconverter
  10. # Define Depth Classification model input size
  11. DEPTH_NN_INPUT_SIZE = (64, 64)
  12. # Define Face Detection model name and input size
  13. # If you define the blob make sure the DET_MODEL_NAME and DET_ZOO_TYPE are None
  14. DET_INPUT_SIZE = (300, 300)
  15. DET_MODEL_NAME = "face-detection-retail-0004"
  16. DET_ZOO_TYPE = "depthai"
  17. det_blob_path = None
  18. # Define Face Recognition model name and input size
  19. # If you define the blob make sure the REC_MODEL_NAME and REC_ZOO_TYPE are None
  20. REC_MODEL_NAME = "Sphereface"
  21. REC_ZOO_TYPE = "intel"
  22. rec_blob_path = None
  23. # Create DepthAi pipeline
  24. def create_depthai_pipeline():
  25. # Start defining a pipeline
  26. pipeline = dai.Pipeline()
  27. # Define a source - two mono (grayscale) cameras
  28. left = pipeline.createMonoCamera()
  29. left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
  30. left.setBoardSocket(dai.CameraBoardSocket.LEFT)
  31. right = pipeline.createMonoCamera()
  32. right.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
  33. right.setBoardSocket(dai.CameraBoardSocket.RIGHT)
  34. # Create a node that will produce the depth map
  35. depth = pipeline.createStereoDepth()
  36. depth.setConfidenceThreshold(200)
  37. depth.setOutputRectified(True) # The rectified streams are horizontally mirrored by default
  38. depth.setRectifyEdgeFillColor(0) # Black, to better see the cutout
  39. depth.setExtendedDisparity(True) # For better close range depth perception
  40. median = dai.StereoDepthProperties.MedianFilter.KERNEL_7x7 # For depth filtering
  41. depth.setMedianFilter(median)
  42. # Linking mono cameras with depth node
  43. left.out.link(depth.left)
  44. right.out.link(depth.right)
  45. # Create left output
  46. xOutRight = pipeline.createXLinkOut()
  47. xOutRight.setStreamName("right")
  48. depth.rectifiedRight.link(xOutRight.input)
  49. # Create depth output
  50. xOutDisp = pipeline.createXLinkOut()
  51. xOutDisp.setStreamName("disparity")
  52. depth.disparity.link(xOutDisp.input)
  53. # Create input and output node for Depth Classification
  54. xDepthIn = pipeline.createXLinkIn()
  55. xDepthIn.setStreamName("depth_in")
  56. xOutDepthNn = pipeline.createXLinkOut()
  57. xOutDepthNn.setStreamName("depth_nn")
  58. # Define Depth Classification NN node
  59. depthNn = pipeline.createNeuralNetwork()
  60. depthNn.setBlobPath("data/depth-classification-models/depth_classification_ipscaled_model.blob")
  61. depthNn.input.setBlocking(False)
  62. # Linking
  63. xDepthIn.out.link(depthNn.input)
  64. depthNn.out.link(xOutDepthNn.input)
  65. # Convert detection model from OMZ to blob
  66. if DET_MODEL_NAME is not None:
  67. facedet_blob_path = blobconverter.from_zoo(
  68. name=DET_MODEL_NAME,
  69. shaves=6,
  70. zoo_type=DET_ZOO_TYPE
  71. )
  72. # Create Face Detection NN node
  73. faceDetNn = pipeline.createMobileNetDetectionNetwork()
  74. faceDetNn.setConfidenceThreshold(0.75)
  75. faceDetNn.setBlobPath(facedet_blob_path)
  76. # Create ImageManip to convert grayscale mono camera frame to RGB
  77. copyManip = pipeline.createImageManip()
  78. depth.rectifiedRight.link(copyManip.inputImage)
  79. # copyManip.initialConfig.setHorizontalFlip(True)
  80. copyManip.initialConfig.setFrameType(dai.RawImgFrame.Type.RGB888p)
  81. # Create ImageManip to preprocess input frame for detection NN
  82. detManip = pipeline.createImageManip()
  83. # detManip.initialConfig.setHorizontalFlip(True)
  84. detManip.initialConfig.setResize(DET_INPUT_SIZE[0], DET_INPUT_SIZE[1])
  85. detManip.initialConfig.setKeepAspectRatio(False)
  86. # Linking detection ImageManip to detection NN
  87. copyManip.out.link(detManip.inputImage)
  88. detManip.out.link(faceDetNn.input)
  89. # Create output steam for detection output
  90. xOutDet = pipeline.createXLinkOut()
  91. xOutDet.setStreamName('det_out')
  92. faceDetNn.out.link(xOutDet.input)
  93. # Script node will take the output from the face detection NN as an input and set ImageManipConfig
  94. # to crop the initial frame for recognition NN
  95. script = pipeline.create(dai.node.Script)
  96. script.setProcessor(dai.ProcessorType.LEON_CSS)
  97. script.setScriptPath("script.py")
  98. # Set inputs for script node
  99. copyManip.out.link(script.inputs['frame'])
  100. faceDetNn.out.link(script.inputs['face_det_in'])
  101. # Convert recognition model from OMZ to blob
  102. if REC_MODEL_NAME is not None:
  103. facerec_blob_path = blobconverter.from_zoo(
  104. name=REC_MODEL_NAME,
  105. shaves=6,
  106. zoo_type=REC_ZOO_TYPE
  107. )
  108. # Create Face Recognition NN node
  109. faceRecNn = pipeline.createNeuralNetwork()
  110. faceRecNn.setBlobPath(facerec_blob_path)
  111. # Create ImageManip to preprocess frame for recognition NN
  112. recManip = pipeline.createImageManip()
  113. # Set recognition ImageManipConfig from script node
  114. script.outputs['manip_cfg'].link(recManip.inputConfig)
  115. script.outputs['manip_img'].link(recManip.inputImage)
  116. # Create output steam for recognition output
  117. xOutRec = pipeline.createXLinkOut()
  118. xOutRec.setStreamName('rec_out')
  119. faceRecNn.out.link(xOutRec.input)
  120. recManip.out.link(faceRecNn.input)
  121. return pipeline
  122. # Load image of a lock in locked position
  123. locked_img = cv2.imread(os.path.join('data', 'images', 'lock_grey.png'), -1)
  124. # Load image of a lock in unlocked position
  125. unlocked_img = cv2.imread(os.path.join('data', 'images', 'lock_open_grey.png'), -1)
  126. # Overlay lock/unlock symbol on the frame
  127. def overlay_symbol(frame, img, pos=(65, 100)):
  128. """
  129. This function overlays the image of lock/unlock
  130. if the authentication of the input frame
  131. is successful/failed.
  132. """
  133. # Offset value for the image of the lock/unlock
  134. symbol_x_offset = pos[0]
  135. symbol_y_offset = pos[1]
  136. # Find top left and bottom right coordinates
  137. # where to place the lock/unlock image
  138. y1, y2 = symbol_y_offset, symbol_y_offset + img.shape[0]
  139. x1, x2 = symbol_x_offset, symbol_x_offset + img.shape[1]
  140. # Scale down alpha channel between 0 and 1
  141. mask = img[:, :, 3]/255.0
  142. # Inverse of the alpha mask
  143. inv_mask = 1-mask
  144. # Iterate over the 3 channels - R, G and B
  145. for c in range(0, 3):
  146. # Add the lock/unlock image to the frame
  147. frame[y1:y2, x1:x2, c] = (mask * img[:, :, c] +
  148. inv_mask * frame[y1:y2, x1:x2, c])
  149. # Display info on the frame
  150. def display_info(frame, bbox, status, status_color, fps):
  151. # Display bounding box
  152. cv2.rectangle(frame, bbox, status_color[status], 2)
  153. # If spoof detected
  154. if status == 'Spoof Detected':
  155. # Display "Spoof detected" status on the bbox
  156. cv2.putText(frame, "Spoofed", (bbox[0], bbox[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, status_color[status])
  157. # Create background for showing details
  158. cv2.rectangle(frame, (5, 5, 175, 150), (50, 0, 0), -1)
  159. # Display authentication status on the frame
  160. cv2.putText(frame, status, (20, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, status_color[status])
  161. # Display lock symbol
  162. if status == 'Authenticated':
  163. overlay_symbol(frame, unlocked_img)
  164. else:
  165. overlay_symbol(frame, locked_img)
  166. # Display instructions on the frame
  167. cv2.putText(frame, 'Press E to Enroll Face.', (10, 45), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255))
  168. cv2.putText(frame, 'Press D to Delist Face.', (10, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255))
  169. cv2.putText(frame, 'Press Q to Quit.', (10, 85), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255))
  170. cv2.putText(frame, f'FPS: {fps:.2f}', (10, 175), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255))
  171. frame_count = 0 # Frame count
  172. fps = 0 # Placeholder fps value
  173. prev_frame_time = 0 # Used to record the time when we processed last frames
  174. new_frame_time = 0 # Used to record the time at which we processed current frames
  175. # Set status colors
  176. status_color = {
  177. 'Authenticated': (0, 255, 0),
  178. 'Unauthenticated': (0, 0, 255),
  179. 'Spoof Detected': (0, 0, 255),
  180. 'No Face Detected': (0, 0, 255)
  181. }
  182. # Create Pipeline
  183. pipeline = create_depthai_pipeline()
  184. # Initialize device and start Pipeline
  185. with dai.Device(pipeline) as device:
  186. # Start pipeline
  187. device.startPipeline()
  188. # Output queue to get the right camera frames
  189. qRight = device.getOutputQueue(name="right", maxSize=4, blocking=False)
  190. # Output queue to get the disparity map
  191. qDepth = device.getOutputQueue(name="disparity", maxSize=4, blocking=False)
  192. # Input queue to send face depth map to the device
  193. qDepthIn = device.getInputQueue(name="depth_in")
  194. # Output queue to get Depth Classification nn data
  195. qDepthNn = device.getOutputQueue(name="depth_nn", maxSize=4, blocking=False)
  196. # Output queue to get Face Recognition nn data
  197. qRec = device.getOutputQueue(name="rec_out", maxSize=4, blocking=False)
  198. # Output queue to get Face Detection nn data
  199. qDet = device.getOutputQueue(name="det_out", maxSize=4, blocking=False)
  200. while True:
  201. # Get right camera frame
  202. inRight = qRight.get()
  203. r_frame = inRight.getFrame()
  204. # r_frame = cv2.flip(r_frame, flipCode=1)
  205. # Get depth frame
  206. inDepth = qDepth.get() # blocking call, will wait until a new data has arrived
  207. depth_frame = inDepth.getFrame()
  208. depth_frame = cv2.flip(depth_frame, flipCode=1)
  209. depth_frame = np.ascontiguousarray(depth_frame)
  210. depth_frame = cv2.bitwise_not(depth_frame)
  211. # Apply color map to highlight the disparity info
  212. depth_frame_cmap = cv2.applyColorMap(depth_frame, cv2.COLORMAP_JET)
  213. # Show disparity frame
  214. cv2.imshow("disparity", depth_frame_cmap)
  215. # Convert grayscale image frame to 'bgr' (opencv format)
  216. frame = cv2.cvtColor(r_frame, cv2.COLOR_GRAY2BGR)
  217. # Get image frame dimensions
  218. img_h, img_w = frame.shape[0:2]
  219. bbox = None
  220. # Get detection NN output
  221. inDet = qDet.tryGet()
  222. if inDet is not None:
  223. # Get face bbox detections
  224. detections = inDet.detections
  225. if len(detections) is not 0:
  226. # Use first detected face bbox
  227. detection = detections[0]
  228. # print(detection.confidence)
  229. x = int(detection.xmin * img_w)
  230. y = int(detection.ymin * img_h)
  231. w = int(detection.xmax * img_w - detection.xmin * img_w)
  232. h = int(detection.ymax * img_h - detection.ymin * img_h)
  233. bbox = (x, y, w, h)
  234. face_embedding = None
  235. authenticated = False
  236. # Check if a face was detected in the frame
  237. if bbox:
  238. # Get face roi depth frame
  239. face_d = depth_frame[max(0, bbox[1]):bbox[1] + bbox[3], max(0, bbox[0]):bbox[0] + bbox[2]]
  240. cv2.imshow("face_roi", face_d)
  241. # Preprocess face depth map for classification
  242. resized_face_d = cv2.resize(face_d, DEPTH_NN_INPUT_SIZE)
  243. resized_face_d = resized_face_d.astype('float16')
  244. # Create Depthai Imageframe
  245. img = dai.ImgFrame()
  246. img.setFrame(resized_face_d)
  247. img.setWidth(DEPTH_NN_INPUT_SIZE[0])
  248. img.setHeight(DEPTH_NN_INPUT_SIZE[1])
  249. img.setType(dai.ImgFrame.Type.GRAYF16)
  250. # Send face depth map to depthai pipeline for classification
  251. qDepthIn.send(img)
  252. # Get Depth Classification NN output
  253. inDepthNn = qDepthNn.tryGet()
  254. is_real = None
  255. if inDepthNn is not None:
  256. # Get prediction
  257. cnn_output = inDepthNn.getLayerFp16("dense_2/Sigmoid")
  258. # print(cnn_output)
  259. if cnn_output[0] > .5:
  260. prediction = 'spoofed'
  261. is_real = False
  262. else:
  263. prediction = 'real'
  264. is_real = True
  265. print(prediction)
  266. if is_real:
  267. # Check if the face in the frame was authenticated
  268. # Get recognition NN output
  269. inRec = qRec.tryGet()
  270. if inRec is not None:
  271. # Get embedding of the face
  272. face_embedding = inRec.getFirstLayerFp16()
  273. # print(len(face_embedding))
  274. authenticated = authenticate_emb(face_embedding)
  275. if authenticated:
  276. # Authenticated
  277. status = 'Authenticated'
  278. else:
  279. # Unauthenticated
  280. status = 'Unauthenticated'
  281. else:
  282. # Spoof detected
  283. status = 'Spoof Detected'
  284. else:
  285. # No face detected
  286. status = 'No Face Detected'
  287. # Display info on frame
  288. display_info(frame, bbox, status, status_color, fps)
  289. # Calculate average fps
  290. if frame_count % 10 == 0:
  291. # Time when we finish processing last 10 frames
  292. new_frame_time = time.time()
  293. # Fps will be number of frame processed in one second
  294. fps = 1 / ((new_frame_time - prev_frame_time)/10)
  295. prev_frame_time = new_frame_time
  296. # Capture the key pressed
  297. key_pressed = cv2.waitKey(1) & 0xff
  298. # Enrol the face if e was pressed
  299. if key_pressed == ord('e'):
  300. if is_real:
  301. enroll_face([face_embedding])
  302. # Delist the face if d was pressed
  303. elif key_pressed == ord('d'):
  304. if is_real:
  305. delist_face([face_embedding])
  306. # Stop the program if q was pressed
  307. elif key_pressed == ord('q'):
  308. break
  309. # Display the final frame
  310. cv2.imshow("Authentication Cam", frame)
  311. # Increment frame count
  312. frame_count += 1
  313. # Close all output windows
  314. cv2.destroyAllWindows()