I'm running a Flink cluster in Docker in my local env, and I've copied these jar files to the /opt/flink/lib/
of the image:
flink-cdc-dist-3.3.0.jar
flink-cdc-pipeline-connector-mysql-3.3.0.jar
flink-sql-connector-mysql-cdc-3.3.0.jar
mysql-connector-java-8.0.27.jar
And then I write a python file like this:
t_env = StreamTableEnvironment.create()
t_env.execute_sql(f"""
CREATE TABLE my_table (
xxx xxx
...
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxx',
'port' = 'xxx',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxx',
'table-name' = 'xxx'
)
""")
# Define the S3 sink table
t_env.execute_sql(f"""
CREATE TABLE s3_table (
xxx xxx,
xxx xxx
...
) WITH (
'connector' = 'filesystem',
'path' = 's3://xxx/xxx',
'format' = 'json'
)
""")
# Insert data from MySQL CDC source to S3 sink
t_env.execute_sql(f"""
INSERT INTO s3_table
SELECT * FROM my_table
""")
# Execute the Flink job
env.execute('MySQL CDC to S3 Flink SQL J
And finally, I try to submit this to the Flink by executing the command on my local:
flink run -py flink_job.py -pyFiles requirements.txt
But Flink returns an error:
.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.my_table'.
Table options are:
'connector'='mysql-cdc'
'database-name'='xxx'
'hostname'='xxx'
'password'='******'
'port'='xxx'
'table-name'='xxx'
'username'='xxx'
...
Caused by: .apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements '.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
python-input-format
...
Why I get this error even I've copy thoese .jar
files to the lib
dir of Flink? How should I submit the job to Flink?