'use strict' /** * @typedef {Object} LockRequest * @property {string} type - 锁类型,可以是 "read" 或 "write" * @property {string} resource - 资源名称 * @property {string} requestId - 请求ID */ class LockServer { /** * * @param {Object} lockOptions * @param {string} lockOptions.pipePath - 命名管道路径 * @param {number} lockOptions.port - port // 端口 for tcp * @param {object} lockOptions.logger // logger */ constructor(lockOptions) { this.options = lockOptions; this._logger = lockOptions.logger||console; this.server = null; this.locks = new Map(); // resource -> { readers: Set, writer: null, queue: [] } this.clients = new Map(); // clientId -> socket } /** * * @param {string} clientId * @param {*} clientObject socket or other stands for client */ _addClient(clientId, clientObject){ this.clients.set(clientId, clientObject); } _removeClient(clientId){ this.clients.delete(clientId); } start() { } stop() { } /** * 处理来自客户端的消息 * @param {string} clientId - 客户端标识符 * @param {LockRequest} message - 消息对象 */ handleMessage(clientId, message) { const { type, resource, requestId } = message; if (!this.locks.has(resource)) { this.locks.set(resource, { readers: new Set(), writer: null, queue: [] }); } const lock = this.locks.get(resource); switch (type) { case 'readLock': this.handleReadLock(clientId, resource, requestId, lock); break; case 'writeLock': this.handleWriteLock(clientId, resource, requestId, lock); break; case 'unlock': this.handleUnlock(clientId, resource, lock); break; default: this.sendError(clientId, `Unknown message type: ${type}`); } } /** * 处理读锁请求 * @param {string} clientId - 客户端标识符 * @param {string} resource - 资源名称 * @param {string} requestId - 请求ID * @param {Object} lock - 锁对象 */ handleReadLock(clientId, resource, requestId, lock) { if (!lock.writer) { // 可以立即获取读锁 lock.readers.add(clientId); this.sendToClient(clientId, { type: 'lockGranted', requestId, resource, lockType: 'read' }); this._logger.debug(`Read lock granted to ${clientId} for ${resource}`); } else { // 加入等待队列 lock.queue.push({ clientId, type: 'read', requestId }); this._logger.debug(`Read lock queued for ${clientId} for ${resource}`); } } /** * 处理写锁请求 * @param {string} clientId - 客户端标识符 * @param {string} resource - 资源名称 * @param {string} requestId - 请求ID * @param {Object} lock - 锁对象 */ handleWriteLock(clientId, resource, requestId, lock) { if (lock.readers.size === 0 && !lock.writer) { // 可以立即获取写锁 lock.writer = clientId; this.sendToClient(clientId, { type: 'lockGranted', requestId, resource, lockType: 'write' }); this._logger.debug(`Write lock granted to ${clientId} for ${resource}`); } else { // 加入等待队列 lock.queue.push({ clientId, type: 'write', requestId }); this._logger.debug(`Write lock queued for ${clientId} for ${resource}`); } } /** * 处理解锁请求 * @param {string} clientId - 客户端标识符 * @param {string} resource - 资源名称 * @param {Object} lock - 锁对象 */ handleUnlock(clientId, resource, lock) { let released = false; // 移除读锁 if (lock.readers.has(clientId)) { lock.readers.delete(clientId); released = true; this._logger.debug(`Read lock released by ${clientId} for ${resource}`); } // 移除写锁 if (lock.writer === clientId) { lock.writer = null; released = true; this._logger.debug(`Write lock released by ${clientId} for ${resource}`); } if (released) { // 处理等待队列 this.processQueue(resource, lock); } } /** * 处理等待队列中的锁请求 * 根据读写锁规则,尽可能多地授权锁请求 * @param {string} resource - 资源名称 * @param {Object} lock - 锁对象 */ processQueue(resource, lock) { const granted = []; for (let i = 0; i < lock.queue.length; i++) { const request = lock.queue[i]; if (request.type === 'read') { if (!lock.writer) { // 可以授予读锁 lock.readers.add(request.clientId); this.sendToClient(request.clientId, { type: 'lockGranted', requestId: request.requestId, resource, lockType: 'read' }); granted.push(i); this._logger.debug(`Queued read lock granted to ${request.clientId} for ${resource}`); } else { // 有写锁等待,读锁必须等待 break; } } else { // write if (lock.readers.size === 0 && !lock.writer) { // 可以授予写锁 lock.writer = request.clientId; this.sendToClient(request.clientId, { type: 'lockGranted', requestId: request.requestId, resource, lockType: 'write' }); granted.push(i); this._logger.debug(`Queued write lock granted to ${request.clientId} for ${resource}`); } else { // 写锁必须等待前面的锁释放 break; } } } // 移除已处理的请求 if (granted.length > 0) { lock.queue = lock.queue.filter((_, index) => !granted.includes(index)); } } /** * 处理客户端断开连接 * 释放该客户端持有的所有锁,并处理相关队列 * @param {string} clientId - 客户端标识符 */ handleClientDisconnect(clientId) { this._logger.debug(`Processing disconnect for client: ${clientId}`); // 释放该客户端持有的所有锁 for (const [resource, lock] of this.locks) { let released = false; if (lock.readers.has(clientId)) { lock.readers.delete(clientId); released = true; } if (lock.writer === clientId) { lock.writer = null; released = true; } // 如果释放了锁,重新处理队列 if (released) { this.processQueue(resource, lock); } } } /** * 向指定客户端发送消息 * @param {string} clientId - 客户端标识符 * @param {Object} message - 要发送的消息对象 */ sendToClient(clientId, message) { const socket = this.clients.get(clientId); if (socket && !socket.destroyed) { try { socket.write(JSON.stringify(message)); } catch (error) { this._logger.error(`Error sending to client ${clientId}:`, error); } } } /** * 向客户端发送错误消息 * @param {string} clientId - 客户端标识符 * @param {string} errorMessage - 错误信息 */ sendError(clientId, errorMessage) { this.sendToClient(clientId, { type: 'error', message: errorMessage }); } } module.exports = LockServer;