1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import atexit
21 import functools
22 import os
23 import sys
24 import time
25 import weakref
26
27 try:
28 import opentracing
29 import jaeger_client
30 from opentracing.ext import tags
31 from opentracing.propagation import Format
32 except ImportError:
33 raise ImportError('proton tracing requires opentracing and jaeger_client modules')
34
35 import proton
36 from proton import Sender as ProtonSender
37 from proton.handlers import (
38 OutgoingMessageHandler as ProtonOutgoingMessageHandler,
39 IncomingMessageHandler as ProtonIncomingMessageHandler
40 )
41
42 _tracer = None
43 _trace_key = proton.symbol('x-opt-qpid-tracestate')
44
51
53 time.sleep(1)
54 c = opentracing.global_tracer().close()
55 while not c.done():
56 time.sleep(0.5)
57
59 global _tracer
60 if _tracer is not None:
61 return _tracer
62
63 config = jaeger_client.Config(
64 config={},
65 service_name=service_name,
66 validate=True
67 )
68 config.initialize_tracer()
69 _tracer = opentracing.global_tracer()
70
71 atexit.register(_fini_tracer)
72 return _tracer
73
74
77 if self.delegate is not None:
78 tracer = get_tracer()
79 message = event.message
80 receiver = event.receiver
81 connection = event.connection
82 span_tags = {
83 tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER,
84 tags.MESSAGE_BUS_DESTINATION: receiver.source.address,
85 tags.PEER_ADDRESS: connection.connected_address,
86 tags.PEER_HOSTNAME: connection.hostname,
87 'inserted_by': 'proton-message-tracing'
88 }
89 if message.annotations is not None:
90 headers = message.annotations[_trace_key]
91 span_ctx = tracer.extract(Format.TEXT_MAP, headers)
92 with tracer.start_active_span('amqp-delivery-receive', child_of=span_ctx, tags=span_tags):
93 proton._events._dispatch(self.delegate, 'on_message', event)
94 else:
95 with tracer.start_active_span('amqp-delivery-receive', ignore_active_span=True, tags=span_tags):
96 proton._events._dispatch(self.delegate, 'on_message', event)
97
108
110 - def send(self, msg):
131
132
133 proton._handlers.IncomingMessageHandler = IncomingMessageHandler
134 proton._handlers.OutgoingMessageHandler = OutgoingMessageHandler
135 proton._endpoints.Sender = Sender
136 proton.handlers.IncomingMessageHandler = IncomingMessageHandler
137 proton.handlers.OutgoingMessageHandler = OutgoingMessageHandler
138 proton.Sender = Sender
139