ElasticsearchRepository 查詢、聚合、過濾、分頁

public List<StatisticPageDTO> getStatisticList(StatisticQueryDTO queryDto) {
        log.info("getStatisticList param:{}", JSON.toJSONString(queryDto));
        PageModel<List<StatisticPageDTO>> model = new PageModel<>();

        NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        if(queryDto.getCompanyId() != null){
            boolQuery.must(QueryBuilders.termsQuery("companyId", queryDto.getCompanyId().toString()));
        }
        if(CollectionUtils.isNotEmpty(queryDto.getVodIds())){
            boolQuery.must(QueryBuilders.termsQuery("vodId", queryDto.getVodIds()));
        }
        if(CollectionUtils.isNotEmpty(queryDto.getUserIds())){
            boolQuery.must(QueryBuilders.termsQuery("userId", queryDto.getUserIds()));
        }
        boolQuery.must(QueryBuilders.termsQuery("isDelete", IsDeleteEnum.NOT.getCode().toString()));


        TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("aggUserId").field("userId").size(EsConstant.MAX_PAGE_SIZE)
                .subAggregation(AggregationBuilders.min("minStartTime").field("startTime"))
                .subAggregation(AggregationBuilders.max("maxEndTime").field("endTime"))
                .subAggregation(AggregationBuilders.sum("sumViewDuration").field("viewDuration"))
                .subAggregation(AggregationBuilders.terms("aggCourseId").field("courseId"))
                .subAggregation(AggregationBuilders.terms("aggVodId").field("vodId")
                        .subAggregation(AggregationBuilders.sum("sum_vod_ViewDuration").field("viewDuration")));
        
        //聚合結(jié)果時(shí)間過濾
        Map<String,String> bucketMap = Maps.newHashMap();
        String code = "";
        boolean startTimeNotNull = queryDto.getBeginTime() != null && queryDto.getEndTime() != null;
        boolean endTimeNotNull = queryDto.getBeginLastTime() != null && queryDto.getEndLastTime() != null;
        if(startTimeNotNull){
            code += "params.min_start_time >= " + queryDto.getBeginTime().getTime() +"L";
            code += " && params.min_start_time <= " + queryDto.getEndTime().getTime() +"L";
            bucketMap.put("min_start_time", "minStartTime");
        }
        if(endTimeNotNull){
            if(startTimeNotNull){
                code += " && ";
            }
            code += " params.max_end_time >= " + queryDto.getBeginLastTime().getTime() +"L";
            code += " && params.max_end_time <= " + queryDto.getEndLastTime().getTime() +"L";
            bucketMap.put("max_end_time", "maxEndTime");
        }

        if(StringUtils.isNotEmpty(code)){
            Script script = new Script(code);
            BucketSelectorPipelineAggregationBuilder bs =
                    PipelineAggregatorBuilders.bucketSelector("time_filter", bucketMap, script);
            aggregationBuilder.subAggregation(bs);
        }
        //排序分頁
        List<FieldSortBuilder> fieldSorts = new ArrayList<>();
        fieldSorts.add(new FieldSortBuilder("maxEndTime").order(SortOrder.DESC));
        aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketSort("bucket_field", fieldSorts)
                .from(queryDto.getPageNo()-1).size(queryDto.getPageSize()));

        //cardinality 度量是一個(gè)近似算法
        //precisionThreshold:保當(dāng)字段唯一值在 10000 以內(nèi)時(shí)會得到非常準(zhǔn)確的結(jié)果
        CardinalityAggregationBuilder cardinality = AggregationBuilders
                .cardinality("total_count").field("userId")
                .precisionThreshold(EsConstant.MAX_PAGE_SIZE);


        //size無法設(shè)置為0,最少返回一條記錄
        Pageable pageable = PageRequest.of(0, 1);

        SearchQuery query = queryBuilder
                .withQuery(boolQuery)
                .addAggregation(aggregationBuilder)
                .addAggregation(cardinality)
                .withPageable(pageable)
                .build();
        log.info("getStatisticList boolQuery:{}", boolQuery.toString());
        log.info("getStatisticList aggregationBuilder:{}", aggregationBuilder.toString());

        AggregatedPage aggPage =(AggregatedPage<VideoViewReocrdDocument>) vodViewRecordESRepository.search(query);
        //總頁數(shù)
        Cardinality totalCountAgg = (Cardinality) aggPage.getAggregation("total_count");
        long total = totalCountAgg.getValue();
        model.setTotal(total);
        StringTerms agg = (StringTerms) aggPage.getAggregation("aggUserId");
        List<StringTerms.Bucket> buckets = agg.getBuckets();
        List<StatisticPageDTO> resultList = Lists.newArrayList();
        for (StringTerms.Bucket bucket : buckets) {
            StatisticPageDTO dto = new StatisticPageDTO();
            Long userId = bucket.getKeyAsNumber().longValue();
            dto.setUserId(userId);
            Aggregations aggregations = bucket.getAggregations();
            if(aggregations != null){
                //開始學(xué)習(xí)時(shí)間
                InternalMin minStartTime = aggregations.get("minStartTime");
                if(minStartTime != null){
                    Date startTime = DateUtil.parseEsDate(minStartTime.getValueAsString());
                    dto.setStartTime(DateUtil.formatDate(startTime));
                }
                //最近觀看時(shí)間
                InternalMax maxEndTime = aggregations.get("maxEndTime");
                if(maxEndTime != null){
                    Date lastTime = DateUtil.parseEsDate(maxEndTime.getValueAsString());
                    dto.setLastTime(DateUtil.formatDate(lastTime));
                }
                //總觀看時(shí)長
                InternalSum sumViewDuration = aggregations.get("sumViewDuration");
                if(sumViewDuration != null){
                    dto.setTotalDuration(Double.valueOf(sumViewDuration.getValue()).longValue());
                }

                //課程id集合
                StringTerms courseAgg = aggregations.get("aggCourseId");
                if(courseAgg != null){
                    List<StringTerms.Bucket> courseBuckets = courseAgg.getBuckets();
                    List<Long> courseIds = courseBuckets.stream().map(StringTerms.Bucket::getKeyAsNumber).map(Number::longValue).collect(Collectors.toList());
                    dto.setCourseIds(courseIds);
                }

                //視頻對應(yīng)觀看時(shí)長
                StringTerms vodAgg = aggregations.get("aggVodId");
                if(vodAgg != null){
                    List<StringTerms.Bucket> vodBuckets = vodAgg.getBuckets();
                    Map<String, Integer> vodMap = Maps.newHashMap();
                    for (StringTerms.Bucket vodBucket : vodBuckets) {
                        String vodId = vodBucket.getKeyAsString();
                        Aggregations vodAggregations = vodBucket.getAggregations();
                        if(vodAggregations != null){
                            InternalSum sumVodViewDuration = vodAggregations.get("sum_vod_ViewDuration");
                            if(sumVodViewDuration != null){
                                vodMap.put(vodId, Double.valueOf(sumVodViewDuration.getValue()).intValue());
                            }
                        }
                    }
                    dto.setVodMap(vodMap);
                }
            }
            resultList.add(dto);
        }
        return resultList;
    }

參考文章:http://www.itdecent.cn/p/930c803a4ebd

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容