B
    ٻdC                 @   s   d Z ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm	Z	 ddlm
Z
 dd	lmZ dd
lmZ ddlmZ eddd ZeddeddddZeddeddddZG dd dejZedG dd dZdS ) z!Grouping dataset transformations.    )dataset_ops)structured_function)nest)	structure)dtypes)ops)tensor_spec)gen_experimental_dataset_ops)deprecation)	tf_exportz"data.experimental.group_by_reducerc                s    fdd}|S )a  A transformation that groups elements and performs a reduction.

  This transformation maps element of a dataset to a key using `key_func` and
  groups the elements by key. The `reducer` is used to process each group; its
  `init_func` is used to initialize state for each group when it is created, the
  `reduce_func` is used to update the state every time an element is mapped to
  the matching group, and the `finalize_func` is used to map the final state to
  an output value.

  Args:
    key_func: A function mapping a nested structure of tensors
      (having shapes and types defined by `self.output_shapes` and
      `self.output_types`) to a scalar `tf.int64` tensor.
    reducer: An instance of `Reducer`, which captures the reduction logic using
      the `init_func`, `reduce_func`, and `finalize_func` functions.

  Returns:
    A `Dataset` transformation function, which can be passed to
    `tf.data.Dataset.apply`.
  c                s   t |  S )zEFunction from `Dataset` to `Dataset` that applies the transformation.)_GroupByReducerDataset)dataset)key_funcreducer b/var/www/html/venv/lib/python3.7/site-packages/tensorflow/python/data/experimental/ops/grouping.py	_apply_fn3   s    z#group_by_reducer.<locals>._apply_fnr   )r   r   r   r   )r   r   r   group_by_reducer   s    r   Nz+Use `tf.data.Dataset.group_by_window(...)`.z!data.experimental.group_by_windowc                s    fdd}|S )aY  A transformation that groups windows of elements by key and reduces them.

  This transformation maps each consecutive element in a dataset to a key
  using `key_func` and groups the elements by key. It then applies
  `reduce_func` to at most `window_size_func(key)` elements matching the same
  key. All except the final window for each key will contain
  `window_size_func(key)` elements; the final window may be smaller.

  You may provide either a constant `window_size` or a window size determined by
  the key through `window_size_func`.

  Args:
    key_func: A function mapping a nested structure of tensors
      (having shapes and types defined by `self.output_shapes` and
      `self.output_types`) to a scalar `tf.int64` tensor.
    reduce_func: A function mapping a key and a dataset of up to `window_size`
      consecutive elements matching that key to another dataset.
    window_size: A `tf.int64` scalar `tf.Tensor`, representing the number of
      consecutive elements matching the same key to combine in a single
      batch, which will be passed to `reduce_func`. Mutually exclusive with
      `window_size_func`.
    window_size_func: A function mapping a key to a `tf.int64` scalar
      `tf.Tensor`, representing the number of consecutive elements matching
      the same key to combine in a single batch, which will be passed to
      `reduce_func`. Mutually exclusive with `window_size`.

  Returns:
    A `Dataset` transformation function, which can be passed to
    `tf.data.Dataset.apply`.

  Raises:
    ValueError: if neither or both of {`window_size`, `window_size_func`} are
      passed.
  c                s   | j  dS )zEFunction from `Dataset` to `Dataset` that applies the transformation.)r   reduce_funcwindow_sizewindow_size_func)group_by_window)r   )r   r   r   r   r   r   r   c   s
    z"group_by_window.<locals>._apply_fnr   )r   r   r   r   r   r   )r   r   r   r   r   r   :   s    )r   z5Use `tf.data.Dataset.bucket_by_sequence_length(...)`.z+data.experimental.bucket_by_sequence_lengthFc       	         s    fdd}|S )aK  A transformation that buckets elements in a `Dataset` by length.

  Elements of the `Dataset` are grouped together by length and then are padded
  and batched.

  This is useful for sequence tasks in which the elements have variable length.
  Grouping together elements that have similar lengths reduces the total
  fraction of padding in a batch which increases training step efficiency.

  Below is an example to bucketize the input data to the 3 buckets
  "[0, 3), [3, 5), [5, inf)" based on sequence length, with batch size 2.

  >>> elements = [
  ...   [0], [1, 2, 3, 4], [5, 6, 7],
  ...   [7, 8, 9, 10, 11], [13, 14, 15, 16, 19, 20], [21, 22]]

  >>> dataset = tf.data.Dataset.from_generator(
  ...     lambda: elements, tf.int64, output_shapes=[None])

  >>> dataset = dataset.apply(
  ...     tf.data.experimental.bucket_by_sequence_length(
  ...         element_length_func=lambda elem: tf.shape(elem)[0],
  ...         bucket_boundaries=[3, 5],
  ...         bucket_batch_sizes=[2, 2, 2]))

  >>> for elem in dataset.as_numpy_iterator():
  ...   print(elem)
  [[1 2 3 4]
   [5 6 7 0]]
  [[ 7  8  9 10 11  0]
   [13 14 15 16 19 20]]
  [[ 0  0]
   [21 22]]

  There is also a possibility to pad the dataset till the bucket boundary.
  You can also provide which value to be used while padding the data.
  Below example uses `-1` as padding and it also shows the input data
  being bucketizied to two buckets "[0,3], [4,6]".

  >>> elements = [
  ...   [0], [1, 2, 3, 4], [5, 6, 7],
  ...   [7, 8, 9, 10, 11], [13, 14, 15, 16, 19, 20], [21, 22]]

  >>> dataset = tf.data.Dataset.from_generator(
  ...   lambda: elements, tf.int32, output_shapes=[None])

  >>> dataset = dataset.apply(
  ...     tf.data.experimental.bucket_by_sequence_length(
  ...         element_length_func=lambda elem: tf.shape(elem)[0],
  ...         bucket_boundaries=[4, 7],
  ...         bucket_batch_sizes=[2, 2, 2],
  ...         pad_to_bucket_boundary=True,
  ...         padding_values=-1))

  >>> for elem in dataset.as_numpy_iterator():
  ...   print(elem)
  [[ 0 -1 -1]
   [ 5  6  7]]
  [[ 1  2  3  4 -1 -1]
   [ 7  8  9 10 11 -1]]
  [[21 22 -1]]
  [[13 14 15 16 19 20]]

  When using `pad_to_bucket_boundary` option, it can be seen that it is
  not always possible to maintain the bucket batch size.
  You can drop the batches that do not maintain the bucket batch size by
  using the option `drop_remainder`. Using the same input data as in the
  above example you get the following result.

  >>> elements = [
  ...   [0], [1, 2, 3, 4], [5, 6, 7],
  ...   [7, 8, 9, 10, 11], [13, 14, 15, 16, 19, 20], [21, 22]]

  >>> dataset = tf.data.Dataset.from_generator(
  ...   lambda: elements, tf.int32, output_shapes=[None])

  >>> dataset = dataset.apply(
  ...     tf.data.experimental.bucket_by_sequence_length(
  ...         element_length_func=lambda elem: tf.shape(elem)[0],
  ...         bucket_boundaries=[4, 7],
  ...         bucket_batch_sizes=[2, 2, 2],
  ...         pad_to_bucket_boundary=True,
  ...         padding_values=-1,
  ...         drop_remainder=True))

  >>> for elem in dataset.as_numpy_iterator():
  ...   print(elem)
  [[ 0 -1 -1]
   [ 5  6  7]]
  [[ 1  2  3  4 -1 -1]
   [ 7  8  9 10 11 -1]]

  Args:
    element_length_func: function from element in `Dataset` to `tf.int32`,
      determines the length of the element, which will determine the bucket it
      goes into.
    bucket_boundaries: `list<int>`, upper length boundaries of the buckets.
    bucket_batch_sizes: `list<int>`, batch size per bucket. Length should be
      `len(bucket_boundaries) + 1`.
    padded_shapes: Nested structure of `tf.TensorShape` to pass to
      `tf.data.Dataset.padded_batch`. If not provided, will use
      `dataset.output_shapes`, which will result in variable length dimensions
      being padded out to the maximum length in each batch.
    padding_values: Values to pad with, passed to
      `tf.data.Dataset.padded_batch`. Defaults to padding with 0.
    pad_to_bucket_boundary: bool, if `False`, will pad dimensions with unknown
      size to maximum length in batch. If `True`, will pad dimensions with
      unknown size to bucket boundary minus 1 (i.e., the maximum length in each
      bucket), and caller must ensure that the source `Dataset` does not contain
      any elements with length longer than `max(bucket_boundaries)`.
    no_padding: `bool`, indicates whether to pad the batch features (features
      need to be either of type `tf.sparse.SparseTensor` or of same shape).
    drop_remainder: (Optional.) A `tf.bool` scalar `tf.Tensor`, representing
      whether the last batch should be dropped in the case it has fewer than
      `batch_size` elements; the default behavior is not to drop the smaller
      batch.

  Returns:
    A `Dataset` transformation function, which can be passed to
    `tf.data.Dataset.apply`.

  Raises:
    ValueError: if `len(bucket_batch_sizes) != len(bucket_boundaries) + 1`.
  c          
      s   | j  dS )N)element_length_funcbucket_boundariesbucket_batch_sizespadded_shapespadding_valuespad_to_bucket_boundary
no_paddingdrop_remainder)bucket_by_sequence_length)r   )r   r   r   r   r   r   r   r   r   r   r      s    z,bucket_by_sequence_length.<locals>._apply_fnr   )	r   r   r   r   r   r   r   r   r   r   )r   r   r   r   r   r   r   r   r   r    n   s     	r    c                   s\   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Ze	dd Z
dd Zdd Z  ZS )r   z;A `Dataset` that groups its input and performs a reduction.c                s   || _ | || | |j | |j| | |j tj	| j j
| jjj| jjj| jjj| jjjf| jj| jj| jj| jjd| j}tt| || dS )z%See `group_by_reducer()` for details.)r   	init_funcr   finalize_funcN)Z_input_dataset_make_key_func_make_init_funcr!   _make_reduce_funcr   _make_finalize_funcr"   ged_opsZ%experimental_group_by_reducer_datasetZ_variant_tensor	_key_funcfunctionZcaptured_inputs
_init_func_reduce_func_finalize_funcZ_flat_structuresuperr   __init__)selfinput_datasetr   r   Zvariant_tensor)	__class__r   r   r.     s"    


z_GroupByReducerDataset.__init__c             C   sP   t j||  |d| _| jjtg tj	sLt
d| jj d| jj ddS )z!Make wrapping defun for key_func.)r   ztInvalid `key_func`. Expected `key_func` to return a scalar tf.int64 tensor, but instead `key_func` has output types=z and shapes=.N)r   StructuredFunctionWrapper_transformation_namer(   output_structureZis_compatible_withr   
TensorSpecr   int64
ValueErroroutput_typesoutput_shapes)r/   r   r0   r   r   r   r#     s    z%_GroupByReducerDataset._make_key_funcc             C   s$   t j||  tg tjd| _dS )z"Make wrapping defun for init_func.)input_structureN)r   r3   r4   r   r6   r   r7   r*   )r/   r!   r   r   r   r$   (  s    z&_GroupByReducerDataset._make_init_funcc             C   s  | j j| _| j j}| j j}| j j}d}xX|rtj||  | j|j	fdd}xHt
t|jt|D ],\}}	t||	shtd|j d| j dqhW xHt
t|jt|D ],\}
}|
|krtd|j d| j j dqW t|}t|j}d	d
 t
||D }d}xHt
||D ]:\}}|jdk	r|jdksN| | krd}P qW |r*t| j j|}t|||| _q*W || _| jjt  dS )z$Make wrapping defun for reduce_func.TF)r;   add_to_graphzAInvalid `reducer`. The output class of the `reducer.reduce_func` z/, does not match the class of the reduce state r2   z7Invalid `reducer`. The element types for the new state z1 do not match the element types of the old state c             S   s   g | ]\}}| |qS r   )Zmost_specific_compatible_shape).0originalnewr   r   r   
<listcomp>Z  s   z<_GroupByReducerDataset._make_reduce_func.<locals>.<listcomp>N)r*   r5   _state_structurer9   r:   Zoutput_classesr   r3   r4   element_speczipr   flatten
issubclass	TypeErrorZ_state_classesZndimsas_listZpack_sequence_asr   Zconvert_legacy_structurer+   r)   r<   r   Zget_default_graph)r/   r   r0   Zstate_typesZstate_shapesZstate_classesZneed_to_rerunZwrapped_funcZnew_state_classZstate_classZnew_state_typeZ
state_typeZflat_state_shapesZflat_new_state_shapesZweakened_state_shapesZoriginal_shapeZweakened_shaper   r   r   r%   /  sR    





z(_GroupByReducerDataset._make_reduce_funcc             C   s   t j||  | jd| _dS )z&Make wrapping defun for finalize_func.)r;   N)r   r3   r4   rA   r,   )r/   r"   r   r   r   r&   p  s    z*_GroupByReducerDataset._make_finalize_funcc             C   s   | j jS )N)r,   r5   )r/   r   r   r   rB   w  s    z#_GroupByReducerDataset.element_specc             C   s   | j | j| j| jgS )N)r(   r*   r+   r,   )r/   r   r   r   
_functions{  s    z!_GroupByReducerDataset._functionsc             C   s   dS )Nz'tf.data.experimental.group_by_reducer()r   )r/   r   r   r   r4     s    z+_GroupByReducerDataset._transformation_name)__name__
__module____qualname____doc__r.   r#   r$   r%   r&   propertyrB   rH   r4   __classcell__r   r   )r1   r   r     s   Ar   zdata.experimental.Reducerc               @   s<   e Zd ZdZdd Zedd Zedd Zedd	 Zd
S )Reducerau  A reducer is used for reducing a set of elements.

  A reducer is represented as a tuple of the three functions:
  - init_func - to define initial value: key => initial state
  - reducer_func - operation to perform on values with same key: (old state, input) => new state
  - finalize_func - value to return in the end: state => result
  
  For example,
  
  ```
  def init_func(_):
    return (0.0, 0.0)

  def reduce_func(state, value):
    return (state[0] + value['features'], state[1] + 1)

  def finalize_func(s, n):
    return s / n

  reducer = tf.data.experimental.Reducer(init_func, reduce_func, finalize_func)
  ```
  c             C   s   || _ || _|| _d S )N)r*   r+   r,   )r/   r!   r   r"   r   r   r   r.     s    zReducer.__init__c             C   s   | j S )N)r*   )r/   r   r   r   r!     s    zReducer.init_funcc             C   s   | j S )N)r+   )r/   r   r   r   r     s    zReducer.reduce_funcc             C   s   | j S )N)r,   )r/   r   r   r   r"     s    zReducer.finalize_funcN)	rI   rJ   rK   rL   r.   rM   r!   r   r"   r   r   r   r   rO     s
   rO   )NN)NNFFF)rL   Ztensorflow.python.data.opsr   r   Ztensorflow.python.data.utilr   r   Ztensorflow.python.frameworkr   r   r   Ztensorflow.python.opsr	   r'   Ztensorflow.python.utilr
   Z tensorflow.python.util.tf_exportr   r   
deprecatedr   r    ZUnaryDatasetr   rO   r   r   r   r   <module>   s6   
 /      