
    Ϫf                         d Z ddlmZ ddlmZ  eej
                  ej                         G d d             Z G d de      Zy)	z
Producer-Consumer Proxy.
    )implementer)
interfacesc                   h    e Zd ZdZdZdZdZdZdZdZ	dZ
d Zd Zd Zd Zd	 Zd
 Zd Zd ZdefdZy)BasicProducerConsumerProxyaa  
    I can act as a man in the middle between any Producer and Consumer.

    @ivar producer: the Producer I subscribe to.
    @type producer: L{IProducer<interfaces.IProducer>}
    @ivar consumer: the Consumer I publish to.
    @type consumer: L{IConsumer<interfaces.IConsumer>}
    @ivar paused: As a Producer, am I paused?
    @type paused: bool
    NTFc                 ^    g | _         |$|| _        |j                  | | j                         y y N)_bufferconsumerregisterProduceriAmStreaming)selfr
   s     7/usr/lib/python3/dist-packages/twisted/protocols/pcp.py__init__z#BasicProducerConsumerProxy.__init__#   s1    $DM%%dD,=,=>      c                 `    d| _         | j                  r| j                  j                          y y NT)pausedproducerpauseProducingr   s    r   r   z)BasicProducerConsumerProxy.pauseProducing+   s%    ==MM((* r   c                 &   d| _         | j                  rD| j                  j                  dj	                  | j                               g | j                  d d  n| j
                  sd| _        | j                  | j                  j                          y y )NF T)	r   r	   r
   writejoinr   outstandingPullr   resumeProducingr   s    r   r   z*BasicProducerConsumerProxy.resumeProducing0   sl    <<MM 56 DLLO$$'+$==$MM))+ %r   c                 n    | j                   | j                   j                          | j                  | `y y r   )r   stopProducingr
   r   s    r   r   z(BasicProducerConsumerProxy.stopProducing=   s0    ==$MM'')==$ %r   c                     | j                   s| j                  s(| j                  s| j                  j	                  |       y | j
                  #| j
                  j                  |       d| _        y y NF)r   r   r   r	   appendr
   r   r   datas     r   r   z BasicProducerConsumerProxy.writeE   sT    ;;t009M9MLL%]]&MM%#(D  'r   c                 p    | j                   | j                   j                          | j                          y r   )r
   finishunregisterProducerr   s    r   r%   z!BasicProducerConsumerProxy.finishN   s(    ==$MM  "!r   c                      || _         || _        y r   )r   producerIsStreamingr   r   	streamings      r   r   z+BasicProducerConsumerProxy.registerProducerS   s     #, r   c                 r    | j                   | ` | `| j                  r| j                  j                          y y r   )r   r(   r
   r&   r   s    r   r&   z-BasicProducerConsumerProxy.unregisterProducerW   s3    ==$(==MM,,. r   returnc                 V    d| j                    dt        |       dd| j                   dS )N<@xz around >)	__class__idr
   r   s    r   __repr__z#BasicProducerConsumerProxy.__repr__^   s,    4>>"!BtHQ<xaHHr   )__name__
__module____qualname____doc__r
   r   r(   r   r   r   stoppedr   r   r   r   r   r%   r   r&   strr4    r   r   r   r      sc    	 HHLOFG?+
,)"
-/I# Ir   r   c                   @    e Zd ZdZdZdZdZd Zd Zd Z	d Z
d Zd	 Zy
)ProducerConsumerProxyzProducerConsumerProxy with a finite buffer.

    When my buffer fills up, I have my parent Producer pause until my buffer
    has room in it again.
    i   Fc                     d| _         y r   )r   r   s    r   r   z$ProducerConsumerProxy.pauseProducingo   s     r   c                    d| _         | j                  rsdj                  | j                        }| j                  |      }|t	        |      k  r)||d  }| j
                  rJ d       |g| j                  d d  ng | j                  d d  nd}| j                  r4|r2| j                  s&| j                  | j                  j                          | j
                  s| | _	        | j                  t        d | j                  D              }| j                  r1|| j                  k  r"d| _        | j                  j                          y | j                  r| j                  j                          y y y )NFr   .Streaming producer did not write all its data.r   c              3   2   K   | ]  }t        |        y wr   len.0ss     r   	<genexpr>z8ProducerConsumerProxy.resumeProducing.<locals>.<genexpr>        =1A=   )r   r	   r   _writeSomeDatarC   r   unregisteredr
   r&   r   r   sumproducerPaused
bufferSizer   )r   r#   	bytesSentunsentbytesBuffereds        r   r   z%ProducerConsumerProxy.resumeProducingt   s9   <<774<<(D++D1I3t9$ij)))DCD)#)(Q"$QI LL)MM,,.  '0=D ==$===M ""(G&+#--/%% --/ & %r   c                 >   | j                   s| j                  s(| j                  s| j                  j	                  |       nv| j
                  j| j                  rJ d       | j                  |      }d| _        |t        |      k(  s1| j                  rJ d       | j                  j	                  ||d         | j                  [| j                  rNt        d | j                  D              }|| j                  k\  r"| j                  j                          d| _        y y y y )Nz9Writing fresh data to consumer before my buffer is empty!Fr@   c              3   2   K   | ]  }t        |        y wr   rB   rD   s     r   rG   z.ProducerConsumerProxy.write.<locals>.<genexpr>   rH   rI   T)r   r   r   r	   r!   r
   rJ   rC   r   r(   rL   rN   r   rM   )r   r#   rO   rQ   s       r   r   zProducerConsumerProxy.write   s    ;;t009M9MLL%]]&LLKJK  ++D1I#(D D	)))DCD)##D$45MM%4+C+C===M/,,.&*# 0 ,D%r   c                 f    d| _         t        j                  | ||       |s|j                          y y r    )rK   r   r   r   r)   s      r   r   z&ProducerConsumerProxy.registerProducer   s0    !"33D(IN$$& r   c                     | j                   | ` | `d| _        | j                  r(| j                  s| j                  j                          y y y r   )r   r(   rK   r
   r	   r&   r   s    r   r&   z(ProducerConsumerProxy.unregisterProducer   sC    ==$( ==MM,,. ".=r   c                 h    | j                   y| j                   j                  |       t        |      S )z`Write as much of this data as possible.

        @returns: The number of bytes written.
        r   )r
   r   rC   r"   s     r   rJ   z$ProducerConsumerProxy._writeSomeData   s-    
 == D!4yr   N)r5   r6   r7   r8   rN   rM   rK   r   r   r   r   r&   rJ   r;   r   r   r=   r=   b   s7     JNL
+0Z+8'/r   r=   N)	r8   zope.interfacer   twisted.internetr   	IProducer	IConsumerr   r=   r;   r   r   <module>r[      sT   
 ' ' Z!!:#7#78PI PI 9PIfq6 qr   