Send rejoin events to PostHog in realtime
By converting PosthogSpanExporter to a SpanProcessor just like the RageshakeSpanProcessor, it can now monitor spans in realtime as they are started.
This commit is contained in:
parent
d211d27817
commit
da7760d7ab
2 changed files with 50 additions and 53 deletions
|
@ -14,12 +14,12 @@ See the License for the specific language governing permissions and
|
||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { SpanExporter, ReadableSpan } from "@opentelemetry/sdk-trace-base";
|
|
||||||
import {
|
import {
|
||||||
ExportResult,
|
SpanProcessor,
|
||||||
ExportResultCode,
|
ReadableSpan,
|
||||||
hrTimeToMilliseconds,
|
Span,
|
||||||
} from "@opentelemetry/core";
|
} from "@opentelemetry/sdk-trace-base";
|
||||||
|
import { hrTimeToMilliseconds } from "@opentelemetry/core";
|
||||||
import { logger } from "matrix-js-sdk/src/logger";
|
import { logger } from "matrix-js-sdk/src/logger";
|
||||||
|
|
||||||
import { PosthogAnalytics } from "./PosthogAnalytics";
|
import { PosthogAnalytics } from "./PosthogAnalytics";
|
||||||
|
@ -36,33 +36,31 @@ interface PrevCall {
|
||||||
const maxRejoinMs = 2 * 60 * 1000; // 2 minutes
|
const maxRejoinMs = 2 * 60 * 1000; // 2 minutes
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is implementation of {@link SpanExporter} that extracts certain metrics
|
* Span processor that extracts certain metrics from spans to send to PostHog
|
||||||
* from spans to send to PostHog
|
|
||||||
*/
|
*/
|
||||||
export class PosthogSpanExporter implements SpanExporter {
|
export class PosthogSpanProcessor implements SpanProcessor {
|
||||||
/**
|
async forceFlush(): Promise<void> {}
|
||||||
* Export spans.
|
|
||||||
* @param spans
|
|
||||||
* @param resultCallback
|
|
||||||
*/
|
|
||||||
async export(
|
|
||||||
spans: ReadableSpan[],
|
|
||||||
resultCallback: (result: ExportResult) => void
|
|
||||||
): Promise<void> {
|
|
||||||
await Promise.all(
|
|
||||||
spans.map((span) => {
|
|
||||||
switch (span.name) {
|
|
||||||
case "matrix.groupCallMembership":
|
|
||||||
return this.exportGroupCallMembershipSpan(span);
|
|
||||||
case "matrix.groupCallMembership.summaryReport":
|
|
||||||
return this.exportSummaryReportSpan(span);
|
|
||||||
// TBD if there are other spans that we want to process for export to
|
|
||||||
// PostHog
|
|
||||||
}
|
|
||||||
})
|
|
||||||
);
|
|
||||||
|
|
||||||
resultCallback({ code: ExportResultCode.SUCCESS });
|
onStart(span: Span): void {
|
||||||
|
// Hack: Yield to allow attributes to be set before processing
|
||||||
|
Promise.resolve().then(() => {
|
||||||
|
switch (span.name) {
|
||||||
|
case "matrix.groupCallMembership":
|
||||||
|
this.onGroupCallMembershipStart(span);
|
||||||
|
return;
|
||||||
|
case "matrix.groupCallMembership.summaryReport":
|
||||||
|
this.onSummaryReportStart(span);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
onEnd(span: ReadableSpan): void {
|
||||||
|
switch (span.name) {
|
||||||
|
case "matrix.groupCallMembership":
|
||||||
|
this.onGroupCallMembershipEnd(span);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private get prevCall(): PrevCall | null {
|
private get prevCall(): PrevCall | null {
|
||||||
|
@ -83,33 +81,33 @@ export class PosthogSpanExporter implements SpanExporter {
|
||||||
localStorage.setItem("matrix-prev-call", JSON.stringify(data));
|
localStorage.setItem("matrix-prev-call", JSON.stringify(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
async exportGroupCallMembershipSpan(span: ReadableSpan): Promise<void> {
|
private onGroupCallMembershipStart(span: ReadableSpan): void {
|
||||||
const prevCall = this.prevCall;
|
const prevCall = this.prevCall;
|
||||||
const newPrevCall = (this.prevCall = {
|
const newCallId = span.attributes["matrix.confId"] as string;
|
||||||
callId: span.attributes["matrix.confId"] as string,
|
|
||||||
hangupTs: hrTimeToMilliseconds(span.endTime),
|
|
||||||
});
|
|
||||||
|
|
||||||
// If the user joined the same call within a short time frame, log this as a
|
// If the user joined the same call within a short time frame, log this as a
|
||||||
// rejoin. This is interesting as a call quality metric, since rejoins may
|
// rejoin. This is interesting as a call quality metric, since rejoins may
|
||||||
// indicate that users had to intervene to make the product work.
|
// indicate that users had to intervene to make the product work.
|
||||||
if (prevCall !== null && newPrevCall.callId === prevCall.callId) {
|
if (prevCall !== null && newCallId === prevCall.callId) {
|
||||||
const duration = hrTimeToMilliseconds(span.startTime) - prevCall.hangupTs;
|
const duration = hrTimeToMilliseconds(span.startTime) - prevCall.hangupTs;
|
||||||
if (duration <= maxRejoinMs) {
|
if (duration <= maxRejoinMs) {
|
||||||
PosthogAnalytics.instance.trackEvent(
|
PosthogAnalytics.instance.trackEvent({
|
||||||
{
|
eventName: "Rejoin",
|
||||||
eventName: "Rejoin",
|
callId: prevCall.callId,
|
||||||
callId: prevCall.callId,
|
rejoinDuration: duration,
|
||||||
rejoinDuration: duration,
|
});
|
||||||
},
|
|
||||||
// Send instantly because the window might be closing
|
|
||||||
{ send_instantly: true }
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async exportSummaryReportSpan(span: ReadableSpan): Promise<void> {
|
private onGroupCallMembershipEnd(span: ReadableSpan): void {
|
||||||
|
this.prevCall = {
|
||||||
|
callId: span.attributes["matrix.confId"] as string,
|
||||||
|
hangupTs: hrTimeToMilliseconds(span.endTime),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private onSummaryReportStart(span: ReadableSpan): void {
|
||||||
// Searching for an event like this:
|
// Searching for an event like this:
|
||||||
// matrix.stats.summary
|
// matrix.stats.summary
|
||||||
// matrix.stats.summary.percentageReceivedAudioMedia: 0.75
|
// matrix.stats.summary.percentageReceivedAudioMedia: 0.75
|
||||||
|
@ -138,7 +136,7 @@ export class PosthogSpanExporter implements SpanExporter {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shutdown the exporter.
|
* Shutdown the processor.
|
||||||
*/
|
*/
|
||||||
shutdown(): Promise<void> {
|
shutdown(): Promise<void> {
|
||||||
return Promise.resolve();
|
return Promise.resolve();
|
|
@ -25,7 +25,7 @@ import { Resource } from "@opentelemetry/resources";
|
||||||
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
|
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
|
||||||
import { logger } from "matrix-js-sdk/src/logger";
|
import { logger } from "matrix-js-sdk/src/logger";
|
||||||
|
|
||||||
import { PosthogSpanExporter } from "../analytics/OtelPosthogExporter";
|
import { PosthogSpanProcessor } from "../analytics/PosthogSpanProcessor";
|
||||||
import { Anonymity } from "../analytics/PosthogAnalytics";
|
import { Anonymity } from "../analytics/PosthogAnalytics";
|
||||||
import { Config } from "../config/Config";
|
import { Config } from "../config/Config";
|
||||||
import { RageshakeSpanExporter } from "../analytics/RageshakeSpanExporter";
|
import { RageshakeSpanExporter } from "../analytics/RageshakeSpanExporter";
|
||||||
|
@ -96,11 +96,10 @@ export class ElementCallOpenTelemetry {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const consoleExporter = new ConsoleSpanExporter();
|
this._provider.addSpanProcessor(
|
||||||
const posthogExporter = new PosthogSpanExporter();
|
new SimpleSpanProcessor(new ConsoleSpanExporter())
|
||||||
|
);
|
||||||
this._provider.addSpanProcessor(new SimpleSpanProcessor(posthogExporter));
|
this._provider.addSpanProcessor(new PosthogSpanProcessor());
|
||||||
this._provider.addSpanProcessor(new SimpleSpanProcessor(consoleExporter));
|
|
||||||
opentelemetry.trace.setGlobalTracerProvider(this._provider);
|
opentelemetry.trace.setGlobalTracerProvider(this._provider);
|
||||||
|
|
||||||
this._tracer = opentelemetry.trace.getTracer(
|
this._tracer = opentelemetry.trace.getTracer(
|
||||||
|
|
Loading…
Add table
Reference in a new issue