Node 异步编程

解释下此段node代码

1
2
3
4
5
6
7
8
9
10
11
12
var proxy = new events.EventEmitter(); 
var status = "ready";
var select = function (callback) {
proxy.once("selected", callback);
if (status === "ready") {
status = "pending";
db.select("SQL", function (results) {
proxy.emit("selected", results);
status = "ready";
});
}
};

这段代码主要演示了如何使用 Node.js 的 EventEmitter 来处理异步操作,同时确保某个操作在特定状态下只会执行一次。

代码解析:

  1. 创建一个 EventEmitter 实例:

    1
    var proxy = new events.EventEmitter();

    proxy 是一个 EventEmitter 对象,可以用于在程序中发出和监听事件。

  2. 初始化状态变量:

    1
    var status = "ready";

    status 是一个字符串,表示操作的当前状态。初始状态为 "ready",表示可以执行操作。

  3. 定义 select 函数:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    var select = function (callback) { 
    proxy.once("selected", callback);
    if (status === "ready") {
    status = "pending";
    db.select("SQL", function (results) {
    proxy.emit("selected", results);
    status = "ready";
    });
    }
    };

    这个 select 函数接受一个回调函数 callback,并包含以下几个关键部分:

    • 注册事件监听器:

      1
      proxy.once("selected", callback);

      proxy.once("selected", callback)proxy 注册了一个一次性的事件监听器。当 proxy 触发 selected 事件时,callback 会被执行。

    • 检查并更新状态:

      1
      2
      if (status === "ready") { 
      status = "pending";

      如果当前 status"ready",即操作尚未执行,则将 status 更新为 "pending",表示操作正在进行。

    • 异步操作并发出事件:

      1
      2
      3
      4
      db.select("SQL", function (results) { 
      proxy.emit("selected", results);
      status = "ready";
      });

      模拟一个异步的数据库查询操作(db.select("SQL", ...))。查询完成后,通过 proxy.emit("selected", results) 触发 selected 事件,并传递查询结果 results。然后将 status 重置为 "ready",表示操作已完成,准备好接受下一次请求。

总结:

  • select 函数被调用时,如果 status"ready",则启动一个异步操作(比如数据库查询),并将状态设置为 "pending" 以防止重复操作。
  • 当查询完成后,select 函数会通过 proxy.emit 触发 selected 事件,所有监听该事件的回调都会执行。
  • 如果在同一时刻有多个请求调用 select,只有第一个请求会触发异步操作,其余请求会等待该操作完成并接收到相同的结果。

列举实际场景的例子

这段代码的模式在许多实际场景中都能派上用场,尤其是在处理需要进行异步操作并确保操作不会重复触发的情况下。以下是几个实际场景的例子:

1. 数据库连接池初始化:

假设你有一个应用程序需要连接到一个数据库。你想确保数据库连接池只初始化一次,无论有多少个请求同时到达。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var proxy = new events.EventEmitter();
var status = "ready";

var initializeDatabase = function(callback) {
proxy.once("initialized", callback);
if (status === "ready") {
status = "pending";
db.connectPool(config, function(err, pool) {
if (err) return proxy.emit("initialized", err);
proxy.emit("initialized", pool);
status = "ready";
});
}
};

// 在多个地方使用初始化的数据库连接池
initializeDatabase(function(pool) {
// 使用连接池执行查询
pool.query('SELECT * FROM users', function(err, results) {
// 处理查询结果
});
});

解释:

  • initializeDatabase 确保数据库连接池只初始化一次,并在后续所有请求中重用该连接池。

2. API 密钥获取:

当你需要从外部服务获取 API 密钥,并确保多个请求不会导致重复的密钥获取操作时,可以使用这种模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var proxy = new events.EventEmitter();
var status = "ready";

var fetchApiKey = function(callback) {
proxy.once("fetched", callback);
if (status === "ready") {
status = "pending";
externalService.getApiKey(function(err, apiKey) {
if (err) return proxy.emit("fetched", err);
proxy.emit("fetched", apiKey);
status = "ready";
});
}
};

// 在多个地方使用获取的 API 密钥
fetchApiKey(function(apiKey) {
// 使用 API 密钥访问服务
externalService.useApiKey(apiKey);
});

解释:

  • fetchApiKey 函数确保从外部服务获取 API 密钥的操作只会执行一次。后续请求将使用已经获取到的 API 密钥。

3. 配置文件加载:

当你需要在程序中加载配置文件并确保配置文件只加载一次时,可以使用这种模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var proxy = new events.EventEmitter();
var status = "ready";

var loadConfig = function(callback) {
proxy.once("loaded", callback);
if (status === "ready") {
status = "pending";
fs.readFile('/path/to/config.json', 'utf8', function(err, data) {
if (err) return proxy.emit("loaded", err);
var config = JSON.parse(data);
proxy.emit("loaded", config);
status = "ready";
});
}
};

// 在多个地方使用加载的配置
loadConfig(function(config) {
// 使用配置文件中的设置
console.log(config.database.host);
});

解释:

  • loadConfig 函数确保配置文件只会加载一次,然后将结果传递给所有需要它的地方。

4. 缓存加载:

假设你需要从远程服务器加载一些数据并缓存它,以便后续请求可以直接使用缓存,而不是重复加载数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var proxy = new events.EventEmitter();
var status = "ready";
var cache = null;

var loadData = function(callback) {
proxy.once("dataLoaded", callback);
if (status === "ready" && !cache) {
status = "pending";
remoteService.getData(function(err, data) {
if (err) return proxy.emit("dataLoaded", err);
cache = data;
proxy.emit("dataLoaded", cache);
status = "ready";
});
} else if (cache) {
// 如果已经有缓存,直接返回
process.nextTick(() => callback(cache));
}
};

// 在多个地方使用加载的缓存数据
loadData(function(data) {
// 使用数据
console.log(data);
});

解释:

  • loadData 函数在第一次调用时从远程服务器加载数据并缓存它。后续请求直接使用缓存的数据,而不需要再次加载。

如果在同一时刻有多个请求调用 select,只有第一个请求会触发异步操作,其余请求会等待该操作完成并接收到相同的结果。针对这句话,同一时刻有多个请求调用 select,能列举具体的例子吗

当在同一时刻有多个请求调用 select 时,这意味着在应用程序中有多个操作或函数几乎同时触发,且这些操作都依赖于 select 函数执行的异步操作。以下是几个具体的例子:

1. Web Server 多个请求触发数据库查询

假设你有一个简单的 Node.js Web 服务器,其中某个 API 端点会触发 select 函数来查询数据库。多个客户端同时向该 API 发送请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const http = require('http');
const events = require('events');

var proxy = new events.EventEmitter();
var status = "ready";

var select = function(callback) {
proxy.once("selected", callback);
if (status === "ready") {
status = "pending";
db.select("SQL", function(results) {
proxy.emit("selected", results);
status = "ready";
});
}
};

http.createServer((req, res) => {
if (req.url === '/data') {
select((results) => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(results));
});
}
}).listen(8080);

解释:

  • 当多个客户端几乎同时请求 /data 端点时,所有这些请求都会调用 select 函数。
  • 第一个请求会触发 db.select 来查询数据库,而其他请求会等待该查询完成并接收到相同的查询结果。
  • 数据库查询完成后,结果将返回给所有等待中的请求。

2. RESTful API 缓存

假设你有一个 RESTful API 服务器,每次请求都会从一个外部服务获取数据。你希望避免多次请求同时触发对外部服务的调用,并希望利用缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const express = require('express');
const events = require('events');
const app = express();

var proxy = new events.EventEmitter();
var status = "ready";
var cache = null;

var fetchData = function(callback) {
proxy.once("dataFetched", callback);
if (status === "ready") {
status = "pending";
externalService.getData((data) => {
cache = data;
proxy.emit("dataFetched", data);
status = "ready";
});
} else if (cache) {
process.nextTick(() => callback(cache));
}
};

app.get('/info', (req, res) => {
fetchData((data) => {
res.json(data);
});
});

app.listen(3000, () => console.log('Server running on port 3000'));

解释:

  • 当多个用户几乎同时请求 /info 端点时,他们都会触发 fetchData
  • 第一个请求会触发 externalService.getData 来获取数据,而其他请求则会等待该数据获取操作完成。
  • 一旦外部服务的数据获取成功,所有等待中的请求都将接收到相同的数据。

3. 后台任务触发

假设你有一个后台任务调度器,它会定期触发 select 函数来从数据库中获取一些数据用于处理。多个任务可能会同时触发该操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const cron = require('node-cron');
const events = require('events');

var proxy = new events.EventEmitter();
var status = "ready";

var select = function(callback) {
proxy.once("selected", callback);
if (status === "ready") {
status = "pending";
db.select("SQL", function(results) {
proxy.emit("selected", results);
status = "ready";
});
}
};

// 两个不同的任务几乎同时运行
cron.schedule('*/5 * * * * *', () => {
select((results) => {
console.log('Task 1 received results:', results);
});
});

cron.schedule('*/5 * * * * *', () => {
select((results) => {
console.log('Task 2 received results:', results);
});
});

解释:

  • 两个定时任务几乎同时运行,并且都调用 select 函数。
  • 第一个任务会触发数据库查询,其他任务会等待查询完成并使用相同的查询结果。
  • 查询结果将传递给所有等待任务。

4. 大型应用中的模块化数据加载

在一个大型应用中,不同的模块可能会同时触发对共享数据的加载。例如,一个电商网站的多个部分可能同时需要商品数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
const events = require('events');

var proxy = new events.EventEmitter();
var status = "ready";

var loadProductData = function(callback) {
proxy.once("dataLoaded", callback);
if (status === "ready") {
status = "pending";
productService.getProducts((data) => {
proxy.emit("dataLoaded", data);
status = "ready";
});
}
};

// 首页加载商品数据
function loadHomePage() {
loadProductData((data) => {
console.log('Home Page received product data:', data);
});
}

// 商品详情页加载商品数据
function loadProductPage() {
loadProductData((data) => {
console.log('Product Page received product data:', data);
});
}

// 同时调用两个模块
loadHomePage();
loadProductPage();

解释:

  • loadHomePageloadProductPage 几乎同时调用,并且都需要商品数据。
  • 第一个调用会触发数据加载,其他模块会等待数据加载完成并使用相同的数据。
  • 数据加载完成后,结果会传递给所有需要它的模块。

这些例子展示了在各种应用场景下,多个请求几乎同时调用 select 或类似的函数,只有第一个请求会实际执行异步操作,其余请求则会等待并共享相同的结果。

EventProxy

EventProxy 是一个在 Node.js 环境中非常实用的库,用于处理复杂的事件组合和异步逻辑。它能够帮助你管理多个异步操作,尤其是在需要等待多个事件完成后再执行某些操作时非常有用。

基本概念

EventProxy 通过代理多个事件来协调异步操作。你可以监听多个事件,等到所有事件都触发后,再执行一个回调函数。

安装

要使用 EventProxy,首先需要安装该库:

1
npm install eventproxy

使用场景与示例

1. 等待多个异步操作完成后执行

假设你有两个异步操作:一个是从数据库获取用户信息,另一个是从 API 获取用户的社交媒体信息。你希望在这两个操作都完成后,再处理结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
const EventProxy = require('eventproxy');
const ep = new EventProxy();

function getUserInfo(callback) {
// 模拟异步操作
setTimeout(() => {
callback(null, { id: 1, name: 'Alice' });
}, 100);
}

function getUserSocial(callback) {
// 模拟异步操作
setTimeout(() => {
callback(null, { twitter: '@alice', facebook: 'alice.fb' });
}, 200);
}

ep.all('user', 'social', function (userInfo, socialInfo) {
// 当 'user' 和 'social' 两个事件都触发后,执行这个回调
console.log('User Info:', userInfo);
console.log('Social Info:', socialInfo);
});

getUserInfo(function (err, userInfo) {
if (err) {
ep.emit('error', err);
} else {
ep.emit('user', userInfo);
}
});

getUserSocial(function (err, socialInfo) {
if (err) {
ep.emit('error', err);
} else {
ep.emit('social', socialInfo);
}
});

解释:

  • ep.all('user', 'social', callback) 方法监听 usersocial 两个事件。当这两个事件都触发后,才会执行 callback 回调。
  • getUserInfogetUserSocial 函数分别执行异步操作,完成后通过 ep.emit 触发相应事件。

2. 处理任意数量的异步操作

EventProxy 也可以用来处理多个相同类型的异步操作,比如批量获取多个用户的详情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const EventProxy = require('eventproxy');
const ep = new EventProxy();

function getUserDetails(userId, callback) {
// 模拟异步操作
setTimeout(() => {
callback(null, { id: userId, name: `User ${userId}` });
}, Math.random() * 200);
}

const userIds = [1, 2, 3, 4, 5];
userIds.forEach((userId) => {
getUserDetails(userId, function (err, userDetails) {
if (err) {
ep.emit('error', err);
} else {
ep.emit('userDetails', userDetails);
}
});
});

// 当所有用户详情都获取完成后,执行回调
ep.after('userDetails', userIds.length, function (userDetailsList) {
console.log('All user details:', userDetailsList);
});

解释:

  • ep.after('userDetails', userIds.length, callback) 方法监听 userDetails 事件指定次数(即 userIds.length),当所有事件都触发后,执行 callback 回调。
  • getUserDetails 函数执行获取用户详情的异步操作,每次获取成功后通过 ep.emit('userDetails', userDetails) 触发事件。

3. 处理错误情况

EventProxy 还提供了简单的错误处理方式。你可以监听 error 事件,当任意异步操作失败时,统一处理错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
const EventProxy = require('eventproxy');
const ep = new EventProxy();

function getUserInfo(callback) {
// 模拟异步操作
setTimeout(() => {
callback(null, { id: 1, name: 'Alice' });
}, 100);
}

function getUserSocial(callback) {
// 模拟一个错误
setTimeout(() => {
callback(new Error('Failed to fetch social info'));
}, 200);
}

ep.all('user', 'social', function (userInfo, socialInfo) {
console.log('User Info:', userInfo);
console.log('Social Info:', socialInfo);
});

ep.fail((err) => {
console.error('An error occurred:', err);
});

getUserInfo(function (err, userInfo) {
if (err) {
ep.emit('error', err);
} else {
ep.emit('user', userInfo);
}
});

getUserSocial(function (err, socialInfo) {
if (err) {
ep.emit('error', err);
} else {
ep.emit('social', socialInfo);
}
});

解释:

  • ep.fail(callback) 方法用于处理所有 ep.emit('error', err) 触发的错误。当任意异步操作中发生错误,都会通过 ep.emit('error', err) 传递给 ep.fail 处理。

总结

EventProxy 是处理复杂异步操作、事件组合、以及多异步操作同步控制的强大工具。通过 allafterfail 等方法,你可以轻松地协调多个异步事件,简化代码逻辑,避免回调地狱或手动管理复杂的异步状态。

作者

John Doe

发布于

2024-10-16

更新于

2024-10-17

许可协议

You need to set install_url to use ShareThis. Please set it in _config.yml.
You forgot to set the business or currency_code for Paypal. Please set it in _config.yml.

评论

You forgot to set the shortname for Disqus. Please set it in _config.yml.
You need to set client_id and slot_id to show this AD unit. Please set it in _config.yml.