[ad_1]
I have a CsvTableSource as follows:
value1,
value2,
value3,
...
And I have a DataStream as follows:
Name,col_a,col_b,....
value3,....
value4,....
I need to filter the DataStream based on values in CSVSource using regular expressions.
This is my code:
val kinesis_deserialization_schema = new KinesisDeserialization[ID]
val KinesisConsumer = new FlinkKinesisConsumer[ID](
"MyKinesis",
kinesis_deserialization_schema,
consumerProps
)
val KinesisSource = env.addSource(KinesisConsumer).name(s"Kinesis Data")
tablecsv = CsvTableSource
.builder
.path("path/to/my/file.csv")
.ignoreFirstLine()
.field("Name", STRING)
.quoteCharacter('"')
.fieldDelimiter(",")
.lineDelimiter("\n")
.build()
[ad_2]