千万级日志回放引擎设计稿#yyds干货盘点#

努力尽今夕,少年犹可夸。这篇文章主要讲述千万级日志回放引擎设计稿#yyds干货盘点#相关的知识,希望能为你提供帮助。
现在压测系统一直用的方案是goreplay进行二次开发完成的。因为整体是java技术栈的,使用goreplay有存在两方面问题:一是兼容性,语言和开发框架上,增加了用例创建执行的复杂度;二是维护成本,goreplay二次开发方案已经无法满足现在的性能测试需求。如果维护两套压测引擎会带来更多工作量。
所以为了尽可能解决这两方面问题,接到了一个活儿,调研一下Java实现日志回放功能。主要就是读了goreplay的源码以及它设计思路,用Java重现实现一遍。
这里用到了前两天分享的Disruptor高性能队列常用API演示、高性能队列Disruptor在测试中应用,有兴趣的可以再翻一翻。另视频版还在制作中,年后会和大家相见。
思路总体设计思路如下:

千万级日志回放引擎设计稿#yyds干货盘点#

文章图片

PS:流量递增和动态增减尚未实现,还在研究goreplay的源码。
日志拉取和解析日志的拉取和初步解析依旧采取原来项目中的逻辑,通过SQL语句网关日志中拉取日志,并对日志内容进行初步解析,放入云OSS中,并将链接存入数据库(此步骤放在录制流量成功之后)。
PS:目前日志解析保留的有用信息只有URL
【千万级日志回放引擎设计稿#yyds干货盘点#】日志格式如下:
/v1/level,funtester.com,-,token,-,1622611469,- /v1/level,funtester.com,-,token,-,1622611469,- /v1/level,funtester.com,-,token,-,1622611469,- /v1/level,funtester.com,-,token,-,1622611469,- /v1/level,funtester.com,-,token,-,1622611469,- /v1/level,funtester.com,-,token,-,1622611469,- /v1/level,funtester.com,-,token,-,1622611469,-

实现步骤
  • 首先将日志中有用信息(URL)以及token放到内存中
  • 通过配置host,读取URL,以及响应header(token,压测标识,常用header,模拟盘标识)组装HTTP请求。
  • 创建Disruptor对象,使用异步创建生产者
  • 通过消费者消费(发出请求)消息(HTTP请求对象),达到HTTP接口日志流量回复功能。
性能指标
  • 本机6C16G配置测试数据
  • 实测1千万URL读取速度约为9s ~ 13s,内存无压力,如果后续更大日志量需求,可以通过stream方式异步读取日志,实测日志读取速度在80万/s以上,满足目前需求。
  • 单生产者速度25万QPS
  • 单机测试QPS8.8万,CPU跑满,触及物理极限,此数据与之前工具对比压测差异不大。
风险
  • 消费者异步对消息进行存储,超过一定数量将会丢弃消息。这个问题在消费者速度小于生产者速度时会触发。
  • 消费者数量需要在启动前设定,如果参数设置不合理,会导致消费者压力瓶颈,无法动态增加消费者。
PS:这些风险后续会逐个解决。
代码实现 生产者Demo:
def ft = output("创建线程") fun int i = 0 while (key) def url = logs.get(i % logs.size()) def get = getHttpGet(HOST + url) get.addHeader("token", tokens.get(i % tokens.size())) get.addHeader(HttpClientConstant.USER_AGENT) ringBuffer.publishEvent e, s -> e.setRequest(get)i++ft()

读取文件代码
/** * 通过闭包传入方法读取超大文件部分内容 * * @param filePath * @param function * @return */ public static List< String> readByLine(String filePath, Function< String, String> function) if (StringUtils.isEmpty(filePath) || !new File(filePath).exists() || new File(filePath).isDirectory()) ParamException.fail("文件信息错误!" + filePath); logger.debug("读取文件名:", filePath); List< String> lines = new ArrayList< > (); File file = new File(filePath); if (file.isFile() & & file.exists())// 判断文件是否存在 try (FileInputStream fileInputStream = new FileInputStream(file); InputStreamReader read = new InputStreamReader(fileInputStream, DEFAULT_CHARSET); BufferedReader bufferedReader = new BufferedReader(read, 3 * 1024 * 1024); ) String line = null; while ((line = bufferedReader.readLine()) != null) String apply = function.apply(line); if (StringUtils.isNotBlank(apply)) lines.add(apply); catch (Exception e) logger.warn("读取文件内容出错", e); else logger.warn("找不到指定的文件:", filePath); return lines;

演示Demo
package com.funtest.groovytestimport com.funtester.base.constaint.FixedThread import com.funtester.config.HttpClientConstant import com.funtester.frame.execute.Concurrent import com.funtester.frame.execute.ThreadPoolUtil import com.funtester.httpclient.ClientManage import com.funtester.httpclient.FunLibrary import com.funtester.utils.ArgsUtil import com.funtester.utils.RWUtil import com.lmax.disruptor.EventHandler import com.lmax.disruptor.RingBuffer import com.lmax.disruptor.WorkHandler import com.lmax.disruptor.YieldingWaitStrategy import com.lmax.disruptor.dsl.Disruptor import com.lmax.disruptor.dsl.ProducerType import org.apache.http.client.methods.HttpGet import org.apache.http.client.methods.HttpRequestBase import org.junit.platform.commons.util.StringUtilsimport java.util.concurrent.LinkedBlockingDeque import java.util.function.Functionclass ReplayTest extends FunLibrary static String url = "http://localhost:12345/test"; static HttpGet httpGet = getHttpGet(url); //static LinkedBlockingQueue< HttpRequestBase> requests = new LinkedBlockingQueue< > ()static def HOST = "http://localhost:12345"static def key = truestatic Disruptor< RequestEvent> disruptorpublic static void main(String[] args) def logfile = "/Users/oker/Desktop/log.csv" //def logfile = "/Users/oker/Desktop/fun.csv" //1千万日志 def tokenfile = "/Users/oker/Desktop/token.csv" //2万用户token List< String> logs = RWUtil.readByLine(logfile, new Function< String, String> () @Override String apply(String s) return StringUtils.isNotBlank(s) & & s.startsWith("/") ? s.split(COMMA)[0] : null); List< String> tokens = RWUtil.readByLine(tokenfile, new Function< String, String> () @Override String apply(String s) return StringUtils.isNotBlank(s) ? s.split(COMMA)[4] : null); output("总计 $formatLong(logs.size()) 条日志") disruptor = new Disruptor< RequestEvent> ( RequestEvent::new, 512 * 512, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ); RingBuffer< RequestEvent> ringBuffer = disruptor.getRingBuffer(); def ft = output("创建线程") fun int i = 0 while (key) def url = logs.get(i % logs.size()) def get = getHttpGet(HOST + url) get.addHeader("token", tokens.get(i % tokens.size())) get.addHeader(HttpClientConstant.USER_AGENT) ringBuffer.publishEvent e, s -> e.setRequest(get)i++ft() disruptor.handleEventsWith(new FunTester(10)) //5.times ft()//下面开始测试 ClientManage.init(10, 5, 0, "", 0) def util = new ArgsUtil(args) def thread = util.getIntOrdefault(0, 20) def times = util.getIntOrdefault(1, 60000) RUNUP_TIME = util.getIntOrdefault(2, 0) def tasks = [] thread.times def tester = new FunTester(times) disruptor.handleEventsWith(tester); tasks < < testerdisruptor.start(); new Concurrent(tasks, "这是千万级日志回放演示Demo").start()private static class FunTester extends FixedThread implements EventHandler< RequestEvent> , WorkHandler< RequestEvent> LinkedBlockingDeque< HttpRequestBase> reqs = new LinkedBlockingDeque< HttpRequestBase> ()FunTester(int limit) super(null, limit, true)@Override protected void doing() throws Exception FunLibrary.executeOnly(reqs.take())@Override FixedThread clone() return new FunTester(limit)@Override protected void after() super.after() key = false disruptor.shutdown()@Override void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception if (reqs.size() < 100000) reqs.add(event.getRequest())@Override void onEvent(RequestEvent event) throws Exception if (reqs.size() < 100000) reqs.add(event.getRequest())private static class RequestEvent HttpRequestBase request; public HttpRequestBase getRequest() return request; public void setRequest(HttpRequestBase request) this.request = request;

PS:这里用到了多个group,原因在设计稿中标记了。
Have Fun ~ Tester !
  • 140道面试题目(UI、Linux、MySQL、API、安全)
  • 接口自动化面试题【思路分享】
  • 分享一份Fiddler学习包
  • Selenium自动化的JUnit参数化实践
  • 绑定手机号性能测试
  • 浏览器测试的三大挑战及解决方案【译】
  • 移动测试工程师职业
  • Groovy热更新Java实践
  • Java线程安全ReentrantLock
  • 无脚本测试
  • 如何mock固定QPS的接口
  • Selenium并行测试最佳实践
  • 连开100年会员会怎样
  • 控制台彩色输出

    推荐阅读