首页
登录 | 注册

使用kafka和storm简单模拟天猫双十一实时统计订单各个维度信息

程序说明:

  • 根据双十一当天的订单mq,快速计算当天的订单量、销售金额
  • 思路:
  • 1,支付系统发送mq到kafka集群中,编写storm程序消费kafka的数据并计算实时的订单数量、订单数量
  • 2,将计算的实时结果保存在redis中
  • 3,外部程序实时展示结果

程序设计

  • 数据产生:编写kafka数据生产者,模拟订单系统发送mq
  • 数据输入:使用PaymentSpout消费kafka中的数据
  • 数据计算:使用FilterMessageBlot对数据进行过滤清洗
  • 数据存储:使用Sava2RedisBolt对数据进行存储
  • 数据展示:编写java app客户端,对数据进行展示,展示方式为打印在控制台。

maven依赖

 <dependencies>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.8.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.jsoup</groupId>
            <artifactId>jsoup</artifactId>
            <version>1.8.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.5</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.8.2</artifactId>
            <version>0.8.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

模拟订单实体类:PaymentInfo.java

package storm.tmall.other;

import com.google.gson.Gson;

import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;

/**
 * Describe: 简单的支付信息,实际业务更复杂,比如父子订单,一个订单多个商品,一个支付订单多个订单等情况。
 */
public class PaymentInfo implements Serializable {
    private String orderId;//订单编号
    private Date createOrderTime;//订单创建时间
    private String paymentId;//支付编号
    private Date paymentTime;//支付时间
    private String productId;//商品编号
    private String productName;//商品名称
    private long productPrice;//商品价格
    private long promotionPrice;//促销价格
    private String shopId;//商铺编号
    private String shopName;//商铺名称
    private String shopMobile;//商品电话
    private long payPrice;//订单支付价格
    private int num;//订单数量

    public PaymentInfo() {
    }

    public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
        this.orderId = orderId;
        this.createOrderTime = createOrderTime;
        this.paymentId = paymentId;
        this.paymentTime = paymentTime;
        this.productId = productId;
        this.productName = productName;
        this.productPrice = productPrice;
        this.promotionPrice = promotionPrice;
        this.shopId = shopId;
        this.shopName = shopName;
        this.shopMobile = shopMobile;
        this.payPrice = payPrice;
        this.num = num;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Date getCreateOrderTime() {
        return createOrderTime;
    }

    public void setCreateOrderTime(Date createOrderTime) {
        this.createOrderTime = createOrderTime;
    }

    public String getPaymentId() {
        return paymentId;
    }

    public void setPaymentId(String paymentId) {
        this.paymentId = paymentId;
    }

    public Date getPaymentTime() {
        return paymentTime;
    }

    public void setPaymentTime(Date paymentTime) {
        this.paymentTime = paymentTime;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public long getProductPrice() {
        return productPrice;
    }

    public void setProductPrice(long productPrice) {
        this.productPrice = productPrice;
    }

    public long getPromotionPrice() {
        return promotionPrice;
    }

    public void setPromotionPrice(long promotionPrice) {
        this.promotionPrice = promotionPrice;
    }

    public String getShopId() {
        return shopId;
    }

    public void setShopId(String shopId) {
        this.shopId = shopId;
    }

    public String getShopName() {
        return shopName;
    }

    public void setShopName(String shopName) {
        this.shopName = shopName;
    }

    public String getShopMobile() {
        return shopMobile;
    }

    public void setShopMobile(String shopMobile) {
        this.shopMobile = shopMobile;
    }

    public long getPayPrice() {
        return payPrice;
    }

    public void setPayPrice(long payPrice) {
        this.payPrice = payPrice;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    @Override
    public String toString() {
        return "PaymentInfo{" +
                "orderId='" + orderId + '\'' +
                ", createOrderTime=" + createOrderTime +
                ", paymentId='" + paymentId + '\'' +
                ", paymentTime=" + paymentTime +
                ", productId='" + productId + '\'' +
                ", productName='" + productName + '\'' +
                ", productPrice=" + productPrice +
                ", promotionPrice=" + promotionPrice +
                ", shopId='" + shopId + '\'' +
                ", shopName='" + shopName + '\'' +
                ", shopMobile='" + shopMobile + '\'' +
                ", payPrice=" + payPrice +
                ", num=" + num +
                '}';
    }

    public String random() {
        this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
        this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
        this.productPrice = new Random().nextInt(1000);
        this.promotionPrice = new Random().nextInt(500);
        this.payPrice = new Random().nextInt(480);

        String date = "2015-11-11 12:22:12";
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            this.createOrderTime = simpleDateFormat.parse(date);
        } catch (ParseException e) {
            e.printStackTrace();
        }

        return new Gson().toJson(this);
    }
}

Kafa生产者生成随机数据:PaymentInfoProducer.java

package storm.tmall.other;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
 * Describe: 随机生产订单消息,此服务单独部署
 */
public class PaymentInfoProducer {

    private final static String TOPIC = "paymentInfo";
    public static void main(String[] args) {
        // 设置配置信息
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list",
                "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        props.put("request.required.acks", "1");
        // 创建producer
        Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props));
        // 发送数据
        int messageNo = 1;
        while (true) {
            producer.send(new KeyedMessage<Integer, String>(TOPIC, PaymentInfoProducer.genPaymentInfo()));
            messageNo++;
        }
    }

    private static String genPaymentInfo(){
        return "";
    }
}

kafka数据消费者代码,没有用kafkaSpout,而是自己实现消费者代码:PaymentInfoSpout.java

package storm.tmall;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;

public class PaymentInfoSpout extends BaseRichSpout {
    private static final String TOPIC = "paymentInfo";
    private Properties props;
    private ConsumerConnector consumer;
    private SpoutOutputCollector collector;
    //ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。
    // 队列的头部 是在队列中存在时间最长的元素
    private ArrayBlockingQueue<String> paymentInfoQueue = new ArrayBlockingQueue<String>(100);

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("paymentInfo"));
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;

        props = new Properties();
        props.put("zookeeper.connect", "hadoop1:2181,hadoop1:2181,hadoop1:2181");
        props.put("group.id", "testGroup");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            paymentInfoQueue.add(new String(it.next().message()));
        }
    }

    public void nextTuple() {
        try {
            collector.emit(new Values(paymentInfoQueue.take()));
            try {
                Thread.sleep(1000);
            } catch (Exception e) {

            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

对订单数据进行简单的过滤清洗:FilterMessageBlot.java

package storm.tmall;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.google.gson.Gson;
import storm.tmall.other.PaymentInfo;

import java.util.Calendar;
import java.util.Date;
import java.util.Map;

public class FilterMessageBlot extends BaseBasicBolt {
    public void execute(Tuple input, BasicOutputCollector collector) {
        //读取订单数据
        String paymentInfoStr = input.getStringByField("paymentInfo");
        //将订单数据解析成JavaBean
        PaymentInfo paymentInfo = new Gson().fromJson(paymentInfoStr, PaymentInfo.class);
        // 过滤订单时间,如果订单时间在2015.11.11这天才进入下游开始计算
        Date date = paymentInfo.getCreateOrderTime();
        if (Calendar.getInstance().get(Calendar.DAY_OF_MONTH) != 31) {
            return;
        }
        collector.emit(new Values(paymentInfoStr));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("message"));
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
    }
}

生成维度统计保存到redis中:Save2RedisBlot.java

package storm.tmall;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import storm.tmall.other.PaymentInfo;

import java.util.Map;

public class Save2RedisBlot extends BaseBasicBolt {
    private JedisPool pool;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        //change "maxActive" -> "maxTotal" and "maxWait" -> "maxWaitMillis" in all examples
        JedisPoolConfig config = new JedisPoolConfig();
        //控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
        config.setMaxIdle(5);
        //控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
        //如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
        //在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
        config.setMaxTotal(1000 * 100);
        //表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
        config.setMaxWaitMillis(3000);
        config.setTestOnBorrow(true);
        config.setTestOnReturn(true);
        /**
         *如果你遇到 java.net.SocketTimeoutException: Read timed out exception的异常信息
         *请尝试在构造JedisPool的时候设置自己的超时值. JedisPool默认的超时时间是2秒(单位毫秒)
         */
        pool = new JedisPool(config, "127.0.0.1", 6379, 50);
        super.prepare(stormConf, context);
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        //读取订单数据
        String paymentInfoStr = input.getStringByField("message");
        //将订单数据解析成JavaBean
        PaymentInfo paymentInfo = new Gson().fromJson(paymentInfoStr, PaymentInfo.class);
        //计算业务订单量
        Jedis jedis = pool.getResource();
        if (paymentInfo != null) {
            //计算订单的总数
            jedis.incrBy("orderTotalNum", 1);
            //计算总的销售额
            jedis.incrBy("orderTotalPrice", paymentInfo.getProductPrice());
            //计算折扣后的销售额
            jedis.incrBy("orderPromotionPrice", paymentInfo.getPromotionPrice());
            //计算实际交易额
            jedis.incrBy("orderTotalRealPay", paymentInfo.getPayPrice());
            jedis.incrBy("userNum", 1);
        }
        System.out.println("订单总数:" + jedis.get("orderTotalNum") +
                "   销售额" + jedis.get("orderTotalPrice") +
                "   交易额" + jedis.get("orderPromotionPrice") +
                "   实际支付:" + jedis.get("orderTotalRealPay") +
                "   下单用户:" + jedis.get("userNum"));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

storm启动类:Double11ToplogyMain.java

package storm.tmall;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

public class Double11ToplogyMain {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("readPaymentInfo", new PaymentInfoSpout(), 1);
        builder.setBolt("processIndex", new FilterMessageBlot(), 2).shuffleGrouping("readPaymentInfo");
        builder.setBolt("saveResult2Redis", new Save2RedisBlot(), 2).shuffleGrouping("processIndex");
        Config conf = new Config();
        conf.setDebug(false);
        if (args != null && args.length > 0) {
            //集群运行
            conf.setNumWorkers(2);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            //本地模拟集群运行
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("double11", conf, builder.createTopology());
//          Utils.sleep(10000);
//          cluster.shutdown();
        }
    }
}


2020 jeepxie.net webmaster#jeepxie.net
10 q. 0.009 s.
京ICP备10005923号