nlocks/lock.namedpipe.js

128 lines
3.5 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// named-pipe-server.js
const net = require('net');
const fs = require('fs');
const path = require('path');
const LockServer = require('./lock');
const { v4: uuidv4 } = require('uuid');
const DEFAULT_PIPE_PATH = process.platform === 'win32'? '\\\\.\\pipe\\rwlock' : '/tmp/rwlock.sock';
/**
* 基于命名管道的读写锁服务器
* 提供跨进程的读写锁功能,支持多个客户端同时请求对同一资源的读锁或写锁
* 实现读写锁的基本语义:
* 1. 多个读操作可以同时进行
* 2. 写操作是独占的,不允许同时进行读或其他写操作
* 3. 锁请求按先进先出(FIFO)方式处理
*/
class NamedPipeLockServer extends LockServer {
/**
* 创建一个NamedPipeLockServer实例
* @param {string} pipePath - 命名管道路径,默认值根据操作系统确定
*/
constructor(lockOptions) {
super(lockOptions);
this.pipePath = this.options.pipePath || DEFAULT_PIPE_PATH;
}
/**
* 启动服务器
* 创建并监听命名管道,处理客户端连接和消息
*/
start() {
// 确保管道文件不存在Windows
if (process.platform === 'win32') {
try {
if (fs.existsSync(this.pipePath)) {
fs.unlinkSync(this.pipePath);
}
} catch (error) {
// 忽略错误,可能管道正在使用
}
}
this.server = net.createServer(socket => {
const clientId = uuidv4();
socket.clientId = clientId;
this.clients.set(clientId, socket);
this._logger.debug(`Client connected: ${clientId}`);
socket.on('data', data => {
try {
const messageStrs = data.toString().split('\n');
messageStrs.forEach(messageStr => {
if (messageStr) {
const message = JSON.parse(messageStr);
this.handleMessage(clientId, message);
}
});
// const message = JSON.parse();
// this.handleMessage(clientId, message);
} catch (error) {
console.error('Error parsing message:', error);
this.sendError(clientId, 'Invalid message format');
}
});
socket.on('close', () => {
this.handleClientDisconnect(clientId);
this.clients.delete(clientId);
this._logger.debug(`Client disconnected: ${clientId}`);
});
socket.on('error', error => {
console.error(`Client error (${clientId}):`, error);
this.handleClientDisconnect(clientId);
this.clients.delete(clientId);
});
});
// 启动命名管道服务器
this._start()
}
_start(){
this.server.listen(this.pipePath, () => {
this._logger.debug(`Lock server listening on named pipe: ${this.pipePath}`);
});
this.server.on('error', error => {
console.error('Server error:', error);
});
}
/**
* 停止服务器
*/
stop() {
if (this.server) {
this.server.close();
this._logger.debug('Lock server stopped');
}
}
}
// // 启动服务器
// const pipePath = process.platform === 'win32'
// ? '\\\\.\\pipe\\rwlock-server'
// : '/tmp/rwlock.sock';
// const server = new NamedPipeLockServer(pipePath);
// server.start();
// // 优雅关闭
// process.on('SIGINT', () => {
// this._logger.debug('Shutting down lock server...');
// server.stop();
// process.exit(0);
// });
// process.on('SIGTERM', () => {
// this._logger.debug('Shutting down lock server...');
// server.stop();
// process.exit(0);
// });
module.exports = NamedPipeLockServer;