pyFlink job - Could not find any factory for identifier 'mysql-cdc' when creating a Flink-cdc table - Stack Over

admin2025-04-19  0

I'm running a Flink cluster in Docker in my local env, and I've copied these jar files to the /opt/flink/lib/ of the image:

flink-cdc-dist-3.3.0.jar
flink-cdc-pipeline-connector-mysql-3.3.0.jar
flink-sql-connector-mysql-cdc-3.3.0.jar
mysql-connector-java-8.0.27.jar

And then I write a python file like this:

t_env = StreamTableEnvironment.create()

t_env.execute_sql(f"""
        CREATE TABLE my_table (
            xxx xxx
            ...
        ) WITH (
            'connector' = 'mysql-cdc',
            'hostname' = 'xxx',
            'port' = 'xxx',
            'username' = 'xxx',
            'password' = 'xxx',
            'database-name' = 'xxx',
            'table-name' = 'xxx'
        )
    """)

# Define the S3 sink table
t_env.execute_sql(f"""
        CREATE TABLE s3_table (
            xxx xxx,
            xxx xxx
            ...
        ) WITH (
            'connector' = 'filesystem',
            'path' = 's3://xxx/xxx',
            'format' = 'json'
        )
    """)

    # Insert data from MySQL CDC source to S3 sink
    t_env.execute_sql(f"""
        INSERT INTO s3_table
        SELECT * FROM my_table
    """)

# Execute the Flink job
env.execute('MySQL CDC to S3 Flink SQL J

And finally, I try to submit this to the Flink by executing the command on my local:

flink run -py flink_job.py -pyFiles requirements.txt 

But Flink returns an error:

.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.my_table'.
Table options are:

'connector'='mysql-cdc'
'database-name'='xxx'
'hostname'='xxx'
'password'='******'
'port'='xxx'
'table-name'='xxx'
'username'='xxx'
...
Caused by: .apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements '.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
python-input-format
...

Why I get this error even I've copy thoese .jar files to the lib dir of Flink? How should I submit the job to Flink?

转载请注明原文地址:http://conceptsofalgorithm.com/Algorithm/1745057903a282515.html

最新回复(0)