a
    NSicz                     @   s   d dl Z d dlZd dlmZ d dlmZ d dlmZ	 d dl
mZmZmZ g Zdd Zdd Zdd	d
Zdd Zejeej eedddZeeef eeddddZdS )    N)_get_stream)is_namedtuple)DictAnyListc                    s,    fddz| }W dnd0 |S )z4
    Recursively moves input to the target_gpu.
    c              	      s  t  tjr jtdkr$ fS s4 fS t}tj|  }W d    n1 sh0    Y  tj. tj }|	| |
| W d    n1 s0    Y  |fS t r fddtt  D S t  trt dkrttt  S t  tr$ gS t  tjjrt dkrz fddtt  D W S  ty   dd tt  D  Y S 0 t  tjjr t dkr z" fddtt   D W S  ty   dd tt   D  Y S 0  gS )	Ncudac                    s   g | ]}t  | qS  type).0argsobjr   S/var/www/html/django/DPS/env/lib/python3.9/site-packages/torch/distributed/utils.py
<listcomp>(       z1_recursive_to.<locals>.to_map.<locals>.<listcomp>r   c                    s   g | ]}t  |qS r   r	   r   ir   r   r   r   1   r   c                 S   s   g | ]}t |qS r   )listr   r   r   r   r   4   r   c                    s   g | ]}t  |qS r   r	   r   r   r   r   r   7   r   c                 S   s   g | ]}t |qS r   )dictr   r   r   r   r   :   r   )
isinstancetorchTensordevicetor   r   streamcurrent_streamwait_streamrecord_stream_is_namedtuplezipmaptuplelenr   strcollectionsabcSequence	TypeErrorMappingitems)r   r   outputr   
target_gputo_map!use_side_stream_for_tensor_copiesr   r   r.      s<    (

(""z_recursive_to.<locals>.to_mapNr   )inputsr-   r/   resr   r,   r   _recursive_to   s
    ,
r2   c                 C   s   | rt | ||ng } |r$t |||ng }t| t|k r^| dd tt|t|  D  n4t|t| k r|dd tt| t| D  t| } t|}| |fS )Nc                 S   s   g | ]}d qS )r   r   r   _r   r   r   r   Q   r   z_to_kwargs.<locals>.<listcomp>c                 S   s   g | ]}i qS r   r   r3   r   r   r   r   S   r   )r2   r#   extendranger"   )r0   kwargs	device_idr/   r   r   r   
_to_kwargsE   s    &$r9   c                 C   s   t | ||S )N)dist_verify_params_across_processes)process_grouptensorsloggerr   r   r   $_verify_param_shape_across_processesX   s    r?   c           	      C   sf   g }|   D ]\}}||vr||  q|  D ]\}}||vr4||  q4t|||| dS )a2  
    Syncs ``module``'s parameters and buffers state so that all ranks contain
    the same module state across all ranks. Note that this API assumes that all
    parameter shapes are consistent before running the synchronization. This can
    be checked with ``_verify_param_shape_across_processes``.
    N)named_parametersappenddetachnamed_buffers_sync_params_and_buffers)	moduler<   broadcast_bucket_sizesrcparams_and_buffers_to_ignoremodule_statesnameparambufferr   r   r   _sync_module_states[   s    rM   r<   rI   rF   rG   c                 C   s    t |dkrt| ||| dS )zu
    Synchronizes ``module_states`` (list of tensors) across all processes by
    broadcasting them from rank 0.
    r   N)r#   r:   _broadcast_coalescedrN   r   r   r   rD   x   s    
rD   )
state_dict
old_prefix
new_prefixreturnc                 C   sX   ||krt dt|  D ]6}||s,q||t|d  }| | | |< | |= qdS )a  
    Replace all keys that match a given old_prefix with a new_prefix (in-place).

    Usage::

        state_dict = {"layer.xyz": torch.tensor(1)}
        replace_by_prefix_(state_dict, "layer.", "module.layer.")
        assert state_dict == {"module.layer.xyz": torch.tensor(1)}
    z*old_prefix and new_prefix must be distinctN)
ValueErrorr   keys
startswithr#   )rP   rQ   rR   keyZnew_keyr   r   r   _replace_by_prefix   s    
rX   )N)r%   r   torch.distributeddistributedr:   Ztorch.nn.parallel._functionsr   Z torch.nn.parallel.scatter_gatherr   r   typingr   r   r   __all__r2   r9   r?   rM   ProcessGroupr   intrD   r$   rX   r   r   r   r   <module>   s(   8

