a
    d=icw                     @   s  d Z ddlZddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddlm
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ dZdZdZ e! Z"G dd de#Z$d8ddZ%dd Z&G dd de'Z(G dd de'Z)G d d! d!e'Z*G d"d# d#e+Z,G d$d% d%e#Z-G d&d' d'e#Z.G d(d) d)e#Z/G d*d+ d+e#Z0G d,d- d-e#Z1ej2d9d.d/Z3d0d1 Z4d2d3 Z5d4d5 Z6d6d7 Z7dS ):0Uploads a TensorBoard logdir to TensorBoard.dev.    N)message)	graph_pb2)summary_pb2)	types_pb2)write_service_pb2)logdir_loader)upload_tracker)util)process_graph)directory_loader)event_file_loader)
io_wrapper)metadata)	grpc_util)
tb_logging)tensor_util   i  
   c                   @   sR   e Zd ZdZdddZedddZeeddd	Z	d
d Z
dd Zdd ZdS )TensorBoardUploaderr   Nc                 C   s(  || _ || _t|| _|| _|	| _|
| _|du r4dn|| _|du rFdn|| _d| _	d| _
|du rntt| _n|| _|du rt| jjd | _n|| _|du rt| jjd | _n|| _|du rt| jjd | _n|| _dd }tjtjtjtj|d}t| j|| _t j!| j| jd| _"dS )	a  Constructs a TensorBoardUploader.

        Args:
          writer_client: a TensorBoardWriterService stub instance
          logdir: path of the log directory to upload
          allowed_plugins: collection of string plugin names; events will only
            be uploaded if their time series's metadata specifies one of these
            plugin names
          upload_limits: instance of tensorboard.service.UploadLimits proto.
          logdir_poll_rate_limiter: a `RateLimiter` to use to limit logdir
            polling frequency, to avoid thrashing disks, especially on networked
            file systems
          rpc_rate_limiter: a `RateLimiter` to use to limit write RPC frequency.
            Note this limit applies at the level of single RPCs in the Scalar
            and Tensor case, but at the level of an entire blob upload in the
            Blob case-- which may require a few preparatory RPCs and a stream
            of chunks.  Note the chunk stream is internally rate-limited by
            backpressure from the server, so it is not a concern that we do not
            explicitly rate-limit within the stream here.
          name: String name to assign to the experiment.
          description: String description to assign to the experiment.
          verbosity: Level of verbosity, an integer. Supported value:
              0 - No upload statistics is printed.
              1 - Print upload statistics while uploading data (default).
         one_shot: Once uploading starts, upload only the existing data in
            the logdir and then return immediately, instead of the default
            behavior of continuing to listen for new data in the logdir and
            upload them when it appears.
        N   Fi  c                 S   s   | t  t kS N)_EVENT_FILE_INACTIVE_SECStime)Zsecs r   n/home/droni/.local/share/virtualenvs/DPS-5Je3_V2c/lib/python3.9/site-packages/tensorboard/uploader/uploader.py<lambda>       z.TensorBoardUploader.__init__.<locals>.<lambda>)Zloader_factoryZpath_filteractive_filter)	verbosityone_shot)#_apiZ_logdir	frozenset_allowed_plugins_upload_limits_name_descriptionZ
_verbosity	_one_shot_request_sender_experiment_idr
   RateLimiter_MIN_LOGDIR_POLL_INTERVAL_SECS_logdir_poll_rate_limiterZmin_scalar_request_interval_rpc_rate_limiterZmin_tensor_request_interval_tensor_rpc_rate_limiterZmin_blob_request_interval_blob_rpc_rate_limiter	functoolspartialr   ZDirectoryLoaderr   ZTimestampedEventFileLoaderr   ZIsTensorFlowEventsFiler   ZLogdirLoader_logdir_loaderr	   ZUploadTracker_tracker)selfwriter_clientZlogdirallowed_pluginsupload_limitsZlogdir_poll_rate_limiterrpc_rate_limitertensor_rpc_rate_limiterblob_rpc_rate_limiternamedescriptionr   r    r   Zdirectory_loader_factoryr   r   r   __init__=   sX    ,



zTensorBoardUploader.__init__)returnc                 C   s
   | j  S )z%Returns this object's upload tracker.)r3   has_datar4   r   r   r   r?      s    zTensorBoardUploader.has_datac                 C   s   | j S )zReturns the experiment_id associated with this uploader.

        May be none if no experiment is set, for instance, if
        `create_experiment` has not been called.
        )r)   r@   r   r   r   experiment_id   s    z!TensorBoardUploader.experiment_idc              
   C   sd   t d tj| j| jd}t| jj	|}t
|j| j| j| j| j| j| j| jd| _|j| _|jS )zACreates an Experiment for this upload session and returns the ID.zCreating experiment)r;   r<   )r6   r7   r8   r9   r:   tracker)loggerinfor   ZCreateExperimentRequestr%   r&   r   call_with_retriesr!   ZCreateExperiment_BatchedRequestSenderrA   r#   r$   r-   r.   r/   r3   r(   r)   )r4   requestresponser   r   r   create_experiment   s&    

z%TensorBoardUploader.create_experimentc                 C   s2   | j du rtd| j  |   | jrq.qdS )a  Uploads data from the logdir.

        This will continuously scan the logdir, uploading as data is added
        unless the uploader was built with the _one_shot option, in which
        case it will terminate after the first scan.

        Raises:
          RuntimeError: If `create_experiment` has not yet been called.
          ExperimentNotFoundError: If the experiment is deleted during the
            course of the upload.
        Nz6Must call create_experiment() before start_uploading())r(   RuntimeErrorr,   tick_upload_oncer'   r@   r   r   r   start_uploading   s    

z#TensorBoardUploader.start_uploadingc                 C   sx   t d t }| j  t | }t d| | j }| j  | j	| W d   n1 sj0    Y  dS )z1Runs one upload cycle, sending zero or more RPCs.zStarting an upload cyclezLogdir sync took %.3f secondsN)
rC   rD   r   r2   Zsynchronize_runsZget_run_eventsr3   Zsend_trackerr(   send_requests)r4   Zsync_start_timeZsync_duration_secsrun_to_eventsr   r   r   rL      s    


z TensorBoardUploader._upload_once)NNNNNNNN)__name__
__module____qualname____doc__r=   boolr?   propertystrrA   rI   rM   rL   r   r   r   r   r   :   s            
cr   c              
   C   s   t d| t }||j_|durBt d|| ||j_d|j_|durht d|| ||j_d|j_zt	
| j| W np tjy } zV| tjjkrt | tjjkrt | tjjkrt|  W Y d}~n
d}~0 0 dS )a  Modifies user data associated with an experiment.

    Args:
      writer_client: a TensorBoardWriterService stub instance
      experiment_id: string ID of the experiment to modify
      name: If provided, modifies name of experiment to this value.
      description: If provided, modifies the description of the experiment to
         this value

    Raises:
      ExperimentNotFoundError: If no such experiment exists.
      PermissionDeniedError: If the user is not authorized to modify this
        experiment.
      InvalidArgumentError: If the server rejected the name or description, if,
        for instance, the size limits have changed on the server.
    zModifying experiment %rNzSetting exp %r name to %rTz Setting exp %r description to %r)rC   rD   r   ZUpdateExperimentRequestZ
experimentrA   r;   Zexperiment_maskr<   r   rE   ZUpdateExperimentgrpcRpcErrorcode
StatusCode	NOT_FOUNDExperimentNotFoundErrorPERMISSION_DENIEDPermissionDeniedErrorZINVALID_ARGUMENTInvalidArgumentErrordetails)r5   rA   r;   r<   rG   er   r   r   update_experiment_metadata   s.    rb   c              
   C   s   t d| t }||_zt| j| W nT tj	y } z:|
 tjjkrTt |
 tjjkrjt  W Y d}~n
d}~0 0 dS )a  Permanently deletes an experiment and all of its contents.

    Args:
      writer_client: a TensorBoardWriterService stub instance
      experiment_id: string ID of the experiment to delete

    Raises:
      ExperimentNotFoundError: If no such experiment exists.
      PermissionDeniedError: If the user is not authorized to delete this
        experiment.
      RuntimeError: On unexpected failure.
    zDeleting experiment %rN)rC   rD   r   ZDeleteExperimentRequestrA   r   rE   ZDeleteExperimentrW   rX   rY   rZ   r[   r\   r]   r^   )r5   rA   rG   ra   r   r   r   delete_experiment  s    rc   c                   @   s   e Zd ZdS )r_   NrP   rQ   rR   r   r   r   r   r_   -  s   r_   c                   @   s   e Zd ZdS )r\   Nrd   r   r   r   r   r\   1  s   r\   c                   @   s   e Zd ZdS )r^   Nrd   r   r   r   r   r^   5  s   r^   c                   @   s   e Zd ZdZdS )_OutOfSpaceErrorzAction could not proceed without overflowing request budget.

    This is a signaling exception (like `StopIteration`) used internally
    by `_*RequestSender`; it does not mean that anything has gone wrong.
    N)rP   rQ   rR   rS   r   r   r   r   re   9  s   re   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	rF   a  Helper class for building requests that fit under a size limit.

    This class maintains stateful request builders for each of the possible
    request types (scalars, tensors, and blobs).  These accumulate batches
    independently, each maintaining its own byte budget and emitting a request
    when the batch becomes full.  As a consequence, events of different types
    will likely be sent to the backend out of order.  E.g., in the extreme case,
    a single tensor-flavored request may be sent only when the event stream is
    exhausted, even though many more recent scalar events were sent earlier.

    This class is not threadsafe. Use external synchronization if
    calling its methods concurrently.
    c	           	      C   sp   i | _ t|| _|| _t||||j| jd| _t||||j|j	| jd| _
t||||j|j| jd| _|| _d S )N)rB   )_tag_metadatar"   r#   r3   _ScalarBatchedRequestSenderZmax_scalar_request_size_scalar_request_sender_TensorBatchedRequestSenderZmax_tensor_request_sizemax_tensor_point_size_tensor_request_sender_BlobRequestSendermax_blob_request_sizemax_blob_size_blob_request_sender)	r4   rA   apir6   r7   r8   r9   r:   rB   r   r   r   r=   R  s6    
z_BatchedRequestSender.__init__c           	      C   s&  |  |D ]\}}}||jf}| j|}d}|du rJd}|j}|| j|< |jj}|dr||jjjkrt	d||jj|jjj q
|| j
vr|r
td|| q
|jtjkr| j|||| q
|jtjkr| j|||| q
|jtjkr
| j|||| q
| j  | j  | j  dS )a  Accepts a stream of TF events and sends batched write RPCs.

        Each sent request will be batched, the size of each batch depending on
        the type of data (Scalar vs Tensor vs Blob) being sent.

        Args:
          run_to_events: Mapping from run name to generator of `tf.Event`
            values, as returned by `LogdirLoader.get_run_events`.

        Raises:
          RuntimeError: If no progress can be made because even a single
          point is too large (say, due to a gigabyte-long tag name).
        FNTr   z8Mismatching plugin names for %s.  Expected %s, found %s.z7Skipping time series %r with unsupported plugin name %r)_run_valuestagrf   getr   plugin_dataplugin_nameZHasFieldrC   warningr#   rD   Z
data_classr   ZDATA_CLASS_SCALARrh   	add_eventZDATA_CLASS_TENSORrk   ZDATA_CLASS_BLOB_SEQUENCEro   flush)	r4   rO   run_nameeventvalueZtime_series_keyr   Zfirst_in_time_seriesru   r   r   r   rN   {  sT    





z#_BatchedRequestSender.send_requestsc                 c   sB   |  D ]4\}}|D ]&}t| |jjD ]}|||fV  q(qqdS )a  Helper generator to create a single stream of work items.

        Note that `dataclass_compat` may emit multiple variants of
        the same event, for backwards compatibility.  Thus this stream should
        be filtered to obtain the desired version of each event.  Here, we
        ignore any event that does not have a `summary` field.

        Furthermore, the events emitted here could contain values that do not
        have `metadata.data_class` set; these too should be ignored.  In
        `_send_summary_value(...)` above, we switch on `metadata.data_class`
        and drop any values with an unknown (i.e., absent or unrecognized)
        `data_class`.
        N)items_filter_graph_defssummaryr{   )r4   rO   ry   eventsrz   r{   r   r   r   rq     s
    z!_BatchedRequestSender._run_valuesN)rP   rQ   rR   rS   r=   rN   rq   r   r   r   r   rF   C  s   )CrF   c                   @   sP   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd ZdS )rg   a  Helper class for building requests that fit under a size limit.

    This class accumulates a current request.  `add_event(...)` may or may not
    send the request (and start a new one).  After all `add_event(...)` calls
    are complete, a final call to `flush()` is needed to send the final request.

    This class is not threadsafe. Use external synchronization if calling its
    methods concurrently.
    c                 C   sJ   |d u rt d|| _|| _|| _t|| _|| _i | _i | _| 	  d S Nzexperiment_id cannot be None)

ValueErrorr)   r!   r-   _ByteBudgetManager_byte_budget_managerr3   _runs_tags_new_request)r4   rA   rp   r8   max_request_sizerB   r   r   r   r=     s    
z$_ScalarBatchedRequestSender.__init__c                 C   s@   t  | _| j  | j  d| _| j| j_| j	
| j dS z1Allocates a new request and refreshes the budget.r   N)r   ZWriteScalarRequest_requestr   clearr   _num_valuesr)   rA   r   resetr@   r   r   r   r     s    



z(_ScalarBatchedRequestSender._new_requestc                 C   sd   z|  |||| W nJ ty^   |   z|  |||| W n tyX   tdY n0 Y n0 dS Attempts to add the given event to the current request.

        If the event cannot be added to the current request because the byte
        budget is exhausted, the request is flushed, and the event is added
        to the next request.
        zadd_event failed despite flushN_add_event_internalre   rx   rJ   r4   ry   rz   r{   r   r   r   r   rw     s    z%_ScalarBatchedRequestSender.add_eventc                 C   s   | j |}|d u r(| |}|| j |< | j||jf}|d u rb| ||j|}|| j||jf< | ||| |  jd7  _d S Nr   r   rs   _create_runr   rr   _create_tag_create_pointr   r4   ry   rz   r{   r   	run_proto	tag_protor   r   r   r     s    

z/_ScalarBatchedRequestSender._add_event_internalc                 C   s   | j }t| |jsdS | j  t||j | j| jn zt	
| jj| W nH tjy } z.| tjjkr|t td| W Y d}~n
d}~0 0 W d   n1 s0    Y  W d   n1 s0    Y  |   dS zrSends the active request after removing empty runs and tags.

        Starts a new, empty active request.
        N Upload call failed with error %s)r   _prune_empty_tags_and_runsrunsr-   rK   _request_loggerr3   Zscalars_trackerr   r   rE   r!   ZWriteScalarrW   rX   rY   rZ   r[   r\   rC   errorr   r4   rG   ra   r   r   r   rx     s"    
^z!_ScalarBatchedRequestSender.flushc                 C   s    | j jj|d}| j| |S )aM  Adds a run to the live request, if there's space.

        Args:
          run_name: String name of the run to add.

        Returns:
          The `WriteScalarRequest.Run` that was added to `request.runs`.

        Raises:
          _OutOfSpaceError: If adding the run would exceed the remaining
            request budget.
        r;   r   r   addr   add_runr4   ry   r   r   r   r   r   8  s    z'_ScalarBatchedRequestSender._create_runc                 C   s*   |j j|d}|j| | j| |S )a  Adds a tag for the given value, if there's space.

        Args:
          run_proto: `WriteScalarRequest.Run` proto to which to add a tag.
          tag_name: String name of the tag to add (as `value.tag`).
          metadata: TensorBoard `SummaryMetadata` proto from the first
            occurrence of this time series.

        Returns:
          The `WriteScalarRequest.Tag` that was added to `run_proto.tags`.

        Raises:
          _OutOfSpaceError: If adding the tag would exceed the remaining
            request budget.
        r   tagsr   r   CopyFromr   add_tagr4   r   Ztag_namer   r   r   r   r   r   I  s    z'_ScalarBatchedRequestSender._create_tagc                 C   sh   |j  }|j|_t|j |_t	|j
|j
 z| j| W n tyb   |j    Y n0 dS )a  Adds a scalar point to the given tag, if there's space.

        Args:
          tag_proto: `WriteScalarRequest.Tag` proto to which to add a point.
          event: Enclosing `Event` proto with the step and wall time data.
          value: Scalar `Summary.Value` proto with the actual scalar data.

        Raises:
          _OutOfSpaceError: If adding the point would exceed the remaining
            request budget.
        N)pointsr   stepr   make_ndarraytensoritemr{   r
   set_timestamp	wall_timer   	add_pointre   pop)r4   r   rz   r{   pointr   r   r   r   ^  s    

z)_ScalarBatchedRequestSender._create_pointN)rP   rQ   rR   rS   r=   r   rw   r   rx   r   r   r   r   r   r   r   rg     s   
	rg   c                   @   sX   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd ZdS )ri   a  Helper class for building WriteTensor() requests that fit under a size limit.

    This class accumulates a current request.  `add_event(...)` may or may not
    send the request (and start a new one).  After all `add_event(...)` calls
    are complete, a final call to `flush()` is needed to send the final request.

    This class is not threadsafe. Use external synchronization if calling its
    methods concurrently.
    c                 C   sP   |d u rt d|| _|| _|| _t|| _|| _|| _i | _i | _	| 
  d S r   )r   r)   r!   r-   r   r   _max_tensor_point_sizer3   r   r   r   )r4   rA   rp   r8   r   rj   rB   r   r   r   r=     s    	
z$_TensorBatchedRequestSender.__init__c                 C   sR   t  | _| j  | j  | j| j_| j	| j d| _
d| _d| _d| _dS r   )r   ZWriteTensorRequestr   r   r   r   r)   rA   r   r   r   _num_values_skipped_tensor_bytes_tensor_bytes_skippedr@   r   r   r   r     s    



z(_TensorBatchedRequestSender._new_requestc                 C   sd   z|  |||| W nJ ty^   |   z|  |||| W n tyX   tdY n0 Y n0 dS r   r   r   r   r   r   rw     s    z%_TensorBatchedRequestSender.add_eventc                 C   s   | j |}|d u r(| |}|| j |< | j||jf}|d u rb| ||j|}|| j||jf< | |||| |  jd7  _d S r   r   r   r   r   r   r     s    

z/_TensorBatchedRequestSender._add_event_internalc                 C   s   | j }t| |jsdS | j  t||j | j| j| j	| j
| jn zt| jj| W nH tjy } z.| tjjkrt td| W Y d}~n
d}~0 0 W d   n1 s0    Y  W d   n1 s0    Y  |   dS r   )r   r   r   r-   rK   r   r3   Ztensors_trackerr   r   r   r   r   rE   r!   ZWriteTensorrW   rX   rY   rZ   r[   r\   rC   r   r   r   r   r   r   rx     s&    
^z!_TensorBatchedRequestSender.flushc                 C   s    | j jj|d}| j| |S )aM  Adds a run to the live request, if there's space.

        Args:
          run_name: String name of the run to add.

        Returns:
          The `WriteTensorRequest.Run` that was added to `request.runs`.

        Raises:
          _OutOfSpaceError: If adding the run would exceed the remaining
            request budget.
        r   r   r   r   r   r   r     s    z'_TensorBatchedRequestSender._create_runc                 C   s*   |j j|d}|j| | j| |S )a  Adds a tag for the given value, if there's space.

        Args:
          run_proto: `WriteTensorRequest.Run` proto to which to add a tag.
          tag_name: String name of the tag to add (as `value.tag`).
          metadata: TensorBoard `SummaryMetadata` proto from the first
            occurrence of this time series.

        Returns:
          The `WriteTensorRequest.Tag` that was added to `run_proto.tags`.

        Raises:
          _OutOfSpaceError: If adding the tag would exceed the remaining
            request budget.
        r   r   r   r   r   r   r     s    z'_TensorBatchedRequestSender._create_tagc                 C   s   |j  }|j|_|j|j t|j|j |  j	|j
 7  _	|j
 | jkrtd||j|j|j
 | j |j   |  jd7  _|  j|j
 7  _dS | |j|j|j|j z| j| W n ty   |j    Y n0 dS )a  Adds a tensor point to the given tag, if there's space.

        Args:
          tag_proto: `WriteTensorRequest.Tag` proto to which to add a point.
          event: Enclosing `Event` proto with the step and wall time data.
          value: Tensor `Summary.Value` proto with the actual tensor data.
          run_name: Name of the wrong, only used for error reporting.

        Raises:
          _OutOfSpaceError: If adding the point would exceed the remaining
            request budget.
        zYTensor (run:%s, tag:%s, step: %d) too large; skipping. Size %d exceeds limit of %d bytes.r   N)r   r   r   r{   r   r   r
   r   r   r   ByteSizer   rC   rv   r;   r   r   r   _validate_tensor_valuerr   r   r   re   )r4   r   rz   r{   ry   r   r   r   r   r     s4    
	

z)_TensorBatchedRequestSender._create_pointc              
   C   sL   zt | W n8 tyF } z td||||f W Y d}~n
d}~0 0 dS )z1Validate a TensorProto by attempting to parse it.zThe uploader failed to upload a tensor. This seems to be due to a malformation in the tensor, which may be caused by a bug in the process that wrote the tensor.

The tensor has tag '%s' and is at step %d and wall_time %.6f.

Original error:
%sN)r   r   r   )r4   Ztensor_protorr   r   r   r   r   r   r   r   2  s    
z2_TensorBatchedRequestSender._validate_tensor_valueN)rP   rQ   rR   rS   r=   r   rw   r   rx   r   r   r   r   r   r   r   r   ri   v  s   
,ri   c                   @   s8   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d ZdS )r   a<  Helper class for managing the request byte budget for certain RPCs.

    This should be used for RPCs that organize data by Runs, Tags, and Points,
    specifically WriteScalar and WriteTensor.

    Any call to add_run(), add_tag(), or add_point() may raise an
    _OutOfSpaceError, which is non-fatal. It signals to the caller that they
    should flush the current request and begin a new one.

    For more information on the protocol buffer encoding and how byte cost
    can be calculated, visit:

    https://developers.google.com/protocol-buffers/docs/encoding
    c                 C   s   d | _ || _d S r   )_byte_budget
_max_bytes)r4   Z	max_bytesr   r   r   r=   P  s    z_ByteBudgetManager.__init__c                 C   s0   | j | _|  j| 8  _| jdk r,tddS )zResets the byte budget and calculates the cost of the base request.

        Args:
          base_request: Base request.

        Raises:
          _OutOfSpaceError: If the size of the request exceeds the entire
            request byte budget.
        r   z&Byte budget too small for base requestN)r   r   r   rJ   )r4   Zbase_requestr   r   r   r   U  s    

z_ByteBudgetManager.resetc                 C   s2   |  t d }|| jkr t |  j|8  _dS )zIntegrates the cost of a run proto into the byte budget.

        Args:
          run_proto: The proto representing a run.

        Raises:
          _OutOfSpaceError: If adding the run would exceed the remaining request
            budget.
        r   Nr   _MAX_VARINT64_LENGTH_BYTESr   re   )r4   r   costr   r   r   r   d  s    
z_ByteBudgetManager.add_runc                 C   s2   |  t d }|| jkr t |  j|8  _dS )zIntegrates the cost of a tag proto into the byte budget.

        Args:
          tag_proto: The proto representing a tag.

        Raises:
          _OutOfSpaceError: If adding the tag would exceed the remaining request
           budget.
        r   Nr   )r4   r   r   r   r   r   r   ~  s    
z_ByteBudgetManager.add_tagc                 C   s:   |  }|t| d }|| jkr(t |  j|8  _dS )a   Integrates the cost of a point proto into the byte budget.

        Args:
          point_proto: The proto representing a point.

        Raises:
          _OutOfSpaceError: If adding the point would exceed the remaining request
           budget.
        r   N)r   _varint_costr   re   )r4   Zpoint_protoZsubmessage_costr   r   r   r   r     s    
	
z_ByteBudgetManager.add_pointN)	rP   rQ   rR   rS   r=   r   r   r   r   r   r   r   r   r   @  s   r   c                   @   sH   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dS )rl   ac  Uploader for blob-type event data.

    Unlike the other types, this class does not accumulate events in batches;
    every blob is sent individually and immediately.  Nonetheless we retain
    the `add_event()`/`flush()` structure for symmetry.

    This class is not threadsafe. Use external synchronization if calling its
    methods concurrently.
    c                 C   sP   |d u rt d|| _|| _|| _|| _|| _|| _d | _d | _d | _	d | _
d S r   )r   r)   r!   r-   _max_blob_request_size_max_blob_sizer3   	_run_name_event_value	_metadata)r4   rA   rp   r8   rm   rn   rB   r   r   r   r=     s    	z_BlobRequestSender.__init__c                 C   s   d| _ d| _d| _d| _dS )z%Declares the previous event complete.N)r   r   r   r   r@   r   r   r   r     s    z_BlobRequestSender._new_requestc                 C   s|   | j rtd|| _|| _|| _ t| j j| _| jjdkrL|| _	| 
  n,td| jj|| j j| jj|jj |   dS )r   z+Tried to send blob while another is pendingr   z~A blob sequence must be represented as a rank-1 Tensor. Provided data has rank %d, for run %s, tag %s, step %s ('%s' plugin) .N)r   rJ   r   r   r   r   r   _blobsndimr   rx   rC   rv   rr   r   rt   ru   r   r   r   r   r   rw     s$    

z_BlobRequestSender.add_eventc              	   C   s   | j r|  }tdt| j| d}t| jD ]b\}}| j  | j	
t|0}|| |||7 }|t| W d   q01 s0    Y  q0td|t| j| |   dS )zNSends the current blob sequence fully, and clears it to make way for the next.z$Sending %d blobs for sequence id: %sr   Nz'Sent %d of %d blobs for sequence id: %s)r   _get_or_create_blob_sequencerC   rD   lenr   	enumerater-   rK   r3   blob_tracker
_send_blobZmark_uploadedrT   r   )r4   blob_sequence_idZ
sent_blobs	seq_indexblobr   r   r   r   rx     s,    
.z_BlobRequestSender.flushc                 C   s   t j| j| j| jj| jjt| j	| j
d}t|j| jj t|v zt| jj|}|j}W nJ tjy } z0| tjjkrt td|  W Y d }~n
d }~0 0 W d    n1 s0    Y  |S )N)rA   runrr   r   Zfinal_sequence_lengthr   r   )r   ZGetOrCreateBlobSequenceRequestr)   r   r   rr   r   r   r   r   r   r
   r   r   r   r   rE   r!   ZGetOrCreateBlobSequencer   rW   rX   rY   rZ   r[   r\   rC   r   )r4   rG   rH   r   ra   r   r   r   r     s*    

6z/_BlobRequestSender._get_or_create_blob_sequencec           
   
   C   s   t || jkr&tdt || j dS | |||}t }d}zN| j|D ]}|d7 }qNt | }td|t ||t || d  W dS  t	j
y }	 zB|	 t	jjkrtd W Y d}	~	dS td|	  W Y d}	~	n
d}	~	0 0 dS )	zTries to send a single blob for a given index within a blob sequence.

        The blob will not be sent if it was sent already, or if it is too large.

        Returns:
          The number of blobs successfully sent (i.e., 1 or 0).
        z=Blob too large; skipping.  Size %d exceeds limit of %d bytes.r   r   zFUpload for %d chunks totaling %d bytes took %.3f seconds (%.3f MB/sec)i   z0Attempted to re-upload existing blob.  Skipping.NzWriteBlob RPC call got error %s)r   r   rC   rv   _write_blob_request_iteratorr   r!   Z	WriteBlobrD   rW   rX   rY   rZ   ZALREADY_EXISTSr   )
r4   r   r   r   Zrequest_iteratorupload_start_timecountrH   upload_duration_secsra   r   r   r   r   9  s>    

z_BlobRequestSender._send_blobc                 c   sd   t dt|| jD ]L}|||| j  }|| j t|k}tj||||d |d t|d}|V  qd S )Nr   )r   indexdataoffsetZcrc32cfinalize_objectZfinal_crc32cZ
blob_bytes)ranger   r   r   ZWriteBlobRequest)r4   r   r   r   r   chunkr   rG   r   r   r   r   h  s    
z/_BlobRequestSender._write_blob_request_iteratorN)rP   rQ   rR   rS   r=   r   rw   rx   r   r   r   r   r   r   r   rl     s   
&/rl   c                 c   sZ   t   }|  }td| d V  t   | }|rHtdt||| ntd|| d S )NzTrying request of %d bytesz/Upload for %d runs (%d bytes) took %.3f secondsz&Upload of (%d bytes) took %.3f seconds)r   r   rC   rD   r   )rG   r   r   Zrequest_bytesr   r   r   r   r   }  s"    r   c                 C   s"   d}| dkr|d7 }| dL } q|S )a'  Computes the size of `n` encoded as an unsigned base-128 varint.

    This should be consistent with the proto wire format:
    <https://developers.google.com/protocol-buffers/docs/encoding#varints>

    Args:
      n: A non-negative integer.

    Returns:
      An integer number of bytes.
    r         r   )nresultr   r   r   r     s
    
r   c                 C   sX   t tt| jD ]@\}}t tt|jD ]\}}|js,|j|= q,|js| j|= qd S r   )reversedlistr   r   r   r   )rG   Zrun_idxr   Ztag_idxrr   r   r   r   r     s    
r   c                 C   s|   | j jD ]n}|jjjtjkrq|jtjkrt	|j
j}dd |D }dd |D }||krtj|tjd}|j
| qd S )Nc                 S   s   g | ]}t |qS r   )_filtered_graph_bytes.0xr   r   r   
<listcomp>  r   z&_filter_graph_defs.<locals>.<listcomp>c                 S   s   g | ]}|d ur|qS r   r   r   r   r   r   r     r   )Zdtype)r~   r{   r   rt   ru   graphs_metadataZPLUGIN_NAMErr   ZRUN_GRAPH_NAMEr   r   Z
string_valr   Zmake_tensor_protor   Z	DT_STRINGr   )rz   vr   Zfiltered_dataZ
new_tensorr   r   r   r}     s    r}   c              	   C   sP   zt  | }W n* tjtfy<   tdt|  Y d S 0 t	
| | S )Nz.Could not parse GraphDef of size %d. Skipping.)r   ZGraphDefZ
FromStringr   DecodeErrorRuntimeWarningrC   rv   r   r   Zprepare_graph_for_uiZSerializeToString)Zgraph_bytesZ	graph_defr   r   r   r     s    
r   )NN)N)8rS   
contextlibr0   r   rW   Zgoogle.protobufr   Ztensorboard.compat.protor   r   r   Ztensorboard.uploader.protor   Ztensorboard.uploaderr   r	   r
   Ztensorboard.backendr   Z$tensorboard.backend.event_processingr   r   r   Ztensorboard.plugins.graphr   r   Ztensorboard.utilr   r   r   r+   r   r   Z
get_loggerrC   objectr   rb   rc   rJ   r_   r\   r^   	Exceptionre   rF   rg   ri   r   rl   contextmanagerr   r   r   r}   r   r   r   r   r   <module>   s`    / 
,
    Kq M	