0 votes
in Other Questions by (400 points)
edited by
Apakah ada yang pernah membuat custom class extends RichSinkFunction untuk jdbc hive cloudera pada flink 1.10.0 ?

Thanks

2 Answers

0 votes
by (4.2k points)
selected by
 
Best answer

ini custom jdbsink yang saya pakai di hgrid v 3.1.

package com.haffana.flink.stream.writer.rdbms;

import com.haffana.structure.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class RDBMSSink extends RichSinkFunction<Tuple> {
    Connection connection;
    int batchCounter=0;
    PreparedStatement preparedStatement;
    String jdbcDriver;
    String jdbcUrl;
    String userName;
    String password;
    String tablename;
    String fieldNames;
    int batchSize=100;

    public RDBMSSink(String jdbcDriver, String jdbcUrl, String userName, String password, String tablename, String fieldNames, int batchSize) {
        this.jdbcDriver = jdbcDriver;
        this.jdbcUrl = jdbcUrl;
        this.userName = userName;
        this.password = password;
        this.tablename = tablename;
        this.fieldNames = fieldNames;
        this.batchSize = batchSize;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        try {
            Class.forName(jdbcDriver);
            connection = DriverManager.getConnection(jdbcUrl,userName,password);

        }catch(Exception e){
            throw new RuntimeException(e.getMessage());
        }
        StringBuilder query_str=new StringBuilder("insert into "+this.tablename+" ");
        StringBuilder sb_field=new StringBuilder("(");
        StringBuilder sb_qmark=new StringBuilder("(");
        String[] fnames=this.fieldNames.split(",");
        for (int i=0; i<fnames.length; i++){
            if (i>0){
                sb_field.append(",");
                sb_qmark.append(",");
            }
            sb_field.append(fnames[i]);
            sb_qmark.append("?");
        }
        sb_field.append(")");
        sb_qmark.append(")");

        query_str.append(sb_field.toString()).append(" values ").append(sb_qmark.toString());
       /// System.out.println("------------"+ query_str.toString());
        preparedStatement = connection.prepareStatement(query_str.toString());
        preparedStatement.setFetchSize(this.batchSize);
    }

    @Override
    public void invoke(Tuple tuple, Context context) throws Exception {
        //System.out.println("Tuple "+ tuple.toString());

        for (int i=0; i<tuple.size();i++){
            preparedStatement.setObject(i+1,tuple.getObject(i));
        }
        preparedStatement.addBatch();
        batchCounter++;
        if (batchCounter>=this.batchSize){
            try {
                int[] count = preparedStatement.executeBatch(); // execute after batch
                //System.out.println ("successfully inserted "+count.length+" row data");
                batchCounter = 0;
            }catch (Exception e){}
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (batchCounter>0){
            try {
                int[] count = preparedStatement.executeBatch(); // execute after batch
                //System.out.println ("successfully inserted "+count.length+" row data");
            }catch (Exception e){}
        }
        if (connection!=null){
            connection.close();
        }
        if (preparedStatement!=null){
            preparedStatement.close();
        }
    }
}

by (4.2k points)
silahkan dimodif sesuai kebutuhan
by (400 points)
mantab thanks pak
0 votes
by (4.2k points)
ini RichSourceFunction atau RichSinkFunction??
by (400 points)
RichSinkFunction maksud nya pak
Welcome to Labs247 Community, where you can ask questions and receive answers from other members of the community.
...