Shortcuts

Source code for pytorchvideo.data.encoded_video_dataset

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.

from __future__ import annotations

import logging
import multiprocessing
import pathlib
from typing import Any, Callable, List, Optional, Tuple, Type

import torch.utils.data
from pytorchvideo.data.clip_sampling import ClipSampler
from pytorchvideo.data.encoded_video import EncodedVideo

from .labeled_video_paths import LabeledVideoPaths
from .utils import MultiProcessSampler


logger = logging.getLogger(__name__)


[docs]class EncodedVideoDataset(torch.utils.data.IterableDataset): """ EncodedVideoDataset handles the storage, loading, decoding and clip sampling for a video dataset. It assumes each video is stored as an encoded video (e.g. mp4, avi). """ _MAX_CONSECUTIVE_FAILURES = 10
[docs] def __init__( self, labeled_video_paths: List[Tuple[str, Optional[dict]]], clip_sampler: ClipSampler, video_sampler: Type[torch.utils.data.Sampler] = torch.utils.data.RandomSampler, transform: Optional[Callable[[dict], Any]] = None, decode_audio: bool = True, decoder: str = "pyav", ) -> None: """ Args: labeled_video_paths List[Tuple[str, Optional[dict]]]]) : List containing video file paths and associated labels clip_sampler (ClipSampler): Defines how clips should be sampled from each video. See the clip sampling documentation for more information. video_sampler (Type[torch.utils.data.Sampler]): Sampler for the internal video container. This defines the order videos are decoded and, if necessary, the distributed split. transform (Callable): This callable is evaluated on the clip output before the clip is returned. It can be used for user defined preprocessing and augmentations to the clips. The clip output is a dictionary with the following format: { 'video': <video_tensor> 'label': <index_label> 'video_index': <video_index> 'clip_index': <clip_index> 'aug_index': <aug_index>, augmentation index as augmentations might generate multiple views for one clip. } If transform is None, the raw clip output in the above format is returned unmodified. decoder (str): Defines what type of decoder used to decode a video. """ self._decode_audio = decode_audio self._transform = transform self._clip_sampler = clip_sampler self._labeled_videos = labeled_video_paths self._decoder = decoder # If a RandomSampler is used we need to pass in a custom random generator that # ensures all PyTorch multiprocess workers have the same random seed. self._video_random_generator = None if video_sampler == torch.utils.data.RandomSampler: self._video_random_generator = torch.Generator() self._video_sampler = video_sampler( self._labeled_videos, generator=self._video_random_generator ) else: self._video_sampler = video_sampler(self._labeled_videos) self._video_sampler_iter = None # Initialized on first call to self.__next__() # Depending on the clip sampler type, we may want to sample multiple clips # from one video. In that case, we keep the store video, label and previous sampled # clip time in these variables. self._loaded_video_label = None self._loaded_clip = None self._next_clip_start_time = 0.0
@property def video_sampler(self): return self._video_sampler
[docs] def __next__(self) -> dict: """ Retrieves the next clip based on the clip sampling strategy and video sampler. Returns: A video clip with the following format if transform is None: { 'video': <video_tensor>, 'label': <index_label>, 'video_index': <video_index> 'clip_index': <clip_index> 'aug_index': <aug_index>, augmentation index as augmentations might generate multiple views for one clip. } Otherwise, the transform defines the clip output. """ if not self._video_sampler_iter: # Setup MultiProcessSampler here - after PyTorch DataLoader workers are spawned. self._video_sampler_iter = iter(MultiProcessSampler(self._video_sampler)) for i_try in range(self._MAX_CONSECUTIVE_FAILURES): # Reuse previously stored video if there are still clips to be sampled from # the last loaded video. if self._loaded_video_label: video, info_dict, video_index = self._loaded_video_label else: video_index = next(self._video_sampler_iter) try: video_path, info_dict = self._labeled_videos[video_index] video = EncodedVideo.from_path( video_path, decode_audio=self._decode_audio, decoder=self._decoder, ) self._loaded_video_label = (video, info_dict, video_index) except Exception as e: logger.debug( "Failed to load video with error: {}; trial {}".format( e, i_try, ) ) continue ( clip_start, clip_end, clip_index, aug_index, is_last_clip, ) = self._clip_sampler(self._next_clip_start_time, video.duration) # Only load the clip once and reuse previously stored clip if there are multiple # views for augmentations to perform on the same clip. if aug_index == 0: self._loaded_clip = video.get_clip(clip_start, clip_end) self._next_clip_start_time = clip_end clip_is_null = ( self._loaded_clip is None or self._loaded_clip["video"] is None or (self._loaded_clip["audio"] is None and self._decode_audio) ) if is_last_clip or clip_is_null: # Close the loaded encoded video and reset the last sampled clip time ready # to sample a new video on the next iteration. self._loaded_video_label[0].close() self._loaded_video_label = None self._next_clip_start_time = 0.0 if clip_is_null: logger.debug( "Failed to load clip {}; trial {}".format(video.name, i_try) ) continue frames = self._loaded_clip["video"] audio_samples = self._loaded_clip["audio"] sample_dict = { "video": frames, "video_name": video.name, "video_index": video_index, "clip_index": clip_index, "aug_index": aug_index, **info_dict, **({"audio": audio_samples} if audio_samples is not None else {}), } if self._transform is not None: sample_dict = self._transform(sample_dict) # User can force dataset to continue by returning None in transform. if sample_dict is None: continue return sample_dict else: raise RuntimeError( f"Failed to load video after {self._MAX_CONSECUTIVE_FAILURES} retries." )
def __iter__(self): self._video_sampler_iter = None # Reset video sampler # If we're in a PyTorch DataLoader multiprocessing context, we need to use the # same seed for each worker's RandomSampler generator. The workers at each # __iter__ call are created from the unique value: worker_info.seed - worker_info.id, # which we can use for this seed. worker_info = torch.utils.data.get_worker_info() if self._video_random_generator is not None and worker_info is not None: base_seed = worker_info.seed - worker_info.id self._video_random_generator.manual_seed(base_seed) return self def num_videos(self): return len(self.video_sampler)
[docs]def labeled_encoded_video_dataset( data_path: pathlib.path, clip_sampler: ClipSampler, video_sampler: Type[torch.utils.data.Sampler] = torch.utils.data.RandomSampler, transform: Optional[Callable[[dict], Any]] = None, video_path_prefix: str = "", decode_audio: bool = True, decoder: str = "pyav", ) -> EncodedVideoDataset: """ A helper function to create EncodedVideoDataset object for Ucf101 and Kinectis datasets. Args: data_path (pathlib.Path): Path to the data. The path type defines how the data should be read: - For a file path, the file is read and each line is parsed into a video path and label. - For a directory, the directory structure defines the classes (i.e. each subdirectory is a class). See the LabeledVideoPaths class documentation for specific formatting details and examples. clip_sampler (ClipSampler): Defines how clips should be sampled from each video. See the clip sampling documentation for more information. video_sampler (Type[torch.utils.data.Sampler]): Sampler for the internal video container. This defines the order videos are decoded and, if necessary, the distributed split. transform (Callable): This callable is evaluated on the clip output before the clip is returned. It can be used for user defined preprocessing and augmentations to the clips. The clip output is a dictionary with the following format: { 'video': <video_tensor>, 'label': <index_label>, 'video_index': <video_index> 'clip_index': <clip_index> 'aug_index': <aug_index>, augmentation index as augmentations might generate multiple views for one clip. } If transform is None, the raw clip output in the above format is returned unmodified. video_path_prefix (str): Path to root directory with the videos that are loaded in EncodedVideoDataset. All the video paths before loading are prefixed with this path. decoder (str): Defines what type of decoder used to decode a video. """ # PathManager may configure the multiprocessing context in a way that conflicts # with PyTorch DataLoader workers. To avoid this, we make sure the PathManager # calls (made by LabeledVideoPaths) are wrapped in their own sandboxed process. labeled_video_paths = LabeledVideoPaths.from_path(data_path) labeled_video_paths.path_prefix = video_path_prefix dataset = EncodedVideoDataset( labeled_video_paths, clip_sampler, video_sampler, transform, decode_audio=decode_audio, decoder=decoder, ) return dataset