This commit is contained in:
Louis Lam
2023-12-26 03:18:56 +08:00
parent ca3bb30ee0
commit fa0a4f8ccf
9 changed files with 99 additions and 24 deletions

View File

@@ -2,22 +2,30 @@ import { DockgeSocket } from "./util-server";
import { io, Socket as SocketClient } from "socket.io-client";
import { log } from "./log";
import { Agent } from "./models/agent";
import { isDev, LooseObject } from "../common/util-common";
import { isDev, LooseObject, sleep } from "../common/util-common";
import semver from "semver";
import { R } from "redbean-node";
import dayjs, { Dayjs } from "dayjs";
/**
* Dockge Instance Manager
* One AgentManager per Socket connection
*/
export class AgentManager {
protected socket : DockgeSocket;
protected agentSocketList : Record<string, SocketClient> = {};
protected agentLoggedInList : Record<string, boolean> = {};
protected _firstConnectTime : Dayjs = dayjs();
constructor(socket: DockgeSocket) {
this.socket = socket;
}
get firstConnectTime() : Dayjs {
return this._firstConnectTime;
}
test(url : string, username : string, password : string) : Promise<void> {
return new Promise((resolve, reject) => {
let obj = new URL(url);
@@ -131,12 +139,14 @@ export class AgentManager {
}, (res : LooseObject) => {
if (res.ok) {
log.info("agent-manager", "Logged in to the socket server: " + endpoint);
this.agentLoggedInList[endpoint] = true;
this.socket.emit("agentStatus", {
endpoint: endpoint,
status: "online",
});
} else {
log.error("agent-manager", "Failed to login to the socket server: " + endpoint);
this.agentLoggedInList[endpoint] = false;
this.socket.emit("agentStatus", {
endpoint: endpoint,
status: "offline",
@@ -188,6 +198,8 @@ export class AgentManager {
}
async connectAll() {
this._firstConnectTime = dayjs();
if (this.socket.endpoint) {
log.info("agent-manager", "This connection is connected as an agent, skip connectAll()");
return;
@@ -211,7 +223,7 @@ export class AgentManager {
}
}
emitToEndpoint(endpoint: string, eventName: string, ...args : unknown[]) {
async emitToEndpoint(endpoint: string, eventName: string, ...args : unknown[]) {
log.debug("agent-manager", "Emitting event to endpoint: " + endpoint);
let client = this.agentSocketList[endpoint];
@@ -220,9 +232,27 @@ export class AgentManager {
throw new Error("Socket client not found for endpoint: " + endpoint);
}
if (!client.connected) {
log.error("agent-manager", "Socket client not connected for endpoint: " + endpoint);
throw new Error("Socket client not connected for endpoint: " + endpoint);
if (!client.connected || !this.agentLoggedInList[endpoint]) {
// Maybe the request is too quick, the socket is not connected yet, check firstConnectTime
// If it is within 10 seconds, we should apply retry logic here
let diff = dayjs().diff(this.firstConnectTime, "second");
log.debug("agent-manager", endpoint + ": diff: " + diff);
let ok = false;
while (diff < 10) {
if (client.connected && this.agentLoggedInList[endpoint]) {
log.debug("agent-manager", `${endpoint}: Connected & Logged in`);
ok = true;
break;
}
log.debug("agent-manager", endpoint + ": not ready yet, retrying in 1 second...");
await sleep(1000);
diff = dayjs().diff(this.firstConnectTime, "second");
}
if (!ok) {
log.error("agent-manager", `${endpoint}: Socket client not connected`);
throw new Error("Socket client not connected for endpoint: " + endpoint);
}
}
client.emit("agent", endpoint, eventName, ...args);
@@ -231,7 +261,9 @@ export class AgentManager {
emitToAllEndpoints(eventName: string, ...args : unknown[]) {
log.debug("agent-manager", "Emitting event to all endpoints");
for (let endpoint in this.agentSocketList) {
this.emitToEndpoint(endpoint, eventName, ...args);
this.emitToEndpoint(endpoint, eventName, ...args).catch((e) => {
log.warn("agent-manager", e.message);
});
}
}