Skip to content

RTSP distribution server for USB camera video using v4l2 with Docker container on Ubuntu 20.04.

License

Notifications You must be signed in to change notification settings

PINTO0309/rtspserver-v4l2

Repository files navigation

rtspserver-v4l2

RTSP distribution server for USB camera video using v4l2 with Docker container on Ubuntu 20.04.

ffmpeg version: https://github.com/PINTO0309/rtspserver-ffmpeg

0. 前準備

  • git clone
    git clone https://github.com/PINTO0309/rtspserver-v4l2.git && cd rtspserver-v4l2
  • USBカメラをホストPCに接続しておく

1. docker compose パターン

  • docker compose 経由でコンテナを起動してUSBカメラの映像をRTSP配信
    docker compose up -d
  • docker compose 経由でコンテナを起動したあとに配信用コンテナを終了
    docker compose down

2. docker run パターン

  • docker build で Docker Image をローカルに生成 (Docker HubからPullするだけで問題ない場合は実施不要)
    docker build -t pinto0309/rtspserver-v4l2:latest -f Dockerfile.v4l2rtsp .
    docker push pinto0309/rtspserver-v4l2:latest
  • docker run 経由でデーモンとしてコンテナをバックエンド起動してUSBカメラの映像をRTSP配信
    docker run --rm -d \
    --device /dev/video0:/dev/video0:mwr \
    --net=host \
    -p 8554:8554 \
    --name rtspserver-v4l2 \
    pinto0309/rtspserver-v4l2:latest
  • docker run 経由でコンテナを起動したあとに配信用コンテナを終了
    docker stop rtspserver-v4l2

3. 配信映像の受信テスト

  • vlc を使用したRTSP配信内容の確認

    vlc rtsp://0.0.0.0:8554/unicast
  • opencv を使用したRTSP配信内容の確認

    video.py
    from pathlib import Path
    from enum import Enum
    from collections import deque
    from urllib.parse import urlparse
    import subprocess
    import threading
    import logging
    import cv2
    from typing import Tuple
    
    
    LOGGER = logging.getLogger(__name__)
    WITH_GSTREAMER = False # True
    
    
    class Protocol(Enum):
        IMAGE = 0
        VIDEO = 1
        CSI   = 2
        V4L2  = 3
        RTSP  = 4
        HTTP  = 5
    
    
    class VideoIO:
        def __init__(
            self,
            output_size: Tuple,
            input_uri: str,
            output_uri: str=None,
            output_fps: int=30,
            input_resolution: Tuple=(640, 480),
            frame_rate: int=30,
            buffer_size: int=10,
            proc_fps: int=30
        ):
            """Class for video capturing and output saving.
            Encoding, decoding, and scaling can be accelerated using the GStreamer backend.
    
            Parameters
            ----------
            output_size : tuple
                Width and height of each frame to output.
            input_uri : str
                URI to input stream. It could be image sequence (e.g. '%06d.jpg'), video file (e.g. 'file.mp4'),
                MIPI CSI camera (e.g. 'csi://0'), USB/V4L2 camera (e.g. '/dev/video0'),
                RTSP stream (e.g. 'rtsp://<user>:<password>@<ip>:<port>/<path>'),
                or HTTP live stream (e.g. 'http://<user>:<password>@<ip>:<port>/<path>')
            output_uri : str, optionals
                URI to an output video file.
            output_fps : int, optionals
                Output video recording frame rate. Specify a value less than 30.
            input_resolution : tuple, optional
                Original resolution of the input source.
                Useful to set a certain capture mode of a USB/CSI camera.
            frame_rate : int, optional
                Frame rate of the input source.
                Required if frame rate cannot be deduced, e.g. image sequence and/or RTSP.
                Useful to set a certain capture mode of a USB/CSI camera.
            buffer_size : int, optional
                Number of frames to buffer.
                For live sources, a larger buffer drops less frames but increases latency.
            proc_fps : int, optional
                Estimated processing speed that may limit the capture interval `cap_dt`.
                This depends on hardware and processing complexity.
            """
            self.size = output_size
            self.input_uri = input_uri
            self.output_uri = output_uri
            self.output_fps = output_fps
            self.resolution = input_resolution
            assert frame_rate > 0
            self.frame_rate = frame_rate
            assert buffer_size >= 1
            self.buffer_size = buffer_size
            assert proc_fps > 0
            self.proc_fps = proc_fps
    
            self.protocol = self._parse_uri(self.input_uri)
            if self.protocol == Protocol.V4L2:
                result = subprocess.check_output(
                    [
                        'sudo', 'chmod', '777', fr'{self.input_uri[:-1]}0'
                    ],
                    stderr=subprocess.PIPE
                ).decode('utf-8')
                result = subprocess.check_output(
                    [
                        'sudo', 'chmod', '777', fr'{self.input_uri[:-1]}1'
                    ],
                    stderr=subprocess.PIPE
                ).decode('utf-8')
            self.is_live = self.protocol != Protocol.IMAGE and self.protocol != Protocol.VIDEO
            if WITH_GSTREAMER:
                self.source = cv2.VideoCapture(self._gst_cap_pipeline(), cv2.CAP_GSTREAMER)
            else:
                self.source = cv2.VideoCapture(self.input_uri)
    
            self.frame_queue: deque = deque([], maxlen=self.buffer_size)
            self.cond = threading.Condition()
            self.exit_event = threading.Event()
            self.cap_thread = threading.Thread(target=self._capture_frames)
    
            ret, frame = self.source.read()
            if not ret:
                raise RuntimeError(f'Unable to read video stream: {self.input_uri}')
            self.frame_queue.append(frame)
    
            width = self.source.get(cv2.CAP_PROP_FRAME_WIDTH)
            height = self.source.get(cv2.CAP_PROP_FRAME_HEIGHT)
            self.cap_fps = self.source.get(cv2.CAP_PROP_FPS)
            self.do_resize = (width, height) != self.size
            if self.cap_fps == 0:
                self.cap_fps = self.frame_rate # fallback to config if unknown
            LOGGER.info('%dx%d stream @ %d FPS', width, height, self.cap_fps)
    
            if self.output_uri is not None:
                Path(self.output_uri).parent.mkdir(parents=True, exist_ok=True)
                output_fps = 1 / self.cap_dt
                if WITH_GSTREAMER:
                    self.writer = cv2.VideoWriter(
                        self._gst_write_pipeline(),
                        cv2.CAP_GSTREAMER,
                        0,
                        fps=output_fps,
                        frameSize=self.size,
                        isColor=True
                    )
                else:
                    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                    self.writer = cv2.VideoWriter(
                        filename=self.output_uri,
                        fourcc=fourcc,
                        fps=output_fps if self.output_fps >= output_fps else self.output_fps,
                        frameSize=self.size,
                        isColor=True,
                    )
    
        @property
        def cap_dt(self):
            # limit capture interval at processing latency for live sources
            return 1 / min(self.cap_fps, self.proc_fps) if self.is_live else 1 / self.cap_fps
    
        def start_capture(self):
            """Start capturing from file or device."""
            if not self.source.isOpened():
                self.source.open(self._gst_cap_pipeline(), cv2.CAP_GSTREAMER)
            if not self.cap_thread.is_alive():
                self.cap_thread.start()
    
        def stop_capture(self):
            """Stop capturing from file or device."""
            with self.cond:
                self.exit_event.set()
                self.cond.notify()
            self.frame_queue.clear()
            self.cap_thread.join()
    
        def read(self):
            """Reads the next video frame.
    
            Returns
            -------
            ndarray
                Returns None if there are no more frames.
            """
            with self.cond:
                while len(self.frame_queue) == 0 and not self.exit_event.is_set():
                    self.cond.wait()
                if len(self.frame_queue) == 0 and self.exit_event.is_set():
                    return None
                frame = self.frame_queue.popleft()
                self.cond.notify()
            if self.do_resize:
                frame = cv2.resize(frame, self.size)
            return frame
    
        def write(self, frame):
            """Writes the next video frame."""
            assert hasattr(self, 'writer')
            self.writer.write(frame)
    
        def release(self):
            """Cleans up input and output sources."""
            self.stop_capture()
            if hasattr(self, 'writer'):
                self.writer.release()
            self.source.release()
    
        def _gst_cap_pipeline(self):
            gst_elements = str(subprocess.check_output('gst-inspect-1.0'))
            if 'nvvidconv' in gst_elements and self.protocol != Protocol.V4L2:
                # format conversion for hardware decoder
                cvt_pipeline = (
                    'nvvidconv interpolation-method=5 ! '
                    'video/x-raw, width=%d, height=%d, format=BGRx !'
                    'videoconvert ! appsink sync=false'
                    % self.size
                )
            else:
                cvt_pipeline = (
                    'videoscale ! '
                    'video/x-raw, width=%d, height=%d !'
                    'videoconvert ! appsink sync=false'
                    % self.size
                )
    
            if self.protocol == Protocol.IMAGE:
                pipeline = (
                    'multifilesrc location=%s index=1 caps="image/%s,framerate=%d/1" ! decodebin ! '
                    % (
                        self.input_uri,
                        self._img_format(self.input_uri),
                        self.frame_rate
                    )
                )
            elif self.protocol == Protocol.VIDEO:
                pipeline = 'filesrc location=%s ! decodebin ! ' % self.input_uri
            elif self.protocol == Protocol.CSI:
                if 'nvarguscamerasrc' in gst_elements:
                    pipeline = (
                        'nvarguscamerasrc sensor_id=%s ! '
                        'video/x-raw(memory:NVMM), width=%d, height=%d, '
                        'format=NV12, framerate=%d/1 ! '
                        % (
                            self.input_uri[6:],
                            *self.resolution,
                            self.frame_rate
                        )
                    )
                else:
                    raise RuntimeError('GStreamer CSI plugin not found')
            elif self.protocol == Protocol.V4L2:
                if 'v4l2src' in gst_elements:
                    pipeline = (
                        'v4l2src device=%s ! '
                        'video/x-raw, width=%d, height=%d, '
                        'format=YUY2, framerate=%d/1 ! '
                        % (
                            self.input_uri,
                            *self.resolution,
                            self.frame_rate
                        )
                    )
                else:
                    raise RuntimeError('GStreamer V4L2 plugin not found')
            elif self.protocol == Protocol.RTSP:
                pipeline = (
                    'rtspsrc location=%s latency=0 ! '
                    'capsfilter caps=application/x-rtp,media=video ! decodebin ! ' % self.input_uri
                )
            elif self.protocol == Protocol.HTTP:
                pipeline = 'souphttpsrc location=%s is-live=true ! decodebin ! ' % self.input_uri
    
            """
            'v4l2src device=/dev/video0 ! video/x-raw, width=640, height=480, format=YUY2, framerate=30/1 ! videoscale ! video/x-raw, width=640, height=480 !videoconvert ! appsink sync=false'
            """
            return pipeline + cvt_pipeline
    
        def _gst_write_pipeline(self):
            gst_elements = str(subprocess.check_output('gst-inspect-1.0'))
            # use hardware encoder if found
            if 'omxh264enc' in gst_elements:
                h264_encoder = 'omxh264enc preset-level=2'
            elif 'x264enc' in gst_elements:
                h264_encoder = 'x264enc pass=4'
            else:
                raise RuntimeError('GStreamer H.264 encoder not found')
            pipeline = (
                'appsrc ! autovideoconvert ! %s ! qtmux ! filesink location=%s '
                % (
                    h264_encoder,
                    self.output_uri
                )
            )
            return pipeline
    
        def _capture_frames(self):
            while not self.exit_event.is_set():
                ret, frame = self.source.read()
                with self.cond:
                    if not ret:
                        self.exit_event.set()
                        self.cond.notify()
                        break
                    # keep unprocessed frames in the buffer for file
                    if not self.is_live:
                        while (len(self.frame_queue) == self.buffer_size and
                               not self.exit_event.is_set()):
                            self.cond.wait()
                    self.frame_queue.append(frame)
                    self.cond.notify()
    
        @staticmethod
        def _parse_uri(uri):
            result = urlparse(uri)
            if result.scheme == 'csi':
                protocol = Protocol.CSI
            elif result.scheme == 'rtsp':
                protocol = Protocol.RTSP
            elif result.scheme == 'http':
                protocol = Protocol.HTTP
            else:
                if '/dev/video' in result.path:
                    protocol = Protocol.V4L2
                elif '%' in result.path:
                    protocol = Protocol.IMAGE
                else:
                    protocol = Protocol.VIDEO
            return protocol
    
        @staticmethod
        def _img_format(uri):
            img_format = Path(uri).suffix[1:]
            return 'jpeg' if img_format == 'jpg' else img_format
    test_rtsp_recv.py
    import cv2
    from videoio import VideoIO
    
    stream = VideoIO(
        output_size=(
            640,
            480
        ),
        input_uri=f'rtsp://0.0.0.0:8554/unicast',
        output_uri=None,
        output_fps=30,
        input_resolution=(
            640,
            480
        ),
        frame_rate=30,
        buffer_size=10,
    )
    stream.start_capture()
    
    try:
        while True:
            frame = stream.read()
            key = cv2.waitKey(1)
            if key == 27:  # ESC
                break
            if frame is None:
                continue
            cv2.imshow('test', frame)
    except Exception as ex:
        pass
    finally:
        if stream:
            stream.release()
    python test_rtsp_recv.py

    image

About

RTSP distribution server for USB camera video using v4l2 with Docker container on Ubuntu 20.04.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages