Node.js 异步流程同步化

这是一篇旧文,其中的内容可能已经过时。

这些天,我在陆续地用 Node.js 重写以前的 Python 和 PHP 代码,是为了让技能树长得更高,而修剪掉次要的技能。

大三时发现学校某系统上有学生证件照,而且它们的地址是明文(学号)。于是写了个 Python 脚本,去抓取所有学生的照片。看看当时的代码,不得不说 Python 确实优雅,怪不得那么多人说“人生苦短,我用 Python”。

from urllib import request, error
import shutil, os, os.path

baseurl = 'http://无可奉告/'
chances = 10
for grade in range(11, 16):
    if not os.path.exists('%02.0f' % grade):
        os.mkdir('%02.0f' % grade + '/')
    for school in range(1, 30):
        path = '%02.0f' % grade + '/'  + '%02.0f' % school
        if not os.path.exists(path):
            os.mkdir(path + '/')
        for t in range(1, 10):
            for d in range(10 if t == 1 else 1):
                for i in range(1, 1000):
                    if chances:
                        id = '%02.0f' % grade + str(t) + '%02.0f' % school + str(d) + '%03.0f' % i
                        print(id)
                        filename = path + '/' + id + '.jpg'
                        if not os.path.exists(filename):
                            url = baseurl + id + '.jpg'
                            try:
                                request.urlretrieve(url, filename)
                                chances = 10
                            except error.HTTPError:
                                chances -= 1
                    else:
                        chances = 10
                        break

相对而言,JavaScript(下文简称 JS)就不优雅了,部分由基因决定:Python 作为纯后端语言,可以使用缩进来定义代码块,既保持了良好的代码格式,又省去了冗长的大括号。而 JS 作为一门前端语言,为了节省带宽往往要做代码压缩,所以代码块必须用大括号来标明。另一部分原因是,Python 内置了许多实用函数,虽然 npm 社区有许多开源的包作为替代 ,但为了练手我先不用。事实上,本文基本围绕着urllib.request.urlretrieve这个函数展开。

这段程序大致分为两块:其一为迭代器,遍历所有应抓取的学号。其二为抓取文件并写入磁盘。现在开始翻译吧!首先,引入模块,补全实用函数。

'use strict';

const http = require('http');
const fs = require('fs');

function pad(str, width) {
  return '0'.repeat(width - ('' + str).length) + str;
}
function* range(a, b) {
  let start = a,
    end = b;
  if (b === undefined) {
    start = 0;
    end = a;
  }
  for (let i = start; i < end; i++) {
    yield i;
  }
}
function exists(path) {
  try {
    fs.accessSync(path);
    return true;
  } catch (err) {
    return false;
  }
}

const baseurl = 'http://无可奉告/';
let chances = 10;
let path;

和 Python 不同的是,这个 range 函数仅仅返回一个可迭代对象而不是一个数组,以减少内存占用。什么叫可迭代?看下边的例子你就懂了:

for (let x of range(3)) {
  console.log(x);
}
// 0
// 1
// 2

[...range(2, 6)];
// [2, 3, 4, 5]

接下来,为了专注于主题,先把学号迭代器分离出来,同样使用生成器函数:

const iter = (function* () {
  for (let grade of [15]) {
    grade = pad(grade, 2);
    if (!exists(grade)) {
      fs.mkdirSync(grade); }
    for (let school of range(1, 30)) {
      school = pad(school, 2);
      path = `${grade}/${school}`;
      if (!exists(path)) {
        fs.mkdirSync(path); }
      for (let t of range(1, 10)) {
        for (let d of range(t === 1 ? 10 : 1)) {
          for (let i of range(1, 1000)) {
            const identity = grade + t + school + d + pad(i, 3);
            const filename = `${path}/${identity}.jpg`;
            if (!chances) {
              chances = 10;
              break;
            } else if (!exists(filename)) {
              yield identity;
            } else {
              chances = 10;
            }}}}}}})();

把大括号叠起来,看起来更像 Python 了。这个iter会一次吐一个学号出来。它的行为会根据外部环境(文件是否已存在? 已连续失败多少次?)进行微调。

基于协程的同步流程

Node.js 自带的文件读写函数提供了同步和异步两个版本,为了写起来舒服,看起来也舒服,我这里尽量用了同步的。但是,http库的get方法并未提供同步版本。有没有什么办法能造出一个同步版本的get,即下文的getSync呢?

for (let identity of iter) {
  console.log(identity);
  const url = `${baseurl}${identity}.jpg`;
  try {
    const imageData = getSync(url);
    const filename = `${path}/${identity}.jpg`;
    fs.writeFileSync(filename, imageData, 'binary');
    chances = 10;
  } catch (err) {
    if (err.message === 'Not 200') {
      chances--;
    } else {
      console.log(err);
      fs.appendFileSync('failed.txt', identity + '\n');
    }
  }
}

getSync函数应当是http.get函数的封装。其接受url作为参数,(简单起见)仅返回响应主体,或者抛出一个错误。先试着写一个出来:

function getSync(url) {
  let done = false,
    e = false,
    imageData = '';
  http
    .get(url, (res) => {
      if (res.statusCode !== 200) {
        e = new Error('Not 200');
        return;
      }
      res.setEncoding('binary');
      res.on('data', (chunk) => {
        imageData += chunk;
      });
      res.on('end', () => {
        done = true;
      });
    })
    .on('error', (err) => {
      e = err;
    });

  while (!done && !e) {
    // do nothing
  }
  if (e) {
    throw e;
  }
  return imageData;
}

点击,运行,啪!程序卡死在了 while 循环处。因为 JS 运行时是单线程模型,http.get的回调函数被放到了消息队列中,必须等待主线程执行完毕,才会执行。换言之,while 循环阻塞了回调函数,故donee的值永远不会发生变化。

为了修复这个问题,我们需要操控一种叫协程(coroutine),又名纤程(fiber)的东西。协程是一种微线程,表现为函数调用,可随时中断并记忆执行环境,也可随时恢复运行。其实生成器的yield语句就是协程的一种实现,只不过没有暴露直接操控协程的接口。所幸,有个用 C 语言底层实现的库node-fibers满足了我的需要。

const Fiber = require('fibers');

function getSync(url) {
  const f = Fiber.current;
  let imageData = '';
  let err;
  http
    .get(url, (res) => {
      if (res.statusCode !== 200) {
        err = new Error('Not 200');
        return f.run();
      }
      res.setEncoding('binary');
      res.on('data', (chunk) => {
        imageData += chunk;
      });
      res.on('end', () => {
        f.run();
      });
    })
    .on('error', (e) => {
      err = e;
      f.run();
    });

  Fiber.yield();
  if (err) {
    throw err;
  }
  return imageData;
}

使用起来很简单,用Fiber.yield()中断当前协程,f.run()启动指定协程。这个问题就这样解决啦!不过还要注意一点,for (let id of iter)这个语句要整个包在一个函数里,并传给Fiber()作为参数,像这样:

Fiber(function () {
  for (let identity of iter) {
    // ...
  }
}).run();

当整个流程被包装成一个协程时,我们才能中断或启动它。否则我们是不能暂停主线程的。

附:基于事件回调的异步流程

iter还是那个iter,倒也是挺简洁的。

(function run() {
  const { value: identity, done } = iter.next();
  if (done) {
    return;
  }

  console.log(identity);
  const url = `${baseurl}${identity}.jpg`;
  http
    .get(url, (res) => {
      const filename = `${path}/${identity}.jpg`;
      if (res.statusCode !== 200) {
        chances--;
        return run();
      }
      res.setEncoding('binary');
      res.on('data', (chunk) => {
        fs.appendFileSync(filename, chunk, 'binary');
      });
      res.on('end', () => {
        chances = 10;
        run();
      });
    })
    .on('error', (err) => {
      console.log(err);
      fs.appendFileSync('failed.txt', identity + '\n');
      run();
    });
})();

附:基于 Promise 的异步流程

虽然是 ES6 的新内容,但 Promise/A+规范其实很老了,互联网上也有很多关于 Promise 的优质文章,所以这里就不详细介绍了。

简而言之,Promise 要求程序定义,一个异步操作何时叫成功(resolve),何时叫失败(reject)。而且若这个操作已成功,就绝不会再失败,反之亦然。还有个好处是,状态的处理函数可以在状态改变后添加,随后立即执行。而回调函数只能在事件触发前添加。

function get(url) {
  return new Promise((resolve, reject) => {
    http
      .get(url, (res) => {
        if (res.statusCode !== 200) {
          reject(new Error('Not 200'));
        }
        let imageData = '';
        res.setEncoding('binary');
        res.on('data', (chunk) => {
          imageData += chunk;
        });
        res.on('end', () => {
          resolve(imageData);
        });
      })
      .on('error', (err) => {
        reject(err);
      });
  });
}

(function run() {
  const { value: identity, done } = iter.next();
  if (done) {
    return;
  }

  console.log(identity);
  const url = `${baseurl}${identity}.jpg`;
  get(url)
    .then((imageData) => {
      const filename = `${path}/${identity}.jpg`;
      fs.writeFileSync(filename, imageData, 'binary');
      chances = 10;
      run();
    })
    .catch((err) => {
      if (err.message === 'Not 200') {
        chances--;
      } else {
        console.log(err);
        fs.appendFileSync('failed.txt', identity + '\n');
      }
      run();
    });
})();