Strom自行整合kafka

前言
最近由于业务线上环境kafka版本有变,导致与其配套的storm已经redis版本都要随之更新。本来想着将kafka的线上版本改回之前版本,这样所有的东西都不用随之变动,不幸的是老大告知线上kafka版本不能变,所以只能去找对应的storm和redis版本了,(注:线上kafka为2.11.0.11.0),在更新storm时发现还没有与之对应的storm-kafka匹配版本,这就意味着不能用storm提供的storm与kafka的整合包了,所以只能自行手写整合。
任务
本次要完成的任务与之前相同,还是走一套完整的storm分析(kafka-storm-kafka-redis),用一个kafka的生产者模拟数据源发给storm,这里要提说的是storm用最传统的开发方式(topology-spout-bolt)
topology:拓扑(bolt的执行逻辑)
spout:storm的数据源,这里接收kafka的数据
bolt:计算任务组件,可以有一个或者多个
环境 kafka2.11.0.11.0
zookeeper3.4.5.0
storm1.0.1
redis3.2.11.0
数据流向
(1):kafkaProducer生产者发送数据到test主题
(2):storm从test主题获取数据(spout组件获取)
(3):spout组件发送给AllDataBolt进行数据处理
(4):AllDataBolt在execute()方法中进行业务处理并将所处理的数据下发给kafka各个数据主题
(5):kafka各个数据主题对应的消费者进行数据消费,并将消费后的数据存储至redis
代码实现
1: 假数据格式 假数据是一条字符串,每条字符串有25个字段,字段与字段之间用空格相隔,每一个字段代表一个特定属性,以下是部分假数据

16 608 0 10010018 17 10 18 19 13 37 839 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325219 138196 16 608 0 10010018 17 10 18 19 13 38 89 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325219 388327 16 608 0 10010018 17 10 18 19 13 38 339 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325219 638286 16 608 0 10010018 17 10 18 19 13 38 589 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325219 888342 16 608 0 10010018 17 10 18 19 13 38 839 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325220 138420 16 608 0 10010018 17 10 18 19 13 39 89 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325220 388487 16 608 0 10010018 17 10 18 19 13 39 339 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325220 638539

真实环境,这些数据都是球员所带的护腿板上发出的数据,由于我们是local测试,所以需要提前造一些假数据存放在txt中,数据格式与上格式吻合即可
2:maven配置及工具类
org.apache.storm storm-core 1.0.1 org.apache.curator curator-framework 2.10.0 log4j log4j org.jboss.netty netty commons-collections commons-collections org.apache.kafka kafka_2.110.11.0.0 org.apache.zookeeper zookeeper org.slf4j slf4j-log4j12 log4j log4j redis.clients jedis 2.8.1

HdtasScheme.java
import org.apache.storm.spout.Scheme; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.List; /** * Created by Jarno on 2017/5/31. */ public class HdtasScheme implements Scheme { private static final Charset UTF8_CHARSET; //协议类型 public static final String PROTOCOL_TYPE = "protocol_type"; //场地ID public static final String FIELD_ID = "field_id"; //主设备ID(终端id) public static final String UWB_ID = "uwb_id"; //护腿板ID public static final String SIGN_ID = "sign_id"; // 年 月 日时 分 秒毫秒 public static final String YEAR = "year"; public static final String MONTH = "month"; public static final String DAY = "day"; public static final String HOUR = "hour"; public static final String MINUTE = "minute"; public static final String SECOND = "second"; public static final String MILLISECOND = "millisecond"; //定位精度X Y Z public static final String X = "x"; public static final String Y = "y"; public static final String Z = "z"; //加速度 X Y Z public static final String A_SPEED_X = "a_speed_x"; public static final String A_SPEED_Y = "a_speed_y"; public static final String A_SPEED_Z = "a_speed_z"; //陀螺仪 X Y Z public static final String GYROSCOPE_X = "gyroscope_x"; public static final String GYROSCOPE_Y = "gyroscope_y"; public static final String GYROSCOPE_Z = "gyroscope_z"; //心率 public static final String HEART_RATE = "heart_rate"; //电池电量 public static final String ELECTRIC = "electric"; //电池充电状态1:充电0:放电 public static final String CHARGING_STATUS = "charging_status"; //Unix时间戳秒 public static final String SERVER_ACCEPT_TIME_S = "server_accept_time_s"; //Unix时间戳纳秒 public static final String SERVER_ACCEPT_TIME_N = "server_accept_time_n"; @Override public List deserialize(ByteBuffer ser) { String input = deserializeString(ser); String[] strs = input.split(" "); if (strs.length != 25) { return null; } return new Values(strs); }private static String deserializeString(ByteBuffer string) { if (string.hasArray()) { int base = string.arrayOffset(); return new String(string.array(), base + string.position(), string.remaining()); } else { return new String(Utils.toByteArray(string), UTF8_CHARSET); } }@Override public Fields getOutputFields() { return new Fields(PROTOCOL_TYPE, FIELD_ID, UWB_ID, SIGN_ID, YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, MILLISECOND, X, Y, Z, A_SPEED_X, A_SPEED_Y, A_SPEED_Z, GYROSCOPE_X, GYROSCOPE_Y, GYROSCOPE_Z, HEART_RATE, ELECTRIC, CHARGING_STATUS, SERVER_ACCEPT_TIME_S, SERVER_ACCEPT_TIME_N); }static { UTF8_CHARSET = Charset.forName("UTF-8"); } }
2:kafkaProducer模拟数据源读取txt发送数据 【Strom自行整合kafka】KafkaProducer.java
import kafka.serializer.StringEncoder; import org.apache.kafka.clients.producer.*; import org.apache.kafka.clients.producer.Producer; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.Properties; /** * @author lvfang * @create 2017-10-15 11:17 * @desc **/ public class KafkaProduce extends Thread { private String topic; //主题private String src; //数据源public KafkaProduce(String topic,String src){ super(); this.topic = topic; this.src = https://www.it610.com/article/src; }//创建生产者 private Producer createProducer(){ Properties properties = new Properties(); //kafka单节点 properties.put("metadata.broker.list", "192.168.90.240:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.90.240:9092"); return new KafkaProducer(properties); }public void run() { BufferedReader br = null; try {br = new BufferedReader(new FileReader(src)); // 创建生产者 Producer producer = createProducer(); String line = null; // 循环发送消息到kafka while ((line = br.readLine()) != null) { System.out.println("生产数据为:"+line); producer.send(new ProducerRecord(topic,line + "\n")); // 发送消息的时间间隔 Thread.sleep(1000); //TimeUnit.SECONDS.sleep(333); } } catch (Exception e) { } finally { try { if (br != null) br.close(); } catch (IOException e) {} } }public static void main(String[] args) { // 使用kafka集群中创建好的主题 test new KafkaProduce("htb_position_test","D:/testdata/htb_position_test_data.txt").start(); } }

写完生产之后我们先试试是否可以顺利发送数据
Strom自行整合kafka
文章图片
2017-10-19_103248.png Strom自行整合kafka
文章图片
2017-10-19_103312.png 数据顺利发送,第一步 我们的数据源就成功了
3:编写storm逻辑 这里storm扮演的角色主要是计算,它将计算好的数据再次分发送给kafka;在这个过程中,主要有三个组件参与到其中,spout、bolt、topology,spout作为数据源接收kafka对应主题所分发的数据;bolt是数据计算组件,每一个bolt做一次计算,当然,这里我们为了简单,都在一个bolt中处理了;topology主要是storm组件的拼凑逻辑,即决定bolt的执行顺序,类似于工作流中的逻辑图一样。
3.1:spout数据源,这里数据源spout进行了抽取,我们知道spout的实现由两种方式,这里使用实现IRichSpout KafkaSpout.java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.storm.Config; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; public abstract class KafkaSpout implements IRichSpout { private static final long serialVersionUID = 3679050534053612196L; public static final Logger logger = LoggerFactory.getLogger(KafkaSpout.class); private final Properties props; private SpoutOutputCollector collector; private List topicList; private final AtomicInteger spoutPending; //private int maxSpoutPending; private KafkaConsumer consumer; public KafkaSpout(Properties props, List topics) { this.props = props; initProps(); this.topicList = topics; this.spoutPending = new AtomicInteger(); }private void initProps() { this.props.put("key.deserializer", StringDeserializer.class); this.props.put("value.deserializer", ByteArrayDeserializer.class); }public abstract Fields generateFields(); public abstract List generateTuple(byte[] message); @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; //this.maxSpoutPending = Integer.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString()); }@Override public void close() {}@Override public void activate() { consumer = new KafkaConsumer(props); consumer.subscribe(topicList); final ExecutorService executor = Executors.newFixedThreadPool(topicList.size()); executor.submit(new Runnable() { @Override public void run() { while (true) { ConsumerRecords records = consumer.poll(100); Iterator iterator = records.iterator(); while (iterator.hasNext()) { if (spoutPending.get() <= 0) { sleep(1000); continue; } ConsumerRecord next = iterator.next(); byte[] message = next.value(); List tuple = null; try { tuple = generateTuple(message); } catch (Exception e) { e.printStackTrace(); } if (tuple == null) { continue; } //logger.info("kafka spout emit tuple:{}", tuple.toString()); collector.emit(tuple); spoutPending.decrementAndGet(); } } } }); }@Override public void deactivate() { consumer.close(); }@Override public void nextTuple() { //if (spoutPending.get() < maxSpoutPending) { //spoutPending.incrementAndGet(); //} spoutPending.incrementAndGet(); }@Override public void ack(Object msgId) { }@Override public void fail(Object msgId) {}@Override public Map getComponentConfiguration() { return null; }@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(generateFields()); }private void sleep(long millisecond) { try { Thread.sleep(millisecond); } catch (InterruptedException e) { e.printStackTrace(); } } }
上边的KafkaSpout是一个抽象方法,他实现了IRichSpout接口并重写了接收和发送消息的方法,而留有两个抽象方法为实现,一个是Fields generateFields(),他需要开发人员根据业务对不同的数据进行不同的实现,主要是返回数据切分后的列字段,已被bolt处用相应字段接收;另一个是List generateTuple(byte[] message)方法,他是一个真正的消息接收发送方法,他接收到消息对消息进行切分。业务人员要实现自己spout数据源需要基础KafkaSpout, 并重写以上两个方法。
HdtasSpout.java
import com.misbio.seer.storm.KafkaSpout; import com.misbio.seer.storm.hdtas.HdtasScheme; import org.apache.storm.tuple.Fields; import java.util.ArrayList; import java.util.List; import java.util.Properties; /** * @author lvfang * @create 2017-10-16 16:40 * @desc **/ public class HdtasSpout extends KafkaSpout {public HdtasSpout(Properties props, List topics){ super(props,topics); }public List generateTuple(byte[] message){ System.out.println(new String(message).split(" ")[1]); String[] messages = new String(message).split(" "); if(messages.length!=25) return null; List tuple = new ArrayList(); tuple.add(messages[0]); tuple.add(messages[1]); tuple.add(messages[2]); tuple.add(messages[3]); tuple.add(messages[4]); tuple.add(messages[5]); tuple.add(messages[6]); tuple.add(messages[7]); tuple.add(messages[8]); tuple.add(messages[9]); tuple.add(messages[10]); tuple.add(messages[11]); tuple.add(messages[12]); tuple.add(messages[13]); tuple.add(messages[14]); tuple.add(messages[15]); tuple.add(messages[16]); tuple.add(messages[17]); tuple.add(messages[18]); tuple.add(messages[19]); tuple.add(messages[20]); tuple.add(messages[21]); tuple.add(messages[22]); tuple.add(messages[23]); tuple.add(messages[24]); return tuple; }public Fields generateFields(){ return new Fields( HdtasScheme.PROTOCOL_TYPE, HdtasScheme.FIELD_ID, HdtasScheme.UWB_ID, HdtasScheme.SIGN_ID, HdtasScheme.YEAR, HdtasScheme.MONTH, HdtasScheme.DAY, HdtasScheme.HOUR, HdtasScheme.MINUTE, HdtasScheme.SECOND, HdtasScheme.MILLISECOND, HdtasScheme.X, HdtasScheme.Y, HdtasScheme.Z, HdtasScheme.A_SPEED_X, HdtasScheme.A_SPEED_Y, HdtasScheme.A_SPEED_Z, HdtasScheme.GYROSCOPE_X, HdtasScheme.GYROSCOPE_Y, HdtasScheme.GYROSCOPE_Z, HdtasScheme.HEART_RATE, HdtasScheme.ELECTRIC, HdtasScheme.CHARGING_STATUS, HdtasScheme.SERVER_ACCEPT_TIME_S, HdtasScheme.SERVER_ACCEPT_TIME_N ); }; }
3.2Bolt编写,此bolt主要对spout所发送的数据进行接收计算处理,处理之后并下发给对应的主题供kafka再次消费使用,这里同样bolt的实现方式也有两种,我们这里采用继承BaseBasicBolt的实现方式 HdtasDataAllBolt.java
import com.misbio.seer.kafka.DefaultProducer; import com.misbio.seer.storm.hdtas.HdtasScheme; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; /** * @author lvfang * @create 2017-10-16 17:33 * @desc **/ public class HdtasDataAllBolt extends BaseBasicBolt { private final static Logger LOG = LoggerFactory.getLogger(HdtasDataAllBolt.class); private String kafkaUri; private Producer createProducer; public HdtasDataAllBolt(){}public HdtasDataAllBolt(String kafkaUri) { this.kafkaUri = kafkaUri; }@Override public void prepare(Map stormConf, TopologyContext context) { super.prepare(stormConf, context); this.createProducer = new DefaultProducer().createProducer(kafkaUri, null); }long start = System.currentTimeMillis(); int count = 0; @Override public void execute(Tuple input, BasicOutputCollector collector) {String fieldId = input.getStringByField(HdtasScheme.FIELD_ID); //获取球场id String signId = input.getStringByField(HdtasScheme.SIGN_ID); //标签id//加速度 X Y Z String aSpeedX = input.getStringByField(HdtasScheme.A_SPEED_X); String aSpeedY = input.getStringByField(HdtasScheme.A_SPEED_Y); String aSpeedZ = input.getStringByField(HdtasScheme.A_SPEED_Z); //定位精度X Y Z String x = input.getStringByField(HdtasScheme.X); String y = input.getStringByField(HdtasScheme.Y); String z = input.getStringByField(HdtasScheme.Z); //陀螺仪 X Y Z String gyroscopeX = input.getStringByField(HdtasScheme.GYROSCOPE_X); String gyroscopeY = input.getStringByField(HdtasScheme.GYROSCOPE_Y); String gyroscopeZ = input.getStringByField(HdtasScheme.GYROSCOPE_Z); String heartRate = input.getStringByField(HdtasScheme.HEART_RATE); //心率 String electric = input.getStringByField(HdtasScheme.ELECTRIC); //电池电量String charingStatus = input.getStringByField(HdtasScheme.CHARGING_STATUS); //充电状态 String timeS = input.getStringByField(HdtasScheme.SERVER_ACCEPT_TIME_S); //时间戳 秒 long time = convertMilTime(input); if ("1523".equals(signId)){ return; }//x y < 0.01 double doubleX = Double.valueOf(x).doubleValue(); double doubleY = Double.valueOf(y).doubleValue(); if(doubleX<0.01 || doubleY <0.01){ return; } //if(x.equals("0.000000") || y.equals("0.000000")){ //return; //} System.out.println("X&Y : " + x + " : " + y); //三个轴的加速度值 单位g16384/8 = 204816g量程 double ax = BigDecimal.valueOf(Integer.parseInt(aSpeedX)).divide(BigDecimal.valueOf(2048), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue(); double ay = BigDecimal.valueOf(Integer.parseInt(aSpeedY)).divide(BigDecimal.valueOf(2048), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue(); double az = BigDecimal.valueOf(Integer.parseInt(aSpeedZ)).divide(BigDecimal.valueOf(2048), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue(); //三个轴的陀螺仪值 单位为dps 度每秒131/8 = 16.3752000度量程 double gx = BigDecimal.valueOf(Integer.parseInt(gyroscopeX)).divide(BigDecimal.valueOf(16.375), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue(); double gy = BigDecimal.valueOf(Integer.parseInt(gyroscopeY)).divide(BigDecimal.valueOf(16.375), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue(); double gz = BigDecimal.valueOf(Integer.parseInt(gyroscopeZ)).divide(BigDecimal.valueOf(16.375), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue(); //加速度矢量 double r = BigDecimal.valueOf(Math.sqrt(ax * ax + ay * ay + az * az)).setScale(3,BigDecimal.ROUND_HALF_DOWN).doubleValue(); if (0d == r) { return; } //结果放入kafka中(hdtas_all_ori_a_speed) final String result = signId + "," + ax + "," + ay + "," + az + "," + r + "," + time; createProducer.send(new ProducerRecord("hdtas_all_ori_a_speed",fieldId,result)); long end = System.currentTimeMillis(); if (end - start <= 1000) count++; else { start = end; count = 0; } //认为异常数据 不予处理 if (ax == 0 && ay == 0 && az == 0) return; //计算步数 Integer step = computeStep(fieldId, signId, r, time); //计算三个轴的角度变化 computeAngle(ax, ay, az); //计算瞬时速度 computeInstantSpeed(time, ax, ay, az); //计算位置 computePosition(signId , fieldId , x , y , z , time); //计算速度(有问题) computeSpeed(signId , fieldId , x , y , z , time); //其他数据(陀螺仪,加速度,心率,电量等等) computeOther(signId , fieldId , electric , heartRate , time , ax , ay , az , gx , gy , gz , r , step); //计算灵敏度 computeAgile(signId , fieldId , gx , gy , gz , time); //计算电量 computeBattery(signId , fieldId , electric , time); //计算距离s computeDistance(signId , fieldId , x , y , z , time); //计算心率 computeHeartrate(signId , fieldId , heartRate , time); }Map lastRMap = new HashMap(); //加速度矢量 Map stepMap = new HashMap(); //Map currentPeakTimeMap = new HashMap(); Map lastPeakTimeMap = new HashMap(); /** * 计算步数 * @param fieldId * @param signId * @param r * @param time */ private Integer computeStep(String fieldId, String signId, double r, long time) { Double lastR = lastRMap.get(signId); if (lastR == null) { lastRMap.put(signId, r); } else { boolean isPeak = detectPeak(signId, lastR, r); if (isPeak){ Long lastPeakTime = lastPeakTimeMap.get(signId); if (lastPeakTime == null) { lastPeakTime = 0L; } Double peak = lastPeakMap.get(signId); Double valley = lastValleyMap.get(signId); if (peak == null || valley == null){ return 0; } //得到阙值 double threshold = peak - valley; //System.out.println("阈值:" + threshold); Integer step = stepMap.get(signId); if (step == null) { step = 0; stepMap.put(signId, 0); } Double bt = baseThresholdMap.get(signId); if (bt == null){ baseThresholdMap.put(signId, baseThreshold); bt = baseThreshold; } System.out.println("baseThreshold:" + bt); if (time - lastPeakTime >= 250 && (threshold >= bt)){ lastPeakTimeMap.put(signId, time); stepMap.put(signId, step + 1); } if (time - lastPeakTime >= 250 && (threshold >= 1.3d)){ lastPeakTimeMap.put(signId, time); double aveThreshold = computeThreshold(signId, threshold); System.out.println("aveThreshold:" + aveThreshold); baseThresholdMap.put(signId, aveThreshold); }//结果放入kafka中 final String result = signId + "," + threshold; final StringBuilder sb = new StringBuilder(result); sb.append(",").append(stepMap.get(signId)); return stepMap.get(signId); //createProducer.send(new ProducerRecord("hdtas_all_step",fieldId,sb.toString())); } lastRMap.put(signId, r); } return 0; } private double stepAvgNum = 4; private double computeThreshold(String signId, double threshold) { double temp = baseThreshold; List thresholdList = aveThresholdMap.get(signId); if (thresholdList == null){ thresholdList = new ArrayList(); } //System.out.println("size:" +thresholdList.size()); if (thresholdList.size() < stepAvgNum) { thresholdList.add(threshold); aveThresholdMap.put(signId,thresholdList); }else{ temp = computeAvgThreshold(thresholdList); thresholdList.add(threshold); thresholdList.remove(0); aveThresholdMap.put(signId,thresholdList); }return temp; }private double computeAvgThreshold(List thresholdList) { int size = thresholdList.size(); double all = 0; for (double d : thresholdList){ all += d; } double ave = new BigDecimal(all).divide(new BigDecimal(size), 3).setScale(3, BigDecimal.ROUND_HALF_DOWN).doubleValue(); if (ave >= 8) { ave = 4.3d; } else if(ave >= 7 && ave < 8){ ave = 3.3d; } else if(ave >= 4 && ave < 7){ ave = 2.3d; } else if(ave >= 3 && ave <4){ ave = 2.0d; } else { ave = 1.3d; } return ave; }//记录当前上升状态 private Map upMap = new HashMap(); //记录上一次上升状态 private Map lastUpMap = new HashMap(); //记录上升趋势次数 private Map upCountMap = new HashMap(); //记录上一次上升趋势次数 private Map lastUpCountMap = new HashMap(); //记录波峰的值 private Map lastPeakMap = new HashMap(); //记录波谷的值 private Map lastValleyMap = new HashMap(); private Map> aveThresholdMap = new HashMap>(); private Map thresholdMap = new HashMap(); private double baseThreshold = 2.0d; private Map baseThresholdMap = new HashMap(); /** * 检测波峰 * * @param signId * @param lastR * @param newR1.下降趋势 *2.上一个点为上升趋势 *3.持续上升次数大于等于2 *4.波峰值判断(>=4走路 >=6慢跑 >=8快跑 >=9跳跃) * @return */ private boolean detectPeak(String signId, Double lastR, double newR) { Boolean isUp = upMap.get(signId); if (isUp == null) { lastUpMap.put(signId, false); }else{ lastUpMap.put(signId, isUp); } Integer count = upCountMap.get(signId); //上升趋势 // && (newR - lastR) >= baseThreshold if (newR >= lastR) { upMap.put(signId, true); upCountMap.put(signId, count == null ? 1 : count + 1); } else { lastUpCountMap.put(signId, count == null ? 0 : count); upCountMap.put(signId, 0); upMap.put(signId, false); } boolean lastUp = lastUpMap.get(signId); boolean currentUp = upMap.get(signId); Integer lastUpCount = lastUpCountMap.get(signId); if (lastUpCount == null) {lastUpCount = 0; } double baseR = 20 / 9.8; if( !currentUp && lastUp && (lastUpCount >= 2 || lastR >= baseR)){ lastPeakMap.put(signId, lastR); return true; } else if (currentUp && !lastUp){ lastValleyMap.put(signId, lastR); return false; }else { return false; } }@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {}/** * 计算三个轴的角度变化 * @param ax * @param ay * @param az */ private void computeAngle(double ax, double ay, double az) { double r = Math.sqrt(ax * ax + ay * ay + az * az); double arx = (180 / Math.PI) * Math.acos(ax / r); double ary = (180 / Math.PI) * Math.acos(ay / r); double arz = (180 / Math.PI) * Math.acos(az / r); //double myarx = -1; //double myary = -1; ////System.out.println("----------------转换后角度"); //if (arz > 0 && arz <= 90) { //myarx = arx - 90; //myary = ary - 90; //} //if (arz > 90) { //if (arx >= 0 && arx <= 90) { //myarx = -90 - arx; //} //if (arx > 90) { //myarx = 180 - arx + 90; //} //if (ary >= 0 && ary <= 90) { //myary = - 90 - ary; //} //if (ary > 90) { ////System.out.println("Y轴:" + (180 - ary + 90)); //myary = 180 - ary + 90; //} //} // }private long lastMillisecond = -1; private long lastTime = 0; private double lastAx = -1; private double lastAy = -1; private double lastAz = -1; private double lastV = 0; /** * 计算瞬时速度 * @param time * @param ax * @param ay * @param az */ private void computeInstantSpeed(long time, double ax, double ay, double az) { if (lastMillisecond != -1 && lastAx != -1 && lastAy != -1 && lastAz != -1) { double ds = (time - lastMillisecond) / 1000d; if (ds <= 0) { lastMillisecond = time; return; }double vx = (ax - lastAx) / 2 * ds; double vy = (ay - lastAy) / 2 * ds; double vz = (az - lastAz) / 2 * ds; //double vx_pow = Math.pow(vx , 2); //double vy_pow = Math.pow(vy, 2); //double vz_pow = Math.pow(vz, 2); //double v = Math.sqrt(vx_pow + vy_pow + vz_pow); double v = vx + vy + vz; lastV = v; //if(v > 8 ){ //if (v < 0){ //v = 0.000; //} v = new BigDecimal(v).setScale(3, BigDecimal.ROUND_HALF_DOWN).doubleValue(); //System.out.println("瞬时速度为:" + v); } lastMillisecond = time; lastAx = ax; lastAy = ay; lastAz = az; }private long convertMilTime(Tuple input) { String year = input.getStringByField(HdtasScheme.YEAR); String month = input.getStringByField(HdtasScheme.MONTH); String day = input.getStringByField(HdtasScheme.DAY); String hour = input.getStringByField(HdtasScheme.HOUR); String minute = input.getStringByField(HdtasScheme.MINUTE); String second = input.getStringByField(HdtasScheme.SECOND); String millisecond = input.getStringByField(HdtasScheme.MILLISECOND); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd HH:mm:ss:SSS"); String time = year + "-" + month + "-" + day + " " + hour + ":" + minute + ":" + second + ":" + millisecond; //System.out.println(time); try { Date date = sdf.parse(time); //System.out.println(date.getTime()); //线上时间 return date.getTime(); //测试时间 //return System.currentTimeMillis(); } catch (ParseException e) { e.printStackTrace(); LOG.error("convertMilTime date format error, error info is {}", e.getMessage()); return System.currentTimeMillis(); } }/** * 计算灵敏度 * @param userId * @param fieldId * @param x * @param y * @param z * @param time */ public void computeAgile(String userId , String fieldId , double x , double y , double z , Long time){ final String result = userId + "," + fieldId + "," + x + "," + y + "," + z + "," + time; createProducer.send(new ProducerRecord("hdtas_agile_bolt",fieldId,result)); }/** * 计算电量 * @param userId * @param electric * @param time */ public void computeBattery(String userId , String fieldId , String electric , Long time){ final String result = userId + "," + electric + "," +time; createProducer.send(new ProducerRecord("hdtas_battery_bolt",fieldId,result)); }private final Map oldXdis = new HashMap(); private final Map oldYdis = new HashMap(); private final Map oldTimedis = newHashMap(); private final Map oldTimeDistance = newHashMap(); private Map distancePositionMap = new HashMap(); /** * 计算距离 * @param userId * @param fieldId * @param x * @param y * @param z * @param time */ public void computeDistance(String userId , String fieldId , String x , String y , String z , Long time){ double dx = Double.parseDouble(x); double dy = Double.parseDouble(y); double dz = Double.parseDouble(z); Long oldLongTime = oldTimeDistance.get(userId); Position p = distancePositionMap.get(userId); if(p == null){ oldTimeDistance.put(userId, time); distancePositionMap.put(userId,new Position(dx,dy,dz)); return; } if(oldLongTime == null){ oldTimeDistance.put(userId,time); return; }double absX = Math.abs((dz-p.getX())); double absY = Math.abs((dy-p.getY())); double absZ = Math.abs((dz-p.getZ())); double dis = new BigDecimal(Math.sqrt(absX*absX + absY*absY)).setScale(5,BigDecimal.ROUND_HALF_UP).doubleValue(); long diff = Math.abs(time - oldLongTime); if (diff >= 1000){ double speed =new BigDecimal(dis).divide(new BigDecimal(diff).divide(new BigDecimal(1000),3,BigDecimal.ROUND_HALF_EVEN),5,BigDecimal.ROUND_HALF_EVEN).doubleValue(); //存储 Position p1 = new Position(dx,dy,dz); distancePositionMap.put(userId, p1); oldTimeDistance.put(userId, time); String result = userId + "," + fieldId + "," +time + "," + diff + "," + dis + "," + speed; createProducer.send(new ProducerRecord("hdtas_distance_bolt",fieldId,result)); } }/** * 计算心率 * @param userId * @param fieldId * @param heartRate * @param time */ public void computeHeartrate(String userId , String fieldId , String heartRate , Long time){ final String result = userId + "," + fieldId + "," +time + "," + heartRate; createProducer.send(new ProducerRecord("hdtas_heartrate_bolt",fieldId,result)); }/** * 计算坐标 * @param userId * @param fieldId * @param x * @param y * @param z * @param time */ public void computePosition(String userId , String fieldId , String x , String y , String z , Long time){ final String result = userId + "," + fieldId + "," +time + "," + x + "," + y; System.out.println("位置数据:" + result); createProducer.send(new ProducerRecord("hdtas_position_bolt",fieldId,result)); }private final Map oldTimespeed = newHashMap(); private Map speedPositionMap = new HashMap(); /** * 计算速度 * @param userId * @param fieldId * @param x * @param y * @param z * @param time */ public void computeSpeed(String userId , String fieldId , String x , String y , String z , long time){ double sx = Double.parseDouble(x); double sy = Double.parseDouble(y); double sz = Double.parseDouble(z); Long oldLongTime = oldTimespeed.get(userId); Position p = speedPositionMap.get(userId); if(p == null){ oldTimespeed.put(userId, time); speedPositionMap.put(userId,new Position(sx,sy,sz)); return; } if (oldLongTime == null){ oldTimespeed.put(userId, time); return; }double absX = Math.abs((sz-p.getX())); double absY = Math.abs((sy-p.getY())); double absZ = Math.abs((sz-p.getZ())); double dis = new BigDecimal(Math.sqrt(absX*absX + absY*absY)).setScale(5,BigDecimal.ROUND_HALF_UP).doubleValue(); long diff = Math.abs(time - oldLongTime); System.out.println("diff : ++++++++++++" + absX+","+absY); System.out.println("dis : ++++++++++++" + dis); //double speed = dis/(diff/1000); if (diff >= 1000){ double speed =new BigDecimal(dis).divide(new BigDecimal(diff).divide(new BigDecimal(1000),3,BigDecimal.ROUND_HALF_EVEN),5,BigDecimal.ROUND_HALF_EVEN).doubleValue(); System.out.println("speed : ++++++++++++" + dis); //存储 Position p1 = new Position(sx,sy,sz); speedPositionMap.put(userId, p1); oldTimespeed.put(userId, time); String result = userId + "," + fieldId + "," +time + "," + diff + "," + dis + "," + speed; System.out.println("速度数据:" + result); createProducer.send(new ProducerRecord("hdtas_speed_bolt",fieldId,result)); }}/** * 其他参数 * @param userId 用户id * @param fieldId 球场id * @param battery 电量 * @param heartRate 心率 * @param time 当前毫秒值 * @param ax * @param ay * @param az * @param gx * @param gy * @param gz * @param r */ public void computeOther(String userId , String fieldId , String battery , String heartRate , long time , double ax , double ay , double az , double gx , double gy , double gz , double r , Integer step){String result = userId + "," + fieldId + "," + battery+ "," + heartRate + "," + ax + "," + ay + "," + az + "," + gx + "," + gy + "," + gz + "," + r + "," + step + "," + time; System.out.println("其他数据: " + result); createProducer.send(new ProducerRecord("hdtas_other_bolt",fieldId,result)); } }

3.3topology编写,这里topology是一个bolt的逻辑视图,决定bolt的流向
import com.misbio.seer.storm.hdtas.bolt.HdtasDataAllBolt; import com.misbio.seer.storm.hdtas.spout.HdtasSpout; import com.misbio.seer.utils.ResourceUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import java.util.ArrayList; import java.util.List; import java.util.Properties; /** * @author lvfang * @create 2017-10-16 16:31 * @desc **/ public class TopologyHdtas {// 主题与zk端口(local) public static final Integer ZK_PORT = Integer.parseInt(ResourceUtils.getProperty("zookeeper.port")); public static final String ZK_HOST = ResourceUtils.getProperty("zookeeper.host"); public static final String ZKINFO = ResourceUtils.getProperty("zookeeper.connect.kafka"); public static final String KAFKA_URL = ResourceUtils.getProperty("kafka.broker"); public static final String ZKROOT = "/hdtas"; public static final String SPOUTID = "hdtas"; private static final String HDTAS_SPOUT = "hdtasSpout"; private static final String HDTAS_ALL_BOLT = "hdtas_all_bolt"; public static void main(String[] args) throws Exception{ Config cfg = new Config(); cfg.setNumWorkers(2); cfg.setDebug(true); Properties properties = new Properties(); //kafka 设置 properties.put("group.id", "group1"); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); List topics = new ArrayList(); topics.add("htb_position_test"); //组件执行顺序逻辑 TopologyBuilder builder = new TopologyBuilder(); //先创造spout数据源PWSpout builder.setSpout("spout", new HdtasSpout(properties,topics)); //在创建bolt组件PrintBolt,并接受分组id为“spout”的数据 builder.setBolt("print-bolt", new HdtasDataAllBolt(KAFKA_URL)).shuffleGrouping("spout"); //1 本地模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("top1", cfg, builder.createTopology()); //提交分析 Thread.sleep(10000000); //10000毫秒后停止实时计算 cluster.killTopology("top1"); cluster.shutdown(); //2集群模式 //StormSubmitter.submitTopology("top1", cfg,builder.createTopology()); } }

在编写完topology之后,我们就可以启动程序看看能否跑通
Strom自行整合kafka
文章图片
2017-10-19_112349.png Strom自行整合kafka
文章图片
2017-10-19_112448.png 我们可以看到,数据经过处理后已经发送到对应的主题,剩下的工作就是编写主题消费后持久化到redis
3.4KafkaCusumer主要对storm计算后的数据进行消费,storm计算后将数据根据类型分别下发到了位置主题、距离主题、速度主题、心率主题、电量主题、陀螺仪主题等等,这里我们只编写一个主题的消费,其他等同 KafkaCusumer.java
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; /** * @author lvfang * @create 2017-10-15 11:17 * @desc **/ public class KafkaCusumer extends Thread{ private String topic; //主题public final String SRC = "D:/testdata/testData.txt"; public KafkaCusumer(String topic){ super(); this.topic = topic; }//创建消费者 private Consumer createConsumer(){ Properties properties = new Properties(); //必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据 properties.put("group.id", "group1"); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.90.240:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); return new KafkaConsumer(properties); }@Override public void run() { //创建消费者 Consumer consumer = createConsumer(); consumer.subscribe(Arrays.asList(topic)); while (true) {ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) {System.out.println("消费到的数据为:"+record.value()); //System.out.printf("接收到: ", record.offset(), record.key(), record.value()); } } }public static void main(String[] args) { new KafkaCusumer("htb_position_test").start(); } }

编写完后我们可以启动消费,看看是否可以消费到
Strom自行整合kafka
文章图片
2017-10-19_113515.png 这里消费是将数据直接打印出来了,至于持久化,我们在打印之后存储即可,这里就不在写了,完毕!!!

    推荐阅读