nlocks/lock.js

264 lines
7.3 KiB
JavaScript

'use strict'
/**
* @typedef {Object} LockRequest
* @property {string} type - 锁类型,可以是 "read" 或 "write"
* @property {string} resource - 资源名称
* @property {string} requestId - 请求ID
*/
class LockServer {
/**
*
* @param {Object} lockOptions
* @param {string} lockOptions.pipePath - 命名管道路径
* @param {number} lockOptions.port - port // 端口 for tcp
* @param {object} lockOptions.logger // logger
*/
constructor(lockOptions) {
this.options = lockOptions;
this._logger = lockOptions.logger||console;
this.server = null;
this.locks = new Map(); // resource -> { readers: Set, writer: null, queue: [] }
this.clients = new Map(); // clientId -> socket
}
/**
*
* @param {string} clientId
* @param {*} clientObject socket or other stands for client
*/
_addClient(clientId, clientObject){
this.clients.set(clientId, clientObject);
}
_removeClient(clientId){
this.clients.delete(clientId);
}
start() {
}
stop() {
}
/**
* 处理来自客户端的消息
* @param {string} clientId - 客户端标识符
* @param {LockRequest} message - 消息对象
*/
handleMessage(clientId, message) {
const { type, resource, requestId } = message;
if (!this.locks.has(resource)) {
this.locks.set(resource, {
readers: new Set(),
writer: null,
queue: []
});
}
const lock = this.locks.get(resource);
switch (type) {
case 'readLock':
this.handleReadLock(clientId, resource, requestId, lock);
break;
case 'writeLock':
this.handleWriteLock(clientId, resource, requestId, lock);
break;
case 'unlock':
this.handleUnlock(clientId, resource, lock);
break;
default:
this.sendError(clientId, `Unknown message type: ${type}`);
}
}
/**
* 处理读锁请求
* @param {string} clientId - 客户端标识符
* @param {string} resource - 资源名称
* @param {string} requestId - 请求ID
* @param {Object} lock - 锁对象
*/
handleReadLock(clientId, resource, requestId, lock) {
if (!lock.writer) {
// 可以立即获取读锁
lock.readers.add(`${clientId}_${requestId}`);
this.sendToClient(clientId, {
type: 'lockGranted',
requestId,
resource,
lockType: 'read'
});
this._logger.debug(`Read lock granted to ${clientId} for ${resource}`);
} else {
// 加入等待队列
lock.queue.push({ clientId, type: 'read', requestId });
this._logger.debug(`Read lock queued for ${clientId} for ${resource}`);
}
}
/**
* 处理写锁请求
* @param {string} clientId - 客户端标识符
* @param {string} resource - 资源名称
* @param {string} requestId - 请求ID
* @param {Object} lock - 锁对象
*/
handleWriteLock(clientId, resource, requestId, lock) {
if (lock.readers.size === 0 && !lock.writer) {
// 可以立即获取写锁
lock.writer = clientId;
this.sendToClient(`${clientId}_${requestId}`, {
type: 'lockGranted',
requestId,
resource,
lockType: 'write'
});
this._logger.debug(`Write lock granted to ${clientId} for ${resource}`);
} else {
// 加入等待队列
lock.queue.push({ clientId, type: 'write', requestId });
this._logger.debug(`Write lock queued for ${clientId} for ${resource}`);
}
}
/**
* 处理解锁请求
* @param {string} clientId - 客户端标识符
* @param {string} resource - 资源名称
* @param {Object} lock - 锁对象
*/
handleUnlock(clientId, resource, lock) {
let released = false;
// 移除读锁
if (lock.readers.has(clientId)) {
lock.readers.delete(clientId);
released = true;
this._logger.debug(`Read lock released by ${clientId} for ${resource}`);
}
// 移除写锁
if (lock.writer === clientId) {
lock.writer = null;
released = true;
this._logger.debug(`Write lock released by ${clientId} for ${resource}`);
}
if (released) {
// 处理等待队列
this.processQueue(resource, lock);
}
}
/**
* 处理等待队列中的锁请求
* 根据读写锁规则,尽可能多地授权锁请求
* @param {string} resource - 资源名称
* @param {Object} lock - 锁对象
*/
processQueue(resource, lock) {
const granted = [];
for (let i = 0; i < lock.queue.length; i++) {
const request = lock.queue[i];
if (request.type === 'read') {
if (!lock.writer) {
// 可以授予读锁
lock.readers.add(request.clientId);
this.sendToClient(request.clientId, {
type: 'lockGranted',
requestId: request.requestId,
resource,
lockType: 'read'
});
granted.push(i);
this._logger.debug(`Queued read lock granted to ${request.clientId} for ${resource}`);
} else {
// 有写锁等待,读锁必须等待
break;
}
} else { // write
if (lock.readers.size === 0 && !lock.writer) {
// 可以授予写锁
lock.writer = request.clientId;
this.sendToClient(request.clientId, {
type: 'lockGranted',
requestId: request.requestId,
resource,
lockType: 'write'
});
granted.push(i);
this._logger.debug(`Queued write lock granted to ${request.clientId} for ${resource}`);
} else {
// 写锁必须等待前面的锁释放
break;
}
}
}
// 移除已处理的请求
if (granted.length > 0) {
lock.queue = lock.queue.filter((_, index) => !granted.includes(index));
}
}
/**
* 处理客户端断开连接
* 释放该客户端持有的所有锁,并处理相关队列
* @param {string} clientId - 客户端标识符
*/
handleClientDisconnect(clientId) {
this._logger.debug(`Processing disconnect for client: ${clientId}`);
// 释放该客户端持有的所有锁
for (const [resource, lock] of this.locks) {
let released = false;
if (lock.readers.has(clientId)) {
lock.readers.delete(clientId);
released = true;
}
if (lock.writer === clientId) {
lock.writer = null;
released = true;
}
// 如果释放了锁,重新处理队列
if (released) {
this.processQueue(resource, lock);
}
}
}
/**
* 向指定客户端发送消息
* @param {string} clientId - 客户端标识符
* @param {Object} message - 要发送的消息对象
*/
sendToClient(clientId, message) {
const socket = this.clients.get(clientId);
if (socket && !socket.destroyed) {
try {
socket.write(JSON.stringify(message));
} catch (error) {
this._logger.error(`Error sending to client ${clientId}:`, error);
}
}
}
/**
* 向客户端发送错误消息
* @param {string} clientId - 客户端标识符
* @param {string} errorMessage - 错误信息
*/
sendError(clientId, errorMessage) {
this.sendToClient(clientId, {
type: 'error',
message: errorMessage
});
}
}
module.exports = LockServer;