配置中心是怎么推送的?动手实现一个 Long Polling 长轮询 - 今日头条

本文由 简悦 SimpRead 转码, 原文地址 www.toutiao.com

客户端 Codepackagecom.andy.example.longpolling.client;import java.io.Buffere

众所周知,数据交互有两种模式:Push(推模式)、Pull(拉模式)。

推模式指的是客户端与服务端建立好网络长连接,服务方有相关数据,直接通过长连接通道推送到客户端。其优点是及时,一旦有数据变更,客户端立马能感知到;另外对客户端来说逻辑简单,不需要关心有无数据这些逻辑处理。缺点是不知道客户端的数据消费能力,可能导致数据积压在客户端,来不及处理。

拉模式指的是客户端主动向服务端发出请求,拉取相关数据。其优点是此过程由客户端发起请求,故不存在推模式中数据积压的问题。缺点是可能不够及时,对客户端来说需要考虑数据拉取相关逻辑,何时去拉,拉的频率怎么控制等等。

说到 Long Polling(长轮询),必然少不了提起 Polling(轮询),这都是拉模式的两种方式。

Polling 是指不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。

Long Polling 原理也很简单,相比 Polling,客户端发起 Long Polling,此时如果服务端没有相关数据,会 hold 住请求,直到服务端有相关数据,或者等待一定时间超时才会返回。返回后,客户端又会立即再次发起下一次 Long Polling。这种方式也是对拉模式的一个优化,解决了拉模式数据通知不及时,以及减少了大量的无效轮询次数。

所谓的 hold 住请求指的服务端暂时不回复结果,保存相关请求,不关闭请求链接,等相关数据准备好,写会客户端。

前面提到 Long Polling 如果当时服务端没有需要的相关数据,此时请求会 hold 住,直到服务端把相关数据准备好,或者等待一定时间直到此次请求超时,这里大家是否有疑问,为什么不是一直等待到服务端数据准备好再返回,这样也不需要再次发起下一次的 Long Polling,节省资源?

主要原因是网络传输层主要走的是 tcp 协议,tcp 协议是可靠面向连接的协议,通过三次握手建立连接。但是所建立的连接是虚拟的,可能存在某段时间网络不通,或者服务端程序非正常关闭,亦或服务端机器非正常关机,面对这些情况客户端根本不知道服务端此时已经不能互通,还在傻傻的等服务端发数据过来,而这一等一般都是很长时间。

当然 tcp 协议栈在实现上有保活计时器来保证的,但是等到保活计时器发现连接已经断开需要很长时间,如果没有专门配置过相关的 tcp 参数,一般需要 2 个小时,而且这些参数是机器操作系统层面,所以,以此方式来保活不太靠谱,故 Long Polling 的实现上一般是需要设置超时时间的。

Long Polling 的实现很简单,可分为四个过程:

  • 发起 Polling,发起 Polling 很简单,只需向服务器发起请求,此时服务端还未应答,所以客户端与服务端之间一直处于连接状态。
  • 数据推送,如果服务器端有相关数据,此时服务端会将数据通过此前建立的通道发回客户端。
  • Polling 终止,Polling 终止情况有三种:若服务端返回相关数据,此时客户端收到数据后,关闭请求连接,结束此次 Polling 过程。若客户端等待设定的超时时间后,服务端依然没有返回数据,此时客户端需要主动终止此次 Polling 请求。若客户端收到网络故障或异常,此时客户端自然也是需要主动终止此次 Polling 请求。
  • 重新 Polling,终止上次 Polling 后,客户端需要立即再次发起 Polling 请求。这样才能保证拉取数据的及时性。

代码实现起来也很简单,Http Call 按照上述过程就很方便实现 LongPolling。下面 Code 只是简单展示过程,在具体场景下,根据具体的业务逻辑进行调整。

package com.andy.example.longpolling.client;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.HttpURLConnection;

import java.net.URL;

/**

  • Created by andy on 17/7/6.

*/

public class ClientBootstrap {

public static final String URL = “http://localhost:8080/long-polling”;

public static void main(String[] args) {

int i = 0;

while (true) {

System.out.println(“第” + (++i) + “次 longpolling”);

HttpURLConnection connection = null;

try {

URL getUrl = new URL(URL);

connection = (HttpURLConnection) getUrl.openConnection();

connection.setReadTimeout(50000);// 这就是等待时间,设置为 50s

connection.setConnectTimeout(3000);

connection.setRequestMethod(“GET”);

connection.setRequestProperty(“Accept-Charset”, “utf-8”);

connection.setRequestProperty(“Content-Type”, “application/json”);

connection.setRequestProperty(“Charset”, “UTF-8”);

if (200 == connection.getResponseCode()) {

BufferedReader reader = null;

try {

reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), “UTF-8”));

StringBuilder result = new StringBuilder(256);

String line = null;

while ((line = reader.readLine()) != null) {

result.append(line);

}

System.out.println(“结果” + result);

} finally {

if (reader != null) {

reader.close();

}

}

}

} catch (IOException e) {

} finally {

if (connection != null) {

connection.disconnect();

}

}

}

}

}

package com.andy.example.longpolling.server;

import javax.servlet.ServletException;

import javax.servlet.http.HttpServlet;

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpServletResponse;

import java.io.IOException;

import java.io.PrintWriter;

import java.util.Random;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicLong;

/**

  • Created by andy on 17/7/6.

*/

public class LongPollingServlet extends HttpServlet {

private Random random = new Random();

private AtomicLong sequenceId = new AtomicLong();

private AtomicLong count = new AtomicLong();

@Override

protected void doGet(HttpServletRequest request, HttpServletResponse response)

throws ServletException, IOException {

System.out.println(“第” + (count.incrementAndGet()) + “次 longpolling”);

int sleepSecends = random.nextInt(100);

// 随机获取等待时间,来通过 sleep 模拟服务端是否准备好数据

System.out.println(“wait” + sleepSecends + “second”);

try {

TimeUnit.SECONDS.sleep(sleepSecends);//sleep

} catch (InterruptedException e) {

}

PrintWriter out = response.getWriter();

long value = sequenceId.getAndIncrement();

out.write(Long.toString(value));

}

}

https://p3.toutiaoimg.com/origin/pgc-image/6dad97ddf1d04b0f997e45d037b9002b?from=pc

https://p3.toutiaoimg.com/origin/pgc-image/d37b528f258d4d6cb07858026fa5ea44?from=pc

WebQQ、Comet 都用到长轮询技术,另外一些使用 Pull 模式消费的消息系统,都会使用 Long Polling 技术进行优化。