{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "     \n", "     \n", "     \n", "     \n", "     \n", "   \n", "[Home Page](Start_Here.ipynb)\n", " \n", " \n", "[Previous Notebook](Introduction_to_Multi-DNN_pipeline.ipynb)\n", "     \n", "     \n", "     \n", "     \n", "[1](Introduction_to_Deepstream_and_Gstreamer.ipynb)\n", "[2](Getting_started_with_Deepstream_Pipeline.ipynb)\n", "[3](Introduction_to_Multi-DNN_pipeline.ipynb)\n", "[4]\n", "[5](Multi-stream_Multi_DNN.ipynb)\n", "     \n", "     \n", "     \n", "     \n", "[Next Notebook](Multi-stream_Multi_DNN.ipynb)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Multi-stream pipeline\n", "\n", "In this notebook, we will learn to build a pipeline that can take in two stream and batch them to run inference and process them together. \n", "\n", "**Contents of this Notebook :**\n", "\n", "- [Nvmultistreamtiler](#Nvmultistreamtiler) \n", "- [Building the pipeline](#Building-the-pipeline)\n", " \n", " \n", "![test3](images/test3.png)\n", "\n", "We will be using `nvmultistreamtiler` for this application to composite a 2D tile from batched buffers.\n", "\n", "### Nvmultistreamtiler\n", "\n", "The Gst-nvmultistreamtiler plugin composites a 2D tile from batched buffers. The plugin accepts batched NV12/RGBA data from upstream components. The plugin composites the tile based on stream IDs, obtained from NvDsBatchMeta and NvDsFrameMeta in row-major order (starting from source 0, left to right across the top row, then across the next row). Each source frame is scaled to the corresponding location in the tiled output buffer. The plugin can reconfigure if a new source is added and it exceeds the space allocated for tiles. It also maintains a cache of old frames to avoid display flicker if one source has a lower frame rate than other sources.\n", "\n", "![test3](images/nvmultistreamtiler.png)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Building the pipeline\n", "\n", "We will the using batched on the 4-class DNN network. Rest of the notebook remains mostly similar to primary notebook. \n", "\n", "![test1](images/test1.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import required libraries \n", "import sys\n", "sys.path.append('../source_code')\n", "import gi\n", "import configparser\n", "gi.require_version('Gst', '1.0')\n", "from gi.repository import GObject, Gst\n", "from gi.repository import GLib\n", "from ctypes import *\n", "import time\n", "import sys\n", "import math\n", "import platform\n", "from common.bus_call import bus_call\n", "from common.FPS import GETFPS\n", "import pyds\n", "\n", "\n", "# Define variables to be used later\n", "fps_streams={}\n", "\n", "PGIE_CLASS_ID_VEHICLE = 0\n", "PGIE_CLASS_ID_BICYCLE = 1\n", "PGIE_CLASS_ID_PERSON = 2\n", "PGIE_CLASS_ID_ROADSIGN = 3\n", "\n", "MUXER_OUTPUT_WIDTH=1920\n", "MUXER_OUTPUT_HEIGHT=1080\n", "\n", "TILED_OUTPUT_WIDTH=1920\n", "TILED_OUTPUT_HEIGHT=1080\n", "OSD_PROCESS_MODE= 0\n", "OSD_DISPLAY_TEXT= 0\n", "pgie_classes_str= [\"Vehicle\", \"TwoWheeler\", \"Person\",\"RoadSign\"]\n", "\n", "# Define Input and output Stream information \n", "num_sources = 2 \n", "INPUT_VIDEO_1 = '/opt/nvidia/deepstream/deepstream-5.0/samples/streams/sample_720p.h264'\n", "INPUT_VIDEO_2 = '/opt/nvidia/deepstream/deepstream-5.0/samples/streams/sample_720p.h264'\n", "OUTPUT_VIDEO_NAME = \"../source_code/N3/ds_out.mp4\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We define a function `make_elm_or_print_err()` to create our elements and report any errors if the creation fails.\n", "\n", "Elements are created using the `Gst.ElementFactory.make()` function as part of Gstreamer library." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Make Element or Print Error and any other detail\n", "def make_elm_or_print_err(factoryname, name, printedname, detail=\"\"):\n", " print(\"Creating\", printedname)\n", " elm = Gst.ElementFactory.make(factoryname, name)\n", " if not elm:\n", " sys.stderr.write(\"Unable to create \" + printedname + \" \\n\")\n", " if detail:\n", " sys.stderr.write(detail)\n", " return elm" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Initialise GStreamer and Create an Empty Pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for i in range(0,num_sources):\n", " fps_streams[\"stream{0}\".format(i)]=GETFPS(i)\n", "\n", "# Standard GStreamer initialization\n", "GObject.threads_init()\n", "Gst.init(None)\n", "\n", "# Create gstreamer elements */\n", "# Create Pipeline element that will form a connection of other elements\n", "print(\"Creating Pipeline \\n \")\n", "pipeline = Gst.Pipeline()\n", "\n", "if not pipeline:\n", " sys.stderr.write(\" Unable to create Pipeline \\n\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Elements that are required for our pipeline\n", "\n", "Compared to the first notebook , we use a lot of queues in this notebook to buffer data when it moves from one plugin to another." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "########### Create Elements required for the Pipeline ########### \n", "\n", "######### Defining Stream 1 \n", "# Source element for reading from the file\n", "source1 = make_elm_or_print_err(\"filesrc\", \"file-source-1\",'file-source-1')\n", "# Since the data format in the input file is elementary h264 stream,we need a h264parser\n", "h264parser1 = make_elm_or_print_err(\"h264parse\", \"h264-parser-1\",\"h264-parser-1\")\n", "# Use nvdec_h264 for hardware accelerated decode on GPU\n", "decoder1 = make_elm_or_print_err(\"nvv4l2decoder\", \"nvv4l2-decoder-1\",\"nvv4l2-decoder-1\")\n", " \n", "##########\n", "\n", "########## Defining Stream 2 \n", "# Source element for reading from the file\n", "source2 = make_elm_or_print_err(\"filesrc\", \"file-source-2\",\"file-source-2\")\n", "# Since the data format in the input file is elementary h264 stream, we need a h264parser\n", "print(\"Creating H264Parser \\n\")\n", "h264parser2 = make_elm_or_print_err(\"h264parse\", \"h264-parser-2\", \"h264-parser-2\")\n", "# Use nvdec_h264 for hardware accelerated decode on GPU\n", "print(\"Creating Decoder \\n\")\n", "decoder2 = make_elm_or_print_err(\"nvv4l2decoder\", \"nvv4l2-decoder-2\",\"nvv4l2-decoder-2\")\n", "########### \n", " \n", "# Create nvstreammux instance to form batches from one or more sources.\n", "streammux = make_elm_or_print_err(\"nvstreammux\", \"Stream-muxer\",\"Stream-muxer\") \n", "# Creating primary Inference \n", "pgie = make_elm_or_print_err(\"nvinfer\", \"primary-inference\",\"primary-inference\")\n", "# Creating Tiler to present more than one streams\n", "tiler=make_elm_or_print_err(\"nvmultistreamtiler\", \"nvtiler\",\"nvtiler\")\n", "# Use convertor to convert from NV12 to RGBA as required by nvosd\n", "nvvidconv = make_elm_or_print_err(\"nvvideoconvert\", \"convertor\",\"nvvidconv\")\n", "# Create OSD to draw on the converted RGBA buffer\n", "nvosd = make_elm_or_print_err(\"nvdsosd\", \"onscreendisplay\",\"nvosd\")\n", "# Creating queue's to buffer incoming data from pgie\n", "queue1=make_elm_or_print_err(\"queue\",\"queue1\",\"queue1\")\n", "# Creating queue's to buffer incoming data from tiler\n", "queue2=make_elm_or_print_err(\"queue\",\"queue2\",\"queue2\")\n", "# Creating queue's to buffer incoming data from nvvidconv\n", "queue3=make_elm_or_print_err(\"queue\",\"queue3\",\"queue3\")\n", "# Creating queue's to buffer incoming data from nvosd\n", "queue4=make_elm_or_print_err(\"queue\",\"queue4\",\"queue4\")\n", "# Creating queue's to buffer incoming data from nvvidconv2\n", "queue5=make_elm_or_print_err(\"queue\",\"queue5\",\"queue5\")\n", "# Use convertor to convert from NV12 to RGBA as required by nvosd\n", "nvvidconv2 = make_elm_or_print_err(\"nvvideoconvert\", \"convertor2\",\"nvvidconv2\")\n", "# Place an encoder instead of OSD to save as video file\n", "encoder = make_elm_or_print_err(\"avenc_mpeg4\", \"encoder\", \"Encoder\")\n", "# Parse output from Encoder \n", "codeparser = make_elm_or_print_err(\"mpeg4videoparse\", \"mpeg4-parser\", 'Code Parser')\n", "# Create a container\n", "container = make_elm_or_print_err(\"qtmux\", \"qtmux\", \"Container\")\n", "# Create Sink for storing the output \n", "sink = make_elm_or_print_err(\"filesink\", \"filesink\", \"Sink\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have created the elements ,we can now set various properties for out pipeline at this point. \n", "\n", "Configuration file : [pgie](../source_code/N3/dstest3_pgie_config.txt)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "############ Set properties for the Elements ############\n", "# Set Input Video files \n", "source1.set_property('location', INPUT_VIDEO_1)\n", "source2.set_property('location', INPUT_VIDEO_2)\n", "# Set Input Width , Height and Batch Size \n", "streammux.set_property('width', 1920)\n", "streammux.set_property('height', 1080)\n", "streammux.set_property('batch-size', num_sources)\n", "# Timeout in microseconds to wait after the first buffer is available \n", "# to push the batch even if a complete batch is not formed.\n", "streammux.set_property('batched-push-timeout', 4000000)\n", "# Set configuration file for nvinfer \n", "pgie.set_property('config-file-path', \"../source_code/N3/dstest3_pgie_config.txt\")\n", "pgie_batch_size=pgie.get_property(\"batch-size\")\n", "print(\"PGIE batch size :\",end='')\n", "print(pgie_batch_size)\n", "if(pgie_batch_size != num_sources):\n", " print(\"WARNING: Overriding infer-config batch-size\",pgie_batch_size,\" with number of sources \", num_sources,\" \\n\")\n", " pgie.set_property(\"batch-size\",num_sources)\n", " \n", "# Set display configurations for nvmultistreamtiler \n", "tiler_rows=int(math.sqrt(num_sources))\n", "tiler_columns=int(math.ceil((1.0*num_sources)/tiler_rows))\n", "tiler.set_property(\"rows\",tiler_rows)\n", "tiler.set_property(\"columns\",tiler_columns)\n", "tiler.set_property(\"width\", TILED_OUTPUT_WIDTH)\n", "tiler.set_property(\"height\", TILED_OUTPUT_HEIGHT)\n", "\n", "# Set encoding properties and Sink configs\n", "encoder.set_property(\"bitrate\", 2000000)\n", "sink.set_property(\"location\", OUTPUT_VIDEO_NAME)\n", "sink.set_property(\"sync\", 0)\n", "sink.set_property(\"async\", 0)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now link all the elements in the order we prefer and create Gstreamer bus to feed all messages through it. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "########## Add and Link ELements in the Pipeline ########## \n", "\n", "print(\"Adding elements to Pipeline \\n\")\n", "pipeline.add(source1)\n", "pipeline.add(h264parser1)\n", "pipeline.add(decoder1)\n", "pipeline.add(source2)\n", "pipeline.add(h264parser2)\n", "pipeline.add(decoder2)\n", "pipeline.add(streammux)\n", "pipeline.add(pgie)\n", "pipeline.add(tiler)\n", "pipeline.add(nvvidconv)\n", "pipeline.add(nvosd)\n", "pipeline.add(queue1)\n", "pipeline.add(queue2)\n", "pipeline.add(queue3)\n", "pipeline.add(queue4)\n", "pipeline.add(queue5)\n", "pipeline.add(nvvidconv2)\n", "pipeline.add(encoder)\n", "pipeline.add(codeparser)\n", "pipeline.add(container)\n", "pipeline.add(sink)\n", "\n", "print(\"Linking elements in the Pipeline \\n\")\n", "\n", "source1.link(h264parser1)\n", "h264parser1.link(decoder1)\n", "\n", "\n", "###### Create Sink pad and connect to decoder's source pad \n", "sinkpad1 = streammux.get_request_pad(\"sink_0\")\n", "if not sinkpad1:\n", " sys.stderr.write(\" Unable to get the sink pad of streammux \\n\")\n", " \n", "srcpad1 = decoder1.get_static_pad(\"src\")\n", "if not srcpad1:\n", " sys.stderr.write(\" Unable to get source pad of decoder \\n\")\n", " \n", "srcpad1.link(sinkpad1)\n", "\n", "###### Create Sink pad and connect to decoder's source pad \n", "source2.link(h264parser2)\n", "h264parser2.link(decoder2)\n", "\n", "sinkpad2 = streammux.get_request_pad(\"sink_1\")\n", "if not sinkpad2:\n", " sys.stderr.write(\" Unable to get the sink pad of streammux \\n\")\n", " \n", "srcpad2 = decoder2.get_static_pad(\"src\")\n", "if not srcpad2:\n", " sys.stderr.write(\" Unable to get source pad of decoder \\n\")\n", " \n", "srcpad2.link(sinkpad2)\n", "\n", "\n", "\n", "streammux.link(queue1)\n", "queue1.link(pgie)\n", "pgie.link(queue2)\n", "queue2.link(tiler)\n", "tiler.link(queue3)\n", "queue3.link(nvvidconv)\n", "nvvidconv.link(queue4)\n", "queue4.link(nvosd)\n", "nvosd.link(queue5)\n", "queue5.link(nvvidconv2)\n", "nvvidconv2.link(encoder)\n", "encoder.link(codeparser)\n", "codeparser.link(container)\n", "container.link(sink)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create an event loop and feed gstreamer bus mesages to it\n", "loop = GObject.MainLoop()\n", "bus = pipeline.get_bus()\n", "bus.add_signal_watch()\n", "bus.connect (\"message\", bus_call, loop)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Our pipeline now carries the metadata forward but we have not done anything with it until now, but as mentoioned in the above pipeline diagram , we will now create a callback function to write relevant data on the frame once called and create a src pad in the pgie element to call the function. \n", "\n", "This callback function is the same as used in the previous notebook but **notice the location that we have attached the callback function to.** We make this change because after every frame goes through the `pgie` , it carries the metadata generated from the primary inference , but if you look at the image above on `nvmultistreamtiler` elemenet, we can notice that the output is a *transformed metadata*, hence the metadata changes when it goes through the `nvmultistreamtiler` element because of which we have the callback function attached to the source pad of `pgie` element and not to the nvosd element like in the previous notebooks.\n", "\n", "For a better understanding on how exactly `nvmultistreamtiler` changes the metadata, try attaching the callback function to the source pad of tiler element like mentioned below, the same details can also be found in it's [documentation](https://docs.nvidia.com/metropolis/deepstream/plugin-manual/index.html#page/DeepStream%20Plugins%20Development%20Guide/deepstream_plugin_details.html#wwpID0E0PX0HA).\n", "\n", "\n", "```bash\n", "# Change line from \n", "tiler_src_pad=pgie.get_static_pad(\"src\")\n", "# to \n", "tiler_src_pad=tiler.get_static_pad(\"src\")\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# tiler_sink_pad_buffer_probe will extract metadata received on OSD sink pad\n", "# and update params for drawing rectangle, object information etc.\n", "def tiler_src_pad_buffer_probe(pad,info,u_data):\n", " #Intiallizing object counter with 0.\n", " obj_counter = {\n", " PGIE_CLASS_ID_VEHICLE:0,\n", " PGIE_CLASS_ID_PERSON:0,\n", " PGIE_CLASS_ID_BICYCLE:0,\n", " PGIE_CLASS_ID_ROADSIGN:0\n", " }\n", " # Set frame_number & rectangles to draw as 0 \n", " frame_number=0\n", " num_rects=0\n", " \n", " gst_buffer = info.get_buffer()\n", " if not gst_buffer:\n", " print(\"Unable to get GstBuffer \")\n", " return\n", "\n", " # Retrieve batch metadata from the gst_buffer\n", " # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the\n", " # C address of gst_buffer as input, which is obtained with hash(gst_buffer)\n", " batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))\n", " l_frame = batch_meta.frame_meta_list\n", " while l_frame is not None:\n", " try:\n", " # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta\n", " frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)\n", " except StopIteration:\n", " break\n", " \n", " # Get frame number , number of rectables to draw and object metadata\n", " frame_number=frame_meta.frame_num\n", " num_rects = frame_meta.num_obj_meta\n", " l_obj=frame_meta.obj_meta_list\n", " \n", " while l_obj is not None:\n", " try:\n", " # Casting l_obj.data to pyds.NvDsObjectMeta\n", " obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)\n", " except StopIteration:\n", " break\n", " # Increment Object class by 1 and Set Box border to Red color \n", " obj_counter[obj_meta.class_id] += 1\n", " obj_meta.rect_params.border_color.set(0.0, 0.0, 1.0, 0.0)\n", " try: \n", " l_obj=l_obj.next\n", " except StopIteration:\n", " break\n", " ################## Setting Metadata Display configruation ############### \n", " # Acquiring a display meta object.\n", " display_meta=pyds.nvds_acquire_display_meta_from_pool(batch_meta)\n", " display_meta.num_labels = 1\n", " py_nvosd_text_params = display_meta.text_params[0]\n", " # Setting display text to be shown on screen\n", " py_nvosd_text_params.display_text = \"Frame Number={} Number of Objects={} Vehicle_count={} Person_count={}\".format(frame_number, num_rects, obj_counter[PGIE_CLASS_ID_VEHICLE], obj_counter[PGIE_CLASS_ID_PERSON])\n", " # Now set the offsets where the string should appear\n", " py_nvosd_text_params.x_offset = 10\n", " py_nvosd_text_params.y_offset = 12\n", " # Font , font-color and font-size\n", " py_nvosd_text_params.font_params.font_name = \"Serif\"\n", " py_nvosd_text_params.font_params.font_size = 10\n", " # Set(red, green, blue, alpha); Set to White\n", " py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)\n", " # Text background color\n", " py_nvosd_text_params.set_bg_clr = 1\n", " # Set(red, green, blue, alpha); set to Black\n", " py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)\n", " # Using pyds.get_string() to get display_text as string to print in notebook\n", " print(pyds.get_string(py_nvosd_text_params.display_text))\n", " pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)\n", " \n", " ############################################################################\n", " # Get frame rate through this probe\n", " fps_streams[\"stream{0}\".format(frame_meta.pad_index)].get_fps()\n", " try:\n", " l_frame=l_frame.next\n", " except StopIteration:\n", " break\n", "\n", " return Gst.PadProbeReturn.OK\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tiler_src_pad=pgie.get_static_pad(\"src\")\n", "if not tiler_src_pad:\n", " sys.stderr.write(\" Unable to get src pad \\n\")\n", "else:\n", " tiler_src_pad.add_probe(Gst.PadProbeType.BUFFER, tiler_src_pad_buffer_probe, 0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now with everything defined , we can start the playback and listen the events." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# List the sources\n", "print(\"Now playing...\")\n", "start_time = time.time()\n", "print(\"Starting pipeline \\n\")\n", "# start play back and listed to events\t\t\n", "pipeline.set_state(Gst.State.PLAYING)\n", "try:\n", " loop.run()\n", "except:\n", " pass\n", "# cleanup\n", "print(\"Exiting app\\n\")\n", "pipeline.set_state(Gst.State.NULL)\n", "print(\"--- %s seconds ---\" % (time.time() - start_time))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Convert video profile to be compatible with Jupyter notebook\n", "!ffmpeg -loglevel panic -y -an -i ../source_code/N3/ds_out.mp4 -vcodec libx264 -pix_fmt yuv420p -profile:v baseline -level 3 ../source_code/N3/output.mp4" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Display the Output\n", "from IPython.display import HTML\n", "HTML(\"\"\"\n", "