0 votes
in Other Questions by (400 points)
Dear Rekans,

Apakah ada yang pernah mengalamai hal ini, saya membuat flink dengan coGroup operation menggunakan TumblingEvent dengan watermark. Namun proses left join hanya berjalan sekali saja saat job pertama kali dijalankan, apakah ada yang tahu bagaimana cara tracing yg salahnya ?

BR,

SRO

2 Answers

0 votes
by (140 points)
ada sample codenya ga om?

buat ngetes dl, kalau pakai source dr kafka. coba msg di topic kafkanya di produce manual sesuai interval yang di set di windowing tumblingnya.
by (400 points)
Sudah ane jefri ya gan,,
0 votes
by (4.2k points)

Biasanya kalu sumber/source nya berhenti, proses streamingnya juga berheti/stop.

Untuk menjamin sumber/source nya jalan terus, saya buat custom-source extends

RichSourceFunction<T>
by (400 points)
Untuk streaming sudah dibuat watermark nya selalu jalan pak, namum masih belum ada yang keluar juga hasil output nya.

berikut contoh kode leftjoin

    private static class LeftJoinFunction implements CoGroupFunction<Tuple3<CTIData, UtmTraffic2, Integer>, FgiIPS, Tuple4<CTIData, UtmTraffic2, FgiIPS, Integer>> {
        @Override
        public void coGroup(Iterable<Tuple3<CTIData, UtmTraffic2, Integer>> left, Iterable<FgiIPS> right, Collector<Tuple4<CTIData, UtmTraffic2, FgiIPS, Integer>> collector) throws Exception {
            for (Tuple3<CTIData, UtmTraffic2, Integer> ld: left) {
                Boolean noElements = true;
                for (FgiIPS rd: right) {
                    noElements = false;
                    collector.collect(Tuple4.of(ld.f0, ld.f1, rd, 1));
                }

                if (noElements) {
                    collector.collect(Tuple4.of(ld.f0, ld.f1, new FgiIPS(), 0));
                }
              }
        }
    }
by (4.2k points)
Logic left-joinnya sudah benar.
berarti ada penyabab yang lain.

bisa di share code-nya??
by (400 points)
edited by
Setelah di-debug di-local dengan menggunakan CustomDataSource/Generator akhirnya solve jg, makasih semunya atas support nya.
kalau  mau ditarik kesimpulan permasalahnya somehow untuk coGroup operation lebih baik menggunakan TimeCharacteristic.ProcessingTime dan jika menggunakan lebih dari 1 key untuk join lebih baik menggunakan Objects.hash() dan disarankan untuk debugging bisa menggunakan custom source / generator agar lebih mudah di-debug melalui intelejia
by (4.2k points)
Yang pengen tahun cara buat Custom-Input di Filink Streaming, silahkan kontak saya atau Syah Reza
Welcome to Labs247 Community, where you can ask questions and receive answers from other members of the community.
...