diff --git a/__tests__/namedpipe-lock.test.js b/__tests__/namedpipe-lock.test.js index b30bbe1..bfc48bf 100644 --- a/__tests__/namedpipe-lock.test.js +++ b/__tests__/namedpipe-lock.test.js @@ -4,7 +4,7 @@ const fs = require('fs'); const os = require('os'); const path = require('path'); -jest.setTimeout(30000); +jest.setTimeout(3000000); // Helper function to create a unique pipe path for testing const createTestPipePath = () => { if (process.platform === 'win32') { @@ -17,12 +17,14 @@ const createTestPipePath = () => { describe('NamedPipeLock', () => { let server; let pipePath; + let connect = {} // 在所有测试之前启动服务器 beforeAll(async () => { pipePath = createTestPipePath(); - server = new NamedPipeLockServer(pipePath); - server.start(); + connect = {pipePath} + server = new NamedPipeLockServer({pipePath}); + await server.start(); }); // 在所有测试完成后停止服务器 @@ -36,7 +38,7 @@ describe('NamedPipeLock', () => { describe('Basic Lock Operations', () => { test('should acquire and release read lock without waiting when not locked', async () => { - const lock = new NamedPipeRWLock('resource1', pipePath); + const lock = new NamedPipeRWLock('resource1',{connect}); const startTime = Date.now(); await lock.readLock(); @@ -50,7 +52,7 @@ describe('NamedPipeLock', () => { }); test('should acquire and release write lock without waiting when not locked', async () => { - const lock = new NamedPipeRWLock('resource2', pipePath); + const lock = new NamedPipeRWLock('resource2',{connect}); const startTime = Date.now(); await lock.writeLock(); @@ -64,7 +66,7 @@ describe('NamedPipeLock', () => { }); test('should allow consecutive acquisitions and releases without queueing', async () => { - const lock = new NamedPipeRWLock('resource3', pipePath); + const lock = new NamedPipeRWLock('resource3',{connect}); // 第一次获取 await lock.readLock(); @@ -87,9 +89,9 @@ describe('NamedPipeLock', () => { describe('Multiple Clients', () => { test('should handle multiple concurrent read locks', async () => { - const lock1 = new NamedPipeRWLock('sharedResource', pipePath); - const lock2 = new NamedPipeRWLock('sharedResource', pipePath); - const lock3 = new NamedPipeRWLock('sharedResource', pipePath); + const lock1 = new NamedPipeRWLock('sharedResource',{connect}); + const lock2 = new NamedPipeRWLock('sharedResource',{connect}); + const lock3 = new NamedPipeRWLock('sharedResource',{connect}); // 所有读锁应该能够同时获取 await Promise.all([ @@ -115,9 +117,9 @@ describe('NamedPipeLock', () => { }); test('should queue write lock when read locks exist', async () => { - const readLock1 = new NamedPipeRWLock('queuedResource', pipePath); - const readLock2 = new NamedPipeRWLock('queuedResource', pipePath); - const writeLock = new NamedPipeRWLock('queuedResource', pipePath); + const readLock1 = new NamedPipeRWLock('queuedResource',{connect}); + const readLock2 = new NamedPipeRWLock('queuedResource',{connect}); + const writeLock = new NamedPipeRWLock('queuedResource',{connect}); // 先获取两个读锁 await readLock1.readLock(); @@ -156,9 +158,9 @@ describe('NamedPipeLock', () => { }); test('should queue read locks when write lock exists', async () => { - const writeLock = new NamedPipeRWLock('queuedResource2', pipePath); - const readLock1 = new NamedPipeRWLock('queuedResource2', pipePath); - const readLock2 = new NamedPipeRWLock('queuedResource2', pipePath); + const writeLock = new NamedPipeRWLock('queuedResource2',{connect}); + const readLock1 = new NamedPipeRWLock('queuedResource2',{connect}); + const readLock2 = new NamedPipeRWLock('queuedResource2',{connect}); // 先获取写锁 await writeLock.writeLock(); @@ -205,7 +207,7 @@ describe('NamedPipeLock', () => { describe('Error Handling', () => { test('should reject when trying to acquire lock while already holding one', async () => { - const lock = new NamedPipeRWLock('errorResource', pipePath); + const lock = new NamedPipeRWLock('errorResource',{connect}); await lock.readLock(); @@ -217,12 +219,12 @@ describe('NamedPipeLock', () => { }); test('should handle lock acquisition timeout', async () => { - const lock = new NamedPipeRWLock('timeoutResource', pipePath, { + const lock = new NamedPipeRWLock('timeoutResource',{connect}, { timeout: 100 // 设置很短的超时时间 }); // 模拟一个永远不会释放的锁场景 - const blockingLock = new NamedPipeRWLock('timeoutResource', pipePath); + const blockingLock = new NamedPipeRWLock('timeoutResource',{connect}); await blockingLock.writeLock(); // 尝试获取已经被占用的锁,应该会超时 diff --git a/__tests__/tcp-lock.test.js b/__tests__/tcp-lock.test.js new file mode 100644 index 0000000..93be6b0 --- /dev/null +++ b/__tests__/tcp-lock.test.js @@ -0,0 +1,142 @@ +const TCPLockServer = require('../lock.tcp'); +const TcpRwLock = require('../lock-client.tcp'); +const net = require('net'); + +jest.setTimeout(30000); + +// Helper function to check if a port is available +const isPortAvailable = (port) => { + return new Promise((resolve) => { + const tester = net.createServer(); + tester.listen(port, () => { + tester.once('close', () => { + resolve(true); + }); + tester.close(); + }); + tester.on('error', () => { + resolve(false); + }); + }); +}; + +// Find an available port for testing +const findAvailablePort = async (startPort = 7301) => { + let port = startPort; + while (!(await isPortAvailable(port))) { + port++; + } + return port; +}; + +describe('TCPLock', () => { + let server; + let port; + let connect = {}; + + // Start server before all tests + beforeAll(async () => { + port = await findAvailablePort(); + connect = { host: 'localhost', port }; + server = new TCPLockServer({ host: 'localhost', port }); + await server.start(); + }); + + // Stop server after all tests + afterAll(() => { + if (server) { + server.stop(); + server = null; + } + }); + + test('should be able to create TCP lock client', () => { + const lock = new TcpRwLock('test-resource', { connect }); + expect(lock).toBeInstanceOf(TcpRwLock); + expect(lock.host).toBe('localhost'); + expect(lock.port).toBe(port); + }); + + test('should be able to connect to TCP lock server', async () => { + const lock = new TcpRwLock('test-resource', { connect }); + + try { + await lock.connect(); + // Since there's no isConnected method, we'll check if we can send a request + expect(lock.socket).toBeDefined(); + expect(lock.socket.destroyed).toBe(false); + } finally { + if (lock.socket) { + lock.close(); + } + } + }); + + test('should be able to acquire and release read lock', async () => { + const lock = new TcpRwLock('test-resource', { connect }); + + try { + await lock.connect(); + await lock.readLock(); + expect(lock.isLocked).toBe(true); + await lock.unlock(); + expect(lock.isLocked).toBe(false); + } finally { + if (lock.socket) { + lock.close(); + } + } + }); + + test('should be able to acquire and release write lock', async () => { + const lock = new TcpRwLock('test-resource', { connect }); + + try { + await lock.connect(); + await lock.writeLock(); + expect(lock.isLocked).toBe(true); + await lock.unlock(); + expect(lock.isLocked).toBe(false); + } finally { + if (lock.socket) { + lock.close(); + } + } + }); + + test('should handle multiple clients', async () => { + const lock1 = new TcpRwLock('shared-resource', { connect }); + const lock2 = new TcpRwLock('shared-resource', { connect }); + + try { + await Promise.all([ + lock1.connect(), + lock2.connect() + ]); + + // Both clients should be able to acquire read locks + await Promise.all([ + lock1.readLock(), + lock2.readLock() + ]); + + expect(lock1.isLocked).toBe(true); + expect(lock2.isLocked).toBe(true); + + await Promise.all([ + lock1.unlock(), + lock2.unlock() + ]); + + expect(lock1.isLocked).toBe(false); + expect(lock2.isLocked).toBe(false); + } finally { + if (lock1.socket) { + lock1.close(); + } + if (lock2.socket) { + lock2.close(); + } + } + }); +}); \ No newline at end of file diff --git a/index.js b/index.js index 31c6af2..e17ded4 100644 --- a/index.js +++ b/index.js @@ -2,8 +2,19 @@ const AsyncLock = require('./async-lock') const FileLock = require('./file-lock') - +const LockServer = require('./lock') +const NamedPipeLockServer = require('./lock.namedpipe') +const NamedPipeRWLock = require('./lock-client.namedpipe') +const TCPLockServer = require('./lock.tcp') +const TcpRwLock = require('./lock-client.tcp') +const LockClient = require('./lock-client') module.exports = { AsyncLock, - FileLock + FileLock, + LockServer, + NamedPipeLockServer, + NamedPipeRWLock, + TCPLockServer, + TcpRwLock, + LockClient } \ No newline at end of file diff --git a/lock-client.js b/lock-client.js new file mode 100644 index 0000000..fe4833c --- /dev/null +++ b/lock-client.js @@ -0,0 +1,201 @@ +// named-pipe-client.js +const net = require('net'); +const { EventEmitter } = require('events'); +const { v4: uuidv4 } = require('uuid'); + +/** + * 基于命名管道的读写锁客户端 + * 提供对NamedPipeLockServer的客户端接口,允许应用程序请求和释放读写锁 + * 支持自动重连和超时机制 + */ +class LockClient extends EventEmitter { + /** + * 创建NamedPipeRWLock实例 + * @param {string} resource - 要锁定的资源名称 + * @param {string} pipePath - 服务器命名管道路径 + * @param {Object} options - 配置选项 + * @param {number} options.timeout - 锁请求超时时间(毫秒),默认30000 + * @param {number} options.retryInterval - 重连间隔(毫秒),默认1000 + * @param {number} options.maxRetries - 最大重连次数,默认5 + * @param {object} options.connect + * @param {string} options.connect.pipePath - 命名管道路径 + * @param {string} options.connect.host - 服务器主机名或IP地址 + * @param {number} options.connect.port - 命名管道端口号 + * @param {object} options.logger + */ + constructor(resource, options = {}) { + super(); + this.resource = resource; + + this._connectInfo = options.connect || {}; + this.timeout = options.timeout || 30000; + this.retryInterval = options.retryInterval || 1000; + this.maxRetries = options.maxRetries || 5; + + this.socket = null; + this.requestId = null; + this.isLocked = false; + this.lockType = null; + this.timeoutHandle = null; + this.retryCount = 0; + this._logger = options.logger || console; + } + + /** + * 连接到锁服务器 + * @returns {Promise} 连接成功时resolve + */ + connect() { + + } + + /** + * 请求读锁 + * @returns {Promise} 锁获取成功时resolve + */ + async readLock() { + return this.acquireLock('readLock'); + } + + /** + * 请求写锁 + * @returns {Promise} 锁获取成功时resolve + */ + async writeLock() { + return this.acquireLock('writeLock'); + } + + /** + * 获取指定类型的锁 + * @param {string} type - 锁类型 ('readLock' 或 'writeLock') + * @returns {Promise} 锁获取成功时resolve + */ + async acquireLock(type) { + await this.ensureConnected(); + + return new Promise((resolve, reject) => { + if (this.isLocked) { + reject(new Error('Lock already held')); + return; + } + + this.requestId = uuidv4(); + this.lockType = type; + + // 发送锁请求 + this.sendMessage({ + type, + resource: this.resource, + requestId: this.requestId + }); + + // 设置超时 + this.timeoutHandle = setTimeout(() => { + this.cleanup(); + reject(new Error(`Lock acquisition timeout after ${this.timeout}ms`)); + }, this.timeout); + + // 监听锁授予事件 + const onLockGranted = (data) => { + if (data.requestId === this.requestId) { + this.isLocked = true; + clearTimeout(this.timeoutHandle); + this.removeListener('lockGranted', onLockGranted); + resolve(); + } + }; + + this.on('lockGranted', onLockGranted); + + // 监听错误事件 + const onError = (errorData) => { + if (errorData.requestId === this.requestId) { + clearTimeout(this.timeoutHandle); + this.removeListener('error', onError); + reject(new Error(errorData.message)); + } + }; + + this.on('error', onError); + }); + } + + /** + * 释放当前持有的锁 + * @returns {Promise} + */ + async unlock() { + if (!this.isLocked || !this.socket) return; + + this.sendMessage({ + type: 'unlock', + resource: this.resource + }); + + this.cleanup(); + this._logger.debug(`Lock released for resource: ${this.resource}`); + } + + + + /** + * 处理与服务器的连接断开 + */ + handleDisconnect() { + this._logger.debug('Disconnected from lock server'); + this.cleanup(); + this.socket = null; + + // 自动重连逻辑 + if (this.retryCount < this.maxRetries) { + this.retryCount++; + this._logger.debug(`Attempting to reconnect... (${this.retryCount}/${this.maxRetries})`); + + setTimeout(() => { + this.ensureConnected().catch(error => { + this._logger.error('Reconnection failed:', error); + }); + }, this.retryInterval); + } + } + + /** + * 向服务器发送消息 + * @param {Object} message - 要发送的消息对象 + */ + sendMessage(message) { + + } + + /** + * 确保客户端已连接到服务器 + * @returns {Promise} + */ + async ensureConnected() { + if (!this.socket || this.socket.destroyed) { + await this.connect(); + } + } + + /** + * 清理内部状态 + */ + cleanup() { + this.isLocked = false; + this.lockType = null; + this.requestId = null; + if (this.timeoutHandle) { + clearTimeout(this.timeoutHandle); + this.timeoutHandle = null; + } + } + + /** + * 关闭连接 + */ + close() { + + } +} + +module.exports = LockClient; \ No newline at end of file diff --git a/lock-client.namedpipe.js b/lock-client.namedpipe.js index cb49f98..78d098a 100644 --- a/lock-client.namedpipe.js +++ b/lock-client.namedpipe.js @@ -1,6 +1,6 @@ // named-pipe-client.js const net = require('net'); -const { EventEmitter } = require('events'); +const LockClient = require('./lock-client'); const { v4: uuidv4 } = require('uuid'); /** @@ -8,7 +8,7 @@ const { v4: uuidv4 } = require('uuid'); * 提供对NamedPipeLockServer的客户端接口,允许应用程序请求和释放读写锁 * 支持自动重连和超时机制 */ -class NamedPipeRWLock extends EventEmitter { +class NamedPipeRWLock extends LockClient { /** * 创建NamedPipeRWLock实例 * @param {string} resource - 要锁定的资源名称 @@ -18,41 +18,40 @@ class NamedPipeRWLock extends EventEmitter { * @param {number} options.retryInterval - 重连间隔(毫秒),默认1000 * @param {number} options.maxRetries - 最大重连次数,默认5 */ - constructor(resource, pipePath = '\\\\.\\pipe\\rwlock-server', options = {}) { - super(); + constructor(resource, options = {}) { + super(options); this.resource = resource; - this.pipePath = pipePath; - this.timeout = options.timeout || 30000; - this.retryInterval = options.retryInterval || 1000; - this.maxRetries = options.maxRetries || 5; - - this.socket = null; - this.requestId = null; - this.isLocked = false; - this.lockType = null; - this.timeoutHandle = null; - this.retryCount = 0; + this.pipePath = options.connect.pipePath; } + _createConnection(){ + this.socket = net.createConnection(this.pipePath) + } /** * 连接到锁服务器 * @returns {Promise} 连接成功时resolve */ - async connect() { + connect() { return new Promise((resolve, reject) => { if (this.socket && !this.socket.destroyed) { resolve(); return; } - this.socket = net.createConnection(this.pipePath, () => { - console.log(`Connected to lock server at ${this.pipePath}`); + // this.socket = net.createConnection(this.pipePath, () => { + // console.log(`Connected to lock server at ${this.pipePath}`); + // this.retryCount = 0; + // resolve(); + // }); + this._createConnection() + this.socket.on('connect', () => { + this._logger.debug(`Connected to lock server at ${this.pipePath}`); this.retryCount = 0; resolve(); }); - + this.socket.on('error', (error) => { - console.error('Connection error:', error); + this._logger.error('Connection error:', error); reject(error); }); @@ -62,93 +61,6 @@ class NamedPipeRWLock extends EventEmitter { }); } - /** - * 请求读锁 - * @returns {Promise} 锁获取成功时resolve - */ - async readLock() { - return this.acquireLock('readLock'); - } - - /** - * 请求写锁 - * @returns {Promise} 锁获取成功时resolve - */ - async writeLock() { - return this.acquireLock('writeLock'); - } - - /** - * 获取指定类型的锁 - * @param {string} type - 锁类型 ('readLock' 或 'writeLock') - * @returns {Promise} 锁获取成功时resolve - */ - async acquireLock(type) { - await this.ensureConnected(); - - return new Promise((resolve, reject) => { - if (this.isLocked) { - reject(new Error('Lock already held')); - return; - } - - this.requestId = uuidv4(); - this.lockType = type; - - // 发送锁请求 - this.sendMessage({ - type, - resource: this.resource, - requestId: this.requestId - }); - - // 设置超时 - this.timeoutHandle = setTimeout(() => { - this.cleanup(); - reject(new Error(`Lock acquisition timeout after ${this.timeout}ms`)); - }, this.timeout); - - // 监听锁授予事件 - const onLockGranted = (data) => { - if (data.requestId === this.requestId) { - this.isLocked = true; - clearTimeout(this.timeoutHandle); - this.removeListener('lockGranted', onLockGranted); - resolve(); - } - }; - - this.on('lockGranted', onLockGranted); - - // 监听错误事件 - const onError = (errorData) => { - if (errorData.requestId === this.requestId) { - clearTimeout(this.timeoutHandle); - this.removeListener('error', onError); - reject(new Error(errorData.message)); - } - }; - - this.on('error', onError); - }); - } - - /** - * 释放当前持有的锁 - * @returns {Promise} - */ - async unlock() { - if (!this.isLocked || !this.socket) return; - - this.sendMessage({ - type: 'unlock', - resource: this.resource - }); - - this.cleanup(); - console.log(`Lock released for resource: ${this.resource}`); - } - /** * 处理从服务器收到的消息 * @param {Buffer} data - 接收的数据 @@ -176,26 +88,6 @@ class NamedPipeRWLock extends EventEmitter { } } - /** - * 处理与服务器的连接断开 - */ - handleDisconnect() { - console.log('Disconnected from lock server'); - this.cleanup(); - this.socket = null; - - // 自动重连逻辑 - if (this.retryCount < this.maxRetries) { - this.retryCount++; - console.log(`Attempting to reconnect... (${this.retryCount}/${this.maxRetries})`); - - setTimeout(() => { - this.ensureConnected().catch(error => { - console.error('Reconnection failed:', error); - }); - }, this.retryInterval); - } - } /** * 向服务器发送消息 @@ -215,28 +107,9 @@ class NamedPipeRWLock extends EventEmitter { } } - /** - * 确保客户端已连接到服务器 - * @returns {Promise} - */ - async ensureConnected() { - if (!this.socket || this.socket.destroyed) { - await this.connect(); - } - } - /** - * 清理内部状态 - */ - cleanup() { - this.isLocked = false; - this.lockType = null; - this.requestId = null; - if (this.timeoutHandle) { - clearTimeout(this.timeoutHandle); - this.timeoutHandle = null; - } - } + + /** * 关闭连接 diff --git a/lock-client.tcp.js b/lock-client.tcp.js new file mode 100644 index 0000000..b347f86 --- /dev/null +++ b/lock-client.tcp.js @@ -0,0 +1,15 @@ +const NamedPipeRWLock = require('./lock-client.namedpipe'); +const net = require('net'); + +class TcpRwLock extends NamedPipeRWLock { + constructor(resource, options = {}) { + super(resource, options); + this.host = options.connect.host||'localhost'; + this.port = options.connect.port||7301; + } + _createConnection() { + this.socket = net.createConnection({host:this.host, port:this.port}) + } +} + +module.exports = TcpRwLock; \ No newline at end of file diff --git a/lock.js b/lock.js new file mode 100644 index 0000000..abf3958 --- /dev/null +++ b/lock.js @@ -0,0 +1,264 @@ +'use strict' + +/** + * @typedef {Object} LockRequest + * @property {string} type - 锁类型,可以是 "read" 或 "write" + * @property {string} resource - 资源名称 + * @property {string} requestId - 请求ID + */ + +class LockServer { + /** + * + * @param {Object} lockOptions + * @param {string} lockOptions.pipePath - 命名管道路径 + * @param {number} lockOptions.port - port // 端口 for tcp + * @param {object} lockOptions.logger // logger + */ + constructor(lockOptions) { + this.options = lockOptions; + this._logger = lockOptions.logger||console; + this.server = null; + this.locks = new Map(); // resource -> { readers: Set, writer: null, queue: [] } + this.clients = new Map(); // clientId -> socket + } + + /** + * + * @param {string} clientId + * @param {*} clientObject socket or other stands for client + */ + _addClient(clientId, clientObject){ + this.clients.set(clientId, clientObject); + } + _removeClient(clientId){ + this.clients.delete(clientId); + } + start() { + } + stop() { + } + /** + * 处理来自客户端的消息 + * @param {string} clientId - 客户端标识符 + * @param {LockRequest} message - 消息对象 + */ + handleMessage(clientId, message) { + const { type, resource, requestId } = message; + + if (!this.locks.has(resource)) { + this.locks.set(resource, { + readers: new Set(), + writer: null, + queue: [] + }); + } + + const lock = this.locks.get(resource); + + switch (type) { + case 'readLock': + this.handleReadLock(clientId, resource, requestId, lock); + break; + case 'writeLock': + this.handleWriteLock(clientId, resource, requestId, lock); + break; + case 'unlock': + this.handleUnlock(clientId, resource, lock); + break; + default: + this.sendError(clientId, `Unknown message type: ${type}`); + } + } + + + /** + * 处理读锁请求 + * @param {string} clientId - 客户端标识符 + * @param {string} resource - 资源名称 + * @param {string} requestId - 请求ID + * @param {Object} lock - 锁对象 + */ + handleReadLock(clientId, resource, requestId, lock) { + if (!lock.writer) { + // 可以立即获取读锁 + lock.readers.add(clientId); + this.sendToClient(clientId, { + type: 'lockGranted', + requestId, + resource, + lockType: 'read' + }); + this._logger.debug(`Read lock granted to ${clientId} for ${resource}`); + } else { + // 加入等待队列 + lock.queue.push({ clientId, type: 'read', requestId }); + this._logger.debug(`Read lock queued for ${clientId} for ${resource}`); + } + } + + /** + * 处理写锁请求 + * @param {string} clientId - 客户端标识符 + * @param {string} resource - 资源名称 + * @param {string} requestId - 请求ID + * @param {Object} lock - 锁对象 + */ + handleWriteLock(clientId, resource, requestId, lock) { + if (lock.readers.size === 0 && !lock.writer) { + // 可以立即获取写锁 + lock.writer = clientId; + this.sendToClient(clientId, { + type: 'lockGranted', + requestId, + resource, + lockType: 'write' + }); + this._logger.debug(`Write lock granted to ${clientId} for ${resource}`); + } else { + // 加入等待队列 + lock.queue.push({ clientId, type: 'write', requestId }); + this._logger.debug(`Write lock queued for ${clientId} for ${resource}`); + } + } + + /** + * 处理解锁请求 + * @param {string} clientId - 客户端标识符 + * @param {string} resource - 资源名称 + * @param {Object} lock - 锁对象 + */ + handleUnlock(clientId, resource, lock) { + let released = false; + + // 移除读锁 + if (lock.readers.has(clientId)) { + lock.readers.delete(clientId); + released = true; + this._logger.debug(`Read lock released by ${clientId} for ${resource}`); + } + + // 移除写锁 + if (lock.writer === clientId) { + lock.writer = null; + released = true; + this._logger.debug(`Write lock released by ${clientId} for ${resource}`); + } + + if (released) { + // 处理等待队列 + this.processQueue(resource, lock); + } + } + + /** + * 处理等待队列中的锁请求 + * 根据读写锁规则,尽可能多地授权锁请求 + * @param {string} resource - 资源名称 + * @param {Object} lock - 锁对象 + */ + processQueue(resource, lock) { + const granted = []; + + for (let i = 0; i < lock.queue.length; i++) { + const request = lock.queue[i]; + + if (request.type === 'read') { + if (!lock.writer) { + // 可以授予读锁 + lock.readers.add(request.clientId); + this.sendToClient(request.clientId, { + type: 'lockGranted', + requestId: request.requestId, + resource, + lockType: 'read' + }); + granted.push(i); + this._logger.debug(`Queued read lock granted to ${request.clientId} for ${resource}`); + } else { + // 有写锁等待,读锁必须等待 + break; + } + } else { // write + if (lock.readers.size === 0 && !lock.writer) { + // 可以授予写锁 + lock.writer = request.clientId; + this.sendToClient(request.clientId, { + type: 'lockGranted', + requestId: request.requestId, + resource, + lockType: 'write' + }); + granted.push(i); + this._logger.debug(`Queued write lock granted to ${request.clientId} for ${resource}`); + } else { + // 写锁必须等待前面的锁释放 + break; + } + } + } + + // 移除已处理的请求 + if (granted.length > 0) { + lock.queue = lock.queue.filter((_, index) => !granted.includes(index)); + } + } + /** + * 处理客户端断开连接 + * 释放该客户端持有的所有锁,并处理相关队列 + * @param {string} clientId - 客户端标识符 + */ + handleClientDisconnect(clientId) { + this._logger.debug(`Processing disconnect for client: ${clientId}`); + + // 释放该客户端持有的所有锁 + for (const [resource, lock] of this.locks) { + let released = false; + + if (lock.readers.has(clientId)) { + lock.readers.delete(clientId); + released = true; + } + + if (lock.writer === clientId) { + lock.writer = null; + released = true; + } + + // 如果释放了锁,重新处理队列 + if (released) { + this.processQueue(resource, lock); + } + } + } + + /** + * 向指定客户端发送消息 + * @param {string} clientId - 客户端标识符 + * @param {Object} message - 要发送的消息对象 + */ + sendToClient(clientId, message) { + const socket = this.clients.get(clientId); + if (socket && !socket.destroyed) { + try { + socket.write(JSON.stringify(message)); + } catch (error) { + this._logger.error(`Error sending to client ${clientId}:`, error); + } + } + } + + /** + * 向客户端发送错误消息 + * @param {string} clientId - 客户端标识符 + * @param {string} errorMessage - 错误信息 + */ + sendError(clientId, errorMessage) { + this.sendToClient(clientId, { + type: 'error', + message: errorMessage + }); + } +} + +module.exports = LockServer; \ No newline at end of file diff --git a/lock.namedpipe.js b/lock.namedpipe.js index dc43f8e..bee3781 100644 --- a/lock.namedpipe.js +++ b/lock.namedpipe.js @@ -2,8 +2,8 @@ const net = require('net'); const fs = require('fs'); const path = require('path'); -const { EventEmitter } = require('events'); -const async_hooks = require('async_hooks'); +const LockServer = require('./lock'); + const { v4: uuidv4 } = require('uuid'); const DEFAULT_PIPE_PATH = process.platform === 'win32'? '\\\\.\\pipe\\rwlock' : '/tmp/rwlock.sock'; @@ -16,17 +16,15 @@ const DEFAULT_PIPE_PATH = process.platform === 'win32'? '\\\\.\\pipe\\rwlock' : * 2. 写操作是独占的,不允许同时进行读或其他写操作 * 3. 锁请求按先进先出(FIFO)方式处理 */ -class NamedPipeLockServer extends EventEmitter { +class NamedPipeLockServer extends LockServer { /** * 创建一个NamedPipeLockServer实例 * @param {string} pipePath - 命名管道路径,默认值根据操作系统确定 */ - constructor(pipePath = DEFAULT_PIPE_PATH) { - super(); - this.pipePath = pipePath; - this.server = null; - this.locks = new Map(); // resource -> { readers: Set, writer: null, queue: [] } - this.clients = new Map(); // clientId -> socket + constructor(lockOptions) { + super(lockOptions); + this.pipePath = this.options.pipePath || DEFAULT_PIPE_PATH; + } /** @@ -50,7 +48,7 @@ class NamedPipeLockServer extends EventEmitter { socket.clientId = clientId; this.clients.set(clientId, socket); - console.log(`Client connected: ${clientId}`); + this._logger.debug(`Client connected: ${clientId}`); socket.on('data', data => { try { @@ -72,7 +70,7 @@ class NamedPipeLockServer extends EventEmitter { socket.on('close', () => { this.handleClientDisconnect(clientId); this.clients.delete(clientId); - console.log(`Client disconnected: ${clientId}`); + this._logger.debug(`Client disconnected: ${clientId}`); }); socket.on('error', error => { @@ -83,8 +81,11 @@ class NamedPipeLockServer extends EventEmitter { }); // 启动命名管道服务器 + this._start() + } + _start(){ this.server.listen(this.pipePath, () => { - console.log(`Lock server listening on named pipe: ${this.pipePath}`); + this._logger.debug(`Lock server listening on named pipe: ${this.pipePath}`); }); this.server.on('error', error => { @@ -92,235 +93,13 @@ class NamedPipeLockServer extends EventEmitter { }); } - /** - * 处理来自客户端的消息 - * @param {string} clientId - 客户端标识符 - * @param {Object} message - 消息对象 - */ - handleMessage(clientId, message) { - const { type, resource, requestId } = message; - - if (!this.locks.has(resource)) { - this.locks.set(resource, { - readers: new Set(), - writer: null, - queue: [] - }); - } - - const lock = this.locks.get(resource); - - switch (type) { - case 'readLock': - this.handleReadLock(clientId, resource, requestId, lock); - break; - case 'writeLock': - this.handleWriteLock(clientId, resource, requestId, lock); - break; - case 'unlock': - this.handleUnlock(clientId, resource, lock); - break; - default: - this.sendError(clientId, `Unknown message type: ${type}`); - } - } - - /** - * 处理读锁请求 - * @param {string} clientId - 客户端标识符 - * @param {string} resource - 资源名称 - * @param {string} requestId - 请求ID - * @param {Object} lock - 锁对象 - */ - handleReadLock(clientId, resource, requestId, lock) { - if (!lock.writer) { - // 可以立即获取读锁 - lock.readers.add(clientId); - this.sendToClient(clientId, { - type: 'lockGranted', - requestId, - resource, - lockType: 'read' - }); - console.log(`Read lock granted to ${clientId} for ${resource}`); - } else { - // 加入等待队列 - lock.queue.push({ clientId, type: 'read', requestId }); - console.log(`Read lock queued for ${clientId} for ${resource}`); - } - } - - /** - * 处理写锁请求 - * @param {string} clientId - 客户端标识符 - * @param {string} resource - 资源名称 - * @param {string} requestId - 请求ID - * @param {Object} lock - 锁对象 - */ - handleWriteLock(clientId, resource, requestId, lock) { - if (lock.readers.size === 0 && !lock.writer) { - // 可以立即获取写锁 - lock.writer = clientId; - this.sendToClient(clientId, { - type: 'lockGranted', - requestId, - resource, - lockType: 'write' - }); - console.log(`Write lock granted to ${clientId} for ${resource}`); - } else { - // 加入等待队列 - lock.queue.push({ clientId, type: 'write', requestId }); - console.log(`Write lock queued for ${clientId} for ${resource}`); - } - } - - /** - * 处理解锁请求 - * @param {string} clientId - 客户端标识符 - * @param {string} resource - 资源名称 - * @param {Object} lock - 锁对象 - */ - handleUnlock(clientId, resource, lock) { - let released = false; - - // 移除读锁 - if (lock.readers.has(clientId)) { - lock.readers.delete(clientId); - released = true; - console.log(`Read lock released by ${clientId} for ${resource}`); - } - - // 移除写锁 - if (lock.writer === clientId) { - lock.writer = null; - released = true; - console.log(`Write lock released by ${clientId} for ${resource}`); - } - - if (released) { - // 处理等待队列 - this.processQueue(resource, lock); - } - } - - /** - * 处理等待队列中的锁请求 - * 根据读写锁规则,尽可能多地授权锁请求 - * @param {string} resource - 资源名称 - * @param {Object} lock - 锁对象 - */ - processQueue(resource, lock) { - const granted = []; - - for (let i = 0; i < lock.queue.length; i++) { - const request = lock.queue[i]; - - if (request.type === 'read') { - if (!lock.writer) { - // 可以授予读锁 - lock.readers.add(request.clientId); - this.sendToClient(request.clientId, { - type: 'lockGranted', - requestId: request.requestId, - resource, - lockType: 'read' - }); - granted.push(i); - console.log(`Queued read lock granted to ${request.clientId} for ${resource}`); - } else { - // 有写锁等待,读锁必须等待 - break; - } - } else { // write - if (lock.readers.size === 0 && !lock.writer) { - // 可以授予写锁 - lock.writer = request.clientId; - this.sendToClient(request.clientId, { - type: 'lockGranted', - requestId: request.requestId, - resource, - lockType: 'write' - }); - granted.push(i); - console.log(`Queued write lock granted to ${request.clientId} for ${resource}`); - } else { - // 写锁必须等待前面的锁释放 - break; - } - } - } - - // 移除已处理的请求 - if (granted.length > 0) { - lock.queue = lock.queue.filter((_, index) => !granted.includes(index)); - } - } - - /** - * 处理客户端断开连接 - * 释放该客户端持有的所有锁,并处理相关队列 - * @param {string} clientId - 客户端标识符 - */ - handleClientDisconnect(clientId) { - console.log(`Processing disconnect for client: ${clientId}`); - - // 释放该客户端持有的所有锁 - for (const [resource, lock] of this.locks) { - let released = false; - - if (lock.readers.has(clientId)) { - lock.readers.delete(clientId); - released = true; - } - - if (lock.writer === clientId) { - lock.writer = null; - released = true; - } - - // 如果释放了锁,重新处理队列 - if (released) { - this.processQueue(resource, lock); - } - } - } - - /** - * 向指定客户端发送消息 - * @param {string} clientId - 客户端标识符 - * @param {Object} message - 要发送的消息对象 - */ - sendToClient(clientId, message) { - const socket = this.clients.get(clientId); - if (socket && !socket.destroyed) { - try { - socket.write(JSON.stringify(message)); - } catch (error) { - console.error(`Error sending to client ${clientId}:`, error); - } - } - } - - /** - * 向客户端发送错误消息 - * @param {string} clientId - 客户端标识符 - * @param {string} errorMessage - 错误信息 - */ - sendError(clientId, errorMessage) { - this.sendToClient(clientId, { - type: 'error', - message: errorMessage - }); - } - /** * 停止服务器 */ stop() { if (this.server) { this.server.close(); - console.log('Lock server stopped'); + this._logger.debug('Lock server stopped'); } } } @@ -335,13 +114,13 @@ class NamedPipeLockServer extends EventEmitter { // // 优雅关闭 // process.on('SIGINT', () => { -// console.log('Shutting down lock server...'); +// this._logger.debug('Shutting down lock server...'); // server.stop(); // process.exit(0); // }); // process.on('SIGTERM', () => { -// console.log('Shutting down lock server...'); +// this._logger.debug('Shutting down lock server...'); // server.stop(); // process.exit(0); // }); diff --git a/lock.tcp.js b/lock.tcp.js new file mode 100644 index 0000000..99ad847 --- /dev/null +++ b/lock.tcp.js @@ -0,0 +1,19 @@ +'use strict' +const NamedPipeLockServer = require('./lock.namedpipe'); + +class TCPLockServer extends NamedPipeLockServer { + constructor(lockOptions) { + super(lockOptions); + this.port = lockOptions.port || 7301; + this.host = lockOptions.host || '0.0.0.0'; + } + _start(){ + this.server.listen(this.port, this.host,()=>{ + this._logger.debug(`Lock server listening on ${this.host}:${this.port}`); + }); + this.server.on('error', (err) => { + this._logger.error(`Lock server error: ${err}`); + }); + } +} +module.exports = TCPLockServer; \ No newline at end of file