
    \j"                         d dl Z d dlmZ d dlmZmZ d dlmZ erd dlmZ d dlm	Z	 d dl
mZmZ d dlmZ  G d	 d
e      ZddZddZddZddZddZddZ G d d      Z G d de      ZddZy)    N)Integration)capture_internal_exceptionsensure_integration_enabled)TYPE_CHECKING)Any)Optional)EventHintSparkContextc                   "    e Zd ZdZedd       Zy)SparkIntegrationsparkNc                      t                y N)_setup_sentry_tracing     T/root/env/lib/python3.12/site-packages/sentry_sdk/integrations/spark/spark_driver.py
setup_oncezSparkIntegration.setup_once   s    r   returnN)__name__
__module____qualname__
identifierstaticmethodr   r   r   r   r   r      s    J   r   r   c                      ddl m}  | j                  }|r9|j                  d|j                         |j                  d|j
                         yy)z
    Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
    This allows worker integration to have access to app_name and application_id.
    r   r   sentry_app_namesentry_application_idN)pysparkr   _active_spark_contextsetLocalPropertyappNameapplicationId)r   spark_contexts     r   _set_app_propertiesr'      sP    
 % 66M&&!!	
 	&&#''	
 r   c                     ddl m} | j                  } ||       t               }| j                  j                         j                  |       y)zA
    Start java gateway server to add custom `SparkListener`
    r   )ensure_callback_server_startedN)pyspark.java_gatewayr)   _gatewaySentryListener_jscscaddSparkListener)r.   r)   gwlisteners       r   _start_sentry_listenerr2   *   s9     D	B"2&HGGJJL!!(+r   c                 X     t        j                         }|j                  d fd       }y )Nc                 R   t               5  t        j                         j                  t              | cd d d        S j
                  | cd d d        S | j                  di       j                  dj                                | j                  di       j                  dj                  j                  d             | d   j                  dj                  j                  d             | d   j                  dj                  j                  d	             | d   j                  d
j                  j                  d             | d   j                  dj                         | d   j                  dj                         | d   j                  dj                         | d   j                  dj                         | d   j                  dj                         | j                  di       j                  dj                         d d d        | S # 1 sw Y   | S xY w)Nuseridtagszexecutor.idzspark.executor.idzspark-submit.deployModezspark.submit.deployModezdriver.hostzspark.driver.hostzdriver.portzspark.driver.portspark_versionapp_nameapplication_idmaster
spark_homeextraweb_url)r   
sentry_sdk
get_clientget_integrationr   r"   
setdefault	sparkUser_confgetversionr$   r%   r;   	sparkHomeuiWebUrl)eventhintr.   s     r   process_eventz+_add_event_processor.<locals>.process_event9   s   (* 	M$$&667GHP	M 	M ''/	M 	M VR(33D",,.IVR(33rxx||,?@ &M$$)67 &M$$]BHHLLAT4UV&M$$]BHHLLAT4UV&M$$_bjjA&M$$Z<&M$$%5r7G7GH&M$$Xryy9&M$$\2<<@Wb)44YL1	M4 5	M4 s   )HHF<HH&)rI   r	   rJ   r
   r   zOptional[Event])r?   get_isolation_scopeadd_event_processor)r.   scoperK   s   `  r   _add_event_processorrO   6   s)    **,E
 r   c                 D    t        |        t                t        |        y r   )r2   r'   rO   )r.   s    r   _activate_integrationrQ   X   s    2r   c                  z    ddl m}  | j                  t        t              	 	 	 	 	 	 	 	 dfd       }|| _        y )Nr   r   c                 6     | g|i |}t        |        |S r   )rQ   )selfargskwargsrvspark_context_inits       r   "_sentry_patched_spark_context_initzE_patch_spark_context_init.<locals>._sentry_patched_spark_context_initc   s&      6t6v6d#	r   )rT   r   rU   r   rV   r   r   zOptional[Any])r!   r   _do_initr   r   )r   rY   rX   s     @r   _patch_spark_context_initr[   ^   sX    $%.. 02DE%*6;	 F ?Lr   c                  h    ddl m}  | j                  t        | j                         y t	                y )Nr   r   )r!   r   r"   rQ   r[   r   s    r   r   r   n   s(    $))5l@@Ar   c                       e Zd ZddZddZddZddZd dZd!dZd"dZ	d#d	Z
	 	 	 	 d$d
Zd%dZd&dZd'dZd(dZd)dZd*dZd+dZd,dZd-dZd.dZd/dZd0dZd1dZd2dZd3dZ G d d      Zy)4SparkListenerNc                      y r   r   )rT   applicationEnds     r   onApplicationEndzSparkListener.onApplicationEndx       r   c                      y r   r   )rT   applicationStarts     r   onApplicationStartz SparkListener.onApplicationStart{   rb   r   c                      y r   r   )rT   blockManagerAddeds     r   onBlockManagerAddedz!SparkListener.onBlockManagerAdded~   rb   r   c                      y r   r   )rT   blockManagerRemoveds     r   onBlockManagerRemovedz#SparkListener.onBlockManagerRemoved   rb   r   c                      y r   r   )rT   blockUpdateds     r   onBlockUpdatedzSparkListener.onBlockUpdated   rb   r   c                      y r   r   )rT   environmentUpdates     r   onEnvironmentUpdatez!SparkListener.onEnvironmentUpdate   rb   r   c                      y r   r   )rT   executorAddeds     r   onExecutorAddedzSparkListener.onExecutorAdded   rb   r   c                      y r   r   )rT   executorBlacklisteds     r   onExecutorBlacklistedz#SparkListener.onExecutorBlacklisted   rb   r   c                      y r   r   )rT   executorBlacklistedForStages     r   onExecutorBlacklistedForStagez+SparkListener.onExecutorBlacklistedForStage   s     	r   c                      y r   r   )rT   executorMetricsUpdates     r   onExecutorMetricsUpdatez%SparkListener.onExecutorMetricsUpdate   rb   r   c                      y r   r   )rT   executorRemoveds     r   onExecutorRemovedzSparkListener.onExecutorRemoved   rb   r   c                      y r   r   )rT   jobEnds     r   onJobEndzSparkListener.onJobEnd   rb   r   c                      y r   r   )rT   jobStarts     r   
onJobStartzSparkListener.onJobStart   rb   r   c                      y r   r   )rT   nodeBlacklisteds     r   onNodeBlacklistedzSparkListener.onNodeBlacklisted   rb   r   c                      y r   r   )rT   nodeBlacklistedForStages     r   onNodeBlacklistedForStagez'SparkListener.onNodeBlacklistedForStage   rb   r   c                      y r   r   )rT   nodeUnblacklisteds     r   onNodeUnblacklistedz!SparkListener.onNodeUnblacklisted   rb   r   c                      y r   r   )rT   rI   s     r   onOtherEventzSparkListener.onOtherEvent   rb   r   c                      y r   r   )rT   speculativeTasks     r   onSpeculativeTaskSubmittedz(SparkListener.onSpeculativeTaskSubmitted   rb   r   c                      y r   r   )rT   stageCompleteds     r   onStageCompletedzSparkListener.onStageCompleted   rb   r   c                      y r   r   )rT   stageSubmitteds     r   onStageSubmittedzSparkListener.onStageSubmitted   rb   r   c                      y r   r   )rT   taskEnds     r   	onTaskEndzSparkListener.onTaskEnd   rb   r   c                      y r   r   )rT   taskGettingResults     r   onTaskGettingResultz!SparkListener.onTaskGettingResult   rb   r   c                      y r   r   )rT   	taskStarts     r   onTaskStartzSparkListener.onTaskStart   rb   r   c                      y r   r   )rT   unpersistRDDs     r   onUnpersistRDDzSparkListener.onUnpersistRDD   rb   r   c                       e Zd ZdgZy)SparkListener.Javaz1org.apache.spark.scheduler.SparkListenerInterfaceN)r   r   r   
implementsr   r   r   Javar      s
    IJ
r   r   )r`   r   r   N)rd   r   r   N)rg   r   r   N)rj   r   r   N)rm   r   r   N)rp   r   r   N)rs   r   r   N)rv   r   r   N)ry   r   r   N)r|   r   r   N)r   r   r   Nr   r   r   Nr   r   r   N)r   r   r   N)r   r   r   N)r   r   r   N)rI   r   r   N)r   r   r   Nr   r   r   Nr   r   r   N)r   r   r   N)r   r   r   N)r   r   r   N)r   r   r   N)r   r   r   ra   re   rh   rk   rn   rq   rt   rw   rz   r}   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r^   r^   w   s    %* 
K Kr   r^   c            	       H    e Zd Z	 ddededdddfdZddZdd	Zdd
ZddZy)r,   NlevelmessagedatazOptional[dict[str, Any]]r   c                 P    t        j                         j                  |||       y )Nr   r   r   )r?   rL   add_breadcrumb)rT   r   r   r   s       r   _add_breadcrumbzSentryListener._add_breadcrumb   s'     	&&(77t 	8 	
r   c                     t        j                         j                          dj                  |j	                               }| j                  d|       t                y )NzJob {} Startedinfo)r   r   )r?   rL   clear_breadcrumbsformatjobIdr   r'   )rT   r   r   s      r   r   zSentryListener.onJobStart   sG    &&(::<"))(..*:;67;r   c                 <   d}d}d|j                         j                         i}|j                         j                         dk(  r"d}dj                  |j                               }n!d}dj                  |j                               }| j	                  |||       y )	N resultJobSucceededr   zJob {} EndedwarningzJob {} Failedr   )	jobResulttoStringr   r   r   )rT   r   r   r   r   s        r   r   zSentryListener.onJobEnd   s    &**,5578&&(N:E$++FLLN;GE%,,V\\^<G5'Er   c                     |j                         }dj                  |j                               }d|j                         i}t	        |      }|||d<   | j                  d||       t                y )NzStage {} Submittedname	attemptIdr   r   )	stageInfor   stageIdr   _get_attempt_idr   r'   )rT   r   
stage_infor   r   
attempt_ids         r   r   zSentryListener.onStageSubmitted   so    #--/
&--j.@.@.BC
)*$Z0
! *D67Fr   c                    ddl m} |j                         }d}d}d|j                         i}t	        |      }|||d<   	 |j                         j                         |d<   dj                  |j                               }d}| j                  |||       y # |$ r$ d	j                  |j                               }d
}Y =w xY w)Nr   )Py4JJavaErrorr   r   r   reasonzStage {} Failedr   zStage {} Completedr   r   )
py4j.protocolr   r   r   r   failureReasonrE   r   r   r   )rT   r   r   r   r   r   r   r   s           r   r   zSentryListener.onStageCompleted   s    /#--/

)*$Z0
! *D	'557;;=DN'..z/A/A/CDGE
 	5'E	  	*11*2D2D2FGGE	s    AB &C ?C r   r   r   r   r   )	r   r   r   strr   r   r   r   r   r   r   r   r,   r,      sH    
 ,0	

 
 )	

 

F
Fr   r,   c                     	 | j                         S # t        $ r Y nw xY w	 | j                         S # t        $ r Y y w xY wr   )r   	ExceptionattemptNumber)r   s    r   r   r     sQ    ##%% '')) s    	2 	>>r   )r.   r   r   N)r   r   r   zOptional[int])r?   sentry_sdk.integrationsr   sentry_sdk.utilsr   r   typingr   r   r   sentry_sdk._typesr	   r
   r!   r   r   r'   r2   rO   rQ   r[   r   r^   r,   r   r   r   r   <module>r      sp     / T  -$ {  
&	,D?  MK MK`AF] AFHr   