
    S`,-                        d dl Zd dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dlm
Z
 d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ  ej8                  e      Z e       Z  G d d      Z!d Z"d'dZ#d Z$ G d d      Z% G d d      Z&d Z'd Z(d Z) G d de*      Z+ G d  d!      Z,d" Z-d# Z.d$ Z/d% Z0d& Z1y)(    N)Empty)Queue)Lock)	Semaphore)Thread)APIError)ImageNotFound)AnsiMode)green)red)ShutdownException)PARALLEL_LIMIT)CompletedUnsuccessfully)HealthCheckFailed)NoHealthCheckConfigured)OperationFailedErrorc                   0    e Zd ZdZ ee      Zed        Zy)GlobalLimitzSimple class to hold a global semaphore limiter for a project. This class
    should be treated as a singleton that is instantiated when the project is.
    c                 4    |t         }t        |      | _        y N)r   r   global_limiter)clsvalues     2/usr/lib/python3/dist-packages/compose/parallel.pyset_global_limitzGlobalLimit.set_global_limit%   s    ="E&u-    N)	__name__
__module____qualname____doc__r   r   r   classmethodr    r   r   r   r      s%     ~.N. .r   r   c                    d}| D ]~  \  }}	}
|
Y|' ||      r|j                  | ||      dt               n|j                  | ||      dt               |j                  |	       ct	        |
t
              r6|
j                  | ||      <   |j                  | ||      dt               |
}t	        |
t              r4|
j                  | ||      <   |j                  | ||      dt               t	        |
t        t        t        t        f      r5|
j                  | ||      <   |j                  | ||      dt               Bt	        |
t              r |j                  | ||      dt               r|
| ||      <   |
} |S )z Watch events from a parallel execution, update status and fill errors and results.
        Returns exception to re-raise.
    Nfaileddoneerror)writer   r   append
isinstancer	   explanationr   r   r   r   r   msgUpstreamError)eventswritererrorsresultsr+   get_name
fail_checkerror_to_reraiseobjresult	exceptions              r   parallel_execute_watchr7   ,   sY    "( )VY%*S/S(3-3?S(3-?NN6"	=1 %.$9$9F8C=!LLhsmWc:(	8,$-$9$9F8C=!LLhsmWc:	$8:KMd$;$= >$-MMF8C=!LLhsmWc:	=1LLhsmWc:$-F8C=!(1)2 r   c           	         t        |       } t        j                  }t        j	                  t        |            }| D ]  }	|j                  | ||	              | D ]  }	|j                  | ||	              t        | |||      }
i }g }t        |
||||||      }|j                         D ]&  \  }}|j                  dj                  ||             ( |r|||fS )a  Runs func on objects in parallel while ensuring that func is
    ran on object only after it is ran on all its dependencies.

    get_deps called on object must return a collection with its dependencies.
    get_name called on object must return its name.
    fail_check is an additional failure check for cases that should display as a failure
        in the CLI logs, but don't raise an exception (such as attempting to start 0 containers)
    z
ERROR: for {}  {}
)listsysstderrParallelStreamWriterget_or_assign_instance
add_objectwrite_initialparallel_execute_iterr7   itemsr'   format)objectsfuncr1   r+   get_depslimitr2   streamr.   r4   r-   r/   r0   r3   obj_namer&   s                   r   parallel_executerI   M   s     7mGZZF!889Mf9UVF .#x}-. 1S(3-01 #7D(EBFFG-h
 "<<> F%,33HeDEF F?r   c                     g S r   r"   )xs    r   _no_depsrL   q   s    Ir   c                   "    e Zd ZdZd Zd Zd Zy)Statez
    Holds the state of a partially-complete parallel operation.

    state.started:   objects being processed
    state.finished:  objects which have been processed
    state.failed:    objects which either failed or whose dependencies failed
    c                 l    || _         t               | _        t               | _        t               | _        y r   )rC   setstartedfinishedr$   )selfrC   s     r   __init__zState.__init__}   s$    uer   c                     t        | j                        t        | j                        z   t        | j                        k\  S r   )lenrR   r$   rC   rS   s    r   is_donezState.is_done   s-    4==!C$44DLL8IIIr   c                 z    t        | j                        | j                  z
  | j                  z
  | j                  z
  S r   )rP   rC   rQ   rR   r$   rW   s    r   pendingzState.pending   s,    4<< 4<</$--?$++MMr   N)r   r   r   r    rT   rX   rZ   r"   r   r   rN   rN   u   s    JNr   rN   c                       e Zd Zd Zd Zy)NoLimitc                      y r   r"   rW   s    r   	__enter__zNoLimit.__enter__       r   c                      y r   r"   )rS   exs     r   __exit__zNoLimit.__exit__   r_   r   N)r   r   r   r^   rb   r"   r   r   r\   r\      s    r   r\   c              #   <  K   |t         }|t               }nt        |      }t               }t	        |       }	 t        | |||||       	 |j                  d      }|t        u ry|\  }}	}
|
@t        j                  dj                  |             |j                  j!                  |       n?t        j                  dj                  |             |j"                  j!                  |       | # t        $ r Y t        j                  $ r t               w xY ww)a  
    Runs func on objects in parallel while ensuring that func is
    ran on object only after it is ran on all its dependencies.

    Returns an iterator of tuples which look like:

    # if func returned normally when run on object
    (object, result, None)

    # if func raised an exception when run on object
    (object, None, exception)

    # if func raised an exception when run on one of object's dependencies
    (object, None, UpstreamError())
    Ng?)timeoutzFinished processing: {}z
Failed: {})rL   r\   r   r   rN   
feed_queuegetr   threadr&   r   STOPlogdebugrB   rR   addr$   )rC   rD   rE   rF   limiterr0   stateeventr4   _r6   s              r   r@   r@      s      })E"gG'NE
7D(GUGD	&KKK,E D=!Q	II/66s;<NNs#IIl))#./LLS!- 
  	|| 	&#%%	&s+   AD
C1 BD1	D:D<DDc                 
   |5  t         j                  5  	  ||       }|j                  | |df       ddd       ddd       y# t        $ r}|j                  | d|f       Y d}~3d}~ww xY w# 1 sw Y   <xY w# 1 sw Y   yxY w)z
    The entry point for a producer thread which runs func on a single object.
    Places a tuple on the results queue once func has either returned or raised.
    N)r   r   put	Exception)r4   rD   r0   rl   r5   es         r   producerrt      s    
 
 (+,, (	(#YFKKfd+,( ( (  	(KKdA''	(	( ( ( (sD   A9A-AA9	A*A% A-%A**A--A6	2A99Bc                     j                         }t        j                  dj                  |             |D ]  } ||      }	 t	        fd|D              r\t        j                  dj                  |             |j                  |dt               f       j                  j                  |       nt         fd|D              rkt        j                  dj                  |             t        t        ||||f      }	d|	_        |	j                          j                  j                  |        j%                         r|j                  t&               yy# t        t         f$ rC}
t        j                  d	j                  |             |j                  |d|
f       Y d}
~
rd}
~
wt"        $ rC}
t        j                  d
j                  |             |j                  |d|
f       Y d}
~
d}
~
ww xY w)a)  
    Starts producer threads for any objects which are ready to be processed
    (i.e. they have no dependencies which haven't been successfully processed).

    Shortcuts any objects whose dependencies have failed and places an
    (object, None, UpstreamError()) tuple on the results queue.
    zPending: {}c              3   @   K   | ]  }|d    j                   v   yw)r   N)r$   ).0deprm   s     r   	<genexpr>zfeed_queue.<locals>.<genexpr>   s     :c3q6U\\):s   z'{} has upstream errors - not processingNc              3   j   K   | ]*  \  }}|vxs |j                   v xr | xs  ||       , y wr   )rR   )rw   rx   ready_checkrC   rm   s      r   ry   zfeed_queue.<locals>.<genexpr>   sK       'c; 7" 5>>)S;/R+cBRs   03zStarting producer thread for {})targetargsTzAHealthcheck for service(s) upstream of {} failed - not processingzIService(s) upstream of {} did not completed successfully - not processing)rZ   ri   rj   rB   anyrq   r,   r$   rk   allr   rt   daemonstartrQ   r   r   r   rX   rh   )rC   rD   rE   r0   rm   rl   rZ   r4   depstrs   s   `   `      r   re   re      s    mmoGIIm""7+, (}	(:T::		CJJ3OPS$89  %  +/ 
 		;BB3GH(#tWg1NO	!!#&!(< }}D  "#:; 	(II!!' KKdA''& 	(II!!' KKdA''	(s%   C0EG:-8F++G:78G55G:c                       e Zd Zy)r,   N)r   r   r   r"   r   r   r,   r,     s    r   r,   c                       e Zd ZdZej
                  Z e       ZdZ	 e       Z
ed        Zed        Zed        ZddZd Zd Zd	 Zd
 Zd Zy)r<   zWrite out messages for operations happening in parallel.

    Each operation has its own line, and ANSI code characters are used
    to jump to the correct line, and write over the line.
    Nc                     | j                   S r   )instance)r   s    r   get_instancez!ParallelStreamWriter.get_instance  s    ||r   c                     | j                   j                          	 | j                  || _        | j                  | j                   j                          S # | j                   j                          w xY wr   )instance_lockacquirer   release)r   r.   s     r   r=   z+ParallelStreamWriter.get_or_assign_instance  sV    !!#	(||#%<<%%'C%%'s   A A1c                     || _         y r   )default_ansi_mode)r   	ansi_modes     r   set_default_ansi_modez*ParallelStreamWriter.set_default_ansi_mode   s
     )r   c                 v    || j                   }|| _        |j                  |      | _        g | _        d| _        y )Nr   )r   rG   use_ansi_codeslineswidth)rS   rG   r   s      r   rT   zParallelStreamWriter.__init__$  s;    ..I'66v>

r   c                     |y | j                   j                  ||z          t        | j                  t	        |dz   |z               | _        y )N )r   r(   maxr   rV   rS   r+   	obj_indexs      r   r>   zParallelStreamWriter.add_object,  s@    ;

#	/*SsY)>%?@
r   c                 .    |y | j                  ||d      S )N )_write_noansir   s      r   r?   z"ParallelStreamWriter.write_initial2  s    ;!!#y"55r   c                 B   | j                   j                          | j                  j                  ||z         }t	        | j                        |z
  }| j
                  j                  dd|fz         | j
                  j                  ddz         | j
                  j                  dj                  |dz   |z   || j                               | j
                  j                  dd|fz         | j
                  j                          | j                   j                          y )Nz%c[%dA   z%c[2Kz{:<{width}} ... {}r   r   z%c[%dB)
write_lockr   r   indexrV   rG   r'   rB   r   flushr   )rS   r+   r   statuspositiondiffs         r   _write_ansiz ParallelStreamWriter._write_ansi7  s    !::##C)O44::)(b$Z/0)b.)077c	I8M 

 8 4 	5 	(b$Z/0!r   c                     | j                   j                  dj                  |dz   |z   || j                               | j                   j	                          y )Nz{:<{width}} ... {}
r   r   )rG   r'   rB   r   r   )rS   r+   r   r   s       r   r   z"ParallelStreamWriter._write_noansiF  sM    $++c	I%vTZZ , 	

 	r   c                 |    |y | j                   r| j                  || ||             y | j                  |||       y r   )r   r   r   )rS   r+   r   r   
color_funcs        r   r'   zParallelStreamWriter.writeN  s<    ;S)Z-?@sIv6r   r   )r   r   r   r    r
   AUTOr   r   r   r   r   r!   r   r=   r   rT   r>   r?   r   r   r'   r"   r   r   r<   r<     s     !JHFM  ( ( * *A6
"7r   r<   c                 n    t        | t        j                  |fi |t        j                  d      |       y )Nname)rI   operatormethodcaller
attrgetter)
containers	operationoptionsmessages       r   parallel_operationr   W  s1    i373F#	r   c                 b    | D cg c]  }|j                   r| }}t        |d|d       y c c}w )NremoveRemoving)
is_runningr   )r   r   cstopped_containerss       r   parallel_remover   `  s1    %/Dq||!DD)8WjI Es   ,,c                      t        | d|d       y )NpausePausingr   r   r   s     r   parallel_pauser   e  s    z7GY?r   c                      t        | d|d       y )Nunpause	Unpausingr   r   s     r   parallel_unpauser   i  s    z9g{Cr   c                      t        | d|d       y )NkillKillingr   r   s     r   parallel_killr   m  s    z67I>r   )NNN)2_threadrg   loggingr   r:   queuer   r   	threadingr   r   r   docker.errorsr   r	   compose.cli.colorsr
   r   r   compose.cli.signalsr   compose.constr   compose.errorsr   r   r   r   	getLoggerr   ri   objectrh   r   r7   rI   rL   rN   r\   r@   rt   re   rr   r,   r<   r   r   r   r   r   r"   r   r   <module>r      s       
      " ' ' $ " 1 ( 2 , 2 / g!x. .B!HN N, 1h
(*Z	I 	O7 O7dJ
@D?r   