a
    d=ic,-                     @   s   d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	m
Z
 e
 ZdZdZdZdZd	ZeejjejjejjejjgZd
ZG dd dZdddZdd ZdddZdd Zdd Zej G dd dej!Z"dS )z-Utilities for working with python gRPC stubs.    N)version)
tb_logging         g?g      ?ztensorboard-versionc                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	AsyncCallFuturea  Encapsulates the future value of a retriable async gRPC request.

    Abstracts over the set of futures returned by a set of gRPC calls
    comprising a single logical gRPC request with retries.  Communicates
    to the caller the result or exception resulting from the request.

    Args:
      completion_event: The constructor should provide a `threding.Event` which
        will be used to communicate when the set of gRPC requests is complete.
    c                 C   s   d | _ t | _|| _d S N)_active_grpc_future	threadingLock_active_grpc_future_lock_completion_event)selfcompletion_event r   k/home/droni/.local/share/virtualenvs/DPS-5Je3_V2c/lib/python3.9/site-packages/tensorboard/util/grpc_util.py__init__D   s    
zAsyncCallFuture.__init__c                 C   s@   |d u rt d| j || _W d    n1 s20    Y  d S )Nz1_set_active_future invoked with grpc_future=None.)RuntimeErrorr   r	   )r   Zgrpc_futurer   r   r   _set_active_futureI   s    z"AsyncCallFuture._set_active_futurec                 C   sd   | j |std| d| j, | jdu r8td| j W  d   S 1 sV0    Y  dS )aU  Analogous to `grpc.Future.result`. Returns the value or exception.

        This method will wait until the full set of gRPC requests is complete
        and then act as `grpc.Future.result` for the single gRPC invocation
        corresponding to the first successful call or final failure, as
        appropriate.

        Args:
          timeout: How long to wait in seconds before giving up and raising.

        Returns:
          The result of the future corresponding to the single gRPC
          corresponding to the successful call.

        Raises:
          * `grpc.FutureTimeoutError` if timeout seconds elapse before the gRPC
          calls could complete, including waits and retries.
          * The exception corresponding to the last non-retryable gRPC request
          in the case that a successful gRPC request was not made.
        z AsyncCallFuture timed out after z secondsNz*AsyncFuture never had an active future set)r   waitgrpcZFutureTimeoutErrorr   r	   r   result)r   timeoutr   r   r   r   Q   s    

zAsyncCallFuture.resultN)__name__
__module____qualname____doc__r   r   r   r   r   r   r   r   8   s   r   c                    sd   du rt td  t t fdd fddtjdd S )	a  Initiate an asynchronous call to a gRPC stub, with retry logic.

    This is similar to the `async_call` API, except that the call is handled
    asynchronously, and the completion may be handled by another thread. The
    caller must provide a `done_callback` argument which will handle the
    result or exception rising from the gRPC completion.

    Retries are handled with jittered exponential backoff to spread out failures
    due to request spikes.

    This only supports unary-unary RPCs: i.e., no streaming on either end.

    Args:
      api_method: Callable for the API method to invoke.
      request: Request protocol buffer to pass to the API method.
      clock: an interface object supporting `time()` and `sleep()` methods
        like the standard `time` module; if not passed, uses the normal module.

    Returns:
      An `AsyncCallFuture` which will encapsulate the `grpc.Future`
      corresponding to the gRPC call which either completes successfully or
      represents the final try.
    Nz"Async RPC call %s with request: %rc                    s*    j tt d}| ||  dS )zDInvokes the gRPC future and orchestrates it via the AsyncCallFuture.r   metadataN)future_GRPC_DEFAULT_TIMEOUT_SECSversion_metadatar   add_done_callback)handlerr   )
api_methodasync_futurerequestr   r   
async_call   s    	
z+async_call_with_retries.<locals>.async_callc                    s   |   }|d u r  d S td | | tvrB  d S |tkrV  d S t|}| t	j
|d d d S )NRPC call %s got error %s   num_attempts)	exceptionsetloggerinfocode_GRPC_RETRYABLE_STATUS_CODES_GRPC_RETRY_MAX_ATTEMPTS_compute_backoff_secondssleep	functoolspartial)r   r+   ebackoff_secs)r$   r'   clockr   retry_handlerr   r   r:      s     
z.async_call_with_retries.<locals>.retry_handlerr)   r*   )timer.   debugr
   Eventr   r5   r6   )r$   r&   r9   r   )r$   r'   r%   r9   r   r&   r:   r   async_call_with_retriesp   s    r>   c                 C   s   t tt}t|  | }|S )z3Compute appropriate wait time between RPC attempts.)randomuniform_GRPC_RETRY_JITTER_FACTOR_MIN_GRPC_RETRY_JITTER_FACTOR_MAX_GRPC_RETRY_EXPONENTIAL_BASE)r+   Zjitter_factorr8   r   r   r   r3      s    r3   c              
   C   s   |du rt }|jjdd}td|| d}|d7 }z| |tt dW S  tj	y } z2t
d|| | tvrv |tkr W Y d}~n
d}~0 0 t|}t
d	||| || q.dS )
a  Call a gRPC stub API method, with automatic retry logic.

    This only supports unary-unary RPCs: i.e., no streaming on either end.
    Streamed RPCs will generally need application-level pagination support,
    because after a gRPC error one must retry the entire request; there is no
    "retry-resume" functionality.

    Retries are handled with jittered exponential backoff to spread out failures
    due to request spikes.

    Args:
      api_method: Callable for the API method to invoke.
      request: Request protocol buffer to pass to the API method.
      clock: an interface object supporting `time()` and `sleep()` methods
        like the standard `time` module; if not passed, uses the normal module.

    Returns:
      Response protocol buffer returned by the API method.

    Raises:
      grpc.RpcError: if a non-retryable error is returned, or if all retry
        attempts have been exhausted.
    NRequest zRPC call %s with request: %rr   r)   r   r(   z8RPC call %s attempted %d times, retrying in %.1f seconds)r;   	__class__r   replacer.   r<   r    r!   r   ZRpcErrorr/   r0   r1   r2   r3   r4   )r$   r&   r9   Zrpc_namer+   r7   r8   r   r   r   call_with_retries   s4    rH   c                   C   s   t tjffS )a  Creates gRPC invocation metadata encoding the TensorBoard version.

    Usage: `stub.MyRpc(request, metadata=version_metadata())`.

    Returns:
      A tuple of key-value pairs (themselves 2-tuples) to be passed as the
      `metadata` kwarg to gRPC stub API methods.
    )_VERSION_METADATA_KEYr   VERSIONr   r   r   r   r!      s    	r!   c                 C   s   t | tS )a  Extracts version from invocation metadata.

    The argument should be the result of a prior call to `metadata` or the
    result of combining such a result with other metadata.

    Returns:
      The TensorBoard version listed in this metadata, or `None` if none
      is listed.
    )dictgetrI   )r   r   r   r   extract_version	  s    
rM   c                   @   s4   e Zd ZdZdZdZdd Zedd Zdd	 Z	d
S )ChannelCredsTypelocalsslZssl_devc                 C   s^   g }| t jkrt }n>| t jkr,t }n*| t jkrJt }|d ntd|  ||fS )aN  Create channel credentials and options.

        Returns:
          A tuple `(channel_creds, channel_options)`, where `channel_creds`
          is a `grpc.ChannelCredentials` and `channel_options` is a
          (potentially empty) list of `(key, value)` tuples. Both results
          may be passed to `grpc.secure_channel`.
        )zgrpc.ssl_target_name_override	localhostzunhandled ChannelCredsType: %r)	rN   LOCALr   Zlocal_channel_credentialsSSLZssl_channel_credentialsSSL_DEVappendAssertionError)r   optionscredsr   r   r   channel_config  s    





zChannelCredsType.channel_configc                 C   s
   | j  S r   )__members__values)clsr   r   r   choices4  s    zChannelCredsType.choicesc                 C   s   | j S r   )value)r   r   r   r   __str__8  s    zChannelCredsType.__str__N)
r   r   r   rR   rS   rT   rY   classmethodr]   r_   r   r   r   r   rN     s   
rN   )N)N)#r   enumr5   r?   r
   r;   r   Ztensorboardr   Ztensorboard.utilr   Z
get_loggerr.   r    r2   rC   rA   rB   	frozensetZ
StatusCodeZABORTEDZDEADLINE_EXCEEDEDZRESOURCE_EXHAUSTEDUNAVAILABLEr1   rI   r   r>   r3   rH   r!   rM   uniqueEnumrN   r   r   r   r   <module>   s<   
8
K
7