
    ]j                      "   d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZmZ d dlmZmZ dd	lmZ dd
lmZmZmZ ddlmZ ddlmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z#m$Z$ dZ%dZ& G d de'e	      Z( G d d      Z)d Z*y)    N)Iterable)datetime)Enum)Process)OptionalUnion)ConnectionPoolRedis   )parse_connection)DEFAULT_LOGGING_DATE_FORMATDEFAULT_LOGGING_FORMAT!DEFAULT_SCHEDULER_FALLBACK_PERIOD)Job)setup_loghandlers)Queue)ScheduledJobRegistry)resolve_serializer)current_timestampparse_nameszrq:scheduler:%szrq:scheduler-lock:%sc                       e Zd ZdZdZdZy)SchedulerStatusstartedworkingstoppedN)__name__
__module____qualname__STARTEDWORKINGSTOPPED     6/root/env/lib/python3.12/site-packages/rq/scheduler.pyr   r      s    GGGr#   r   c                       e Zd ZeZdej                  eedfde	de
eef   fdZed        Zed        Zed        Zed	        Zdd
Zddeee      fdZedefd       Zd Zd ZddZd Zd Zd Zd Zd Z y)RQSchedulerr   N
connectionlogging_levelc                    t        t        |            | _        t               | _        g | _        d | _        t        |      \  | _        | _        | _	        t        |      | _        d | _        || _        d| _        | j                  j                   | _        d | _        t'        j(                  t*              | _        t/        |t*        ||       y )NF)levelname
log_formatdate_format)setr   _queue_names_acquired_locks_scheduled_job_registrieslock_acquisition_timer   _connection_class_pool_class_pool_kwargsr   
serializer_connectioninterval_stop_requestedStatusr!   _status_processlogging	getLoggerr   logr   )selfqueuesr'   r8   r(   r-   r,   r6   s           r$   __init__zRQScheduler.__init__(   s      F 34),EG&%)"FVWaFbC 0$2C,Z8 ${{**$$X.!#		
r#   c                     | j                   r| j                   S | j                  t        dd| j                  i| j                        | _         | j                   S )Nconnection_class)connection_poolr"   )r7   r3   r	   r4   r5   r@   s    r$   r'   zRQScheduler.connectionF   sZ    ###11*bD<L<LbPTPaPab 2 
 r#   c                     | j                   S N)r0   rF   s    r$   acquired_lockszRQScheduler.acquired_locksO   s    ###r#   c                     | j                   S rH   )r;   rF   s    r$   statuszRQScheduler.statusS   s    ||r#   c                     | j                   | j                  k(  ry| j                  syt        j                         | j                  z
  j                         t        kD  S )zCReturns True if lock_acquisition_time is longer than 10 minutes agoFT)r/   rI   r2   r   nowtotal_secondsr   rF   s    r$   should_reacquire_locksz"RQScheduler.should_reacquire_locksW   sM      3 33))!;!;;JJLOpppr#   c                    t               }t        j                         }| j                  j	                  ddj                  | j                               | j                  D ]k  }| j                  j                  | j                  |      |d| j                  dz         s?| j                  j                  d|       |j                  |       m g | _        | j                  j                  |      | _        t        j                          | _        | j                  r8|r6| j$                  r| j$                  j'                         s| j)                          |S )z7Returns names of queue it successfully acquires lock onzAcquiring scheduler lock for %s, T<   )nxexzAcquired scheduler lock for %s)r.   osgetpidr?   debugjoinr/   r'   get_locking_keyr8   infoaddr1   r0   unionr   rM   r2   r<   is_alivestart)r@   
auto_startsuccessful_lockspidr+   s        r$   acquire_lockszRQScheduler.acquire_locks`   s   5iik8$))DDUDU:VW%% 	+D""4#7#7#=stPTP]P]`bPb"c>E $$T*	+ *,&#3399:JK%-\\^" J==(>(>(@

r#   queue_namesc                     g | _         |s| j                  }|D ]=  }| j                   j                  t        || j                  | j
                               ? y)z(Prepare scheduled job registries for user'   r6   N)r1   r0   appendr   r'   r6   )r@   rc   r+   s      r$   prepare_registrieszRQScheduler.prepare_registriesw   sR    )+&..K 	D**11$TdooRVRaRab	r#   r+   c                     t         |z  S )z,Returns scheduler key for a given queue name)SCHEDULER_LOCKING_KEY_TEMPLATE)clsr+   s     r$   rY   zRQScheduler.get_locking_key   s     .44r#   c           
         | j                   j                  | _        | j                  s| j                  r| j                          | j                  D ]  }t               }|j                  |      }|s!t        |j                  | j                  | j                        }| j                  j                         5 }t        j                  || j                  | j                        }|D ],  }||j                  ||t!        |j"                               . |D ]  }|j%                  ||        |j'                          ddd        | j                   j(                  | _        y# 1 sw Y   xY w)z+Enqueue jobs whose timestamp is in the pastre   N)pipelineat_front)rl   )r:   r    r;   r1   r0   rg   r   get_jobs_to_scheduler   r+   r'   r6   rl   r   
fetch_many_enqueue_jobboolenqueue_at_frontremoveexecuter   )	r@   registry	timestampjob_idsqueuerl   jobsjobjob_ids	            r$   enqueue_scheduled_jobsz"RQScheduler.enqueue_scheduled_jobs   sB   {{**--$2F2F##%66 	#H)+I 33I>G(--DOOPTP_P_`E))+ #x~~g$//VZVeVef hC**3DQTQeQeLf*gh & ?FOOFXO>?  "# #	#( {{**# #s   74E%,AE%%E/	c                     t        j                   t         j                  | j                         t        j                   t         j                  | j                         y)zUInstalls signal handlers for handling SIGINT and SIGTERM
        gracefully.
        N)signalSIGINTrequest_stopSIGTERMrF   s    r$   _install_signal_handlersz$RQScheduler._install_signal_handlers   s4     	fmmT%6%67fnnd&7&78r#   c                     d| _         y)z8Toggle self._stop_requested that's checked on every loopTN)r9   )r@   signumframes      r$   r   zRQScheduler.request_stop   s
    #r#   c                 f   | j                   j                  ddj                  | j                               t	        | j
                        dkD  ru| j                  j                         5 }| j
                  D ]2  }| j                  |      }|j                  || j                  dz          4 |j                          ddd       y| j
                  rW| j                  t        t        | j
                                    }| j                  j                  || j                  dz          yy# 1 sw Y   yxY w)z/Updates the TTL on scheduler keys and the locksz!Scheduler sending heartbeat to %srQ   r   rR   N)r?   rW   rX   rI   lenr0   r'   rl   rY   expirer8   rt   nextiter)r@   rl   r+   keys       r$   	heartbeatzRQScheduler.heartbeat   s    :DIIdFYFY<Z[t##$q())+ #x 00 =D..t4COOC);<=   "	# #
 !!&&tD1E1E,F'GHCOO""3(:; "# #s   (AD''D0c                     | j                   j                  ddj                  | j                               | j	                          | j
                  j                  | _        y )Nz-Scheduler stopping, releasing locks for %s...rQ   )r?   rZ   rX   r0   release_locksr:   r!   r;   rF   s    r$   stopzRQScheduler.stop   sB    EtyyQUQeQeGfg{{**r#   c                     | j                   D cg c]  }| j                  |       }} | j                  j                  |  t	               | _         yc c}w )zRelease acquired locksN)r0   rY   r'   deleter.   )r@   r+   keyss      r$   r   zRQScheduler.release_locks   sJ    7;7K7KLt$$T*LL%"u Ms   Ac                     | j                   j                  | _        d | _        t	        t
        | fd      | _        | j                  j                          | j                  S )N	Scheduler)targetargsr+   )r:   r   r;   r7   r   runr<   r^   rF   s    r$   r^   zRQScheduler.start   sI    {{**  s${K}}r#   c                    | j                          	 | j                  r| j                          y | j                  r| j	                          | j                          | j                          t        j                  | j                         yrH   )
r   r9   r   rO   rb   r|   r   timesleepr8   rF   s    r$   workzRQScheduler.work   sf    %%'##		**""$'')NNJJt}}% r#   )FrH   )NN)!r   r   r   r   r:   r=   INFOr   r   r
   r   strintrB   propertyr'   rI   rK   rO   rb   r   r   rg   classmethodrY   r|   r   r   r   r   r   r^   r   r"   r#   r$   r&   r&   !   s    
 F )0/)
 

 S#X
<     $ $   q q .hx}.E  53 5 5+:9$<+
%&r#   r&   c                    | j                   j                  ddj                  | j                        t	        j
                                	 | j                          | j                   j                  dt	        j
                                y #  | j                   j                  dt	        j
                         t        j                                 xY w)Nz$Scheduler for %s started with PID %srQ   z*Scheduler [PID %s] raised an exception.
%sz!Scheduler with PID %d has stopped)
r?   rZ   rX   r/   rU   rV   r   error	traceback
format_exc)	schedulers    r$   r   r      s    MM=tyyI_I_?`bdbkbkbmn MM:BIIKHI299;XaXlXlXnos   
B	 	AC)+r=   rU   r~   r   r   collections.abcr   r   enumr   multiprocessingr   typingr   r   redisr	   r
   connectionsr   defaultsr   r   r   rz   r   logutilsr   rx   r   ru   r   serializersr   utilsr   r   SCHEDULER_KEY_TEMPLATEri   r   r   r&   r   r"   r#   r$   <module>r      sn     	    $   # " ' ) l l  '  * + 1* !7 c4 {& {&|Ir#   