mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-01-28 06:29:29 +01:00
@@ -42,6 +42,8 @@ type config struct {
|
||||
TracerProvider trace.TracerProvider
|
||||
MeterProvider metric.MeterProvider
|
||||
SpanStartOptions []trace.SpanStartOption
|
||||
SpanAttributes []attribute.KeyValue
|
||||
MetricAttributes []attribute.KeyValue
|
||||
|
||||
ReceivedEvent bool
|
||||
SentEvent bool
|
||||
@@ -49,11 +51,11 @@ type config struct {
|
||||
tracer trace.Tracer
|
||||
meter metric.Meter
|
||||
|
||||
rpcDuration metric.Float64Histogram
|
||||
rpcRequestSize metric.Int64Histogram
|
||||
rpcResponseSize metric.Int64Histogram
|
||||
rpcRequestsPerRPC metric.Int64Histogram
|
||||
rpcResponsesPerRPC metric.Int64Histogram
|
||||
rpcDuration metric.Float64Histogram
|
||||
rpcInBytes metric.Int64Histogram
|
||||
rpcOutBytes metric.Int64Histogram
|
||||
rpcInMessages metric.Int64Histogram
|
||||
rpcOutMessages metric.Int64Histogram
|
||||
}
|
||||
|
||||
// Option applies an option value for a config.
|
||||
@@ -94,46 +96,64 @@ func newConfig(opts []Option, role string) *config {
|
||||
}
|
||||
}
|
||||
|
||||
c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size",
|
||||
rpcRequestSize, err := c.meter.Int64Histogram("rpc."+role+".request.size",
|
||||
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
|
||||
metric.WithUnit("By"))
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
if c.rpcRequestSize == nil {
|
||||
c.rpcRequestSize = noop.Int64Histogram{}
|
||||
if rpcRequestSize == nil {
|
||||
rpcRequestSize = noop.Int64Histogram{}
|
||||
}
|
||||
}
|
||||
|
||||
c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size",
|
||||
rpcResponseSize, err := c.meter.Int64Histogram("rpc."+role+".response.size",
|
||||
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
|
||||
metric.WithUnit("By"))
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
if c.rpcResponseSize == nil {
|
||||
c.rpcResponseSize = noop.Int64Histogram{}
|
||||
if rpcResponseSize == nil {
|
||||
rpcResponseSize = noop.Int64Histogram{}
|
||||
}
|
||||
}
|
||||
|
||||
c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
|
||||
rpcRequestsPerRPC, err := c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
|
||||
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
|
||||
metric.WithUnit("{count}"))
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
if c.rpcRequestsPerRPC == nil {
|
||||
c.rpcRequestsPerRPC = noop.Int64Histogram{}
|
||||
if rpcRequestsPerRPC == nil {
|
||||
rpcRequestsPerRPC = noop.Int64Histogram{}
|
||||
}
|
||||
}
|
||||
|
||||
c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
|
||||
rpcResponsesPerRPC, err := c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
|
||||
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
|
||||
metric.WithUnit("{count}"))
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
if c.rpcResponsesPerRPC == nil {
|
||||
c.rpcResponsesPerRPC = noop.Int64Histogram{}
|
||||
if rpcResponsesPerRPC == nil {
|
||||
rpcResponsesPerRPC = noop.Int64Histogram{}
|
||||
}
|
||||
}
|
||||
|
||||
switch role {
|
||||
case "client":
|
||||
c.rpcInBytes = rpcResponseSize
|
||||
c.rpcInMessages = rpcResponsesPerRPC
|
||||
c.rpcOutBytes = rpcRequestSize
|
||||
c.rpcOutMessages = rpcRequestsPerRPC
|
||||
case "server":
|
||||
c.rpcInBytes = rpcRequestSize
|
||||
c.rpcInMessages = rpcRequestsPerRPC
|
||||
c.rpcOutBytes = rpcResponseSize
|
||||
c.rpcOutMessages = rpcResponsesPerRPC
|
||||
default:
|
||||
c.rpcInBytes = noop.Int64Histogram{}
|
||||
c.rpcInMessages = noop.Int64Histogram{}
|
||||
c.rpcOutBytes = noop.Int64Histogram{}
|
||||
c.rpcOutMessages = noop.Int64Histogram{}
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
@@ -257,3 +277,29 @@ func (o spanStartOption) apply(c *config) {
|
||||
func WithSpanOptions(opts ...trace.SpanStartOption) Option {
|
||||
return spanStartOption{opts}
|
||||
}
|
||||
|
||||
type spanAttributesOption struct{ a []attribute.KeyValue }
|
||||
|
||||
func (o spanAttributesOption) apply(c *config) {
|
||||
if o.a != nil {
|
||||
c.SpanAttributes = o.a
|
||||
}
|
||||
}
|
||||
|
||||
// WithSpanAttributes returns an Option to add custom attributes to the spans.
|
||||
func WithSpanAttributes(a ...attribute.KeyValue) Option {
|
||||
return spanAttributesOption{a: a}
|
||||
}
|
||||
|
||||
type metricAttributesOption struct{ a []attribute.KeyValue }
|
||||
|
||||
func (o metricAttributesOption) apply(c *config) {
|
||||
if o.a != nil {
|
||||
c.MetricAttributes = o.a
|
||||
}
|
||||
}
|
||||
|
||||
// WithMetricAttributes returns an Option to add custom attributes to the metrics.
|
||||
func WithMetricAttributes(a ...attribute.KeyValue) Option {
|
||||
return metricAttributesOption{a: a}
|
||||
}
|
||||
|
||||
@@ -13,21 +13,22 @@ import (
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
|
||||
)
|
||||
|
||||
type gRPCContextKey struct{}
|
||||
|
||||
type gRPCContext struct {
|
||||
messagesReceived int64
|
||||
messagesSent int64
|
||||
metricAttrs []attribute.KeyValue
|
||||
record bool
|
||||
inMessages int64
|
||||
outMessages int64
|
||||
metricAttrs []attribute.KeyValue
|
||||
record bool
|
||||
}
|
||||
|
||||
type serverHandler struct {
|
||||
@@ -62,11 +63,11 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
|
||||
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
|
||||
name,
|
||||
trace.WithSpanKind(trace.SpanKindServer),
|
||||
trace.WithAttributes(attrs...),
|
||||
trace.WithAttributes(append(attrs, h.config.SpanAttributes...)...),
|
||||
)
|
||||
|
||||
gctx := gRPCContext{
|
||||
metricAttrs: attrs,
|
||||
metricAttrs: append(attrs, h.config.MetricAttributes...),
|
||||
record: true,
|
||||
}
|
||||
if h.config.Filter != nil {
|
||||
@@ -102,11 +103,11 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
|
||||
ctx,
|
||||
name,
|
||||
trace.WithSpanKind(trace.SpanKindClient),
|
||||
trace.WithAttributes(attrs...),
|
||||
trace.WithAttributes(append(attrs, h.config.SpanAttributes...)...),
|
||||
)
|
||||
|
||||
gctx := gRPCContext{
|
||||
metricAttrs: attrs,
|
||||
metricAttrs: append(attrs, h.config.MetricAttributes...),
|
||||
record: true,
|
||||
}
|
||||
if h.config.Filter != nil {
|
||||
@@ -150,8 +151,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
|
||||
case *stats.Begin:
|
||||
case *stats.InPayload:
|
||||
if gctx != nil {
|
||||
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
|
||||
c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
|
||||
messageId = atomic.AddInt64(&gctx.inMessages, 1)
|
||||
c.rpcInBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
|
||||
}
|
||||
|
||||
if c.ReceivedEvent {
|
||||
@@ -166,8 +167,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
|
||||
}
|
||||
case *stats.OutPayload:
|
||||
if gctx != nil {
|
||||
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
|
||||
c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
|
||||
messageId = atomic.AddInt64(&gctx.outMessages, 1)
|
||||
c.rpcOutBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
|
||||
}
|
||||
|
||||
if c.SentEvent {
|
||||
@@ -213,8 +214,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
|
||||
|
||||
c.rpcDuration.Record(ctx, elapsedTime, recordOpts...)
|
||||
if gctx != nil {
|
||||
c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), recordOpts...)
|
||||
c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), recordOpts...)
|
||||
c.rpcInMessages.Record(ctx, atomic.LoadInt64(&gctx.inMessages), recordOpts...)
|
||||
c.rpcOutMessages.Record(ctx, atomic.LoadInt64(&gctx.outMessages), recordOpts...)
|
||||
}
|
||||
default:
|
||||
return
|
||||
|
||||
@@ -5,7 +5,7 @@ package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.g
|
||||
|
||||
// Version is the current release version of the gRPC instrumentation.
|
||||
func Version() string {
|
||||
return "0.53.0"
|
||||
return "0.58.0"
|
||||
// This string is updated by the pre_release.sh script during release
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user