From 57dce4f099fe88e0c0b79032819257189a6cc919 Mon Sep 17 00:00:00 2001 From: Anton Roslund Date: Tue, 6 Jan 2026 16:39:39 +0100 Subject: [PATCH] Capture edges from traceroutes --- package-lock.json | 3 +- .../20260106151912_add_edges/migration.sql | 23 +++++ prisma/schema.prisma | 24 +++++ src/mqtt.js | 92 +++++++++++++++++++ 4 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 prisma/migrations/20260106151912_add_edges/migration.sql diff --git a/package-lock.json b/package-lock.json index 5ff0661..1fb2318 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,8 @@ "cors": "^2.8.5", "express": "^5.2.1", "mqtt": "^5.14.1", - "protobufjs": "^7.5.4" + "protobufjs": "^7.5.4", + "ws": "^8.18.3" }, "devDependencies": { "jest": "^30.1.3", diff --git a/prisma/migrations/20260106151912_add_edges/migration.sql b/prisma/migrations/20260106151912_add_edges/migration.sql new file mode 100644 index 0000000..113243f --- /dev/null +++ b/prisma/migrations/20260106151912_add_edges/migration.sql @@ -0,0 +1,23 @@ +-- CreateTable +CREATE TABLE `edges` ( + `id` BIGINT NOT NULL AUTO_INCREMENT, + `from_node_id` BIGINT NOT NULL, + `to_node_id` BIGINT NOT NULL, + `snr` INTEGER NOT NULL, + `from_latitude` INTEGER NULL, + `from_longitude` INTEGER NULL, + `to_latitude` INTEGER NULL, + `to_longitude` INTEGER NULL, + `packet_id` BIGINT NOT NULL, + `channel_id` VARCHAR(191) NULL, + `gateway_id` BIGINT NULL, + `source` VARCHAR(191) NOT NULL, + `created_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + `updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + + INDEX `edges_from_node_id_idx`(`from_node_id`), + INDEX `edges_to_node_id_idx`(`to_node_id`), + INDEX `edges_created_at_idx`(`created_at`), + INDEX `edges_from_node_id_to_node_id_idx`(`from_node_id`, `to_node_id`), + PRIMARY KEY (`id`) +) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index ef96a9c..2a908dc 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -347,4 +347,28 @@ model ChannelUtilizationStats { @@index([channel_id]) @@index([recorded_at]) @@map("channel_utilization_stats") +} + +model Edge { + id BigInt @id @default(autoincrement()) + from_node_id BigInt + to_node_id BigInt + snr Int + from_latitude Int? + from_longitude Int? + to_latitude Int? + to_longitude Int? + packet_id BigInt + channel_id String? + gateway_id BigInt? + source String + + created_at DateTime @default(now()) + updated_at DateTime @default(now()) @updatedAt + + @@index(from_node_id) + @@index(to_node_id) + @@index(created_at) + @@index([from_node_id, to_node_id]) + @@map("edges") } \ No newline at end of file diff --git a/src/mqtt.js b/src/mqtt.js index f0f65bd..a446fd9 100644 --- a/src/mqtt.js +++ b/src/mqtt.js @@ -1336,6 +1336,98 @@ client.on("message", async (topic, message) => { console.error(e); } + // Extract edges from traceroute (only for response packets) + if(!envelope.packet.decoded.wantResponse) { + try { + const route = routeDiscovery.route || []; + const snrTowards = routeDiscovery.snrTowards || []; + const originNodeId = envelope.packet.to; + const destinationNodeId = envelope.packet.from; + const packetId = envelope.packet.id; + const channelId = envelope.channelId; + const gatewayId = envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null; + + // Determine number of edges: route.length + 1 + const numEdges = route.length + 1; + const edgesToCreate = []; + + // Extract edges from the route path + for(let i = 0; i < numEdges; i++) { + // Get SNR for this edge + if(i >= snrTowards.length) { + // Array length mismatch - skip this edge + continue; + } + + const snr = snrTowards[i]; + + // Skip if SNR is -128 (no SNR recorded) + if(snr === -128) { + continue; + } + + // Determine from_node and to_node + let fromNodeId, toNodeId; + + if(route.length === 0) { + // Empty route: direct connection (to -> from) + fromNodeId = originNodeId; + toNodeId = destinationNodeId; + } else if(i === 0) { + // First edge: origin -> route[0] + fromNodeId = originNodeId; + toNodeId = route[0]; + } else if(i === route.length) { + // Last edge: route[route.length-1] -> destination + fromNodeId = route[route.length - 1]; + toNodeId = destinationNodeId; + } else { + // Middle edge: route[i-1] -> route[i] + fromNodeId = route[i - 1]; + toNodeId = route[i]; + } + + // Fetch node positions from Node table + const [fromNode, toNode] = await Promise.all([ + prisma.node.findUnique({ + where: { node_id: fromNodeId }, + select: { latitude: true, longitude: true }, + }), + prisma.node.findUnique({ + where: { node_id: toNodeId }, + select: { latitude: true, longitude: true }, + }), + ]); + + // Create edge record (skip if nodes don't exist, but still create edge with null positions) + edgesToCreate.push({ + from_node_id: fromNodeId, + to_node_id: toNodeId, + snr: snr, + from_latitude: fromNode?.latitude ?? null, + from_longitude: fromNode?.longitude ?? null, + to_latitude: toNode?.latitude ?? null, + to_longitude: toNode?.longitude ?? null, + packet_id: packetId, + channel_id: channelId, + gateway_id: gatewayId, + source: "TRACEROUTE_APP", + }); + } + + // Bulk insert edges + if(edgesToCreate.length > 0) { + await prisma.edge.createMany({ + data: edgesToCreate, + skipDuplicates: true, // Skip if exact duplicate exists + }); + } + } catch (e) { + // Log error but don't crash - edge extraction is non-critical + console.error("Error extracting edges from traceroute:", e); + } + } + } else if(portnum === 73) {