
    ]j7(                        d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZmZmZmZ d dlmZ d dlmZmZ d dlmZ d	d
lmZ d	dlmZmZ d	dlmZ d	dlm Z  d	dl!m"Z" d	dl#m$Z$ d	dl%m&Z&m'Z' erd dlm(Z(  G d de      Z) G d d      Z*e'eee"ddd fde+dee+   de,de-e&   ddde-e   de-e"   d e.d!e+d"e/fd#Z0y)$    N)Iterable)Enum)Process)TYPE_CHECKING
NamedTupleOptionalUnion)uuid4)ConnectionPoolRedis)DefaultSerializer   )parse_connection)DEFAULT_LOGGING_DATE_FORMATDEFAULT_LOGGING_FORMAT)Job)setup_loghandlers)Queue)parse_names)
BaseWorkerWorker)
Serializerc                   ,    e Zd ZU eed<   eed<   eed<   y)
WorkerDatanamepidprocessN)__name__
__module____qualname__str__annotations__intr        8/root/env/lib/python3.12/site-packages/rq/worker_pool.pyr   r      s    
I	Hr%   r   c                      e Zd Z G d de      Zdeeeefde	e
eef      dededee   dd	d
ee   dee   fdZedee   fd       Zedefd       Zd Zd$dZdefdZd ZdefdZd%deddfdZ	 	 d&dededededef
dZ	 	 	 	 d'de e   dededefdZ!d(dededefd Z"e#jH                  fdefd!Z%d" Z&d)dedefd#Z'y)*
WorkerPoolc                       e Zd ZdZdZdZy)WorkerPool.Statusr         N)r   r   r    IDLESTARTEDSTOPPEDr$   r%   r&   Statusr*   &   s    r%   r0   r   queues
connectionnum_workersworker_class
serializerr   	job_classqueue_classc                    || _         g | _        t        dt        t        t
               t        j                  t
              | _        t        |      | _
        || _        t               j                  | _        d| _        d| _        | j"                  j$                  | _        || _        || _        || _        || _        i | _        t3        |      \  | _        | _        | _        y )NINFOr   Tr   )r3   _workersr   r   r   r   logging	getLoggerlogr   _queue_namesr2   r
   hexr   _burst_sleepr0   r-   statusr4   r5   r6   r7   worker_dictr   _connection_class_pool_class_pool_kwargs)
selfr1   r2   r3   r4   r5   r6   r7   argskwargss
             r&   __init__zWorkerPool.__init__+   s     !,&(&"=?U\de#*#4#4X#>'26':$	 #';;#3#3.:&0$-(3 35FVWaFbC 0$2Cr%   returnc                 v    | j                   D cg c]  }| j                  || j                        ! c}S c c}w )Returns a list of Queue objectsr2   )r?   r7   r2   )rH   r   s     r&   r1   zWorkerPool.queuesK   s4     PTO`O`at  $// Baaas   $6c                 ,    t        | j                        S )rN   )lenrD   rH   s    r&   number_of_active_workersz#WorkerPool.number_of_active_workersP   s     4##$$r%   c                     t        j                   t         j                  | j                         t        j                   t         j                  | j                         y)zUInstalls signal handlers for handling SIGINT and SIGTERM
        gracefully.
        N)signalSIGINTrequest_stopSIGTERMrR   s    r&   _install_signal_handlersz#WorkerPool._install_signal_handlersU   s4     	fmmT%6%67fnnd&7&78r%   Nc                     | j                   j                  d       | j                  j                  | _        | j                          y)z8Toggle self._stop_requested that's checked on every loopz)Received SIGINT/SIGTERM, shutting down...N)r>   infor0   r/   rC   stop_workers)rH   signumframes      r&   rW   zWorkerPool.request_stop\   s0    ABkk))r%   c                 @    | j                          | j                  dk(  S )z)Returns True if all workers have stopped.r   )reap_workersrS   rR   s    r&   all_workers_have_stoppedz#WorkerPool.all_workers_have_stoppedb   s    ,,11r%   c                 ~   | j                   j                  d       t        | j                  j	                               }|D ]z  }|j
                  j                  d       |j
                  j                         r2| j                   j                  d|j                  |j                         j| j                  |       | y)z%Removes dead workers from worker_dictzReaping dead workersg?zWorker %s with pid %d is aliveN)r>   debuglistrD   valuesr   joinis_aliver   r   handle_dead_worker)rH   worker_datasdatas      r&   r`   zWorkerPool.reap_workersh   s    -.D,,3356  	DLLc"||$$&?DHHU''-	r%   worker_datac                    | j                   j                  d|j                  |j                         t	        j
                  t              5  | j                  j                  |j                         ddd       y# 1 sw Y   yxY w)z&
        Handle a dead worker
        zWorker %s with pid %d is deadN)	r>   r[   r   r   
contextlibsuppressKeyErrorrD   pop)rH   rk   s     r&   rh   zWorkerPool.handle_dead_worker   s`     	5{7G7GY  * 	3  !1!12	3 	3 	3s   &A::Brespawnc                 `   | j                   j                  d       | j                          |r| j                  | j                  j
                  k7  r]| j                  t        | j                        z
  }|r8t        |      D ])  }| j                  | j                  | j                         + yyyy)z7
        Check whether workers are still alive
        zChecking worker processes)burstrB   N)r>   rc   r`   rC   r0   r/   r3   rQ   rD   rangestart_workerrA   rB   )rH   rq   deltais       r&   check_workerszWorkerPool.check_workers   s     	23 t{{dkk&9&99$$s4+;+;'<<Eu MA%%DKK%LM  :7r%   r   rs   rB   logging_levelc                     t        t        || j                  | j                  | j                  | j
                  f|||| j                  | j                  | j                  dd| d| j                   d      S )zReturns the worker process)rB   rs   ry   r4   r6   r5   zWorker z (WorkerPool ))targetrI   rJ   r   )
r   
run_workerr?   rE   rF   rG   r4   r6   r5   r   )rH   r   rs   rB   ry   s        r&   get_worker_processzWorkerPool.get_worker_process   sx     ))4+A+A4CSCSUYUfUfg !. $ 1 1!^^"oo 4&dii[:
 	
r%   countc                    t               j                  }| j                  ||||      }|j                          t	        ||j
                  |      }|| j                  |<   | j                  j                  d||j
                         y)z
        Starts a worker and adds the data to worker_datas.
        * sleep: waits for X seconds before creating worker, for testing purposes
        rs   rB   ry   )r   r   r   zSpawned worker: %s with PID %dN)	r
   r@   r~   startr   r   rD   r>   rc   )rH   r   rs   rB   ry   r   r   rk   s           r&   ru   zWorkerPool.start_worker   sm     w{{))$eFZg)h dWM!,7w{{Kr%   c                     | j                   j                  d| j                   d       t        | j                        D ]  }| j	                  |dz   |||        y)zx
        Run the workers
        * sleep: waits for X seconds before creating worker, only for testing purposes
        z	Spawning z workersr   r   N)r>   rc   r3   rt   ru   )rH   rs   rB   ry   rw   s        r&   start_workerszWorkerPool.start_workers   s[    
 	4#3#3"4H=>t''( 	^Aa!e5}]	^r%   c                 2   	 t        j                  |j                  |       | j                  j	                  d|j                         y# t
        $ rD}|j                  t        j                  k(  r| j                  j                  d       n Y d}~yd}~ww xY w)zm
        Send stop signal to worker and catch "No such process" error if the worker is already dead.
        z'Sent shutdown command to worker with %szHorse already deadN)	oskillr   r>   r[   OSErrorerrnoESRCHrc   )rH   rk   siges       r&   stop_workerzWorkerPool.stop_worker   si    	GGKOOS)HHMMC[__U 	ww%++%34 5	s   AA	 		B:BBc                     | j                   j                  dt        | j                               t	        | j                  j                               }|D ]  }| j                  |        y)zSend SIGINT to all workersz!Sending stop signal to %s workersN)r>   r[   rQ   rD   rd   re   r   )rH   ri   rk   s      r&   r\   zWorkerPool.stop_workers   sV    93t?O?O;PQD,,3356' 	*K[)	*r%   c                     || _         | }t        |t        t        t               | j
                  j                  d| j                   dt        j                                | j                  j                  | _        | j                  | j                   |       | j                          	 | j                  | j                  j                  k(  r]| j!                         r| j
                  j                  d       y | j
                  j                  d       t#        j$                  d       | j'                  |       |r+| j(                  d	k(  r| j
                  j                  d       y t#        j$                  d       )
Nr:   zStarting worker pool z with pid %d...)rs   ry   zAll workers stopped, exiting...z"Waiting for workers to shutdown...r   )rq   r   )rA   r   r   r   r   r>   r[   r   r   getpidr0   r.   rC   r   rY   r/   ra   timesleeprx   rS   )rH   rs   ry   rq   s       r&   r   zWorkerPool.start   s   )-)DF\ckl-dii[H"))+Vkk))MJ%%'{{dkk111002HHMM"CDHHMM"FGJJqM""7"3T::a?HHMM"CD

1 r%   )NN)T)r   r9   )NTr   r9   )Tr   r9   )Fr9   )(r   r   r    r   r0   r   r   r   r   r   r	   r!   r   r#   typer   rK   propertyrd   r1   rS   rY   rW   boolra   r`   r   rh   rx   floatr   r~   r   ru   r   rU   rV   r   r\   r   r$   r%   r&   r(   r(   %   s     )/#4"#(csEz*+c c 	c
 :&c !c 9c %[c@ bU b b %# % %92$ 243j 3MT MT M$ #

 
 	

 
 

0  $#L}L L 	L
 L$^4 ^ ^RU ^ 8>}} z *4  r%   r(   Tr9   worker_namequeue_namesconnection_pool_kwargsr4   r5   r   r6   r7   rs   ry   rB   c                 .    |t        dd|i|      }|D cg c]  } |||       }} ||| ||||      }|j                  j                  dt        j                                t        j                  |       |j                  |	d|
       y c c}w )	Nconnection_class)connection_poolrO   )r   r2   r5   r6   r7   z#Starting worker started with PID %sT)rs   with_schedulerry   r$   )r   r>   r[   r   r   r   r   work)r   r   r   connection_pool_classr   r4   r5   r6   r7   rs   ry   rB   r2   r   r1   workers                   r&   r}   r}      s     "&h8MhQghJ DOO4k$:6OFOF JJOO9299;GJJv
KKeDKN Ps   B)1rm   r   r<   r   rU   r   collections.abcr   enumr   multiprocessingr   typingr   r   r   r	   uuidr
   redisr   r   rq.serializersr   connectionsr   defaultsr   r   jobr   logutilsr   queuer   utilsr   r   r   r   r   r   r(   r!   dictr   r   r#   r}   r$   r%   r&   <module>r      s       	   $  # > =  ' , ) I  '   &) P Pr &,0$OO#O
 !O z"O O CyO eO O O Or%   