public void handleMessage() {
Properties properties = new Properties();
// 添加若干配置....
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer());
String topic = "foo_bar";
consumer.subscribe(Collections.singleton(topic));
while (!stopMark) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(300, MILLIS));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
}
}
}
[Consumer clientId=foo, groupId=bar] Setting offset for partition foo_bar-8 to the committed offset FetchPosition{offset=1590039403, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[1.2.3.4:9000 (id: 4 rack: rack-0)], epoch=0}}
Thread t = new Thread(() -> {
Integer a = 0;
while (true) {
System.out.println(a);
try {
TimeUnit.SECONDS.sleep(1);
} catch (Throwable ignore) {
}
}
});
t.start();
0
0
0
0
0
1590039403
1590039403
1590039403
1590039403
public static void hahaha(Integer s){
}
hahaha(new Integer(0));
hahaha(0);
public native int GetConfFile(String var1, int var2, StringBuffer var4);
StringBuffer sb = new StringBuffer();
int code = ConfigAccess.GetConfFile("myConfigFile", 0, sb);
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException, InterruptedException {
Thread t = new Thread(() -> {
Integer a = 0;
while (true) {
System.out.println(a);
try {
TimeUnit.SECONDS.sleep(1);
} catch (Throwable ignore) {
}
}
});
t.start();
TimeUnit.SECONDS.sleep(5);
Integer b = 0;
// 获取Unsafe的实例
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);
// 获取对象的字段
Field field = Integer.class.getDeclaredField("value");
// 计算字段在对象中的偏移量
long offset = unsafe.objectFieldOffset(field);
// 修改字段的值
unsafe.putInt(b, offset, 1);
System.out.println("+++++" + unsafe.getInt(b, offset));
}
0
0
0
0
0
+++++1 //此时修改了内存中的值
1
1
1
1
1
- END -