a
    [=ic                     @   s  d Z ddlmZmZmZmZ ddlmZ ddlm	Z	 ddlm
Z
 ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ee	jdddZee	jdddZeeedZdeee	jf eeeeege	jf f  ee ee ee eeeef  ee eeeef  ee	jd
ddZdS )z7Library of Cloud TPU helper functions for data loading.    )CallableOptionalTextUnion)interleave_ops)dataset_ops)iterator_ops)readers)dtypes)function)ops)functional_ops)filenamereturnc                 C   s   d}t j| |d}|S Ni   buffer_size)r	   ZTextLineDatasetr   r   Zdataset r   o/home/droni/.local/share/virtualenvs/DPS-5Je3_V2c/lib/python3.9/site-packages/tensorflow/python/tpu/datasets.py_TextLineDataset   s    r   c                 C   s   d}t j| |d}|S r   )r	   ZTFRecordDatasetr   r   r   r   _TFRecordDataset#   s    r   )tfrecordZtextlinetextNT)
filesfiletypefile_reader_job
worker_job
num_epochsfilename_shuffle_buffer_sizenum_parallel_readsbatch_transfer_sizesloppyr   c	                    s  |du rd}t |trD|tvr:td| dtt  t| }	n(t|rR|}	ntd| dt| dprd|pzd	}|du rd
}|pd}|du rd}dkrd}
nd }
t	|
 t | trt
j| n t | t
jr| ntd|  |rj|dtj|	||d||r8|dt
}| W d   n1 sj0    Y  ttjfdd  fdd}t	d| R t
jd j||rdndd}|d}|r| d}W d   n1 s0    Y  |S )a	  StreamingFilesDataset constructs a dataset to stream from workers (GCE VM).

  Because Cloud TPUs are allocated over the network, a Cloud TPU cannot read
  files local to your GCE VM. In order to train using files stored on your local
  VM (e.g. on local SSD for extreme performance), use the StreamingFilesDataset
  helper to generate a dataset to feed your Cloud TPU with files from your GCE
  VM.

  The resulting dataset may return an OutOfRangeError if there are no files
  found as a result of the fileglob expansion.

  Note: StreamingFilesDataset assumes that the session is using a
  TPUClusterResolver and has therefore a worker and a coordinator job. File
  loading will be done on the coordinator job.

  Args:
    files: A string glob to match files, or a `tf.data.Dataset` generating file
      names.
    filetype: A string (one of 'tfrecord', or 'textline') or a single-argument
      TensorFlow function that when given a filename returns a dataset.
    file_reader_job: An optional string that corresponds to the job that should
      perform the file reads.
    worker_job: An optional string that corresponds to the job that should
      process the tensors (i.e. your GPU or TPU worker).
    num_epochs: The number of epochs through the training set that should be
      generated. By default, it will repeat infinitely.
    filename_shuffle_buffer_size: An optional integer whose value controls the
      shuffling of the file names. If you would like to read from the files in
      the same order, set to 0 or False.
    num_parallel_reads: An optional integer controlling the number of files to
      read from concurrently. (Set to 1 for no parallelism.)
    batch_transfer_size: An optional integer controlling the batching used to
      amortize the remote function invocation overhead. Set to a very large
      number to increase throughput. Set to a very small number to reduce memory
      consumption. Set to False to skip batching.
    sloppy: (Optional.) If `False`, read input data while maintaining a
      deterministic order. (This may have significant performance impacts.)
      sloppy defaults to: True.
  Returns:
    A `tf.data.Dataset` with an infinite stream of elements generated by a
    parallel interleaving of the set of files matched (or generated) by `files`
    with a type is the output of the dataset specified by `filetype`.

  Raises:
    ValueError: if any argument is not of the expected type.
  Nr   zUnexpected filetype. Received: z. Expected one of z@Argument `filetype` should be a string or a callable. Received: z	 of type .ZcoordinatorZworkeri         z/job:coordinator/task:0z/job:%szOArgument `files` should be a string or a `tf.data.Dataset` instance. Received: r   )Zcycle_lengthr"      c                    s$   t j| t t }| S )N)r   IteratorZfrom_string_handler   get_legacy_output_typesZget_legacy_output_shapesZget_next)hZremote_iterator)source_datasetr   r   LoadingFunc   s
    
z*StreamingFilesDataset.<locals>.LoadingFuncc                    sn   t }t|tjr|g}nt|ttfr2|}ntdtj	g| d d}t
|dkrf|d S |S d S )NzbSource dataset has invalid output types. Only list/tuples or TensorFlow tensor types are accepted.z/job:%s/replica:0/task:0/cpu:0)argsZToutftargetr&   r   )r   r(   
isinstancer
   ZDTypelisttuple
ValueErrorr   Zremote_calllen)Zunused_inputZsource_dataset_output_typesZoutput_typesZremote_callsr+   r   r*   Zsource_handler   r   MapFn   s"    z$StreamingFilesDataset.<locals>.MapFn      )Znum_parallel_calls)r/   str_FILETYPE_MAPr2   r0   keyscallabletyper   Zdevicer   DatasetZ
list_filesZ	DatasetV2shuffleapplyr   Zparallel_interleaverepeatbatchZprefetchZmake_one_shot_iteratorZstring_handler   ZDefunr
   stringrangemapZunbatch)r   r   r   r   r   r   r    r!   r"   Z	reader_fnZfile_reader_deviceZsource_iteratorr5   Zoutput_datasetr   r4   r   StreamingFilesDataset0   s~    9








(

.rE   )NNNNNNNT)__doc__typingr   r   r   r   Z'tensorflow.python.data.experimental.opsr   Ztensorflow.python.data.opsr   r   r	   Ztensorflow.python.frameworkr
   r   r   Ztensorflow.python.opsr   r=   r   r   r9   intboolrE   r   r   r   r   <module>   sH   
        
