
    f$                        d Z ddlZddlZddlZddlmZ ddlmZmZ  ej                  e
      Zej                  dk(  r! ej                   ej                                 G d dej                        Z G d d	ej"                  ej$                  ej&                  ej(                        Z G d
 dej,                        Z G d dej0                        Zy)z#Use pika with the Asyncio EventLoop    N)base_connection)nbio_interfaceio_services_utilswin32c                   F     e Zd ZdZ	 	 	 	 	 	 d fd	Ze	 	 dd       Z xZS )AsyncioConnectionz; The AsyncioConnection runs on the Asyncio EventLoop.

    c                     t        |t        j                        r|}nt        |      }t        |   ||||||       y)a   Create a new instance of the AsyncioConnection class, connecting
        to RabbitMQ automatically

        :param pika.connection.Parameters parameters: Connection parameters
        :param callable on_open_callback: The method to call when the connection
            is open
        :param None | method on_open_error_callback: Called if the connection
            can't be established or connection establishment is interrupted by
            `Connection.close()`: on_open_error_callback(Connection, exception).
        :param None | method on_close_callback: Called when a previously fully
            open connection is closed:
            `on_close_callback(Connection, exception)`, where `exception` is
            either an instance of `exceptions.ConnectionClosed` if closed by
            user or broker or exception of another type that describes the cause
            of connection failure.
        :param None | asyncio.AbstractEventLoop |
            nbio_interface.AbstractIOServices custom_ioloop:
                Defaults to asyncio.get_event_loop().
        :param bool internal_connection_workflow: True for autonomous connection
            establishment which is default; False for externally-managed
            connection workflow via the `create_connection()` factory.

        )internal_connection_workflowN)
isinstancer   AbstractIOServices_AsyncioIOServicesAdaptersuper__init__)	self
parameterson_open_callbackon_open_error_callbackon_close_callbackcustom_ioloopr
   nbio	__class__s	           [/var/www/cs2snipe.com/venv/lib/python3.12/site-packages/pika/adapters/asyncio_connection.pyr   zAsyncioConnection.__init__   sI    < m^%F%FG D,];D")E 	 	G    c                 T     t        |       fd} j                  ||||      S )z`Implement
        :py:classmethod::`pika.adapters.BaseConnection.create_connection()`.

        c                 4    | t        d       | d      S )zConnection factory.zIExpected pika.connection.Parameters instance, but got None in params arg.F)r   r   r
   )
ValueError)paramsclsr   s    r   connection_factoryz?AsyncioConnection.create_connection.<locals>.connection_factoryM   s3    ~  "I J J!"-24 4r   )connection_configsr   r   workflowon_done)r   _start_connection_workflow)r   r    r"   r   r!   r   r   s   `     @r   create_connectionz#AsyncioConnection.create_connectionA   s=     )7	4 --11 .  	r   )NNNNNT)NN)__name__
__module____qualname____doc__r   classmethodr$   __classcell__)r   s   @r   r   r      s?    
 !"&(,#'#.2)GV  )-#'	 r   r   c                   j    e Zd ZdZddZd Zd Zd Zd Zd Z	d	 Z
	 	 	 	 dd
Zd Zd Zd Zd Zd Zy)r   zImplements
    :py:class:`.utils.nbio_interface.AbstractIOServices` interface
    on top of `asyncio`.

    NOTE:
    :py:class:`.utils.nbio_interface.AbstractFileDescriptorServices`
    interface is only required by the mixins.

    Nc                 >    |xs t        j                         | _        y)z{
        :param asyncio.AbstractEventLoop | None loop: If None, gets default
            event loop from asyncio.

        N)asyncioget_event_loop_loop)r   loops     r   r   z"_AsyncioIOServicesAdapter.__init__m   s     5W335
r   c                     | j                   S )zdImplement
        :py:meth:`.utils.nbio_interface.AbstractIOServices.get_native_ioloop()`.

        )r/   r   s    r   get_native_ioloopz+_AsyncioIOServicesAdapter.get_native_ioloopu   s    
 zzr   c                 8    | j                   j                          y)zXImplement
        :py:meth:`.utils.nbio_interface.AbstractIOServices.close()`.

        N)r/   closer2   s    r   r5   z_AsyncioIOServicesAdapter.close|   s    
 	

r   c                 8    | j                   j                          y)zNImplement :py:meth:`.utils.nbio_interface.AbstractIOServices.run()`.

        N)r/   run_foreverr2   s    r   runz_AsyncioIOServicesAdapter.run   s     	

 r   c                 8    | j                   j                          y)zOImplement :py:meth:`.utils.nbio_interface.AbstractIOServices.stop()`.

        N)r/   stopr2   s    r   r:   z_AsyncioIOServicesAdapter.stop   s     	

r   c                 :    | j                   j                  |       y)zjImplement
        :py:meth:`.utils.nbio_interface.AbstractIOServices.add_callback_threadsafe()`.

        N)r/   call_soon_threadsafe)r   callbacks     r   add_callback_threadsafez1_AsyncioIOServicesAdapter.add_callback_threadsafe   s    
 	

''1r   c                 L    t        | j                  j                  ||            S )z]Implement
        :py:meth:`.utils.nbio_interface.AbstractIOServices.call_later()`.

        )_TimerHandler/   
call_later)r   delayr=   s      r   rA   z$_AsyncioIOServicesAdapter.call_later   s     
 DJJ11%BCCr   c           
      d    | j                  | j                  j                  ||||||      |      S )z^Implement
        :py:meth:`.utils.nbio_interface.AbstractIOServices.getaddrinfo()`.

        )familytypeprotoflags)_schedule_and_wrap_in_io_refr/   getaddrinfo)r   hostportr"   rD   socktyperF   rG   s           r   rI   z%_AsyncioIOServicesAdapter.getaddrinfo   sE     00JJ"" #  &' 	'r   c                 h    | j                   j                  ||       t        j                  d|       y)ziImplement
        :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_reader()`.

        zset_reader(%s, _)N)r/   
add_readerLOGGERdebug)r   fdon_readables      r   
set_readerz$_AsyncioIOServicesAdapter.set_reader   '    
 	

b+.("-r   c                 d    t         j                  d|       | j                  j                  |      S )zlImplement
        :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_reader()`.

        zremove_reader(%s))rO   rP   r/   remove_readerr   rQ   s     r   rV   z'_AsyncioIOServicesAdapter.remove_reader   (    
 	("-zz''++r   c                 h    | j                   j                  ||       t        j                  d|       y)ziImplement
        :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_writer()`.

        zset_writer(%s, _)N)r/   
add_writerrO   rP   )r   rQ   on_writables      r   
set_writerz$_AsyncioIOServicesAdapter.set_writer   rT   r   c                 d    t         j                  d|       | j                  j                  |      S )zlImplement
        :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_writer()`.

        zremove_writer(%s))rO   rP   r/   remove_writerrW   s     r   r^   z'_AsyncioIOServicesAdapter.remove_writer   rX   r   c                     t        |      st        dj                  |            t        t	        j
                  || j                        |      S )a  Schedule the coroutine to run and return _AsyncioIOReference

        :param coroutine-obj coro:
        :param callable on_done: user callback that takes the completion result
            or exception as its only arg. It will not be called if the operation
            was cancelled.
        :rtype: _AsyncioIOReference which is derived from
            nbio_interface.AbstractIOReference

        *on_done arg must be callable, but got {!r})r0   )callable	TypeErrorformat_AsyncioIOReferencer-   ensure_futurer/   )r   coror"   s      r   rH   z6_AsyncioIOServicesAdapter._schedule_and_wrap_in_io_ref   sP      <CCGLN N #!!$TZZ8'C 	Cr   N)r   r   r   r   )r%   r&   r'   r(   r   r3   r5   r8   r:   r>   rA   rI   rS   rV   r\   r^   rH    r   r   r   r   _   sX    6!2D '*.,.,Cr   r   c                       e Zd ZdZd Zd Zy)r@   zJThis module's adaptation of `nbio_interface.AbstractTimerReference`.

    c                     || _         y)z0

        :param asyncio.Handle handle:
        N)_handle)r   handles     r   r   z_TimerHandle.__init__   s    
 r   c                 `    | j                   "| j                   j                          d | _         y y rg   )rk   cancelr2   s    r   rn   z_TimerHandle.cancel   s(    <<#LL!DL $r   Nr%   r&   r'   r(   r   rn   rh   r   r   r@   r@      s     r   r@   c                       e Zd ZdZd Zd Zy)rd   zGThis module's adaptation of `nbio_interface.AbstractIOReference`.

    c                     t              st        dj                              || _        fd}|j	                  |       y)z
        :param asyncio.Future future:
        :param callable on_done: user callback that takes the completion result
            or exception as its only arg. It will not be called if the operation
            was cancelled.

        r`   c                 x    | j                         s) | j                         xs | j                                yy)z3Handle completion callback from the future instanceN)	cancelled	exceptionresult)futurer"   s    r   on_done_adapterz5_AsyncioIOReference.__init__.<locals>.on_done_adapter
  s2    
 ##%((*=fmmo> &r   N)ra   rb   rc   _futureadd_done_callback)r   rv   r"   rw   s     ` r   r   z_AsyncioIOReference.__init__   sH      <CCGLN N 	? 	  1r   c                 6    | j                   j                         S )zCancel pending operation

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        )rx   rn   r2   s    r   rn   z_AsyncioIOReference.cancel  s     ||""$$r   Nro   rh   r   r   rd   rd      s    20%r   rd   )r(   r-   loggingsyspika.adaptersr   pika.adapters.utilsr   r   	getLoggerr%   rO   platformset_event_loop_policyWindowsSelectorEventLoopPolicyBaseConnectionr   SocketConnectionMixinStreamingConnectionMixinr   AbstractFileDescriptorServicesr   AbstractTimerReferencer@   AbstractIOReferencerd   rh   r   r   <module>r      s    )   
 ) A			8	$ <<7!G!!"H'"H"H"JKK66 K\CC 1 G G 1 J J . A A . M MCCL >88  $$%.<< $%r   