0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

Java 8 Stream流底層原理

jf_ro2CN3Fa ? 來(lái)源:CSDN ? 作者:CSDN ? 2022-11-18 10:27 ? 次閱讀

  • 函數(shù)式接口
    • 操作
    • 流程
    • Collection
    • AbstractPipeline
    • ReferencePipeline
    • Head
    • StatelessOp
    • StatefulOp
    • TerminalOp
    • ReduceOp
    • MatchOp
    • FindOp
    • ForEachOp
    • Sink
    • ChainedReference
    • TerminalSink
    • Collector
    • 并行流
    • ForkJoinTask
    • AbstractTask

函數(shù)式接口

初識(shí)lambda呢,函數(shù)式接口肯定是繞不過(guò)去的,函數(shù)式接口就是一個(gè)有且僅有一個(gè)抽象方法,但是可以有多個(gè)非抽象方法的接口。函數(shù)式接口可以被隱式轉(zhuǎn)換為lambda表達(dá)式。

@FunctionalInterface
publicinterfaceCloseable{

voidclose();
}

java.util.function它包含了很多類,用來(lái)支持Java的函數(shù)式編程,該包中的函數(shù)式接口有:

e1af6c02-66e6-11ed-8abf-dac502259ad0.png

操作

e1bb7024-66e6-11ed-8abf-dac502259ad0.png

流程

Stream相關(guān)接口繼承圖:

e1cd4740-66e6-11ed-8abf-dac502259ad0.png

Stream流水線組織結(jié)構(gòu)示意圖(圖是盜的):

e1dfbe7a-66e6-11ed-8abf-dac502259ad0.png

Collection

類路徑java.util.colltction

@Override
defaultSpliteratorspliterator(){
returnSpliterators.spliterator(this,0);
}
//常用Stream流轉(zhuǎn)換
defaultStreamstream(){
returnStreamSupport.stream(spliterator(),false);
}
//并行流
defaultStreamparallelStream(){
returnStreamSupport.stream(spliterator(),true);
}

//java.util.stream.StreamSupport#stream(java.util.Spliterator,boolean)
publicstaticStreamstream(Spliteratorspliterator,booleanparallel){
Objects.requireNonNull(spliterator);
returnnewReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);
}

AbstractPipeline

類路徑java.util.stream.AbstractPipeline

//反向鏈接到管道鏈的頭部(如果是源階段,則為自身)。
privatefinalAbstractPipelinesourceStage;

//“上游”管道,如果這是源階段,則為null。
privatefinalAbstractPipelinepreviousStage;

//此管道對(duì)象表示的中間操作的操作標(biāo)志。
protectedfinalintsourceOrOpFlags;

//管道中的下一個(gè)階段;如果這是最后一個(gè)階段,則為null。在鏈接到下一個(gè)管道時(shí)有效地結(jié)束。
privateAbstractPipelinenextStage;

//如果是順序的,則此管道對(duì)象與流源之間的中間操作數(shù);如果是并行的,則為先前有狀態(tài)的中間操作數(shù)。在管道準(zhǔn)備進(jìn)行評(píng)估時(shí)有效。
privateintdepth;

//源和所有操作的組合源標(biāo)志和操作標(biāo)志,直到此流水線對(duì)象表示的操作為止(包括該流水線對(duì)象所代表的操作)。在管道準(zhǔn)備進(jìn)行評(píng)估時(shí)有效。
privateintcombinedFlags;

//源拆分器。僅對(duì)頭管道有效。如果管道使用非null值,那么在使用管道之前,sourceSupplier必須為null。在使用管道之后,如果非null,則將其設(shè)置為null。
privateSpliteratorsourceSpliterator;

//來(lái)源供應(yīng)商。僅對(duì)頭管道有效。如果非null,則在使用管道之前,sourceSpliterator必須為null。在使用管道之后,如果非null,則將其設(shè)置為null。
privateSupplier>sourceSupplier;

//如果已鏈接或使用此管道,則為True
privatebooleanlinkedOrConsumed;

//如果正在執(zhí)行任何有狀態(tài)操作,則為true;否則為true。僅對(duì)源階段有效。
privatebooleansourceAnyStateful;

privateRunnablesourceCloseAction;

//如果管道是并行的,則為true;否則,管道為順序的;否則為true。僅對(duì)源階段有效。
privatebooleanparallel;

ReferencePipeline

類路徑:java.util.stream.ReferencePipeline

filter
//java.util.stream.ReferencePipeline#filter
@Override
publicfinalStreamfilter(PredicatesuperP_OUT>predicate){
Objects.requireNonNull(predicate);
//返回一個(gè)匿名無(wú)狀態(tài)的管道
returnnewStatelessOp(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED){
//下游生產(chǎn)線所需要的回調(diào)接口
@Override
SinkopWrapSink(intflags,Sinksink){
returnnewSink.ChainedReference(sink){
@Override
publicvoidbegin(longsize){
downstream.begin(-1);
}
//真正執(zhí)行操作的方法,依靠ChainedReference內(nèi)置ReferencePipeline引用下游的回調(diào)
@Override
publicvoidaccept(P_OUTu){
//只有滿足條件的元素才能被下游執(zhí)行
if(predicate.test(u))
downstream.accept(u);
}
};
}
};
}
map
//java.util.stream.ReferencePipeline#map
publicfinalStreammap(FunctionsuperP_OUT,?extendsR>mapper){
Objects.requireNonNull(mapper);
//返回一個(gè)匿名無(wú)狀態(tài)的管道
returnnewStatelessOp(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED|StreamOpFlag.NOT_DISTINCT){
//下游生產(chǎn)線所需要的回調(diào)接口
@Override
SinkopWrapSink(intflags,Sinksink){
returnnewSink.ChainedReference(sink){
//真正執(zhí)行操作的方法,依靠ChainedReference內(nèi)置ReferencePipeline引用下游的回調(diào)
@Override
publicvoidaccept(P_OUTu){
//執(zhí)行轉(zhuǎn)換后提供給下游執(zhí)行
downstream.accept(mapper.apply(u));
}
};
}
};
}
flatMap
//java.util.stream.ReferencePipeline#flatMap
@Override
publicfinalStreamflatMap(FunctionsuperP_OUT,?extendsStream>mapper){
Objects.requireNonNull(mapper);
//返回一個(gè)匿名無(wú)狀態(tài)的管道
returnnewStatelessOp(this,StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED|StreamOpFlag.NOT_DISTINCT|StreamOpFlag.NOT_SIZED){
//下游生產(chǎn)線所需要的回調(diào)接口
@Override
SinkopWrapSink(intflags,Sinksink){
returnnewSink.ChainedReference(sink){
@Override
publicvoidbegin(longsize){
downstream.begin(-1);
}
//真正執(zhí)行操作的方法,依靠ChainedReference內(nèi)置ReferencePipeline引用下游的回調(diào)
@Override
publicvoidaccept(P_OUTu){
try(Streamresult=mapper.apply(u)){
//劃分為多個(gè)流執(zhí)行下游(分流)
if(result!=null)
result.sequential().forEach(downstream);
}
}
};
}
};
}
peek
//java.util.stream.ReferencePipeline#peek
@Override
publicfinalStreampeek(ConsumersuperP_OUT>action){
Objects.requireNonNull(action);
//返回一個(gè)匿名無(wú)狀態(tài)的管道
returnnewStatelessOp(this,StreamShape.REFERENCE,0){
//下游生產(chǎn)線所需要的回調(diào)接口
@Override
SinkopWrapSink(intflags,Sinksink){
returnnewSink.ChainedReference(sink){
//真正執(zhí)行操作的方法,依靠ChainedReference內(nèi)置ReferencePipeline引用下游的回調(diào)
@Override
publicvoidaccept(P_OUTu){
//先執(zhí)行自定義方法,在執(zhí)行下游方法
action.accept(u);
downstream.accept(u);
}
};
}
};
}
sorted
@Override
publicfinalStreamsorted(){
//不提供Comparator,會(huì)使用元素自實(shí)現(xiàn)Comparator的compareTo方法
returnSortedOps.makeRef(this);
}

@Override
publicfinalStreamsorted(ComparatorsuperP_OUT>comparator){
returnSortedOps.makeRef(this,comparator);
}
//Sorted.makeRef
staticStreammakeRef(AbstractPipelineupstream,
ComparatorsuperT>comparator){
returnnewOfRef<>(upstream,comparator);
}
//ofRef類
privatestaticfinalclassOfRef<T>extendsReferencePipeline.StatefulOp<T,T>{

privatefinalbooleanisNaturalSort;
privatefinalComparatorsuperT>comparator;

@Override
publicSinkopWrapSink(intflags,Sinksink){
Objects.requireNonNull(sink);
//根據(jù)不同的flag進(jìn)行不同排序
if(StreamOpFlag.SORTED.isKnown(flags)&&isNaturalSort)
returnsink;
elseif(StreamOpFlag.SIZED.isKnown(flags))
returnnewSizedRefSortingSink<>(sink,comparator);
else
returnnewRefSortingSink<>(sink,comparator);
}

}
distinct
@Override
publicfinalStreamdistinct(){
returnDistinctOps.makeRef(this);
}
staticReferencePipelinemakeRef(AbstractPipelineupstream){
//返回一個(gè)匿名有狀態(tài)的管道
returnnewReferencePipeline.StatefulOp(upstream,StreamShape.REFERENCE,StreamOpFlag.IS_DISTINCT|StreamOpFlag.NOT_SIZED){

@Override
SinkopWrapSink(intflags,Sinksink){
Objects.requireNonNull(sink);

if(StreamOpFlag.DISTINCT.isKnown(flags)){
//已經(jīng)是去重過(guò)了
returnsink;
}elseif(StreamOpFlag.SORTED.isKnown(flags)){
//有序流
returnnewSink.ChainedReference(sink){
booleanseenNull;
//這個(gè)為先執(zhí)行的前序元素
TlastSeen;

@Override
publicvoidbegin(longsize){
seenNull=false;
lastSeen=null;
downstream.begin(-1);
}

@Override
publicvoidend(){
seenNull=false;
lastSeen=null;
downstream.end();
}
//這里通過(guò)有序的特性,前序元素與后序元素比較,如果相等則跳過(guò)執(zhí)行后序的元素
@Override
publicvoidaccept(Tt){
if(t==null){
//這里控制元素為null只有一個(gè)
if(!seenNull){
seenNull=true;
downstream.accept(lastSeen=null);
}
}elseif(lastSeen==null||!t.equals(lastSeen)){
//這里將前序元素賦值給lastSeen
downstream.accept(lastSeen=t);
}
}
};
}else{
//底層通過(guò)Set進(jìn)行去重,所以該元素需要重寫hashCode和equals方法
returnnewSink.ChainedReference(sink){
Setseen;

@Override
publicvoidbegin(longsize){
seen=newHashSet<>();
downstream.begin(-1);
}

@Override
publicvoidend(){
seen=null;
downstream.end();
}

@Override
publicvoidaccept(Tt){
if(!seen.contains(t)){
seen.add(t);
downstream.accept(t);
}
}
};
}
}
};
}
skip、limit
publicstaticStreammakeRef(AbstractPipelineupstream,
longskip,longlimit){
if(skip0)
thrownewIllegalArgumentException("Skipmustbenon-negative:"+skip);
//返回一個(gè)匿名有狀態(tài)的管道
returnnewReferencePipeline.StatefulOp(upstream,StreamShape.REFERENCE,flags(limit)){
SpliteratorunorderedSkipLimitSpliterator(Spliterators,longskip,longlimit,longsizeIfKnown){
if(skip<=?sizeIfKnown)?{
????????????????????limit?=?limit?>=0?Math.min(limit,sizeIfKnown-skip):sizeIfKnown-skip;
skip=0;
}
returnnewStreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s,skip,limit);
}
//自己實(shí)現(xiàn)真正操作的方法
@Override
SinkopWrapSink(intflags,Sinksink){
returnnewSink.ChainedReference(sink){
longn=skip;
longm=limit>=0?limit:Long.MAX_VALUE;

@Override
publicvoidbegin(longsize){
downstream.begin(calcSize(size,skip,m));
}

@Override
publicvoidaccept(Tt){
if(n==0){
//limit
if(m>0){
m--;
downstream.accept(t);
}
}
//skip
else{
n--;
}
}

@Override
publicbooleancancellationRequested(){
returnm==0||downstream.cancellationRequested();
}
};
}
};
}
reduce
//java.util.stream.ReferencePipeline#reduce(P_OUT,java.util.function.BinaryOperator)
@Override
publicfinalP_OUTreduce(finalP_OUTidentity,finalBinaryOperatoraccumulator){
returnevaluate(ReduceOps.makeRef(identity,accumulator,accumulator));
}
//java.util.stream.ReferencePipeline#reduce(java.util.function.BinaryOperator)
@Override
publicfinalOptionalreduce(BinaryOperatoraccumulator){
returnevaluate(ReduceOps.makeRef(accumulator));
}
//java.util.stream.ReferencePipeline#reduce(R,java.util.function.BiFunction,java.util.function.BinaryOperator)
@Override
publicfinalRreduce(Ridentity,BiFunctionsuperP_OUT,R>accumulator,BinaryOperatorcombiner){
returnevaluate(ReduceOps.makeRef(identity,accumulator,combiner));
}

//java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp)
finalRevaluate(TerminalOpterminalOp){
assertgetOutputShape()==terminalOp.inputShape();
if(linkedOrConsumed)
thrownewIllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed=true;

returnisParallel()
?terminalOp.evaluateParallel(this,sourceSpliterator(terminalOp.getOpFlags()))
:terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));
}
collect
//java.util.stream.ReferencePipeline#collect(java.util.stream.Collector)
@Override
@SuppressWarnings("unchecked")
publicfinalRcollect(CollectorsuperP_OUT,A,R>collector){
Acontainer;
if(isParallel()
&&(collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&&(!isOrdered()||collector.characteristics().contains(Collector.Characteristics.UNORDERED))){
container=collector.supplier().get();
BiConsumersuperP_OUT>accumulator=collector.accumulator();
forEach(u->accumulator.accept(container,u));
}
else{
container=evaluate(ReduceOps.makeRef(collector));
}
//具有特定轉(zhuǎn)換的使用finisher處理
returncollector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
?(R)container
:collector.finisher().apply(container);
}
//java.util.stream.ReferencePipeline#collect(java.util.function.Supplier,java.util.function.BiConsumer,java.util.function.BiConsumer)
@Override
publicfinalRcollect(Suppliersupplier,BiConsumersuperP_OUT>accumulator,BiConsumercombiner){
returnevaluate(ReduceOps.makeRef(supplier,accumulator,combiner));
}

//java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp)
finalRevaluate(TerminalOpterminalOp){
assertgetOutputShape()==terminalOp.inputShape();
if(linkedOrConsumed)
thrownewIllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed=true;

returnisParallel()
?terminalOp.evaluateParallel(this,sourceSpliterator(terminalOp.getOpFlags()))
:terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));
}
forEach
//java.util.stream.ReferencePipeline#forEach
@Override
publicvoidforEach(ConsumersuperP_OUT>action){
evaluate(ForEachOps.makeRef(action,false));
}

//java.util.stream.ForEachOps#makeRef
publicstaticTerminalOpmakeRef(ConsumersuperT>action,booleanordered){
Objects.requireNonNull(action);
returnnewForEachOp.OfRef<>(action,ordered);
}

//java.util.stream.ForEachOps.ForEachOp.OfRef
staticfinalclassOfRef<T>extendsForEachOp<T>{
finalConsumersuperT>consumer;

OfRef(ConsumersuperT>consumer,booleanordered){
super(ordered);
this.consumer=consumer;
}

//只是簡(jiǎn)單的消費(fèi)
@Override
publicvoidaccept(Tt){
consumer.accept(t);
}
}

Head

流的數(shù)據(jù)元的頭,類路徑java.util.stream.ReferencePipeline.Head

//java.util.stream.ReferencePipeline.Head
staticclassHead<E_IN,E_OUT>extendsReferencePipeline<E_IN,E_OUT>{

Head(Supplier>source,intsourceFlags,booleanparallel){
super(source,sourceFlags,parallel);
}

Head(Spliteratorsource,intsourceFlags,booleanparallel){
super(source,sourceFlags,parallel);
}

@Override
finalbooleanopIsStateful(){
thrownewUnsupportedOperationException();
}

@Override
finalSinkopWrapSink(intflags,Sinksink){
thrownewUnsupportedOperationException();
}
//Optimizedsequentialterminaloperationsfortheheadofthepipeline
@Override
publicvoidforEach(ConsumersuperE_OUT>action){
if(!isParallel()){
sourceStageSpliterator().forEachRemaining(action);
}
else{
super.forEach(action);
}
}

@Override
publicvoidforEachOrdered(ConsumersuperE_OUT>action){
if(!isParallel()){
sourceStageSpliterator().forEachRemaining(action);
}
else{
super.forEachOrdered(action);
}
}
}

StatelessOp

無(wú)狀態(tài)的中間管道,類路徑java.util.stream.ReferencePipeline.StatelessOp

//java.util.stream.ReferencePipeline.StatelessOp
abstractstaticclassStatelessOp<E_IN,E_OUT>extendsReferencePipeline<E_IN,E_OUT>{

StatelessOp(AbstractPipelineupstream,StreamShapeinputShape,intopFlags){
super(upstream,opFlags);
assertupstream.getOutputShape()==inputShape;
}

@Override
finalbooleanopIsStateful(){
returnfalse;
}
}

StatefulOp

有狀態(tài)的中間管道,類路徑java.util.stream.ReferencePipeline.StatefulOp

//java.util.stream.ReferencePipeline.StatefulOp
abstractstaticclassStatefulOp<E_IN,E_OUT>extendsReferencePipeline<E_IN,E_OUT>{

StatefulOp(AbstractPipelineupstream,StreamShapeinputShape,intopFlags){
super(upstream,opFlags);
assertupstream.getOutputShape()==inputShape;
}

@Override
finalbooleanopIsStateful(){
returntrue;
}

@Override
abstractNodeopEvaluateParallel(PipelineHelperhelper,
Spliteratorspliterator,
IntFunctiongenerator);

TerminalOp

管道流的結(jié)束操作,類路徑java.util.stream.TerminalOp

interfaceTerminalOp<E_IN,R>{

//獲取此操作的輸入類型的形狀
defaultStreamShapeinputShape(){returnStreamShape.REFERENCE;}

//獲取操作的流標(biāo)志。終端操作可以設(shè)置StreamOpFlag定義的流標(biāo)志的有限子集,并且這些標(biāo)志與管道的先前組合的流和中間操作標(biāo)志組合在一起。
defaultintgetOpFlags(){return0;}

//使用指定的PipelineHelper對(duì)操作執(zhí)行并行評(píng)估,該操作描述上游中間操作。
defaultRevaluateParallel(PipelineHelperhelper,Spliteratorspliterator){
if(Tripwire.ENABLED)
Tripwire.trip(getClass(),"{0}triggeringTerminalOp.evaluateParallelserialdefault");
returnevaluateSequential(helper,spliterator);
}

//使用指定的PipelineHelper對(duì)操作執(zhí)行順序評(píng)估,該操作描述上游中間操作。
RevaluateSequential(PipelineHelperhelper,Spliteratorspliterator);
}

ReduceOp

類路徑java.util.stream.ReduceOps.ReduceOp

privatestaticabstractclassReduceOp<T,R,SextendsAccumulatingSink<T,R,S>>implementsTerminalOp<T,R>{
privatefinalStreamShapeinputShape;

ReduceOp(StreamShapeshape){
inputShape=shape;
}

publicabstractSmakeSink();

@Override
publicStreamShapeinputShape(){
returninputShape;
}

//通過(guò)匿名子類實(shí)現(xiàn)makeSink()獲取Sink
@Override
publicRevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
returnhelper.wrapAndCopyInto(makeSink(),spliterator).get();
}

@Override
publicRevaluateParallel(PipelineHelperhelper,Spliteratorspliterator){
returnnewReduceTask<>(this,helper,spliterator).invoke().get();
}
}

MatchOp

類路徑java.util.stream.MatchOps.MatchOp

privatestaticfinalclassMatchOp<T>implementsTerminalOp<T,Boolean>{
privatefinalStreamShapeinputShape;
finalMatchKindmatchKind;
finalSupplier>sinkSupplier;

MatchOp(StreamShapeshape,MatchKindmatchKind,Supplier>sinkSupplier){
this.inputShape=shape;
this.matchKind=matchKind;
this.sinkSupplier=sinkSupplier;
}

@Override
publicintgetOpFlags(){
returnStreamOpFlag.IS_SHORT_CIRCUIT|StreamOpFlag.NOT_ORDERED;
}

@Override
publicStreamShapeinputShape(){
returninputShape;
}

//使用內(nèi)置的sinkSupplier獲取Sink
@Override
publicBooleanevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
returnhelper.wrapAndCopyInto(sinkSupplier.get(),spliterator).getAndClearState();
}

@Override
publicBooleanevaluateParallel(PipelineHelperhelper,Spliteratorspliterator){
returnnewMatchTask<>(this,helper,spliterator).invoke();
}
}

FindOp

類路徑java.util.stream.FindOps.FindOp

privatestaticfinalclassFindOp<T,O>implementsTerminalOp<T,O>{
privatefinalStreamShapeshape;
finalbooleanmustFindFirst;
finalOemptyValue;
finalPredicatepresentPredicate;
finalSupplier>sinkSupplier;

FindOp(booleanmustFindFirst,
StreamShapeshape,
OemptyValue,
PredicatepresentPredicate,
Supplier>sinkSupplier){
this.mustFindFirst=mustFindFirst;
this.shape=shape;
this.emptyValue=emptyValue;
this.presentPredicate=presentPredicate;
this.sinkSupplier=sinkSupplier;
}

@Override
publicintgetOpFlags(){
returnStreamOpFlag.IS_SHORT_CIRCUIT|(mustFindFirst?0:StreamOpFlag.NOT_ORDERED);
}

@Override
publicStreamShapeinputShape(){
returnshape;
}

//通過(guò)內(nèi)置sinkSupplier獲取Sink
@Override
publicOevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
Oresult=helper.wrapAndCopyInto(sinkSupplier.get(),spliterator).get();
returnresult!=null?result:emptyValue;
}

@Override
publicOevaluateParallel(PipelineHelperhelper,Spliteratorspliterator){
returnnewFindTask<>(this,helper,spliterator).invoke();
}
}

ForEachOp

類路徑java.util.stream.ForEachOps.ForEachOp

staticabstractclassForEachOp<T>implementsTerminalOp<T,Void>,TerminalSink<T,Void>{
privatefinalbooleanordered;

protectedForEachOp(booleanordered){
this.ordered=ordered;
}

@Override
publicintgetOpFlags(){
returnordered?0:StreamOpFlag.NOT_ORDERED;
}

//自己實(shí)現(xiàn)了Sink
@Override
publicVoidevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
returnhelper.wrapAndCopyInto(this,spliterator).get();
}

@Override
publicVoidevaluateParallel(PipelineHelperhelper,Spliteratorspliterator){
if(ordered)
newForEachOrderedTask<>(helper,spliterator,this).invoke();
else
newForEachTask<>(helper,spliterator,helper.wrapSink(this)).invoke();
returnnull;
}

@Override
publicVoidget(){
returnnull;
}

staticfinalclassOfRef<T>extendsForEachOp<T>{
finalConsumersuperT>consumer;

OfRef(ConsumersuperT>consumer,booleanordered){
super(ordered);
this.consumer=consumer;
}

@Override
publicvoidaccept(Tt){
consumer.accept(t);
}
}
...
}

Sink

類路徑java.util.stream.Sink

interfaceSink<T>extendsConsumer<T>{
//開始遍歷元素之前調(diào)用該方法,通知Sink做好準(zhǔn)備。
defaultvoidbegin(longsize){}
//所有元素遍歷完成之后調(diào)用,通知Sink沒(méi)有更多的元素了。
defaultvoidend(){}
//是否可以結(jié)束操作,可以讓短路操作盡早結(jié)束。
defaultbooleancancellationRequested(){
returnfalse;
}
//遍歷元素時(shí)調(diào)用,接受一個(gè)待處理元素,并對(duì)元素進(jìn)行處理。Stage把自己包含的操作和回調(diào)方法封裝到該方法里,前一個(gè)Stage只需要調(diào)用當(dāng)前Stage.accept(Tt)方法就行了。
voidaccept(Tt);
}

這里Sink的子類實(shí)現(xiàn)中分為兩種:中間操作匿名實(shí)現(xiàn)ChainedReferenceTerminalOp子類所提供的Sink。

ChainedReference

類路徑java.util.stream.Sink.ChainedReference,這里是中間操作的默認(rèn)模板父類

staticabstractclassChainedReference<T,E_OUT>implementsSink<T>{
protectedfinalSinksuperE_OUT>downstream;

publicChainedReference(SinksuperE_OUT>downstream){
this.downstream=Objects.requireNonNull(downstream);
}

@Override
publicvoidbegin(longsize){
downstream.begin(size);
}

@Override
publicvoidend(){
downstream.end();
}

@Override
publicbooleancancellationRequested(){
returndownstream.cancellationRequested();
}
}

在上述的中間操作管道流中都是通過(guò)匿名類繼承ChainedReference實(shí)現(xiàn)onWrapSink(int, Sink)返回一個(gè)指定操作的Sink。

TerminalSink

這里為什么講提供呢?這是因?yàn)椴煌膶?shí)現(xiàn)TerminalOp的子類中在實(shí)現(xiàn)java.util.stream.TerminalOp#evaluateSequential中都是通過(guò)helper.wrapAndCopyInto(TerminalOp子類實(shí)現(xiàn)提供的Sink, spliterator)中通過(guò)參數(shù)傳遞的方式提供的,不同的子類傳遞的方式不一樣所以此處用了一個(gè)提供Sink

由ReduceOps中實(shí)現(xiàn)TerminalOp所提供的ReducingSink,它是由匿名類實(shí)現(xiàn)java.util.stream.ReduceOps.ReduceOp#makeSink來(lái)交付給helper.wrapAndCopyInto(makeSink(), spliterator)的。

publicstaticTerminalOpmakeRef(Useed,BiFunctionsuperT,U>reducer,BinaryOperatorcombiner){
Objects.requireNonNull(reducer);
Objects.requireNonNull(combiner);
classReducingSinkextendsBox<U>implementsAccumulatingSink<T,U,ReducingSink>{
@Override
publicvoidbegin(longsize){
state=seed;
}

@Override
publicvoidaccept(Tt){
state=reducer.apply(state,t);
}

@Override
publicvoidcombine(ReducingSinkother){
state=combiner.apply(state,other.state);
}
}
returnnewReduceOp(StreamShape.REFERENCE){
@Override
publicReducingSinkmakeSink(){
returnnewReducingSink();
}
};
}

ForEachOps中實(shí)現(xiàn)TerminalOp所提供的是this,它的提供方式就是通過(guò)this交付給helper.wrapAndCopyInto(this, spliterator)。

//這里ForEachOp自己通過(guò)TerminalSink間接的實(shí)現(xiàn)了Sink
staticabstractclassForEachOp<T>implementsTerminalOp<T,Void>,TerminalSink<T,Void>{
@Override
publicVoidevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
returnhelper.wrapAndCopyInto(this,spliterator).get();
}
}

由MatchOps中實(shí)現(xiàn)TerminalOp所提供的sinkSupplier通過(guò)構(gòu)造函數(shù)由外部賦值,通過(guò)Supplier接口的get()來(lái)交付給helper.wrapAndCopyInto(sinkSupplier.get(), spliterator)。

privatestaticfinalclassMatchOp<T>implementsTerminalOp<T,Boolean>{
finalSupplier>sinkSupplier;

@Override
publicBooleanevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
returnhelper.wrapAndCopyInto(sinkSupplier.get(),spliterator).getAndClearState();
}
}

由FindOps中實(shí)現(xiàn)TerminalOp所提供的與上述MatchOps是一致的

privatestaticfinalclassFindOp<T,O>implementsTerminalOp<T,O>{
finalSupplier>sinkSupplier;

@Override
publicOevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
Oresult=helper.wrapAndCopyInto(sinkSupplier.get(),spliterator).get();
returnresult!=null?result:emptyValue;
}
}

Collector

在Collector中有以下幾個(gè)實(shí)現(xiàn)接口:

  • Supplier:結(jié)果類型的提供器。
  • BiConsumer:將元素放入結(jié)果的累加器。
  • BinaryOperator:合并部分結(jié)果的組合器。
  • Function:對(duì)結(jié)果類型轉(zhuǎn)換為最終結(jié)果類型的轉(zhuǎn)換器。
  • Set:保存Collector特征的集合

并行流

前述都是基于串行流的講解,其實(shí)并行流也是基于上述的helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator)這個(gè)方法上面做的一層基于ForkJoinTask多線程框架的封裝。

ForkJoinTask

ForkJoin框架的思想就是分而治之,它將一個(gè)大任務(wù)切割為多個(gè)小任務(wù)這個(gè)過(guò)程稱為fork,將每個(gè)任務(wù)的執(zhí)行的結(jié)果進(jìn)行匯總的過(guò)程稱為join。ForkJoin框架相關(guān)的接口關(guān)系圖如下(圖是盜的):

e1fb7a7a-66e6-11ed-8abf-dac502259ad0.png

AbstractTask

類路徑java.util.stream.AbstractTask,AbstractTask繼承了在JUC中已經(jīng)封裝好的ForkJoinTask抽象子類java.util.concurrent.CountedCompleter。

此類基于CountedCompleter ,它是fork-join任務(wù)的一種形式,其中每個(gè)任務(wù)都有未完成子代的信號(hào)量計(jì)數(shù),并且該任務(wù)隱式完成并在其最后一個(gè)子代完成時(shí)得到通知。 內(nèi)部節(jié)點(diǎn)任務(wù)可能會(huì)覆蓋CountedCompleteronCompletion方法,以將子任務(wù)的結(jié)果合并到當(dāng)前任務(wù)的結(jié)果中。

拆分和設(shè)置子任務(wù)鏈接是由內(nèi)部節(jié)點(diǎn)的compute()完成的。 在葉節(jié)點(diǎn)的compute()時(shí)間,可以確保將為所有子代設(shè)置父代的子代相關(guān)字段(包括父代子代的同級(jí)鏈接)。

例如,執(zhí)行減少任務(wù)的任務(wù)將覆蓋doLeaf()以使用Spliterator對(duì)該葉節(jié)點(diǎn)的塊執(zhí)行減少Spliterator ,并覆蓋onCompletion()以合并內(nèi)部節(jié)點(diǎn)的子任務(wù)的結(jié)果:

@Override
protectedReduceTaskmakeChild(Spliteratorspliterator){
//返回一個(gè)ForkJoinTask任務(wù)
returnnewReduceTask<>(this,spliterator);
}

@Override
protectedSdoLeaf(){
//其他實(shí)現(xiàn)大同小異
returnhelper.wrapAndCopyInto(op.makeSink(),spliterator);
}

@Override
publicvoidonCompletion(CountedCompletercaller){
//非葉子節(jié)點(diǎn)進(jìn)行結(jié)果組合
if(!isLeaf()){
SleftResult=leftChild.getLocalResult();
leftResult.combine(rightChild.getLocalResult());
setLocalResult(leftResult);
}
//GCspliterator,leftandrightchild
super.onCompletion(caller);
}

AbstractTask封裝了分片任務(wù)的算法模板,通過(guò)是SpliteratortrySplit()方法來(lái)實(shí)現(xiàn)分片的細(xì)節(jié),詳細(xì)算法源碼如下(類路徑:java.util.stream.AbstractTask#compute):

@Override
publicvoidcompute(){
//將當(dāng)前這個(gè)spliterator作為右節(jié)點(diǎn)(此時(shí)為root節(jié)點(diǎn))
Spliteratorrs=spliterator,ls;
//評(píng)估任務(wù)的大小
longsizeEstimate=rs.estimateSize();
//獲取任務(wù)閾值
longsizeThreshold=getTargetSize(sizeEstimate);
booleanforkRight=false;
@SuppressWarnings("unchecked")Ktask=(K)this;
//細(xì)節(jié)不多贅述,下面我用圖來(lái)講解算法
/**
*根節(jié)點(diǎn)指定為:右邊節(jié)點(diǎn)
*root
*split()
*leftright
*left.fork()
*split()
*lr
*rs=ls
*right.fork()
*split()
*lr
*l.fork()
*/
while(sizeEstimate>sizeThreshold&&(ls=rs.trySplit())!=null){
KleftChild,rightChild,taskToFork;
task.leftChild=leftChild=task.makeChild(ls);
task.rightChild=rightChild=task.makeChild(rs);
task.setPendingCount(1);
if(forkRight){
forkRight=false;
//左右節(jié)點(diǎn)切換進(jìn)行fork和split
rs=ls;
task=leftChild;
taskToFork=rightChild;
}
else{
forkRight=true;
task=rightChild;
taskToFork=leftChild;
}
//fork任務(wù)加入隊(duì)列中去
taskToFork.fork();
sizeEstimate=rs.estimateSize();
}
//將執(zhí)行doLeaf底層就是單個(gè)串行流的操作
task.setLocalResult(task.doLeaf());
//將結(jié)果組合成一個(gè)最終結(jié)果
task.tryComplete();
}

AbstractTask執(zhí)行與分片流程圖如下:

e20b4e46-66e6-11ed-8abf-dac502259ad0.png

到這里Stream流的相關(guān)知識(shí)介紹到這,這里附上一副總體圖來(lái)加深下印象

e217cc8e-66e6-11ed-8abf-dac502259ad0.png

審核編輯 :李倩


聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 接口
    +關(guān)注

    關(guān)注

    33

    文章

    8257

    瀏覽量

    149956
  • JAVA
    +關(guān)注

    關(guān)注

    19

    文章

    2943

    瀏覽量

    104101
  • 函數(shù)
    +關(guān)注

    關(guān)注

    3

    文章

    4237

    瀏覽量

    61969

原文標(biāo)題:還有人不知道 Java 8 Stream流底層原理?

文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    在SpinalHDL里在頂層一鍵優(yōu)化Stream/Flow代碼生成

    ? ? 在SpinalHDL里在頂層一鍵優(yōu)化代碼中Stream/Flow代碼生成的payload,fragment。 難看的代碼 ? ????來(lái)看一段代碼: ? import
    的頭像 發(fā)表于 12-14 09:05 ?557次閱讀

    哪一個(gè)stream的內(nèi)置方法不屬于中間操作

    Java中的(Stream)是用于處理集合數(shù)據(jù)的一種非常強(qiáng)大和靈活的機(jī)制。可以被視為從源數(shù)據(jù)中生成的元素序列。可以被用于對(duì)集合中的元素
    的頭像 發(fā)表于 12-05 14:18 ?321次閱讀

    簡(jiǎn)單了解Java的新特性

    Java 8Java 20,Java 已經(jīng)走過(guò)了漫長(zhǎng)的道路,自 Java 8 以來(lái),
    的頭像 發(fā)表于 11-23 16:38 ?867次閱讀
    簡(jiǎn)單了解<b class='flag-5'>Java</b>的新特性

    java內(nèi)存溢出排查方法

    模型。Java內(nèi)存模型分為線程棧、堆、方法區(qū)(Java 8之前稱為永久代,Java 8后稱為元空間)和本地方法棧
    的頭像 發(fā)表于 11-23 14:46 ?2387次閱讀

    Java11和Java17使用率達(dá)48%和45%

    Java 8,使用率為 40%。85% 的受訪者使用的是 LTS 版本的 Java,64% 的受訪者使用了多個(gè) Java 版本。
    的頭像 發(fā)表于 11-01 12:30 ?486次閱讀

    JavaStream的常用知識(shí)

    ?我們都知道,傳統(tǒng)的處理中,每一步我們都需要通過(guò)循環(huán)控制,邏輯控制,解包,重新裝箱這些工作。 非生產(chǎn)線示意處理圖 這些步驟讓我們的程序的業(yè)務(wù)邏輯支離破碎,經(jīng)常處理數(shù)據(jù)類的小伙伴尤為痛苦。幸運(yùn)的是,Java8為我們引入了Stream,使用
    的頭像 發(fā)表于 10-11 15:45 ?371次閱讀
    <b class='flag-5'>Java</b>的<b class='flag-5'>Stream</b>的常用知識(shí)

    Java8的新特性

    雖然目前Java最新版本都已經(jīng)到16了,但是絕大部分公司目前用的Java版本都是8,想當(dāng)初Java8問(wèn)世后,其Lambda表達(dá)式與方法引用可是最亮眼的新特性,目前,這兩個(gè)特性也被大家廣
    的頭像 發(fā)表于 10-10 17:12 ?431次閱讀

    Java中基礎(chǔ)的 IO

    java 中,程序通常會(huì)和其他外部設(shè)備進(jìn)行數(shù)據(jù)交互,比如寫入磁盤,網(wǎng)絡(luò)發(fā)送數(shù)據(jù)等等,今天我們來(lái)學(xué)學(xué) java 中 基礎(chǔ)的 IO 。 IO 與其他外部設(shè)備進(jìn)行數(shù)據(jù)交互,比如將數(shù)
    的頭像 發(fā)表于 10-10 16:16 ?1682次閱讀
    <b class='flag-5'>Java</b>中基礎(chǔ)的 IO <b class='flag-5'>流</b>

    java的字符分析

    上次聊到了 java 的 字符 Reader ,今天來(lái)看看它的好朋友 Writer 字符。 Writer java 中的 IO 輸出
    的頭像 發(fā)表于 10-10 15:38 ?487次閱讀

    java 8的日期用法

    java 已經(jīng)出到 17 了,而小編還在用 8 的版本,在 8 中已經(jīng)推出了新的日期 API,不在使用 。java.time 包下提供了用于日期、時(shí)間、實(shí)例和周期的主要 API。 獲取
    的頭像 發(fā)表于 10-09 15:50 ?349次閱讀

    Java時(shí)間類轉(zhuǎn)換方案

    眾所周知,Java 8 之前的 Date 相關(guān)的時(shí)間類非常的不好用。從 Java 8 之后開始加入了 LocalDate 等一系列更加現(xiàn)代化的時(shí)間類。 這就衍生出
    的頭像 發(fā)表于 10-09 15:48 ?403次閱讀

    Stream API原理介紹

    Stream API 是 Java 8 中最重要的新特性之一,它是處理集合和數(shù)組的一種新方式。它提供了一種簡(jiǎn)單、靈活和可讀的方式來(lái)處理集合和數(shù)組中的元素,從而使代碼更加簡(jiǎn)潔、高效和易于維護(hù)。 1.
    的頭像 發(fā)表于 09-30 15:31 ?610次閱讀

    java中的IO與Guava工具

    數(shù)據(jù)(從文件、網(wǎng)絡(luò)、數(shù)據(jù)等)寫入到程序,這里的IO指的是基于作為載體進(jìn)行數(shù)據(jù)傳輸。如果把數(shù)據(jù)比作合理的水,河就是IO,也是數(shù)據(jù)的載體。 Java為我們提供了非常多的操作IO的接口與類,幫助開發(fā)者實(shí)現(xiàn)不同源間的數(shù)據(jù)傳輸,比如硬
    的頭像 發(fā)表于 09-25 16:24 ?629次閱讀

    Java8Stream map() 方法

    前言 在日常的開發(fā)工作中經(jīng)常碰到要處理 List 中數(shù)據(jù)的問(wèn)題,比如從一個(gè)對(duì)象集合中獲得對(duì)象中的一個(gè)屬性的集合。之前我們想到的是遍歷每個(gè)元素,然后取出來(lái)放到另外一個(gè)集合中,比較繁瑣;在 Java8
    的頭像 發(fā)表于 09-25 11:06 ?1539次閱讀
    <b class='flag-5'>Java8</b>的<b class='flag-5'>Stream</b><b class='flag-5'>流</b> map() 方法

    LogiCORE IP AXI4-Stream FIFO內(nèi)核解決方案

    LogiCORE IP AXI4-Stream FIFO內(nèi)核允許以內(nèi)存映射方式訪問(wèn)一個(gè)AXI4-Stream接口。該內(nèi)核可用于與AXI4-Stream IP接口,類似于LogiCORE IP AXI以太網(wǎng)內(nèi)核,而無(wú)需使用完整的D
    的頭像 發(fā)表于 09-25 10:55 ?1111次閱讀
    LogiCORE IP AXI4-<b class='flag-5'>Stream</b> FIFO內(nèi)核解決方案