Robos baixando arquivos em paralelo

Há algumas semanas fiz uma entrevista, e um dos desafios era fazer o download de alguns arquivos em paralelo, e eu achei muito interessante, então resolvi fazer um artigo sobre isso. Falando um pouco do desafio, me foi fornecida a função pooledDownload que recebe uma lista de URLs, uma função connect que retorna uma conexão, o número máximo de conexões permitidas, e uma lista de URLs que devem ser baixadas e salvas; a ação deve ser feita em paralelo.

Critérios de aceitação

  1. Todos os arquivos devem ser baixados e salvos.
  2. O conteúdo do arquivo deve ser salvo assim que o arquivo for baixado.
  3. Downloads devem ser distribuídos igualmente entre as conexões.
  4. Uma conexão só pode baixar um arquivo por vez. Você deve criar mais conexões para baixar mais arquivos em paralelo.
  5. Qualquer conexão aberta deve ser fechada.
  6. Se não for possível abrir conexão com o servidor, você deve rejeitar com um Error contendo a mensagem Connection Failed.
  7. Se um erro ocorrer durante o download, você deve rejeitar com o mesmo erro.
  8. Às vezes o servidor pode não ter slots para lidar com o número de conexões simultâneas. Nesse caso, você deve parar a abertura de novas conexões quando o servidor alcançar o limite.
index.js
const pooledDownload = async (connect, save, downloadList, maxConcurrency) => {
  // Implemente a função aqui
};
module.exports = pooledDownload;

Solução

O primeiro passo que eu fiz foi criar a função getConnections, ela recebe a função connect e o número máximo de conexões permitidas, e retorna um array de conexões abertas.

index.js
const getConnections = async (connect, maxConcurrency) => {
  const connections = [];
  for (let i = 0; i < maxConcurrency; i++) {
    try {
      connections.push(await connect());
    } catch (e) {
      break;
    }
  }
 
  return connections;
};

Eu sei que o break no catch, não é uma boa prática, mas é uma maneira simples de assegurar que estejamos em conformidade com o critério 8.

Agora que temos as conexões abertas, podemos implementar uma parte da função pooledDownload.

index.js
const pooledDownload = async (connect, save, downloadList, maxConcurrency) => {
  const filesToDownload = downloadList.slice(0);
  const maxConnectionNeeded = Math.min(maxConcurrency, downloadList.length);
  const connections = await getConnections(connect, maxConnectionNeeded);
 
  if (!connections || !connections.length) {
    throw new Error("connection failed");
  }
};

A primeira ação que fiz foi criar uma cópia da lista de arquivos a serem baixados, e depois pegar o mínimo entre o número de conexões permitidas e o número de arquivos a serem baixados, e então abrir as conexões. Se não for possível abrir nenhuma conexão, eu rejeito a Promise com um Error contendo a mensagem Connection Failed.

Você pode se perguntar o porquê de usar o mínimo entre o número de conexões permitidas e o número de arquivos a serem baixados, isso é para garantir que não abriremos mais conexões do que o necessário.

O próximo passo é criar um loop que irá baixar os arquivos.

index.js
const promise = [];
for (let i = 0; i < maxConnectionNeeded; i++) {
  promise.push(execute(connections, filesToDownload, save));
}
 
try {
  await Promise.all(promise);
} finally {
  // fechar conexões
}

A função execute é responsável por baixar os arquivos, e é onde a magia acontece.

index.js
const execute = (connections, downloadList, save) => {
  if (downloadList.length === 0 || connections.length === 0) return Promise.resolve();
 
  const currentDownload = downloadList.shift();
  const connection = connections.pop();
 
  return connection
    .download(currentDownload)
    .then((result) => {
      save(result);
      connections.unshift(connection);
      return execute(connections, downloadList, save);
    })
    .catch((e) => {
      downloadList.unshift(currentDownload);
      return Promise.reject(e);
    });
};

O que acontece na função execute é que eu verifico se ainda tem arquivos para serem baixados e se ainda tem conexões abertas, se não tiver eu retorno uma promessa resolvida. Então eu pego o primeiro arquivo da lista de arquivos a serem baixados, e a primeira conexão aberta, e então eu baixo o arquivo, salvo o conteúdo do arquivo, e então eu coloco a conexão de volta no início do array de conexões, e chamo a função execute novamente. Se um erro ocorrer durante o download, eu coloco o arquivo de volta na lista de arquivos a serem baixados, e rejeito a promessa com o mesmo erro.

Por fim, eu fecho as conexões.

index.js
try {
  await Promise.all(promise);
} finally {
  connections.forEach((connection) => connection.close());
}

Conclusão

Esse foi um desafio muito interessante, e eu gostei muito de fazer, eu aprendi muito, e espero que você também tenha aprendido. Se você tiver alguma dúvida, sugestão ou crítica, por favor mande uma mensagem, eu vou adorar ouvir o que você tem a dizer.

O resultado final fica assim:

index.js
const getConnections = async (connect, maxConcurrency) => {
  const connections = [];
  for (let i = 0; i < maxConcurrency; i++) {
    try {
      connections.push(await connect());
    } catch (e) {
      break;
    }
  }
 
  return connections;
};
 
const execute = (connections, downloadList, save) => {
  if (downloadList.length === 0 || connections.length === 0) return Promise.resolve();
 
  const currentDownload = downloadList.shift();
  const connection = connections.pop();
 
  return connection
    .download(currentDownload)
    .then((result) => {
      save(result);
      connections.unshift(connection);
      return execute(connections, downloadList, save);
    })
    .catch((e) => {
      downloadList.unshift(currentDownload);
      return Promise.reject(e);
    });
};
 
const pooledDownload = async (connect, save, downloadList, maxConcurrency) => {
  const filesToDownload = downloadList.slice(0);
  const maxConnectionNeeded = Math.min(maxConcurrency, downloadList.length);
  const connections = await getConnections(connect, maxConnectionNeeded);
 
  if (!connections || !connections.length) {
    throw new Error("connection failed");
  }
 
  const promise = [];
  for (let i = 0; i < maxConnectionNeeded; i++) {
    promise.push(execute(connections, filesToDownload, save));
  }
 
  try {
    await Promise.all(promise);
  } finally {
    connections.forEach((connection) => connection.close());
  }
};
 
module.exports = pooledDownload;