a
    ==icC                     @   s   d dl mZmZ d dlmZmZ d dlmZmZ d dl	m
Z
mZ d dlmZmZmZ G dd deZdZd	d
 Zd!ddZdd Zd"ddZd#ddZdd Zdd Zdd Zd$ddZG dd deZG dd  d eZdS )%    )core
queue_util)ReaderWriter)
NetBuilderops)	as_recordField)NodeTask	TaskGroupc                   @   s   e Zd ZdZdddZdS )Outputz
    Represents the result of a processor function. A processor can either
    return an Output, or it can return a record, in which case an Output will be
    created for it afterwards.
    Nc                 C   s|   t   }|d u s(t|dks(J d|d u r4|}t|tjrF|g}|d u rRg nt|| _|d u rhd nt	|| _
|| _d S )Nr   z7Cannot both use `ops` syntax and return a list of nets.)r   currentgetlen
isinstancer   Netlistnetsr   recordshould_stop)selfr   r   r   Zbuilder_children r   g/home/droni/.local/share/virtualenvs/DPS-5Je3_V2c/lib/python3.9/site-packages/caffe2/python/pipeline.py__init__   s    zOutput.__init__)NNN)__name__
__module____qualname____doc__r   r   r   r   r   r      s   r   d   c                 C   s   | d u r*t j|d ur|ntd}| }nTt| trN|d u sDJ dd }| }n0t| drv|d u shJ d| }|  }ntd||| ||fS )N)capacityzcapacity would not be used.writerz)output must be a reader, queue or stream.)	r   QueueDEFAULT_QUEUE_CAPACITYr!   r   r   hasattr
ValueErrorsetup_ex)outputr    global_init_netglobal_exit_net	out_queuer!   r   r   r   _init_output%   s"    



r+   Nc                    sR    d u rdd S t  tjr$t S d urJt drJ fdd}| _ S d S )Nc                 S   s   | S Nr   )recr   r   r   <lambda><       z make_processor.<locals>.<lambda>schema_funcc                      s
     S r,   )r0   r   	processorreaderr   r   processor_schemaA   s    z(make_processor.<locals>.processor_schema)r   r   r   NetProcessorr$   schema)r2   r3   r4   r   r1   r   make_processor:   s    r7   c                 C   s   t | tr| S t | tr"t| dS t | trtt| dkoTt | d toTt | d tj}|rjtdg| R  S t|  S nt| S dS )z
    Allow for processors to return results in several formats.
    TODO(azzolini): simplify once all processors use NetBuilder API.
    )r      r      N)r   r   r	   tupler   r   BlobReference)r'   Zis_record_and_blobr   r   r   normalize_processor_outputH   s    




r<   r9   c           
   	   C   s   t | |||||||\}}	|S )a]
  
    Given a Reader, Queue or DataStream in `input`, and optionally, a Writer,
    Queue or DataStream in `output`, creates a Task that, when run, will
    pipe the input into the output, using multiple parallel threads.
    Additionally, if a processor is given, it will be called between reading
    and writing steps, allowing it to transform the record.

    Args:
        input:       either a Reader, Queue or DataStream that will be read
                     until a stop is signaled either by the reader or the
                     writer.
        output:      either a Writer, a Queue or a DataStream that will be
                     written to as long as neither reader nor writer signal
                     a stop condition. If output is not provided or is None,
                     a Queue is created with given `capacity` and written to.
        num_threads: number of concurrent threads used for processing and
                     piping. If set to 0, no Task is created, and a
                     reader is returned instead -- the reader returned will
                     read from the reader passed in and process it.
                     ** DEPRECATED **. Use `num_runtime_threads` instead.
                     This option will be removed once all readers/processors
                     support `num_runtime_threads`.
        processor:   (optional) function that takes an input record and
                     optionally returns a record; this will be called
                     between read and write steps. If the processor does
                     not return a record, a writer will not be instantiated.
                     Processor can also be a core.Net with input and output
                     records properly set. In that case, a NetProcessor is
                     instantiated, cloning the net for each of the threads.
        name:        (optional) name of the task to be created.
        capacity:    when output is not passed, a queue of given `capacity`
                     is created and written to.
        group:       (optional) explicitly add the created Task to this
                     TaskGroup, instead of using the currently active one.
        num_runtime_threads: Similar to `num_threads`, but instead of expanding
                     the tasks with a `for` loop in python, does that at
                     runtime. This is preferable to `num_threads`, but some
                     processors/readers still require to be called multiple
                     times in python.

    Returns:
        Output Queue, DataStream, Reader, or None, depending on the parameters
        passed.
    )
_pipe_step)
inputr'   num_threadsr2   namer    groupnum_runtime_threadsresult_r   r   r   pipec   s
    /rE   c	              
   C   s\   |dksJ t | ||||||||	\}	}
d}|durT|
 }t|ttfvrT|d }|	|fS )aq  
    Similar to `pipe`, with the additional ability for the pipe Task to
    return output values to the `Session` once done.

    Returns:
        Tuple (out_queue, *task_outputs)
            out_queue:    same as return value of `pipe`.
            task_outputs: TaskOutput object, fetchable from the client after
                          session.run() returns.
    r   N)r=   outputstyper   r:   )r>   r'   r?   r2   r@   r    rA   rB   final_outputsrC   taskr   r   r   pipe_and_output   s    rJ   c                 C   sT   t | dr| jS t | drL| jdkr*| jS t | drFd| jj| jf S | jS | jjS )Nr@   	func_namez<lambda>im_classz%s.%s)r$   r@   rK   r   rL   r   	__class__)r2   r   r   r   processor_name   s    



rN   c              	   C   s@  t t }d|d| tr"ttnd|r0t|nd}t| |||d}	td}
td}|	||
 td}td	}|
||\}}}|jg |gg d
tjjd |d urt||||
\}}|||||\}}nd }g }t  t| W d    n1 s0    Y  t  t| W d    n1 s@0    Y  td}|jg |d}td}||g  ttjd|gt| t| |g |d t| t  t| W d    n1 s0    Y  t  t|
 W d    n1 s0    Y  W d    n1 s.0    Y  ||	fS )N{0}/{1}/{2}/{3}/{4}rE   NoInputNoOutput)r@   rA   rF   Znum_instancesz	pipe:exitz	pipe:initzpipe:instance:initzpipe:instance:exitFshapevalueZdtypetimer_startZcounter_name	timer_endbodyZshould_stop_blob)strr
   r   formatr>   rN   r   r   r   r&   read_record_exConstantFillDataTypeBOOLr+   write_record_exr   Z	task_initnetZtask_instance_init
TimerBeginTimerEndexecution_stepr   Ztask_instance_exitZ	task_exit)r@   rA   rH   r3   r?   r'   r    	node_nameprofiler_namerI   r)   r(   init_netexit_net	read_netsstatusr-   r*   r!   
write_netsrD   timer_start_nettimertimer_end_netr   r   r   _runtime_threads_task   sl    





*
*



*
Jro   c                 C   s@  t t }d|d| tr"ttnd|r0t|nd}t| ||d}	td}
td}|	||
 d }d }g }t
|D ]n}td| d	8}td}td}|||\}}}|jg |gg d
tjjd |d ur>|d u r(t|	jd" t||||
\}}W d    n1 s0    Y  |||||\}}ng }td}|jg |d}td}||g  t| ttjd|gt| t| |g |d t| t| W d    n1 s0    Y  |t| q|t| ttjd|dd t|
 W d    n1 s.0    Y  ||	fS )NrO   rE   rP   rQ   )r@   rA   rF   exitinitzt:%d)r@   FrR   )Z	_fullnamerU   rV   rW   rX   rY   T)Zconcurrent_substeps)rZ   r
   r   r[   r>   rN   r   r   r   r&   ranger   r\   r]   r^   r_   r@   r+   r`   rb   rc   r   ra   rd   r   appendZto_execution_step)r@   rA   rH   r3   r?   r'   r    re   rf   rI   r)   r(   r*   r!   Zsteps	thread_idnbrg   rh   ri   rj   r-   rk   rD   rl   rm   rn   r   r   r   _static_threads_task   st    






(




*
*rv   c	           
      C   s   |dks|dksJ dt | tr(| }	n&t| dr<|  }	ntdt| |dur`t|	|}	|dksp|dkr|du s|J |	dfS |du r|durt|}|du r|durdt| }|du rdt|  }|dkrt	||||	|||S t
||||	|||S dS )	z
    r9   z;Only one of num_threads or num_runtime_threads must be set.r3   z/Input must be a reader, queue or stream. Got {}Nr   zpipe_into:%szpipe_from:%s)r   r   r$   r3   r%   r[   rG   ProcessingReaderrN   rv   ro   )
r>   r'   r?   r2   r@   r    rA   rB   rH   r3   r   r   r   r=   :  s:    



r=   c                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )rw   zo
    Reader that reads from an upstream reader, calls the processor, and returns
    the processed record.
    c                 C   s    t |  || _t||| _d S r,   )r   r   r3   r7   r2   )r   r3   r2   r   r   r   r   f  s    
zProcessingReader.__init__c                 C   s
   | j  S r,   )r2   r6   r   r   r   r   r6   k  s    zProcessingReader.schemac                 C   s   | j || d S r,   )r3   r&   )r   rg   Z
finish_netr   r   r   r&   n  s    zProcessingReader.setup_exc           
      C   s   | j ||\}}}t }t| |}W d    n1 s>0    Y  ||j7 }|js^|jrt	d}|jr|
||jg|g |jr|
||jg|g || t| jdr|tj| j | |j |jr|j nd }	|||	fS )Nstop_netsetup)r3   r\   r   r<   r2   r   r   Z
_stop_blobr   r   Orrs   r$   Zadd_attributer   ZLOCAL_SETUPZ_set_schemar   Zfield_blobs)
r   rg   rh   ri   rj   r-   ru   rC   ry   fieldsr   r   r   read_exq  s     ,


zProcessingReader.read_exN)r   r   r   r   r   r6   r&   r}   r   r   r   r   rw   a  s
   rw   c                   @   s:   e Zd ZdZdddZdd Zdd Zd	d
 Zdd ZdS )r5   z
    Processor that clones a core.Net each time it's called, executing
    the cloned net as the processor. It requires the Net to have input
    and (optionally) output records set, with net.set_input_record() and
    net.set_output_record().
    Nc                 C   sb   t |tjsJ |d u s(t |tjs(J |p2t|| _|p<g | _|| _|| _g | _	d| _
g | _d S )NF)r   r   r   r;   rZ   r@   thread_init_netsra   _stop_signal
_blob_maps_frozen_cloned_init_nets)r   ra   stop_signalr~   r@   r   r   r   r     s    

zNetProcessor.__init__c                 C   s
   | j  S r,   )ra   output_recordrx   r   r   r   r6     s    zNetProcessor.schemac                 C   s   d| _ | j}g | _|S NT)r   r   )r   rg   Zcloned_init_netsr   r   r   rz     s    zNetProcessor.setupc           	      C   s   | j r
J t jd }i }| jD ],}t|t|| ||\}}| j	| q"t| j
t| j
| |||\}}| jd u rd }n.t| j|v rtj|t| j |d}n| j}| j	| t|g| |S )N/)ra   )r   r   r   r@   r~   r   Zclone_and_bind_netrZ   r   rs   ra   r   r;   r   r   r   )	r   r-   prefixZ
blob_remapra   Znew_netrD   Z
remappingsr   r   r   r   __call__  s*    


zNetProcessor.__call__c                 C   s   d| _ | jS r   )r   r   rx   r   r   r   	blob_maps  s    zNetProcessor.blob_maps)NNN)	r   r   r   r   r   r6   rz   r   r   r   r   r   r   r5     s   
r5   )N)Nr9   NNNNr9   )Nr9   NNNNr9   N)Nr9   NNNNNN)Zcaffe2.pythonr   r   Zcaffe2.python.dataior   r   Zcaffe2.python.net_builderr   r   Zcaffe2.python.schemar   r	   Zcaffe2.python.taskr
   r   r   objectr   r#   r+   r7   r<   rE   rJ   rN   ro   rv   r=   rw   r5   r   r   r   r   <module>   s.   
  
6  
<B  
',