
    fF                        d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlmZmZ ddlZddlZej"                  ej$                  fZej(                  ej$                  fZ ej,                  e      Zej2                  j5                  e      Zd Zd Zd Z G d de      Z  G d	 d
e      Z! G d de      Z" G d de      Z# G d de      Z$ G d de      Z% G d de%      Z& G d de%      Z'y)z^Utilities for implementing `nbio_interface.AbstractIOServices` for
pika connection adapters.

    N)AbstractIOReferenceAbstractStreamTransportc                 P    t        |       st        dj                  ||             y)zRaise TypeError if callback is not callable

    :param callback: callback to check
    :param name: Name to include in exception text
    :raises TypeError:

    z!{} must be callable, but got {!r}N)callable	TypeErrorformat)callbacknames     `/var/www/cs2snipe.com/venv/lib/python3.12/site-packages/pika/adapters/utils/io_services_utils.pycheck_callback_argr   ,   s1     H;BB(  	     c                 l    t        | t        j                        st        dj	                  |             y)zqRaise TypeError if file descriptor is not an integer

    :param fd: file descriptor
    :raises TypeError:

    z0Paramter must be a file descriptor, but got {!r}N)
isinstancenumbersIntegralr   r   )fds    r   check_fd_argr   9   s6     b'**+>EEbIK 	K ,r   c                 B     t        j                          fd       }|S )z0Function decorator for retrying on SIGINT.

    c                      	 	  | i |S # t         j                  j                  $ r(}|j                  t        j                  k(  rY d}~I d}~ww xY w)zWrapper for decorated functionN)pikacompatSOCKET_ERRORerrnoEINTR)argskwargserrorfuncs      r   retry_sigint_wrapz+_retry_on_sigint.<locals>.retry_sigint_wrapJ   sM     T,V,,;;++ ;;%++-	s    AAAA)	functoolswraps)r   r   s   ` r   _retry_on_sigintr"   E   s(    
 __T	 	 r   c                       e Zd ZdZd Zy)SocketConnectionMixinzImplements
    `pika.adapters.utils.nbio_interface.AbstractIOServices.connect_socket()`
    on top of
    `pika.adapters.utils.nbio_interface.AbstractFileDescriptorServices` and
    basic `pika.adapters.utils.nbio_interface.AbstractIOServices`.

    c                 <    t        | |||      j                         S )z[Implement
        :py:meth:`.nbio_interface.AbstractIOServices.connect_socket()`.

        )nbiosockresolved_addron_done)_AsyncSocketConnectorstart)selfr'   r(   r)   s       r   connect_socketz$SocketConnectionMixin.connect_socketb   s"    
 %D"UW	%r   N)__name__
__module____qualname____doc__r-    r   r   r$   r$   Y   s    %r   r$   c                       e Zd ZdZ	 	 ddZy)StreamingConnectionMixinzImplements
    `.nbio_interface.AbstractIOServices.create_streaming_connection()` on
    top of `.nbio_interface.AbstractFileDescriptorServices` and basic
    `nbio_interface.AbstractIOServices` services.

    Nc                    	 t        | |||||      j                         S # t        $ r[}t        j	                  d||       	 |j                           # t        $ r!}t        j	                  d||       Y d}~ d}~ww xY wd}~ww xY w)zhImplement
        :py:meth:`.nbio_interface.AbstractIOServices.create_streaming_connection()`.

        )r&   protocol_factoryr'   ssl_contextserver_hostnamer)   z*create_streaming_connection(%s) failed: %rz%s.close() failed: %rN)_AsyncStreamConnectorr+   	Exception_LOGGERr   close)r,   r6   r'   r)   r7   r8   r   s          r   create_streaming_connectionz4StreamingConnectionMixin.create_streaming_connectiont   s    	(!1' /! "')  	MMF!D

   D 5tUCCD	s>   ! 	BB AB 	A=A83B 8A==B  B)NN)r.   r/   r0   r1   r=   r2   r   r   r4   r4   l   s     1548r   r4   c                       e Zd ZdZd Zd Zy)_AsyncServiceAsyncHandlezGThis module's adaptation of `.nbio_interface.AbstractIOReference`

    c                 &    |j                   | _        y)zZ
        :param subject: subject of the reference containing a `cancel()` method

        N)cancel_cancel)r,   subjects     r   __init__z!_AsyncServiceAsyncHandle.__init__   s    
 ~~r   c                 "    | j                         S )zCancel pending operation

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        )rB   r,   s    r   rA   z_AsyncServiceAsyncHandle.cancel   s     ||~r   N)r.   r/   r0   r1   rD   rA   r2   r   r   r?   r?      s    &r   r?   c                   r    e Zd ZdZdZdZdZdZd Ze	d        Z
d Zd	 Ze	d
        Ze	d        Ze	d        Zy)r*   zConnects the given non-blocking socket asynchronously using
    `.nbio_interface.AbstractFileDescriptorServices` and basic
    `.nbio_interface.AbstractIOServices`. Used for implementing
    `.nbio_interface.AbstractIOServices.connect_socket()`.
    r            c                    t        |d       	 t        j                  |j                  |d          || _        || _        || _        || _        | j                  | _        d| _        y# t        $ rc}t        t        d      st        j                  d       n3dj                  |||      }t        j                  |       t        |      Y d}~d}~ww xY w)a  
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
        :param socket.socket sock: non-blocking socket that needs to be
            connected via `socket.socket.connect()`
        :param tuple resolved_addr: resolved destination address/port two-tuple
            which is compatible with the given's socket's address family
        :param callable on_done: user callback that takes None upon successful
            completion or exception upon error (check for `BaseException`) as
            its only arg. It will not be called if the operation was cancelled.
        :raises ValueError: if host portion of `resolved_addr` is not an IP
            address or is inconsistent with the socket's address family as
            validated via `socket.inet_pton()`
        r)   r   	inet_ptonz8Unable to check resolved address: no socket.inet_pton().z9Invalid or unresolved IP address {!r} for socket {}: {!r}NF)r   socketrL   familyr:   hasattrr;   debugr   r   
ValueError_nbio_sock_addr_on_done_STATE_NOT_STARTED_state_watching_socket_events)r,   r&   r'   r(   r)   r   msgs          r   rD   z_AsyncSocketConnector.__init__   s     	7I.	&T[[-*:; 

"
--',$!  		&6;/NP2396($47  c" o%P		&s   #A& &	C/ACCc                     | j                   r;d| _         | j                  j                  | j                  j	                                yy)z'Remove socket watcher, if any

        FN)rX   rR   remove_writerrS   filenorF   s    r   _cleanupz_AsyncSocketConnector._cleanup   s9    
 ''+0D(JJ$$TZZ%6%6%89 (r   c                     | j                   | j                  k(  sJ d| j                   f       | j                  | _         | j                  j	                  | j
                         t        |       S )zZStart asynchronous connection establishment.

        :rtype: AbstractIOReference
        z:_AsyncSocketConnector.start(): expected _STATE_NOT_STARTED)rW   rV   _STATE_ACTIVErR   add_callback_threadsafe_start_asyncr?   rF   s    r   r+   z_AsyncSocketConnector.start   sf    
 {{d555 	HKK8 	5 (( 	

**4+<+<='--r   c                 &   | j                   | j                  k(  rM| j                  | _         t        j	                  d| j
                  | j                         | j                          yt        j	                  d| j                   | j
                         y)Cancel pending connection request without calling user's completion
        callback.

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        z-User canceled connection request for %s to %sTzD_AsyncSocketConnector cancel requested when not ACTIVE: state=%s; %sF)rW   r_   _STATE_CANCELEDr;   rP   rS   rT   r]   rF   s    r   rA   z_AsyncSocketConnector.cancel   sn     ;;$,,,..DKMMI**djj2MMO KK	5 r   c                 J   t         j                  d|| j                         t        |t        t        d      f      s	J d|f       | j                  | j                  k(  sJ d| j                  f       | j                  | _        | j                          | j                  |       y)zAdvance to COMPLETED state, remove socket watcher, and invoke user's
        completion callback.

        :param BaseException | None result: value to pass in user's callback

        z0_AsyncSocketConnector._report_completion(%r); %sNzP_AsyncSocketConnector._report_completion() expected exception or None as result.zF_AsyncSocketConnector._report_completion() expected _STATE_NOT_STARTED)r;   rP   rS   r   BaseExceptiontyperW   r_   _STATE_COMPLETEDr]   rU   r,   results     r   _report_completionz(_AsyncSocketConnector._report_completion	  s     	Hdjj	* &=$t*"=> 	'%A' 	'> {{d000 	/!"&++3/ 	/0 ++fr   c                    | j                   | j                  k7  r7t        j                  d| j                  | j
                  | j                          y	 | j                  j                  | j
                         	 | j                   j#                  | j                  j%                         | j&                         d| _        t        j                  d| j                         y# t        t        j                  j                  f$ r}t        |t        j                  j                        r|j                  t        v rnBt        j                  d| j                  | j
                  |       | j                  |       Y d}~yY d}~	d}~ww xY w# t        $ r<}t        j+                  d| j                  |       | j                  |       Y d}~yd}~ww xY w)zCalled as callback from I/O loop to kick-start the workflow, so it's
        safe to call user's completion callback from here, if needed

        zJAbandoning sock=%s connection establishment to %s due to inactive state=%sNz%s.connect(%s) failed: %rTz/Connection-establishment is in progress for %s.zasync.set_writer(%s) failed: %r)rW   r_   r;   rP   rS   rT   connectr:   r   r   r   r   r   (_CONNECTION_IN_PROGRESS_SOCK_ERROR_CODESr   rk   rR   
set_writerr\   _on_writablerX   	exceptionr,   r   s     r   ra   z"_AsyncSocketConnector._start_async   s\    ;;$,,,MM+,0JJ

DKKQ 	JJtzz*
	&JJ!!$**"3"3"5t7H7HI ,0D(MMK**&+ 4;;334 		5$++":":;KK#KK94::"jj%1''. 			  	?#%##E*		s1   %C 8>F #FA4F  F	G2GGc                    | j                   | j                  k7  r,t        j                  d| j                  | j                          y| j                  j                  t        j                  t        j                        }|s#t        j                  d| j                         d}nWt        j                  |      }t        j                  d| j                  ||       t        j                  j                  ||      }| j                  |       y)zwCalled when socket connects or fails to. Check for predicament and
        invoke user's completion callback.

        z_Socket connection-establishment event watcher called in inactive state (ignoring): %s; state=%sNzSocket connected: %sz+Socket failed to connect: %s; error=%s (%s))rW   r_   r;   r   rS   
getsockoptrM   
SOL_SOCKETSO_ERRORinfoosstrerrorr   r   r   rk   )r,   
error_coderj   	error_msgs       r   rp   z"_AsyncSocketConnector._on_writableG  s     ;;$,,, MMDEIZZ  ZZ**6+<+<fooN
LL/<FJ/IMMG**j)=[[--j)DF'r   N)r.   r/   r0   r1   rV   r_   rd   rh   rD   _log_exceptionsr]   r+   rA   rk   ra   rp   r2   r   r   r*   r*      s     MO"-H : :."(  , $& $&L ( (r   r*   c                       e Zd ZdZdZdZdZdZd Ze	d        Z
d Zd	 Ze	d
        Ze	d        Ze	d        Ze	d        Zy)r9   zPerforms asynchronous SSL session establishment, if requested, on the
    already-connected socket and links the streaming transport to protocol.
    Used for implementing
    `.nbio_interface.AbstractIOServices.create_streaming_connection()`.

    r   rH   rI   rJ   c                    t        |d       t        |d       t        |t        d      t        j                  f      st        dj                  |            ||t        d      	 |j                          || _	        || _
        || _        || _        || _        || _        | j                  | _        d| _        y# t        $ r }t        dj                  ||            d}~ww xY w)a  
        NOTE: We take ownership of the given socket upon successful completion
        of the constructor.

        See `AbstractIOServices.create_streaming_connection()` for detailed
        documentation of the corresponding args.

        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
        :param callable protocol_factory:
        :param socket.socket sock:
        :param ssl.SSLContext | None ssl_context:
        :param str | None server_hostname:
        :param callable on_done:

        r6   r)   Nz8Expected ssl_context=None | ssl.SSLContext, but got {!r}z?Non-None server_hostname must not be passed without ssl contextzEExpected connected socket, but getpeername() failed: error={!r}; {}; F)r   r   rg   ssl
SSLContextrQ   r   getpeernamer:   rR   _protocol_factoryrS   _ssl_context_server_hostnamerU   rV   rW   _watching_socket)r,   r&   r6   r'   r7   r8   r)   r   s           r   rD   z_AsyncStreamConnector.__init__p  s    " 	+-?@7I.+T
CNN'CD ((.{(;= = &;+> 3 4 4	8 
!1
' /-- %  	8##)6%#68 8	8s   (B; ;	C$CC$c                    t         j                  d|       | j                  rt         j                  d|| j                         d| _        | j                  j                  | j                  j                                | j                  j                  | j                  j                                	 |r<t         j                  d|| j                         	 | j                  j                          d| _        d| _        d| _        d| _        d| _        d| _        y# t        $ r'}t         j                  d|| j                          d}~ww xY w# d| _        d| _        d| _        d| _        d| _        d| _        w xY w)zeCancel pending async operations, if any

        :param bool close: close the socket if true
        z"_AsyncStreamConnector._cleanup(%r)z5_AsyncStreamConnector._cleanup(%r): removing RdWr; %sFz6_AsyncStreamConnector._cleanup(%r): closing socket; %sz"_sock.close() failed: error=%r; %sN)r;   rP   r   rS   rR   remove_readerr\   r[   r<   r:   rq   r   r   r   rU   )r,   r<   r   s      r   r]   z_AsyncStreamConnector._cleanup  s?    	:EB  MMG

 %*D!JJ$$TZZ%6%6%89JJ$$TZZ%6%6%89	!L4::'JJ$$& DJDJ%)D" $D$(D! DM ! %%&J&+TZZ9
 DJDJ%)D" $D$(D! DMs*   2#E D 	E$"EEE ,E:c                    t         j                  d| j                         | j                  | j                  k(  sJ d| j                  f       | j
                  | _        | j                  j                  | j                         t        |       S )zCKick off the workflow

        :rtype: AbstractIOReference
        z!_AsyncStreamConnector.start(); %sz9_AsyncStreamConnector.start() expected _STATE_NOT_STARTED)
r;   rP   rS   rW   rV   r_   rR   r`   ra   r?   rF   s    r   r+   z_AsyncStreamConnector.start  s{    
 	94::F{{d555 	/!"&++8/ 	/5 (( 	

**4+<+<='--r   c                    | j                   | j                  k(  rD| j                  | _         t        j	                  d| j
                         | j                  d       yt        j	                  d| j                   | j
                         y)rc   z%User canceled streaming linkup for %sTr<   zD_AsyncStreamConnector cancel requested when not ACTIVE: state=%s; %sF)rW   r_   rd   r;   rP   rS   r]   rF   s    r   rA   z_AsyncStreamConnector.cancel  sj     ;;$,,,..DKMMA4::NMMM% KK	5 r   c                 "   t         j                  d|| j                         t        |t        t
        f      sJ d|| j                  f       | j                  | j                  k(  sJ d| j                  f       | j                  | _        	 | j                  |       	 | j                  t        |t                     y# t        $ r# t         j                  d| j                  |        w xY w# | j                  t        |t                     w xY w)a  Advance to COMPLETED state, cancel async operation(s), and invoke
        user's completion callback.

        :param BaseException | tuple result: value to pass in user's callback.
            `tuple(transport, protocol)` on success, exception on error

        z0_AsyncStreamConnector._report_completion(%r); %szQ_AsyncStreamConnector._report_completion() expected exception or tuple as result.zA_AsyncStreamConnector._report_completion() expected _STATE_ACTIVEz%r: _on_done(%r) failed.r   N)r;   rP   rS   r   rf   tuplerW   r_   rh   rU   r:   rq   rk   r]   ri   s     r   rk   z(_AsyncStreamConnector._report_completion  s     	Hdjj	* &=%"89 	5 &<5 	59 {{d000 	*![[3* 	*0 ++	CMM&! MM
6= AMB  	8"55v?	 MM
6= AMBs   
B= =,C))C, ,"Dc                 l   t         j                  d| j                         | j                  | j                  k7  r,t         j                  d| j                  | j                         y| j
                  | j                          yt         j                  d| j                         	 | j
                  j                  | j                  ddd| j                        | _        | j                          y# t        $ r<}t         j                  d| j                  |       | j                  |       Y d}~yd}~ww xY w)zCalled as callback from I/O loop to kick-start the workflow, so it's
        safe to call user's completion callback from here if needed

        z(_AsyncStreamConnector._start_async(); %szMAbandoning streaming linkup due to inactive state transition; state=%s; %s; .NzStarting SSL handshake on %sF)server_sidedo_handshake_on_connectsuppress_ragged_eofsr8   zSSL wrap_socket(%s) failed: %r)r;   rP   rS   rW   r_   r   _linkupwrap_socketr   r:   rq   rk   _do_ssl_handshakerr   s     r   ra   z"_AsyncStreamConnector._start_async
  s     	@$**M;;$,,,MM./3{{DJJH  $LLNMM8$**E!..::JJ %,1).$($9$9 ; ;
 ""$  !!"BDJJ"')''.	s   $9C. .	D372D..D3c                 j   t         j                  d       d}	 	 | j                         }| j                  #	 t        | j
                  || j                        }n"	 t        | j
                  || j                        }t         j                  d|       	 |j                  |       t         j                  d||       ||f}| j                  |       y# t        $ r'}t         j	                  d|| j
                          d}~ww xY w# t        $ r'}t         j	                  d|| j
                          d}~ww xY w# t        $ r'}t         j	                  d|| j
                          d}~ww xY w# t        $ r(}t         j	                  d||| j
                          d}~ww xY w# t        $ r}|}Y d}~d}~ww xY w)	z}Connection is ready: instantiate and link up transport and protocol,
        and invoke user's completion callback.

        z_AsyncStreamConnector._linkup()Nz'protocol_factory() failed: error=%r; %sz%PlainTransport() failed: error=%r; %sz#SSLTransport() failed: error=%r; %sz_linkup(): created transport %rz1protocol.connection_made(%r) failed: error=%r; %sz2_linkup(): introduced transport to protocol %r; %r)r;   rP   r   r:   rq   rS   r   _AsyncPlaintextTransportrR   _AsyncSSLTransportconnection_maderk   )r,   	transportprotocolr   rj   s        r   r   z_AsyncStreamConnector._linkup0  s    	78	,	+113   ( 8

Hdjj!:I 24::x37::!?I MM;YG((3 MMN#X/
  *F'U  !!"K"'5 ! %%&M&+TZZ9 ! %%&K&+TZZ9  !!Gudjj2 	  	F	s   C F !D F !D6 ;F E) #F 	D "C;;D  F 	D3"D..D33F 6	E&?"E!!E&&F )	F2#FFF 	F2&F--F2c                 f   t         j                  d       | j                  | j                  k7  r,t         j                  d| j                  | j                         yd}	 	 | j                  j                          d}t         j                  d| j                         |rt         j                  d
| j                         | j                  j'                  | j                  j                                | j                  j!                  | j                  j                                d| _        t         j                  d| j                         | j/                          yy# t        j                  $ rx}|j                  t        j                  k(  rt         j                  d| j                         d| _        | j                  j                  | j                  j                         | j                         | j                  j!                  | j                  j                                n|j                  t        j"                  k(  rt         j                  d| j                         d| _        | j                  j%                  | j                  j                         | j                         | j                  j'                  | j                  j                                n Y d}~Hd}~ww xY w# t(        $ r<}t         j+                  d	|| j                         | j-                  |       Y d}~yd}~ww xY w)zJPerform asynchronous SSL handshake on the already wrapped socket

        z)_AsyncStreamConnector._do_ssl_handshake()z`_do_ssl_handshake: Abandoning streaming linkup due to inactive state transition; state=%s; %s; .NFTz(SSL handshake completed successfully: %szSSL handshake wants read; %s.zSSL handshake wants write. %sz%SSL do_handshake failed: error=%r; %sz8_do_ssl_handshake: removing watchers ahead of linkup: %sz=_do_ssl_handshake: pre-linkup removal of watchers is done; %s)r;   rP   rW   r_   rS   do_handshakerw   r   SSLErrorr   SSL_ERROR_WANT_READr   rR   
set_readerr\   r   r[   SSL_ERROR_WANT_WRITEro   r   r:   rq   rk   r   )r,   doner   s      r   r   z'_AsyncStreamConnector._do_ssl_handshakej  sU   
 	AB;;$,,,MM@AE

 	)

'')$ G!ZZ) MMJ

 JJ$$TZZ%6%6%89JJ$$TZZ%6%6%89 %*D!MMO

 LLN 5 << ;;#"9"99MM"A4::N,0D)JJ))$***;*;*=*.*@*@BJJ,,TZZ->->-@A[[C$<$<<MM"A4::N,0D)JJ))$***;*;*=*.*@*@BJJ,,TZZ->->-@A (  	Eu"jj*##E*		s=   E 9"K+ K(0E-K#K+ #K((K+ +	L042L++L0N)r.   r/   r0   r1   rV   r_   rd   rh   rD   r|   r]   r+   rA   rk   ra   r   r   r2   r   r   r9   r9   d  s     MO.&`  !  !D.&( C C> #% #%J 7( 7(r : :r   r9   c                       e Zd ZdZdZdZdZdZdZdZ	 G d d	e
      Zd
 Zd Zd Zd Zd Zd Zd Zeed               Zeed               Zed        Zed        Zed        Zed        Zy)_AsyncTransportBasezIBase class for `_AsyncPlaintextTransport` and `_AsyncSSLTransport`.

    rH   rI   rJ      i   i  c                   "     e Zd ZdZ fdZ xZS )_AsyncTransportBase.RxEndOfFilezNWe raise this internally when EOF (empty read) is detected on input.

        c                 B    t         t        j                  |   dd       y )NzEnd of input stream (EOF))superr   RxEndOfFilerD   )r,   	__class__s    r   rD   z(_AsyncTransportBase.RxEndOfFile.__init__  s    %114A/1r   )r.   r/   r0   r1   rD   __classcell__r   s   @r   r   r     s    		1 	1r   r   c                     t         j                  d|       || _        || _        || _        | j
                  | _        t        j                         | _	        d| _
        y)a~  

        :param socket.socket | ssl.SSLSocket sock: connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        z _AsyncTransportBase.__init__: %sr   N)r;   rP   rS   	_protocolrR   r_   rW   collectionsdeque_tx_buffers_tx_buffered_byte_count)r,   r'   r   r&   s       r   rD   z_AsyncTransportBase.__init__  sN     	8$?
!
((&,,.'($r   c                 |    t         j                  d| j                  | j                         | j	                  d       y)a  Close connection abruptly without waiting for pending I/O to
        complete. Will invoke the corresponding protocol's `connection_lost()`
        method asynchronously (not in context of the abort() call).

        :raises Exception: Exception-based exception on error
        z+Aborting transport connection: state=%s; %sN)r;   rw   rW   rS   _initiate_abortrF   s    r   abortz_AsyncTransportBase.abort  s0     	BDKKZZ	! 	T"r   c                     | j                   S )zReturn the protocol linked to this transport.

        :rtype: pika.adapters.utils.nbio_interface.AbstractStreamProtocol
        )r   rF   s    r   get_protocolz _AsyncTransportBase.get_protocol  s    
 ~~r   c                     | j                   S )ze
        :returns: Current size of output data buffered by the transport
        :rtype: int
        )r   rF   s    r   get_write_buffer_sizez)_AsyncTransportBase.get_write_buffer_size  s    
 +++r   c                    |sEt         j                  d| j                  | j                         t	        dj                  |            | j                  | j                  k7  r,t         j                  d| j                  | j                         y| j                  j                  |       | xj                  t        |      z  c_
        y)Buffer the given data until it can be sent asynchronously.

        :param bytes data:
        :raises ValueError: if called with empty data

        z,write() called with empty data: state=%s; %sz#write() called with empty data {!r};Ignoring write() called during inactive state: state=%s; %sN)r;   r   rW   rS   rQ   r   r_   rP   r   appendr   len)r,   datas     r   _buffer_tx_dataz#_AsyncTransportBase._buffer_tx_data  s     MMH++tzz3BII$OPP;;$,,,MM $TZZ9 %$$D	1$r   c                    d}| j                   | j                  k(  r|| j                  k  r| j                  | j                  | j
                        }|t        |      z  }|s0t        j                  d| j                         | j                         	 | j                  j                  |       | j                   | j                  k(  r|| j                  k  ryyyy# t        $ r'}t        j                  d|| j                          d}~ww xY w)a  Utility method for use by subclasses to ingest data from socket and
        dispatch it to protocol's `data_received()` method socket-specific
        "try again" exception, per-event data consumption limit is reached,
        transport becomes inactive, or a fatal failure.

        Consumes up to `self._MAX_CONSUME_BYTES` to prevent event starvation or
        until state becomes inactive (e.g., `protocol.data_received()` callback
        aborts the transport)

        :raises: Whatever the corresponding `sock.recv()` raises except the
                 socket error with errno.EINTR
        :raises: Whatever the `protocol.data_received()` callback raises
        :raises _AsyncTransportBase.RxEndOfFile: upon shutdown of input stream

        r   zSocket EOF; %sz-protocol.data_received() failed: error=%r; %sN)rW   r_   _MAX_CONSUME_BYTES_sigint_safe_recvrS   _MAX_RECV_BYTESr   r;   r   r   r   data_receivedr:   rq   )r,   bytes_consumedr   r   s       r   _consumez_AsyncTransportBase._consume  s      {{d000 7 77))$**d6J6JKDc$i'N .

;&&((,,T2 {{d000 7 77 17 1  !!CUJJ  	s   C 	D
#"DD
c                    | j                   r| j                  | j                  | j                   d         }| j                   j                         }|t	        |      k  r>t
        j                  d|t	        |             | j                   j                  ||d        | xj                  |z  c_        | j                  dk\  sJ d| j                  | j                  f       | j                   ryy)a  Utility method for use by subclasses to emit data from tx_buffers.
        This method sends chunks from `tx_buffers` until all chunks are
        exhausted or sending is interrupted by an exception. Maintains integrity
        of `self.tx_buffers`.

        :raises: whatever the corresponding `sock.send()` raises except the
                 socket error with errno.EINTR

        r   z/Partial send, requeing remaining data; %s of %sNz7_AsyncTransportBase._produce() tx buffer size underflow)
r   _sigint_safe_sendrS   popleftr   r;   rP   
appendleftr   rW   )r,   num_bytes_sentchunks      r   _producez_AsyncTransportBase._produce(  s     !33DJJ484D4DQ4GIN $$,,.EE
*O,c%j:  ++E./,BC((N:(//14 ;I,,dkk7; ;4 r   c                 $    | j                  |      S )am  Receive data from socket, retrying on SIGINT.

        :param sock: stream or SSL socket
        :param max_bytes: maximum number of bytes to receive
        :returns: received data or empty bytes uppon end of file
        :rtype: bytes
        :raises: whatever the corresponding `sock.recv()` raises except socket
                 error with errno.EINTR

        )recv)r'   	max_bytess     r   r   z%_AsyncTransportBase._sigint_safe_recvA  s     yy##r   c                 $    | j                  |      S )a@  Send data to socket, retrying on SIGINT.

        :param sock: stream or SSL socket
        :param data: data bytes to send
        :returns: number of bytes actually sent
        :rtype: int
        :raises: whatever the corresponding `sock.send()` raises except socket
                 error with errno.EINTR

        )send)r'   r   s     r   r   z%_AsyncTransportBase._sigint_safe_sendP  s     yyr   c                    | j                   | j                  k(  rt        j                  d| j                   | j                         | j
                  j                  | j                  j                                | j
                  j                  | j                  j                                | j                  j                          yy)z2Unregister the transport from I/O events

        z$Deactivating transport: state=%s; %sN)rW   r_   r;   rw   rS   rR   r   r\   r[   r   clearrF   s    r   _deactivatez_AsyncTransportBase._deactivate_  s    
 ;;$,,,LL?%JJ$$TZZ%6%6%89JJ$$TZZ%6%6%89""$ -r   c                    | j                   | j                  k7  rt        j                  d| j                   | j                         	 | j                  j                  t        j                         | j                  j                          d| _        d| _        d| _        | j                  | _         yy# t        j                  j                  $ r Y aw xY w)z{Close the transport's socket and unlink the transport it from
        references to other assets (protocol, etc.)

        z4Closing transport socket and unlinking: state=%s; %sN)rW   rh   r;   rw   rS   shutdownrM   	SHUT_RDWRr   r   r   r<   r   rR   rF   s    r   _close_and_finalizez'_AsyncTransportBase._close_and_finalizek  s     ;;$///LLOdjj2

##F$4$45 JJDJ!DNDJ//DK 0
 ;;++ s   )B1 1CCc                    t         j                  d| j                  || j                         | j                  | j                  k7  sJ d| j                  f       | j                  | j                  k(  ry| j                          |A| j                  | j                  k(  rt         j                  d       y| j                  | _        nW| j                  | j                  k7  r-| j                  | j                  k(  sJ d| j                  f       y| j                  | _        | j                  j                  t        j                  | j                  |             y)a  Initiate asynchronous abort of the transport that concludes with a
        call to the protocol's `connection_lost()` method. No flushing of
        output buffers will take place.

        :param BaseException | None error: None if being canceled by user,
            including via falsie return value from protocol.eof_received;
            otherwise the exception corresponding to the the failed connection.
        zo_AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=%s; error=%r; %szB_AsyncTransportBase._initate_abort() expected non-_STATE_COMPLETEDNzM_AsyncTransportBase._initiate_abort(): ignoring - user-abort already pending.zD_AsyncTransportBase._initate_abort() expected _STATE_ABORTED_BY_USER)r;   rw   rW   rS   rh   r   _STATE_ABORTED_BY_USERrP   r_   _STATE_FAILEDrR   r`   r    partial_connection_lost_notify_asyncrr   s     r   r   z#_AsyncTransportBase._initiate_abort~  s<    	FKK

	,
 {{d333 	1#$(KK61 	13 ;;$/// = {{d999 G H 55DK {{d000{{d&A&AA ;-.2kkD; ;A ,,DK 	

**d@@%H	Jr   c                    t         j                  d| j                  |       | j                  | j                  k(  ry|F| j                  | j                  k7  r-| j                  | j
                  k(  sJ d| j                  f       y	 | j                  j                  |       	 | j                          y# t        $ r(}t         j                  d||| j                          d}~ww xY w# | j                          w xY w)a  Handle aborting of transport either due to socket error or user-
        initiated `abort()` call. Must be called from an I/O loop callback owned
        by us in order to avoid reentry into user code from user's API call into
        the transport.

        :param BaseException | None error: None if being canceled by user;
            otherwise the exception corresponding to the the failed connection.
        z1Concluding transport shutdown: state=%s; error=%rNzS_AsyncTransportBase._connection_lost_notify_async() expected _STATE_ABORTED_BY_USERz/protocol.connection_lost(%r) failed: exc=%r; %s)r;   rP   rW   rh   r   r   r   connection_lostr:   rq   rS   r   )r,   r   excs      r   r   z1_AsyncTransportBase._connection_lost_notify_async  s     	Ikk5	* ;;$///0B0B!B;;$"="== @237;;@@ @= 		'NN**51 $$&  	O#S$**6 	 $$&s$   B2 2	C#;#CC##C& &C8N)r.   r/   r0   r1   r_   r   r   rh   r   r   OSErrorr   rD   r   r   r   r   r   r   staticmethodr"   r   r   r|   r   r   r   r   r2   r   r   r   r     s     MMO $1g 1)&
#,2*#J;2 $  $    	% 	% 0 0$ 2J 2Jh  '  'r   r   c                   H     e Zd ZdZ fdZd Zed        Zed        Z xZ	S )r   z`Implementation of `nbio_interface.AbstractStreamTransport` for a
    plaintext connection.

    c                     t         t        |   |||       | j                  j	                  | j
                  j                         | j                         y)a{  

        :param socket.socket sock: non-blocking connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        N)r   r   rD   rR   r   rS   r\   _on_socket_readabler,   r'   r   r&   r   s       r   rD   z!_AsyncPlaintextTransport.__init__  sB     	&6tXtL 	

djj//143K3KLr   c                    | j                   | j                  k7  r,t        j                  d| j                   | j                         y|sJ d|| j                   f       | j                         dk(  }| j                  |       |r_| j                  j                  | j                  j                         | j                         t        j                  d| j                         yy)r   r   Nz7_AsyncPlaintextTransport.write(): empty data from user.r   !Turned on writability watcher: %s)rW   r_   r;   rP   rS   r   r   rR   ro   r\   _on_socket_writabler,   r   tx_buffer_was_emptys      r   writez_AsyncPlaintextTransport.write  s     ;;$,,,MM $TZZ9  	)ODKK) 	)t #88:a?T"JJ!!$**"3"3"5t7O7OPMM=tzzJ r   c                    | j                   | j                  k7  r,t        j                  d| j                   | j                         y	 | j                          | j                   | j                  k7  r,t        j                  d| j                   | j                         yy# | j                  $ r 	 | j                  j                         }|rUt        j                  d| j                         | j                  j                  | j                  j                                Y yt        j                  d| j                         | j                  d       Y y# t        $ r=}t        j                  d|| j                         | j                  |       Y d}~Y yd}~ww xY wt        t         j"                  j$                  f$ r}t'        |t         j"                  j$                        r3|j(                  t*        v r!t        j                  d| j                         nit        j                  d|| j                  d	j-                  t/        j0                  t3        j4                                       | j                  |       Y d}~yY d}~yd}~ww xY w)
zIngest data from socket and dispatch it to protocol until exception
        occurs (typically EAGAIN or EWOULDBLOCK), per-event data consumption
        limit is reached, transport becomes inactive, or failure.

        EIgnoring readability notification due to inactive state: state=%s; %sNz>Leaving Plaintext consumer due to inactive state: state=%s; %sz0protocol.eof_received() elected to keep open: %sz,protocol.eof_received() elected to close: %sz,protocol.eof_received() failed: error=%r; %szRecv would block on %sa_AsyncBaseTransport._consume() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%s )rW   r_   r;   rP   rS   r   r   r   eof_receivedrw   rR   r   r\   r   r:   rq   r   r   r   r   r   _TRY_IO_AGAIN_SOCK_ERROR_CODESjoin	tracebackformat_exceptionsysexc_info)r,   	keep_openr   s      r   r   z,_AsyncPlaintextTransport._on_socket_readable	  s    ;;$,,,MM&'+{{DJJ@ %	DMMO> {{d000 *+/;;

D 1=  	// NN779	 LLJ

$ JJ,,TZZ->->-@ALL!O!%-((.  ,!!BEJJ  $$U++	, 4;;334 
	,5$++":":;KK#AA6

C!!J4::rww!22CLLNC(EF
 $$U++ D
	,sJ   B J-EAJ1J	F2FJF%J<B;JJc                    | j                   | j                  k7  r,t        j                  d| j                   | j                         y| j
                  sJ d| j                   f       	 | j                          | j
                  sT| j                  j                  | j                  j                                t        j                  d| j                         yy# t        t        j                  j                  f$ r}t        |t        j                  j                        r3|j                  t         v r!t        j                  d| j                         nit        j#                  d|| j                  dj%                  t'        j(                  t+        j,                                       | j/                  |       Y d}~yY d}~yd}~ww xY w)-Handle writable socket notification

        EIgnoring writability notification due to inactive state: state=%s; %sNzP_AsyncPlaintextTransport._on_socket_writable() called, but _tx_buffers is empty.z"Turned off writability watcher: %szSend would block on %sa_AsyncBaseTransport._produce() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%sr   )rW   r_   r;   rP   rS   r   r   rR   r[   r\   r:   r   r   r   r   r   r   rq   r   r   r   r   r   r   rr   s     r   r   z,_AsyncPlaintextTransport._on_socket_writable=  s`   
 ;;$,,,MM&'+{{DJJ@   	6()-"6 	6	PMMO ##

(():):)<=BDJJO $ 4;;334 
	,5$++":":;KK#AA6

C!!J4::rww!22CLLNC(EF
 $$U++ D
	,s   &C #G:B;F??G)
r.   r/   r0   r1   rD   r   r|   r   r   r   r   s   @r   r   r     sA    
M K8 1D 1Df  P  Pr   r   c                   p     e Zd ZdZ fdZd Zed        Zed        Ze fd       Z	e fd       Z
 xZS )r   z\Implementation of `.nbio_interface.AbstractStreamTransport` for an SSL
    connection.

    c                 &   t         t        |   |||       | j                  | _        d| _        | j                  j                  | j                  j                         | j                         | j                  j                  | j                         y)a{  

        :param ssl.SSLSocket sock: non-blocking connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        N)r   r   rD   r   _ssl_readable_action_ssl_writable_actionrR   r   rS   r\   r   r`   r   s       r   rD   z_AsyncSSLTransport.__init__g  sn     	 $0xF$(MM!$(! 	

djj//143K3KL

**4+C+CDr   c                     | j                   | j                  k7  r,t        j                  d| j                   | j                         y|sJ d|| j                   f       | j                         dk(  }| j                  |       |r}| j                  p| j                  | _        | j                  j                  | j                  j                         | j                         t        j                  d| j                         yyy)r   r   Nz1_AsyncSSLTransport.write(): empty data from user.r   r   )rW   r_   r;   rP   rS   r   r   r  r   rR   ro   r\   r   r   s      r   r   z_AsyncSSLTransport.write{  s     ;;$,,,MM $TZZ9  	)IDKK) 	)t #88:a?T"4#<#<#D(,D%JJ!!$**"3"3"5t7O7OPMM=tzzJ $Er   c                 n   | j                   | j                  k7  r,t        j                  d| j                   | j                         y| j
                  r	 | j                          yt        j                  d| j                  | j                         y# t        $ r}| j                  |       Y d}~yd}~ww xY w)z+Handle readable socket indication

        r   Nz>SSL readable action was suppressed: ssl_writable_action=%r; %s)	rW   r_   r;   rP   rS   r  r:   r   r  rr   s     r   r   z&_AsyncSSLTransport._on_socket_readable      
 ;;$,,,MM&'+{{DJJ@ $$,))+ MM-.2.G.G

  ,$$U++,   B 	B4B//B4c                 n   | j                   | j                  k7  r,t        j                  d| j                   | j                         y| j
                  r	 | j                          yt        j                  d| j                  | j                         y# t        $ r}| j                  |       Y d}~yd}~ww xY w)r   r   Nz>SSL writable action was suppressed: ssl_readable_action=%r; %s)	rW   r_   r;   rP   rS   r  r:   r   r  rr   s     r   r   z&_AsyncSSLTransport._on_socket_writable  r  r  c                 `   d}	 t         t        |           | j                  | j                  k7  r,t
        j                  d| j                  | j                         y| j                  j                  | j                         |r| j,                  s>| j                  j/                  | j                  j1                         | j                         | j                  | _        | j2                  | j                  k(  r| j                  j5                  | j                  j1                                d| _        n| j2                  s>| j                  j7                  | j                  j1                         | j8                         | j                  | _        | j,                  r:| j                  j;                  | j                  j1                                d| _        | j<                  r]| j2                  sP| j>                  | _        | j                  j7                  | j                  j1                         | j8                         yyy# t        j                  $ r}|j                  t        j                  k(  r!t
        j                  d| j                         n|j                  t        j                  k(  r#t
        j                  d| j                         d}nTt
        j!                  d|| j                  dj#                  t%        j&                  t)        j*                                        Y d}~d}~ww xY w)	a  [override] Ingest data from socket and dispatch it to protocol until
        exception occurs (typically ssl.SSLError with
        SSL_ERROR_WANT_READ/WRITE), per-event data consumption limit is reached,
        transport becomes inactive, or failure.

        Update consumer/producer registration.

        :raises Exception: error that signals that connection needs to be
            aborted
        Tz8Leaving SSL consumer due to inactive state: state=%s; %sNzSSL ingester wants read: %szSSL ingester wants write: %sFr   r   ) r   r   r   rW   r_   r;   rP   rS   rR   r`   r   r   r   r   r   r   rq   r   r   r   r   r   r  r   r\   r  r[   ro   r   r   r   r   )r,   next_consume_on_readabler   r   s      r   r   z_AsyncSSLTransport._consume  s^    $( 	I$d46  {{d000 *+/;;

D  JJ..t/G/GH $,,

%%djj&7&7&9&*&>&>@(,D% ((DMM9

(():):)<=,0) ,,

%%djj&7&7&9&*&>&>@(,D%((

(():):)<=,0) D$=$=(,D%JJ!!$**"3"3"5t7O7OP %>k || 	{{c555;TZZH 8 88<djjI+0(!!J4::rww!22CLLNC(EF
 	s   H= =L-CL((L-c                    d}	 t         t        |           | j                  rJ dt	        | j                        f       | j                  rk|J d	| j&                  f       |r| j(                  s>| j*                  j-                  | j                  j/                         | j0                         | j                  | _        | j2                  | j                  k(  r| j*                  j5                  | j                  j/                                d| _        n| j2                  s>| j*                  j7                  | j                  j/                         | j8                         | j                  | _        | j(                  r9| j*                  j;                  | j                  j/                                d| _        n| j2                  | j                  k(  rf| j*                  j5                  | j                  j/                                d| _        | j(                  | j                  k7  sJ d
| j&                  f       | j(                  | j                  k(  s,J dd| j(                  d| j2                  d| j&                  f       d| _        | j*                  j;                  | j                  j/                                | j2                  su| j<                  | _        | j*                  j7                  | j                  j/                         | j8                         | j*                  j?                  | j8                         y| j                  jA                         r&| j*                  j?                  | j8                         yy# t
        j                  $ r}|j                  t
        j                  k(  r#t        j                  d| j                         d}n|j                  t
        j                  k(  r#t        j                  d| j                         d}nTt        j                  d|| j                  dj                  t        j                   t#        j$                                        Y d}~%d}~ww xY w)aI  [override] Emit data from tx_buffers all chunks are exhausted or
        sending is interrupted by an exception (typically ssl.SSLError with
        SSL_ERROR_WANT_READ/WRITE).

        Update consumer/producer registration.

        :raises Exception: error that signals that connection needs to be
            aborted

        Nz__AsyncSSLTransport._produce(): no exception from parent class, but data remains in _tx_buffers.zSSL emitter wants read: %sFzSSL emitter wants write: %sTr   r   zE_AsyncSSLTransport._produce(): next_produce_on_writable is still Nonezr_AsyncSSLTransport._produce(): with empty tx_buffers, writable_action cannot be _produce when readable is _producez_AsyncSSLTransport._produce(): with empty tx_buffers, expected writable_action as _produce when readable_action is not _producezwritable_action:zreadable_action:zstate:)!r   r   r   r   r   r   r   r   r   r;   rP   rS   r   rq   r   r   r   r   r   rW   r  rR   ro   r\   r   r  r   r   r   r[   r   r`   pending)r,   next_produce_on_writabler   r   s      r   r   z_AsyncSSLTransport._produce  s    $( 	'$d46$ '' ':;>$$<&*' '' +7 +"kk:+ +7 (00JJ))$***;*;*=*.*B*BD,0MM) ,,=JJ,,TZZ->->-@A04D- 00JJ))$***;*;*=*.*B*BD,0MM),,JJ,,TZZ->->-@A04D- ((DMM9

(():):)<=,0)00DMMA - $D- -A 00DMMA F&'9--/A--xDF FA -1)

(():):)<= (((,D%JJ!!$**"3"3"5t7O7OPJJ..t/G/GHZZ!JJ..t/G/GH "U || 	{{c555:DJJG+0( 8 88;TZZH+/(!!J4::rww!22CLLNC(EF
 	s   M7 7Q)
CQ$$Q))r.   r/   r0   r1   rD   r   r|   r   r   r   r   r   r   s   @r   r   r   a  sm    
E(K:  *  * FQ FQP ZI ZIr   r   )(r1   r   r   r    loggingr   rx   rM   r   r   r   "pika.adapters.utils.nbio_interfacer   r   pika.compatr   pika.diagnostic_utilsEAGAINEWOULDBLOCKr   EINPROGRESSrn   	getLoggerr.   r;   diagnostic_utilscreate_log_exception_decoratorr|   r   r   r"   objectr$   r4   r?   r*   r9   r   r   r   r2   r   r   <module>r     s  
      	  
 
 I   
LL	"  
	, (
 '

H
% ''FFwO
	K(%F %&&v &R2 ,v(F v(rAF AH
l'l'^	GP2 GPTGI, GIr   