1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
| package tech.xixing.spark06.elt;
import com.alibaba.fastjson.JSONObject; import lombok.Data; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import tech.xixing.spark06.support.date.DateStyle; import tech.xixing.spark06.support.date.DateUtil;
import java.math.BigDecimal; import java.time.LocalDate; import java.time.Month; import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.stream.Collectors;
public class LineChartETL {
private static SparkSession init(){ SparkSession sparkSession=SparkSession.builder().appName("member etl") .master("local[*]").enableHiveSupport().getOrCreate(); return sparkSession;
} public static void main(String[] args) { SparkSession sparkSession = init(); List<LineVo> lineVoList = lineVos(sparkSession); System.out.println(lineVoList);
}
public static List<LineVo> lineVos(SparkSession sparkSession) { ZoneId defaultZoneId = ZoneId.systemDefault(); LocalDate now = LocalDate.of(2019, Month.NOVEMBER, 30); Date nowDay = Date.from(now.atStartOfDay(defaultZoneId).toInstant()); Date sevenDayBefore = DateUtil.addDay(nowDay, -8); String membersql=" select date_format(create_time,'yyyy-MM-dd') as day ," + " count(id) as regCount ,max(id) as memberCount " + " from i_member.t_member " + " where create_time>'%s' " + " group by date_format(create_time,'yyyy-MM-dd')"; membersql = String.format(membersql, DateUtil.DateToString(sevenDayBefore, DateStyle.YYYY_MM_DD_HH_MM_SS)); Dataset<Row> memberDataset = sparkSession.sql(membersql); String ordersql=" select date_format(create_time,'yyyy-MM-dd') as day ," + " max(order_id) as orderCount , " + " sum(origin_price) as gmv " + " from i_order.t_order " + " where create_time>'%s' " + " group by date_format(create_time,'yyyy-MM-dd') order by day "; ordersql = String.format(ordersql, DateUtil.DateToString(sevenDayBefore, DateStyle.YYYY_MM_DD_HH_MM_SS)); Dataset<Row> orderDataset = sparkSession.sql(ordersql); Dataset<Tuple2<Row, Row>> tuple2Dataset = memberDataset.joinWith(orderDataset, memberDataset.col("day").equalTo(orderDataset.col("day")), "inner"); List<Tuple2<Row, Row>> collectAsList = tuple2Dataset.collectAsList();
List<LineVo> lineVoList=new ArrayList<>(); for (Tuple2<Row, Row> rowRowTuple2 : collectAsList) { JSONObject jsonObject=new JSONObject(); Row row1 = rowRowTuple2._1(); Row row2 = rowRowTuple2._2(); StructType schema = row1.schema(); String[] strings = schema.fieldNames(); for (String string : strings) { Object as = row1.getAs(string); jsonObject.put(string,as); } schema = row2.schema(); strings = schema.fieldNames(); for (String string : strings) { Object as = row2.getAs(string); jsonObject.put(string,as); }
LineVo lineVo = jsonObject.toJavaObject(LineVo.class); lineVoList.add(lineVo); } Collections.sort(lineVoList); String gmvsql="select sum(origin_price) as gmvFirst from i_order.t_order " + " where create_time<'%s' "; gmvsql= String.format(gmvsql, DateUtil.DateToString(sevenDayBefore, DateStyle.YYYY_MM_DD_HH_MM_SS)); Dataset<Row> dataset = sparkSession.sql(gmvsql); double aDouble = dataset.collectAsList().get(0).getDouble(0); BigDecimal sum=BigDecimal.valueOf(aDouble); List<BigDecimal> destList = new ArrayList<>(); for (LineVo lineVo : lineVoList) { BigDecimal gmv = lineVo.getGmv(); BigDecimal temp = new BigDecimal(sum.toString()); sum=temp.add(gmv); temp=temp.add(gmv); lineVo.setGmv(temp); }
return lineVoList; }
@Data private static class LineVo implements Comparable<LineVo>{ private String day; private Integer regCount; private Integer memberCount; private Integer orderCount; private BigDecimal gmv;
@Override public int compareTo(LineVo o) { String num1Str = this.day.substring(8); String num2Str = o.day.substring(8); if(Integer.valueOf(num1Str)>Integer.valueOf(num2Str)){ return 1; } if(Integer.valueOf(num1Str)<Integer.valueOf(num2Str)){ return -1; } return 0; } } }
|