a
    ==ic]4                     @   s  d dl mZmZ d dlmZmZmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZmZmZmZmZmZmZ d dlmZ d dlmZmZmZmZmZ d d	lmZ d d
lm Z  d dl!Z"d dl#Z#d dl$Z$d dl%Z%dd Z&g dZ'dd Z(G dd deZ)G dd deZ*dS )    )StructConstRecord)core	workspacemodel_helper)LocalSession)Dataset)pipe)CheckpointManagerMultiNodeCheckpointManagerJob	JobRunnerepoch_limiterUploadTaskGroupBuilderdb_name)ops)NodeTask	TaskGroupWorkspaceTypeCluster)TestCase)ReaderWithLimitNc              
      s  t d|   t j t Z tdttt	df}t
t|}t|d|  d}|t}tdg W d    n1 s0    Y  W d    n1 s0    Y   fdd}t|d	d
}t||d t |  W d    n1 s0    Y   gS )N
trainer_%dval
   z
dataset:%dnamed   c                    s   t  |  g g d S N)r   Addr   )Zrectotal n/home/droni/.local/share/virtualenvs/DPS-5Je3_V2c/lib/python3.9/site-packages/caffe2/python/checkpoint_test.py	inc_total"   s    z!build_pipeline.<locals>.inc_total   )Znum_iter)	processor)r   r   current
init_groupr   r   nparraylistranger   r   r   readerZConstr   r	   Zadd_stop_conditionZdata_finished)node_idZdata_arrdataZdsZfull_readerr%   Zepoch_readerr#   r!   r$   build_pipeline   s    

H0r1   )g   s         c                    s    fdd}|S )Nc                    s   t   d S r   )shutilcopyfile)inputsoutputsdestsrcr#   r$   copy_op/   s    zlocal_copy_op.<locals>.copy_opr#   )r<   r;   r=   r#   r:   r$   local_copy_op.   s    r>   c                   @   s   e Zd Zdd Zdd ZdS )UploadToLocalFilec                 C   s
   || _ d S r   )dest_dir)selfr@   r#   r#   r$   __init__5   s    zUploadToLocalFile.__init__c                 C   s   t tj}|jD ]\}}tt|t t N t||j|j	}t
j| jt|}tt||gi fg g  W d    n1 s0    Y  W d    q1 s0    Y  qW d    n1 s0    Y  |S r   )r   r   GLOBALZ_node_managersr   strr   r   Z
_node_nameZ
_db_prefixospathjoinr@   r   Pythonr>   )rA   epochZcheckpoint_managerZupload_task_groupnodemanagerZsrc_path	dest_pathr#   r#   r$   build8   s    `zUploadToLocalFile.buildN)__name__
__module____qualname__rB   rM   r#   r#   r#   r$   r?   4   s   r?   c                   @   sD   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dS )TestCheckpointc           
   	      sF  t  ( t }tdd}W d    n1 s00    Y  ttd|d  fdd}| \}}|t t||	|}| 
|tt | 
||td  td|d D ]6}| \}}t|||d		| | 
||td  qtd|d D ],}	|||	 | 
||t|	d   qW d    n1 s80    Y  d S )
Nr   r/   emptystepr9   c                    s   |      d  S Nr   runr9   fetchsessionZoutput_fetcherr#   r$   fetch_totalJ   s    
z,TestCheckpoint.run_with.<locals>.fetch_total   Zresume_from_epoch)r   r   r1   r   r   Netcompiler   r   trainassertEqualslenEXPECTED_TOTALSr-   rX   load)
rA   builderjobr9   r]   r[   
checkpoint
num_epochsinitial_epochrI   r#   r\   r$   run_withD   s0    
(




zTestCheckpoint.run_withc              	      sx   z,t    fdd}| | W t  nt  0 z,t    fdd}| | W t  nt  0 d S )Nc                     s&   t j } t| }t dd}||fS )NZ	temp_nodeminidb)r   C	Workspacer   r
   wsr[   rj   tmpdirr#   r$   rh   f   s    
z6TestCheckpoint.test_single_checkpoint.<locals>.builderc                     s$   t j } t| }t d}||fS )Nrn   )r   ro   rp   r   r   rq   rs   r#   r$   rh   t   s    

)tempfilemkdtemprm   r6   rmtree)rA   rh   r#   rs   r$   test_single_checkpointa   s    z%TestCheckpoint.test_single_checkpointc                 C   s
  zd}t  }t|d}t  t &}t|D ]}t| q2W d    n1 sT0    Y  |t |	|
  t|D ]4}d}d| }|d | d }| |||| q~W d    n1 s0    Y  t| t  }t  t|D ]}tj }	t|	}
t|d}t ^ t }t| W d    n1 sB0    Y  |t t||}||
}W d    n1 s0    Y  | |tt | t|	jd qtj }	t|	}
| t|	jd d	d
g}t|d}t  t (}t|D ]}t| qW d    n1 s"0    Y  |t t||}|j|d|
d tddD ]Z}| |j|||
d |D ]8}| |	| | |	|tt|d  g qxqZ| |j|d|
d W d    n1 s0    Y  W t| nt| 0 d S )Nr&   rn      r   /z.5   r   z'trainer_1/task_2/GivenTensorInt64Fill:0z'trainer_2/task_2/GivenTensorInt64Fill:0r_   )Z
blob_namesrI   r[   )ru   rv   r   r   r   r-   r1   rb   r   initZnodes_to_checkpointrd   Zget_ckpt_db_namer6   rw   r   ResetWorkspacero   rp   r   rc   re   rf   ZblobsZload_blobs_from_checkpoints
assertTrueZhas_blob
fetch_blobr*   r+   assertFalse)rA   	num_nodesrt   rj   ri   r/   rI   	node_nameZexpected_db_namerr   r[   
job_runnerrk   Zmodel_blob_namesZ	blob_namer#   r#   r$   (test_ckpt_name_and_load_model_from_ckpts~   s    
(

$


(

*

,

&z7TestCheckpoint.test_ckpt_name_and_load_model_from_ckptsc              
   C   sn  zZt  }tj|d}t| d}t|D ],}d| }tj||}| tj| q0tdD ]}t	j
 }t|}t|d}	t x t }
t| W d    n1 s0    Y  |
t t|}t|
|	|d}||}| |tt W d    qf1 s0    Y  qft|D ].}d| }tj||}| tj| q W t| nt| 0 d S )Nuploadr&   r   rn   )Zupload_task_group_builder)ru   rv   rE   rF   rG   mkdirr-   r   existsr   ro   rp   r   r   r   r   r1   rb   r?   r   rc   rd   re   rf   r~   r6   rw   )rA   rt   Z
upload_dirr   r/   r   Zupload_pathrr   r[   rj   ri   Zlocal_upload_builderr   rk   r#   r#   r$   test_upload_checkpoint   s:    


&

2z%TestCheckpoint.test_upload_checkpointc           
   
   C   s   d}d}t   t|D ]}t j }t|}t|d}t \ t }t	| W d    n1 sd0    Y  |
t t||}||}	W d    n1 s0    Y  | |	tt qd S )Nr&   z/tmp/path_does_not_exist/rn   )r   r}   r-   ro   rp   r   r   r   r   r1   rb   r   rc   rd   re   rf   )
rA   r   rt   r/   rr   r[   rj   ri   r   rk   r#   r#   r$   test_ckpt_save_failure   s    

&

(z%TestCheckpoint.test_ckpt_save_failurec           	      C   s  t jdd}td}dD ]}|jjg |gdgddd q|jd	d
gdg |dgdg t	 0}t
d |j t|jd W d   n1 s0    Y  |jn t F td t|j W d   n1 s0    Y  W d   n1 s0    Y  W d   n1 s"0    Y  |j t|d W d   n1 sT0    Y  t|d W d   n1 s~0    Y  W d   n1 s0    Y  tj }t|}t|}|| tddtj}| t||d | t||d dS )z}
        A simple test that ensures we have download task group
        executed between epoch_group and exit_group.
        Z
test_modelr   download_net)input1input2outputdownload_result   g      ?r   )shapevalueZrun_oncer   r   r   r   z	trainer:0)rU   Nr_   g       @)r   ZModelHelperr   ra   Zparam_init_netZConstantFillnetr    ZCopyr   r   r)   r   Zepoch_groupr   loopZdownload_groupr   r   ro   rp   r   r   rc   r*   fullZastypefloat32r~   Zarray_equalr   )	rA   modelr   r   ri   rr   r[   r   Zexpected_resultr#   r#   r$   test_download_group_simple	  sB    

*j*J



z)TestCheckpoint.test_download_group_simplec           
   	      s   zt  }tj }t|}t|d}t }tdd}W d   n1 sL0    Y  t	t
d|d |t  fdd}t|||}td	|d	 D ],}	t|||	d
| | ||td  qW t| nt| 0 dS )zf
        A simple test that ensures we can reuse a MultiNodeCheckpointManager
        object.
        rn   r   rR   NrS   rT   c                    s   |      d  S rV   rW   rZ   r\   r#   r$   r]   D  s    
zATestCheckpoint.test_reuse_checkpoint_manager.<locals>.fetch_totalr_   r`   r^   )ru   rv   r   ro   rp   r   r   r   r1   r   r   ra   rb   r   rc   r-   rd   rf   r6   rw   )
rA   rt   rr   r[   rj   ri   r9   r]   rk   rl   r#   r\   r$   test_reuse_checkpoint_manager4  s*    

(
z,TestCheckpoint.test_reuse_checkpoint_managerN)
rN   rO   rP   rm   rx   r   r   r   r   r   r#   r#   r#   r$   rQ   C   s   M&+rQ   )+Zcaffe2.python.schemar   r   Zcaffe2.pythonr   r   r   Zcaffe2.python.sessionr   Zcaffe2.python.datasetr   Zcaffe2.python.pipeliner	   Zcaffe2.python.checkpointr
   r   r   r   r   r   r   Zcaffe2.python.net_builderr   Zcaffe2.python.taskr   r   r   r   r   Zcaffe2.python.test_utilr   Zcaffe2.python.dataior   numpyr*   rE   r6   ru   r1   rf   r>   r?   rQ   r#   r#   r#   r$   <module>   s$   $