0%

ArrayList

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* @author xixing
* @version 1.0
* @date 2020/7/2 14:53
*/
public class ContainerNotSafeDemo {

public static void main(String[] args) {
List<String> list= new CopyOnWriteArrayList<>();
for(int i=0;i<100;i++){
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(list);
},String.valueOf(i)).start();
}
//java.util.ConcurrentModificationException

/**
*1.故障现象
* java.util.ConcurrentModificationException
*
*2.导致原因
* 并发争抢修改导致,参考花名册签名,一个正在写,另一个抢导致
*3.解决方案
* 1.new Vector<>() 不建议,这个类已经被不建议使用。
* 2.Collections.synchronizedList(new ArrayList())
* 3.new CopyOnWriteArrayList<>();
*
*
*/
}
}

image-20200702152923248

HashSet

HashSet底层是HashMap,以key为set,value是一个object的常量。

1
2
3
4
5
6
7
/**
* Constructs a new, empty set; the backing <tt>HashMap</tt> instance has
* default initial capacity (16) and load factor (0.75).
*/
public HashSet() {
map = new HashMap<>();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Adds the specified element to this set if it is not already present.
* More formally, adds the specified element <tt>e</tt> to this set if
* this set contains no element <tt>e2</tt> such that
* <tt>(e==null&nbsp;?&nbsp;e2==null&nbsp;:&nbsp;e.equals(e2))</tt>.
* If this set already contains the element, the call leaves the set
* unchanged and returns <tt>false</tt>.
*
* @param e element to be added to this set
* @return <tt>true</tt> if this set did not already contain the specified
* element
*/
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
1
2
3
4
5
6
7
Set<String> set= new CopyOnWriteArraySet<>();
for(int i=0;i<100;i++){
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(set);
},String.valueOf(i)).start();
}

HashMap

1
2
3
4
5
6
7
Map<String,String> map= new ConcurrentHashMap<>();
for(int i=0;i<100;i++){
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,8));
System.out.println(map);
},String.valueOf(i)).start();
}

什么是CAS

CAS是比较并交换即CompareAndSwap,是一条CPU并发原语,不能被打断

它会比较主物理内存和当前线程的变量是否相等

如果相等就会改变值,不等就会return false;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author xixing
* @version 1.0
* @date 2020/7/1 10:23
*
* 1. CAS是什么 ==》CompareAndSwap
* 比较并交换
*/
public class CASDemo {

public static void main(String[] args) {
AtomicInteger atomicInteger=new AtomicInteger(5);

//main thread do something..
System.out.println(atomicInteger.compareAndSet(5,2019)+"\t"+atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(5,1024)+"\t"+atomicInteger.get());
}

}

CAS底层原理

1
2
3
4
5
6
7
8
9
10
11
12
13
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

可以看出,AtomicInteger调用了UnSafe类

1
2
3
4
5
6
7
8
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

它的getAndIncrement调用了unsafe的getAndAddInt();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
*var1 是AtomicInteger本身
*var2 是该对象的内存偏移地址
*var4是要加的值
*var5是找主内存的真实值
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}
1
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

是一个native类

volatile的可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class VolatileDemo {

public static void main(String[] args) {
MyData myData=new MyData();

new Thread(()->{
System.out.println(Thread.currentThread().getName()+"come in");
//暂停一会线程
try{
TimeUnit.SECONDS.sleep(3);
}catch (Exception e){

}

myData.addTo60();
System.out.println(Thread.currentThread().getName()+":"+myData.number);
},"AAA").start();
//main线程,如果可见,则跳出循环
while (myData.number==0){

}
System.out.println(Thread.currentThread().getName()+"\t"+"mission is over main thread number="+myData.number);
}
}

class MyData{
int number=0;

public void addTo60(){
this.number=60;
}
}

可见性

由于JVM运行程序的实体是线程,每个线程创建时JVM会为其创建一个工作内存,工作内存是每个线程的私有数据区域,而Java内存模型中规定所有的变量都存储在主内存中,主内存是共享内存区域,所有的线程都可以访问,但线程对变量的操作必须在工作内存中进行,首先要把变量从主内存中拷贝一份到工作内存,在工作内存操作完之后再写回主内存,但是不同线程之间工作内存是不能相互访问的,写回主内存之后要保证其他线程能够及时知道主内存中的变量值变了。这就是可见性。

JMM规范(Java Memory Model)

1.是一个抽象概念,描述一组规则或规范

JMM关于同步的规定

1.线程解锁前,必须把共享变量的值刷新回主内存

2.线程加锁前,必须读取主内存的最新值到自己的工作空间

3.加锁解锁是同一把锁

volatile不保证原子性

image-20200630153537384

javap -c在IDEA 2019.2的配置

-c $FileDir$$FileNameWithoutAllExtensions$.class

$FileDir​$

-v -c -s -l -p $OutputPath​$$FileDirRelativeToSourcepath$$FileNameWithoutExtension​$.class

禁止指令重排

image-20200906160358118

image-20200906160813392

image-20200906160837962

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author xixing
* @version 1.0
* @date 2020/6/29 16:05
* <p>
* 1.验证volatile的可见性
* 1.1 假如int number=0 number变量之前根本没有添加valatile关键字修饰
* 1.2添加volatile可以保证可见性
* 2.验证volatile不保证原子性
* 2.1原子性即不可分隔,完整性
* 2.2解决volatile不保证原子性问题
* 1.synchronized
* 2.AtomicInteger
*
*
*/
public class VolatileDemo {

public static void main(String[] args) {

MyData myData = new MyData();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
myData.addPlusPlus();
myData.addAtomic();
}

}, String.valueOf(i)).start();
}

//需要等待20个线程都执行完,再用main线程看看结果是多少
//大于2是因为后台默认有两个线程,一个是main线程,还有gc线程
while (Thread.activeCount() > 2) {
Thread.yield();//礼让
}
System.out.println(Thread.currentThread().getName() + "\t" + "finally number is " + myData.number);
System.out.println(Thread.currentThread().getName() + "\t" + "finally atomicInteger is " + myData.atomicInteger.get());

}

/**
* valotile可以保证可见性
*/
public void seeOkByVolatile() {
MyData myData = new MyData();

new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "come in");
//暂停一会线程
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {

}

myData.addTo60();
System.out.println(Thread.currentThread().getName() + ":" + myData.number);
}, "AAA").start();
//main线程,如果可见,则跳出循环
while (myData.number == 0) {

}
System.out.println(Thread.currentThread().getName() + "\t" + "mission is over main thread number=" + myData.number);

}
}

class MyData {
volatile int number = 0;

AtomicInteger atomicInteger=new AtomicInteger();
public void addAtomic(){
atomicInteger.getAndIncrement();
}

public void addTo60() {
this.number = 60;
}
public void addPlusPlus() {
/**
* 线程中number++有三步
* 1.读到工作内存
* 2.+1
* 3.写回
* 在写回的时候会出现丢失数据情况,可能一个刚写完,
* 还来不及通知其他线程另一个又开始写了
*/
number++;
}

}

未加volatile关键字的DCL(Double Check Lock)存在的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* @author xixing
* @version 1.0
* @date 2020/6/1 8:45
*/
public class LazyDoubleCheckSingleton {
private volatile static LazyDoubleCheckSingleton lazySingleton=null;

private LazyDoubleCheckSingleton(){

}

/**
* 如果锁的是静态方法,相当于把这个class给锁了
* 锁的不是静态方法,相当于把堆内存中的实例锁了
* @return
*/
public static LazyDoubleCheckSingleton getInstance(){
if(lazySingleton==null){
synchronized (LazyDoubleCheckSingleton.class){
if(lazySingleton==null){
lazySingleton=new LazyDoubleCheckSingleton();
//1.分配内存给这个对象
//2.初始化对象
//3.设置lazyDoubleCheckSingleton 指向刚分配的内存地址
//2和3可能顺序会颠倒,加入volatile会使得2和3只能按顺序,使得线程安全,
// 可以看到共享内存的状态


}
}

}
return lazySingleton;
// synchronized (LazySingleton.class){
// if(lazySingleton==null){
// lazySingleton=new LazySingleton();
// }
// }
// return lazySingleton;
}


}

volatile底层

image-20200906161135491

image-20200906161146868

kibana操作es

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
POST /user/_search
{
"query": {
"bool": {
"must": [//可以是should,must就是&&,should就是||
{
"match": {
"name": "Jack"
}
},
{
"range": {
"age": {
"gte": 20,
"lte": 40
}
}
}
]
}
}
}

Spark操作ES

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* @author xixing
* @version 1.0
* @date 2020/6/28 8:53
*/
public class ESDemo {

public static void main(String[] args) {

SparkConf sparkConf=new SparkConf().setAppName("es demo").setMaster("local[*]");
sparkConf.set("es.nodes","namenode");
sparkConf.set("es.port","9200");
sparkConf.set("es.index.auto.create","true");
JavaSparkContext jsc=new JavaSparkContext(sparkConf);
/* 往user里面插数据,如果没有user就新建并且插入数据/user/_doc
List<User> list=new ArrayList<>();
list.add(new User("Jack",20));
list.add(new User("Rose",19));
JavaRDD<User> javaRDD = jsc.parallelize(list);
JavaEsSpark.saveToEs(javaRDD,"/user/_doc");*/


// /**
// * 从/user/_doc里面查数据
// */
// JavaPairRDD<String, Map<String, Object>> javaPairRDD = JavaEsSpark.esRDD(jsc, "/user/_doc");
// Map<String, Map<String, Object>> stringMapMap = javaPairRDD.collectAsMap();
// System.out.println(stringMapMap);
// JavaRDD<User> javaRDD = javaPairRDD.map(new Function<Tuple2<String, Map<String, Object>>, User>() {
// @Override
// public User call(Tuple2<String, Map<String, Object>> stringMapTuple2) throws Exception {
// User user = new User();
// BeanUtils.populate(user, stringMapTuple2._2());
// return user;
// }
// });
// List<User> userList = javaRDD.collect();
// System.out.println(userList);

String query="{\"query\":{\"bool\":{\"must\":[{\"match\":{\"name\":\"Jack\"}},{\"range\":{\"age\":{\"gte\":20,\"lte\":40}}}]}}}";
JavaPairRDD<String, Map<String, Object>> pairRDD = JavaEsSpark.esRDD(jsc, "/user/_doc", query);
Map<String, Map<String, Object>> stringMapMap = pairRDD.collectAsMap();
System.out.println(stringMapMap);
}




@Data
@AllArgsConstructor
@NoArgsConstructor
public static class User implements Serializable {
private String name;
private Integer age;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package tech.xixing.spark06.elt;

import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import tech.xixing.spark06.support.date.DateStyle;
import tech.xixing.spark06.support.date.DateUtil;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.Month;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

/**
* @author xixing
* @version 1.0
* @date 2020/6/27 19:21
*/
public class LineChartETL {


private static SparkSession init(){
SparkSession sparkSession=SparkSession.builder().appName("member etl")
.master("local[*]").enableHiveSupport().getOrCreate();
return sparkSession;

}
public static void main(String[] args) {
SparkSession sparkSession = init();
List<LineVo> lineVoList = lineVos(sparkSession);
System.out.println(lineVoList);

}

public static List<LineVo> lineVos(SparkSession sparkSession) {
ZoneId defaultZoneId = ZoneId.systemDefault();
LocalDate now = LocalDate.of(2019, Month.NOVEMBER, 30);
Date nowDay = Date.from(now.atStartOfDay(defaultZoneId).toInstant());
Date sevenDayBefore = DateUtil.addDay(nowDay, -8);
String membersql=" select date_format(create_time,'yyyy-MM-dd') as day ," +
" count(id) as regCount ,max(id) as memberCount " +
" from i_member.t_member " +
" where create_time>'%s' " +
" group by date_format(create_time,'yyyy-MM-dd')";
membersql = String.format(membersql, DateUtil.DateToString(sevenDayBefore, DateStyle.YYYY_MM_DD_HH_MM_SS));
Dataset<Row> memberDataset = sparkSession.sql(membersql);
String ordersql=" select date_format(create_time,'yyyy-MM-dd') as day ," +
" max(order_id) as orderCount , " +
" sum(origin_price) as gmv " +
" from i_order.t_order " +
" where create_time>'%s' " +
" group by date_format(create_time,'yyyy-MM-dd') order by day ";
ordersql = String.format(ordersql, DateUtil.DateToString(sevenDayBefore, DateStyle.YYYY_MM_DD_HH_MM_SS));
Dataset<Row> orderDataset = sparkSession.sql(ordersql);
Dataset<Tuple2<Row, Row>> tuple2Dataset = memberDataset.joinWith(orderDataset, memberDataset.col("day").equalTo(orderDataset.col("day")), "inner");
//join后再collect as list
List<Tuple2<Row, Row>> collectAsList = tuple2Dataset.collectAsList();

/**
* 查七天前的总额加上每天的量就是从当天到一开始的总额
*/
List<LineVo> lineVoList=new ArrayList<>();
for (Tuple2<Row, Row> rowRowTuple2 : collectAsList) {
JSONObject jsonObject=new JSONObject();
Row row1 = rowRowTuple2._1();
Row row2 = rowRowTuple2._2();
StructType schema = row1.schema();
String[] strings = schema.fieldNames();
for (String string : strings) {
Object as = row1.getAs(string);
jsonObject.put(string,as);
}
schema = row2.schema();
strings = schema.fieldNames();
for (String string : strings) {
Object as = row2.getAs(string);
jsonObject.put(string,as);
}

LineVo lineVo = jsonObject.toJavaObject(LineVo.class);
lineVoList.add(lineVo);
}
Collections.sort(lineVoList);
String gmvsql="select sum(origin_price) as gmvFirst from i_order.t_order " +
" where create_time<'%s' ";
gmvsql= String.format(gmvsql, DateUtil.DateToString(sevenDayBefore, DateStyle.YYYY_MM_DD_HH_MM_SS));
Dataset<Row> dataset = sparkSession.sql(gmvsql);
double aDouble = dataset.collectAsList().get(0).getDouble(0);
BigDecimal sum=BigDecimal.valueOf(aDouble);
List<BigDecimal> destList = new ArrayList<>();
for (LineVo lineVo : lineVoList) {
BigDecimal gmv = lineVo.getGmv();
BigDecimal temp = new BigDecimal(sum.toString());
sum=temp.add(gmv);
temp=temp.add(gmv);
lineVo.setGmv(temp);
}



return lineVoList;
}

@Data
private static class LineVo implements Comparable<LineVo>{
private String day;
private Integer regCount;
private Integer memberCount;
private Integer orderCount;
private BigDecimal gmv;

@Override
public int compareTo(LineVo o) {
String num1Str = this.day.substring(8);
String num2Str = o.day.substring(8);
if(Integer.valueOf(num1Str)>Integer.valueOf(num2Str)){
return 1;
}
if(Integer.valueOf(num1Str)<Integer.valueOf(num2Str)){
return -1;
}
return 0;
}
}
}