
    \j%                         d Z ddlmZ ddlZddlmZ ddlZddlZddlm	Z	m
Z
 ddlmZmZmZ ddlmZ ddlmZmZmZmZ dd	lmZ dd
lmZmZ dZdZdZdZdZ edddg      Z  G d dee      Z!y)z4
Default implementation of the streaming component.
    )
namedtupleN)Thread)HTTPFactory_http_factory)RetryDelayStrategyDefaultBackoffStrategyDefaultJitterStrategy)	SSEClient)logUnsuccessfulResponseExceptionhttp_error_messageis_http_error_recoverable)UpdateProcessor)FEATURESSEGMENTSi,     <   g      ?z/all
ParsedPathkindkeyc                   \    e Zd Zd Zd Zd Zd Zd Zd Ze	d        Z
e	d        Zd	 Zd
 Zy)StreamingUpdateProcessorc                 j   t        j                  |        d| _        |j                  t        z   | _        || _        || _        d| _        || _	        || _
        d | _        t        |j                  t        t        t               t#        t$                    | _        dt)        j*                  d      _        y )NTFbackoff)r   __init__daemonstream_base_uriSTREAM_ALL_PATH_uri_config_store_running_ready_diagnostic_accumulator_es_startedr   initial_reconnect_delayBACKOFF_RESET_INTERVALr   MAX_RETRY_DELAYr	   JITTER_RATIO_retry_delaylogging	getLogger	propagate)selfconfigstorereadydiagnostic_accumulators        L/root/env/lib/python3.12/site-packages/ldclient/impl/datasource/streaming.pyr   z!StreamingUpdateProcessor.__init__#   s    **_<	'=$.**""?3!,/	1 27)$.    c                 @   t        j                  d| j                  z          d| _        d}| j                  rr|dkD  rZ| j                  j                  t        j                               }t        j                  d|z         t        j                  |       |dz  }	 t        t        j                         dz        | _	        | j                         }|D ]  }| j                  s n| j                  j                  t        j                                | j                  | j                  |      }|r| j                  d       d | _	        |du sy| j                  j!                         du st        j                  d       | j                  j#                           | j                  rqy y # t$        $ r}| j                  d       d | _	        t'        |j(                  d	      }t+        |j(                        rt        j,                  |       nDt        j.                  |       | j                  j#                          | j1                          Y d }~y Y d }~d }~wt2        $ r:}t        j,                  d
|z         | j                  d       d | _	        Y d }~d }~ww xY w)Nz5Starting StreamingUpdateProcessor connecting to uri: Tr   z!Will reconnect after delay of %fs     Fz(StreamingUpdateProcessor initialized ok.zstream connectionz5Unexpected error on stream connection: %s, will retry)r   infor   r"   r*   next_retry_delaytimesleepintr%   _connectset_good_sinceprocess_messager!   _record_stream_initr#   is_setsetr   r   statusr   warningerrorstop	Exception)r.   attemptsdelaymessagesmsg
message_okehttp_error_message_results           r3   runzStreamingUpdateProcessor.run=   s   H499TUmm!|))::499;G<uDE

5!MH(#&tyy{T'9#: ==?# 
*C==%%44TYY[A!%!5!5dkk3!GJ!007+/(!T)dkk.@.@.Be.K!KL)
* mm( 1 ((.#' ,>qxxI\,]),QXX6KK 9:II78KKOO%IIK ;  (SVWWX((.#'  (s2   B/F, F, +1F, ,	J5BIJ#0JJc                     | j                   rX| j                  rKt        t        j                         dz        }| j                   j	                  ||| j                  z
  |       y y y )Nr7   )r$   r%   r<   r:   record_stream_init)r.   failedcurrent_times      r3   r@   z,StreamingUpdateProcessor._record_stream_initg   sU    ''D,<,<tyy{T12L((;;L,Y]YiYiJikqr -='r4   c                     t        | j                        }t        |j                  |j                  t
              }t        | j                  |      }|j                  S )N)override_read_timeout)http_factory)	r   r    r   base_headershttp_configstream_read_timeoutr
   r   events)r.   rV   stream_http_factoryclients       r3   r=   z!StreamingUpdateProcessor._connectl   sS    $T\\2),*C*C\E]E]  vI  JII.
 }}r4   c                 <    t        j                  d       d| _        y )Nz!Stopping StreamingUpdateProcessorF)r   r8   r"   r.   s    r3   rF   zStreamingUpdateProcessor.stopv   s    45r4   c                     | j                   xr6 | j                  j                         du xr | j                  j                  du S )NT)r"   r#   rA   r!   initializedr^   s    r3   r`   z$StreamingUpdateProcessor.initializedz   s8    }}a!3!3!5!=a$++BYBY]aBaar4   c                    |j                   dk(  rt        j                  |j                        }t        |d   d   t
        |d   d   i}t        j                  dt        |t                 t        |t
                        | j                  |       y|j                   dk(  rt        j                  |j                        }|d   }|d   }t        j                  d	||j                  d
             t        j                  |      }|| j                  |j                  |       yt        j                  d|       y|j                   dk(  rt        j                  |j                        }|d   }|d
   }t        j                  d||       t        j                  |      }|(| j!                  |j                  |j"                  |       yt        j                  d|       yt        j                  d|j                   z          y)Nputdataflagssegmentsz0Received put event with %d flags and %d segmentsTpatchpathz.Received patch event for %s, New version: [%d]versionzPatch for unknown path: %sdeletez/Received delete event for %s, New version: [%d]zDelete for unknown path: %sz%Unhandled event in stream processor: F)eventjsonloadsrc   r   r   r   debugleninitgetr   _parse_pathupsertr   rD   ri   r   )	r0   rK   all_data	init_datapayloadrg   objtargetrh   s	            r3   r?   z(StreamingUpdateProcessor.process_message~   s   99zz#((+H(6*73(6*:6I IIHIh'(#i.A*BDJJy!YY'!jj*G6?D&/CIIFcggV_N`a-99$?F!V[[#.   8$?  YY("jj*G6?Di(GIIGwW-99$?F!V[[&**g>
  94@  KK?#))KLr4   c                     t         t        fD ]B  }| j                  |j                        st	        || t        |j                        d        c S  y )N)r   r   )r   r   
startswithstream_api_pathr   rn   )rg   r   s     r3   rq   z$StreamingUpdateProcessor._parse_path   sO    x( 	WDt334!T#d>R>R:S:T5UVV	W r4   c                     | S N r^   s    r3   	__enter__z"StreamingUpdateProcessor.__enter__   s    r4   c                 $    | j                          y r|   )rF   )r.   typevalue	tracebacks       r3   __exit__z!StreamingUpdateProcessor.__exit__   s    		r4   N)__name__
__module____qualname__r   rO   r@   r=   rF   r`   staticmethodr?   rq   r~   r   r}   r4   r3   r   r   "   sV    74'(Ts
b " "H  r4   r   )"__doc__collectionsr   rk   	threadingr   r+   r:   ldclient.impl.httpr   r   ldclient.impl.retry_delayr   r   r	   ldclient.impl.sser
   ldclient.impl.utilr   r   r   r   ldclient.interfacesr   ldclient.versioned_data_kindr   r   rY   r(   r'   r)   r   r   r   r}   r4   r3   <module>r      sp   
 #     9 g g ' p p / ;   vuo6
Mv Mr4   