B
    ٻdw<                 @   s   d Z ddlmZ ddlmZ ddlmZ ddlmZ ddlm	Z	 ddlm
Z
 ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ dZeded G dd dejZdddZG dd dejZG dd dejZdd Zdd Zdd Zej e_ dS ) z6Distribution Strategy-related dataset transformations.    )dataset_ops)ExternalStatePolicy)nest)constant_op)dtypes)ops)tensor_shape)tensor_util)	array_ops)gen_experimental_dataset_ops)	tf_exportzdata.experimental.SHARD_HINT
SHARD_HINTc                   s.   e Zd ZdZd fdd	Zedd Z  ZS )_AutoShardDataseta  A `Dataset` that shards the `Dataset` automatically.

  This dataset takes in an existing dataset and tries to automatically figure
  out how to shard the dataset in a multi-worker scenario using graph rewrites.

  If the AutoShardPolicy is set to FILE, it walks up the dataset graph until
  it finds a reader dataset, then inserts a ShardDataset op before that node
  so that each worker only sees some files.

  If the AutoShardPolicy is set to DATA, it inserts a ShardDataset op at the
  end of the input pipeline, before any terminal PrefetchDataset if there is
  one. Additionally, if there is a RebatchDatasetV2 in the input pipeline, it
  is written to legacy RebatchDataset for correctness reasons, since
  RebatchDatasetV2 is incompatible with data sharding.

  If the AutoShardPolicy is set to AUTO, it tries to do file-based sharding.
  If it cannot find a reader dataset, it falls back to doing data-based
  sharding.

  If the AutoShardPolicy is set to OFF, it does nothing.

  Attributes:
    num_workers: Total number of workers to shard this dataset across.
    index: The current worker index (out of the total number of workers) this
      dataset is for.
    num_replicas: The total number of replicas across all workers. This is used
      only when sharding by data (either DATA or AUTO) in order to rewrite
      RebatchDatasetV2 to RebatchDataset.

  Raises:
    NotFoundError: If we cannot find a suitable reader dataset to begin
      automatically sharding the dataset.
  Nc                sR   || _ |j| _tj| j jf||t| jj	|d| j
}tt| || d S )N)num_workersindexauto_shard_policynum_replicas)Z_input_datasetelement_spec_element_specged_opsZauto_shard_dataset_variant_tensorintoptionsZexperimental_distributer   _flat_structuresuperr   __init__)selfinput_datasetr   r   r   variant_tensor)	__class__ d/var/www/html/venv/lib/python3.7/site-packages/tensorflow/python/data/experimental/ops/distribute.pyr   E   s    
z_AutoShardDataset.__init__c             C   s   | j S )N)r   )r   r!   r!   r"   r   S   s    z_AutoShardDataset.element_spec)N)__name__
__module____qualname____doc__r   propertyr   __classcell__r!   r!   )r    r"   r   "   s   !r   Nc             C   s   t t| |||S )N)r   ZDatasetV1Adapterr   )r   r   r   r   r!   r!   r"   _AutoShardDatasetV1X   s    r)   c                   s,   e Zd ZdZ fddZedd Z  ZS )_LegacyRebatchDataseta  A `Dataset` that divides its input batches into `num_replicas` sub-batches.

  For each batch in the input dataset, _LegacyRebatchDataset will produce
  `num_replicas` smaller batches whose sizes add up to the original batch size.

  For example:

  ```python
  ds = tf.data.Dataset.range(8)
  ds = ds.batch(4)
  ds = _LegacyRebatchDataset(ds, num_replicas=3)
  for elem in ds:
    print(elem)
  >> [0, 1], [2, 3], [], [4, 5], [6, 7], []
  ```
  c                sf    fddfdd}t |t|| _t|}tj|jfd i| j	}t
t| || dS )zCreates a _LegacyRebatchDataset.

    Args:
      input_dataset: `Dataset` to rebatch.
      num_replicas: A `tf.int64` scalar, representing the number of sub-batches
        to split each batch from `input_dataset` into.
    c                sv   |   }t|tjsdS |jdkr&dS t|dk r:tddd |jD }|d dk	rr|d   dkrr|d   S dS )z@Recalculates the output_shape after dividing it by num_replicas.N   zInvalid `input_dataset`. Expected a dataset whose elements have rank >= 1 but found a dataset whose elements are scalars. Fix the issue by adding the `batch` transformation to the dataset.c             S   s   g | ]
}|j qS r!   )value).0dr!   r!   r"   
<listcomp>   s    zR_LegacyRebatchDataset.__init__.<locals>.recalculate_batch_size.<locals>.<listcomp>r   )_to_legacy_output_shapes
isinstancer   TensorShaperanklen
ValueErrordims)	type_specoutput_shapeZoutput_dims)r   r!   r"   recalculate_batch_sizex   s    
z>_LegacyRebatchDataset.__init__.<locals>.recalculate_batch_sizec                s    | }|   |S )N)Z_unbatchZ_batch)r7   Z
batch_size)r9   r!   r"   rebatch   s    z/_LegacyRebatchDataset.__init__.<locals>.rebatchr   N)r   Zmap_structurer   get_structurer   Znormalize_to_denser   Zrebatch_datasetr   r   r   r*   r   )r   r   r   r:   r   )r    )r   r9   r"   r   o   s    	

z_LegacyRebatchDataset.__init__c             C   s   | j S )N)r   )r   r!   r!   r"   r      s    z"_LegacyRebatchDataset.element_spec)r#   r$   r%   r&   r   r'   r   r(   r!   r!   )r    r"   r*   ]   s   6r*   c                   s,   e Zd ZdZ fddZedd Z  ZS )_RemoteDatasetz8Creates a dataset on a given `device` given a graph def.c          	      s:   || _ t| t|}W d Q R X tt| | d S )N)
_elem_specr   devicer   Zdataset_from_graphr   r<   r   )r   	graph_defr>   r   r   )r    r!   r"   r      s    z_RemoteDataset.__init__c             C   s   | j S )N)r=   )r   r!   r!   r"   r      s    z_RemoteDataset.element_spec)r#   r$   r%   r&   r   r'   r   r(   r!   r!   )r    r"   r<      s   r<   c          	   C   s   t | tjs tdt|  d| jj}i }t|dkrT|d |krT| ||d < |S t	| j | 
 } | jdtjd}W dQ R X x"|D ]}t||| j}|||< qW |S )a  A transformation that replicates `dataset` onto a list of devices.

  Args:
    dataset: A `tf.data.Dataset` object.
    devices: A list of devices to replicate the dataset on.

  Returns:
    A dictionary mapping device name to a dataset on that device.
  z?Invalid `dataset`. Expected a `tf.data.Dataset` object but got .r+   r   T)Zstrip_device_assignmentZexternal_state_policyN)r1   r   Z	DatasetV2	TypeErrortyper   r>   r4   r   Zcolocate_withZ_apply_debug_optionsZ_as_serialized_graphr   WARNr<   r   )datasetZdevicesZdataset_deviceZdatasetsr?   r>   Zdsr!   r!   r"   	replicate   s"    

rE   c       
      C   s   || }|| }t | }|dk	r&|} | | }| ||  }|dk	r|d g| |g||   }	tj|	|d |	d|  tjddS tj|tjd}	||	 tjtj|tjdtj	|| tjdgdd }	tj|	|d |	d| gddS )a  Determines how to rebatch a dataset for the given worker.

  Given the global batch size, number of workers, number of replicas per worker,
  and worker index, returns the correct batch sizes for rebatching a dataset
  on worker `worker_index` of `num_workers`, such that each global step (across
  all workers and replicas) will consume global_batch_size elements. The
  returned value should be passed as the `batch_sizes` input parameter to
  `tf.data.experimental.rebatch()`. The returned batch sizes meet the following
  constraints:

  Let G = global_batch_size, W = num_workers, R = num_replicas_per_worker
  (A) for any worker, len(batch_sizes) = W * R
  (B) for any worker, sum(batch_sizes) == G
  (C) for any global step (i.e. R iterations on each worker), the sum of batches
      consumed by replicas across all workers is G.
  (D) any two batch sizes of any two replicas differs by at most one.

  For example, suppose we have G = 7, W = 2, R = 2, and suppose we have two
  files which each contain 7 elements:

  ```python
  # WORKER 0
  batch_sizes_0 = batch_sizes_for_worker(global_batch_size=global_batch_size,
                                         num_workers=2,
                                         num_replicas_per_worker=2,
                                         worker_index=0)
  print(batch_sizes_0)
  >> [2, 2, 2, 1]

  dataset_0 = tf.data.Dataset.from_tensor_slices(["file_a", "file_b"])
  dataset_0 = dataset_0.shard(num_shards, index=0)
  dataset_0 = dataset_0.batch(7)
  dataset_0 = dataset_0.apply(tf.data.experimental.rebatch(batch_sizes_0))
  for elem in dataset_0:
    print(elem)
  >> [[A0, A1], [A2, A3], [A4, A5], [A6]]

  # WORKER 1
  batch_sizes_1 = batch_sizes_for_worker(global_batch_size=global_batch_size,
                                         num_workers=2,
                                         num_replicas_per_worker=2,
                                         worker_index=1)
  print(batch_sizes_1)
  >> [2, 1, 2, 2]

  dataset_1 = tf.data.Dataset.from_tensor_slices(["file_a", "file_b"])
  dataset_1 = dataset_1.shard(num_shards, index=1)
  dataset_1 = dataset_1.batch(7)
  dataset_1 = dataset_1.apply(tf.data.experimental.rebatch(batch_sizes_1))
  for elem in dataset_1:
    print(elem)
  >> [[B0, B1], [B2], [B3, B4], [B5, B6]]
  ```

  The above example will produce the following elements:

  Step 1:
    Worker 0 Replica 0: [A0, A1]
    Worker 0 Replica 1: [A2, A3]
    Worker 1 Replica 0: [B0, B1]
    Worker 1 Replica 1: [B2]
  Total batch size = 7

  Step 2:
    Worker 0 Replica 0: [A4, A5]
    Worker 0 Replica 1: [A6]
    Worker 1 Replica 0: [B3, B4]
    Worker 1 Replica 1: [B5, B6]
  Total batch size = 7

  Args:
    global_batch_size: A `tf.int64` scalar, representing the global batch size.
    num_workers: An integer representing the number of workers the dataset will
      be distributed across.
    num_replicas_per_worker: An integer representing the number of replicas per
      worker. All workers are assumed to have the same number of replicas.
    worker_index: An integer index of the worker to be rebatched.

  Returns:
    A `tf.int64` vector, representing the batch sizes to rebatch the dataset
    into.
  Nr+   Zbatch_sizes)dtypename)rF   r   )Zaxis)
r	   Zconstant_valuer   Zconvert_to_tensorr   int64r
   ZonesconcatZzeros)
Zglobal_batch_sizer   Znum_replicas_per_workerZworker_indexZnum_subbatchesoffsetZconst_valuefloorZnum_ceilZworker_0r!   r!   r"   batch_sizes_for_worker   s&    U


rL   c                sz   dd fddt t| D  tdd  D rnt fdd D rX d }nd	}tj|tjd
dS t	
| jS )a  An operation that returns the batch size of the dataset.

  This op tries to infer the batch size statically by walking up the dataset
  tree from the final dataset node and returning the batch size of the first
  batching dataset (such as from .batch() and .padded_batch()) that it
  encounters. This differs from using the `element_spec` of a dataset in that it
  does not account for partial batches.

  This operation may fail if it encounters contradictory batch sizes (for
  example, if the dataset is created by zipping together two datasets with
  different batch sizes), if there are no explicit batching transformations, or
  if there are operations downstream from the batching transformation that may
  modify its batch size. In these cases, it returns a -1.

  Args:
    dataset: A `tf.data.Dataset` object.

  Returns:
    A `tf.int64` Tensor representing the batch size of the dataset sans partial
    batches. If this cannot be inferred statically, the value of this tensor
    will be -1.
  c             S   sL   y|   }W n tk
r    d S X t|tjs2d S |jd kr@d S |jd jS )Nr   )r0   NotImplementedErrorr1   r   r2   r3   r6   r,   )r7   r8   r!   r!   r"   get_static_batch_dimn  s    
z0compute_batch_size.<locals>.get_static_batch_dimc                s   g | ]} |qS r!   r!   )r-   r7   )rN   r!   r"   r/   z  s   z&compute_batch_size.<locals>.<listcomp>c             s   s   | ]}|d k	V  qd S )Nr!   )r-   r.   r!   r!   r"   	<genexpr>~  s    z%compute_batch_size.<locals>.<genexpr>c             3   s   | ]}| d  kV  qdS )r   Nr!   )r-   r.   )
batch_dimsr!   r"   rO     s    r   r   Zstatic_batch_size)rF   rG   )r   flattenr   r;   allr   Zconstantr   rH   r   compute_batch_sizer   )rD   Z	batch_dimr!   )rP   rN   r"   rS   V  s    

rS   )N)r&   Ztensorflow.python.data.opsr   Z"tensorflow.python.data.ops.optionsr   Ztensorflow.python.data.utilr   Ztensorflow.python.frameworkr   r   r   r   r	   Ztensorflow.python.opsr
   r   r   Z tensorflow.python.util.tf_exportr   r   Zexport_constantr#   ZUnaryDatasetr   r)   r*   ZDatasetSourcer<   rE   rL   rS   r!   r!   r!   r"   <module>   s,   6
M"|8