python结合shell自动创建kafka的连接器实战教程_第1页
python结合shell自动创建kafka的连接器实战教程_第2页
python结合shell自动创建kafka的连接器实战教程_第3页
python结合shell自动创建kafka的连接器实战教程_第4页
python结合shell自动创建kafka的连接器实战教程_第5页
已阅读5页,还剩5页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第python结合shell自动创建kafka的连接器实战教程目录环境安装连接oracle的python包获取oracle表信息去掉多余部分shell脚本处理表信息文件增强版处理表信息脚本

环境

cat/etc/redhat-release

CentOSLinuxrelease7.5.1804(Core)

[root@localhost~]#uname-a

Linuxlocalhost.localdomain3.10.0-862.el7.x86_64#1SMPFriApr2016:44:24UTC2025x86_64x86_64x86_64GNU/Linux

python-V

Python2.7.5

安装连接oracle的python包

pipinstallcx_Oracle==7.3

获取oracle表信息

catquery_oracle.py

#!/usr/bin/envpython

importcx_Oracle

importsys

importos

importcsv

importtraceback

file=open("oracle.txt",'w').close()

user="test"

passwd="test"

listener='5:1521/orcl'

conn=cx_Oracle.connect(user,passwd,listener)

cursor=conn.cursor()

sql="selecttable_namefromuser_tables"

cursor.execute(sql)

LIST1=[]

whileTrue:

row=cursor.fetchone()

ifrow==None:

break

fortableinrow:

#printtable

LIST1.append(table)

LIST2=[]

foriinLIST1:

sql3="selectCOLUMN_NAME,DATA_TYPE,DATA_PRECISION,DATA_SCALEfromcolsWHERETABLE_name=upper('%s')"%i

cursor.execute(sql3)

cursor.execute(sql3)

row3=cursor.fetchall()

fordatainrow3:

#LIST2.append(i)

LIST2.extend(list(data))

LIST2.append(i)

f=open('oracle.txt','a+')

printf,LIST2

LIST2=[]

#f=open('test.txt','a+')

#selecttable_name,column_name,DATA_TYPEfromcolsWHERETABLE_name=upper('student');

#selectcolumn_name,DATA_TYPEfromcolsWHERETABLE_name=upper('student');

去掉多余部分

catauto.sh

#!/bin/bash

#pythonquery_oracle.py|tr","''|tr"'"''|tr"["""|tr"]"""

#oracle.txt

oracle_tables.txt

catoracle.txt|tr"[],'"""|sed"s#[][]*##g"|sed's/^[\t]*//g'oracle_tables.txt

catoracle_tables.txt

SNONUMBER190SNAMEVARCHAR2NoneNoneSSEXVARCHAR2NoneNoneSBIRTHDAYDATENoneNoneSCLASSVARCHAR2NoneNoneSTUDENTDATE_DATE

SNO2NUMBER190SNAMEVARCHAR2NoneNoneSSEXVARCHAR2NoneNoneSBIRTHDAYDATENoneNoneSCLASSVARCHAR2NoneNoneSTUDENT2INPUT_TIME

SNO3NUMBER192SNAMEVARCHAR2NoneNoneSSEXVARCHAR2NoneNoneSBIRTHDAYDATENoneNoneSCLASSVARCHAR2NoneNoneSTUDENT3DATA_DATE

shell脚本处理表信息文件

catconnect.sh

#!/bin/bash

#获取临时文件的行数

FILE_NUM=$(catoracle_tables.txt|egrep-v'#|^$'|wc-l)

#清空自动创建连接器的脚本

create-connect.sh

#循环临时文件每一行

foriin`seq$FILE_NUM`

FILE_LINE=$(sed-n${i}poracle_tables.txt)

TABLE_NAME=$(echo${FILE_LINE}|sed's/[\t]*$//g'|awk'{print$(NF-1)}')

COL_NUM=$(echo${FILE_LINE}|sed's/[\t]*$//g'|awk-F"[]"'{printNF}')

REAL_COL_NUM=`expr$COL_NUM-2`

#清空临时文件

${TABLE_NAME}.txt

${TABLE_NAME}.sql

#循环临时文件每行列名所在的列

forjin`seq14$REAL_COL_NUM`

k=`expr$j+1`

m=`expr$j+2`

n=`expr$j+3`

COL_NAME=$(echo$FILE_LINE|cut-d""-f${j})

COL_DATA_TYPE=$(echo$FILE_LINE|cut-d""-f${k})

COL_DATA_PRECISION=$(echo$FILE_LINE|cut-d""-f${m})

COL_DATA_SCALE=$(echo$FILE_LINE|cut-d""-f${n})

#判断列的数据类型是否是NUMBER

if["$COL_DATA_TYPE"="NUMBER"]

then

#循环拼接SQL查询中的CAST(*AS*)AS*部分,追加到临时文件中

echo"CAST($COL_NAMEAS$COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE))AS$COL_NAME"${TABLE_NAME}.txt

else

#循环拼接SQL查询中的列名部分,追加到临时文件中

echo"$COL_NAME"${TABLE_NAME}.txt

done

#拼接完整的SQL语句,追加到临时文件中

echo"select$(cat${TABLE_NAME}.txt|tr"\n"","|sed-e's/,$/\n/')from$TABLE_NAMEwhere$(sed-n${i}poracle_tables.txt|cut-d''-f$COL_NUM)=trunc(sysdate-2)and$(sed-n${i}poracle_tables.txt|cut-d''-f$COL_NUM)trunc(sysdate-1)"${TABLE_NAME}.sql

#循环追加每个表对应的连接器到自动创建连接器的脚本中

catcreate-connect.shEOF

curl-XPOSThttp://localhost:8083/connectors-H"Content-Type:application/json"-d'{

"name":"jdbc_source_$TABLE_NAME",

"config":{

"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",

"connection.url":"jdbc:oracle:thin:@{{ORACLE_IP}}:{{ORACLE_PORT}}:orcl",

"connection.user":"{{ORACLE_USER}}",

"connection.password":"{{ORACLE_PASSWD}}",

"topic.prefix":"YC_$TABLE_NAME",

"mode":"{{CONNECT_MODE}}",

"query":"$(cat${TABLE_NAME}.sql)"

}'/dev/null21

说明:脚本中{{变量名}}部分的内容是获取ansible中的变量,这个脚本是和ansible结合使用的。

增强版处理表信息脚本

#!/bin/bash

#获取临时文件的行数

FILE_NUM=$(catoracle_time_tables.txt|egrep-v'#|^$'|wc-l)

#清空创建连接器的脚本并追加echos函数

create-jdbc-connect.sh

catcreate-jdbc-connect.shEOF

#!/bin/bash

echos(){

case\$1in

red)echo-e"\033[31m\$2\033[0m";;

green)echo-e"\033[32m\$2\033[0m";;

yellow)echo-e"\033[33m\$2\033[0m";;

blue)echo-e"\033[34m\$2\033[0m";;

purple)echo-e"\033[35m\$2\033[0m";;

*)echo"\$2";;

create-jdbc-connect-time.sh

catcreate-jdbc-connect-time.shEOF

#!/bin/bash

echos(){

case\$1in

red)echo-e"\033[31m\$2\033[0m";;

green)echo-e"\033[32m\$2\033[0m";;

yellow)echo-e"\033[33m\$2\033[0m";;

blue)echo-e"\033[34m\$2\033[0m";;

purple)echo-e"\033[35m\$2\033[0m";;

*)echo"\$2";;

#创建表相关文件目录

mkdir-p./TABLE_TIME

#循环临时文件每一行

foriin`seq$FILE_NUM`

FILE_LINE=$(sed-n${i}poracle_time_tables.txt)

TABLE_NAME=$(echo${FILE_LINE}|sed's/[\t]*$//g'|awk'{print$(NF)}')

COL_NUM=$(echo${FILE_LINE}|sed's/[\t]*$//g'|awk-F"[]"'{printNF}')

REAL_COL_NUM=`expr$COL_NUM-2`

#清空临时文件

./TABLE_TIME/${TABLE_NAME}_time.txt

./TABLE_TIME/${TABLE_NAME}_time.sql

./TABLE_TIME/${TABLE_NAME}.sql

#循环临时文件每行列名所在的列

forjin`seq14$REAL_COL_NUM`

k=`expr$j+1`

m=`expr$j+2`

n=`expr$j+3`

COL_NAME=$(echo$FILE_LINE|cut-d""-f${j})

COL_DATA_TYPE=$(echo$FILE_LINE|cut-d""-f${k})

COL_DATA_PRECISION=$(echo$FILE_LINE|cut-d""-f${m})

COL_DATA_SCALE=$(echo$FILE_LINE|cut-d""-f${n})

#判断列的数据类型是否是NUMBER

if["$COL_DATA_TYPE"="NUMBER"]

then

#循环拼接SQL查询中的CAST(*AS*)AS*部分,追加到临时文件中

echo"CAST($COL_NAMEAS$COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE))AS$COL_NAME"./TABLE_TIME/${TABLE_NAME}_time.txt

else

#循环拼接SQL查询中的列名部分,追加到临时文件中

echo"$COL_NAME"./TABLE_TIME/${TABLE_NAME}_time.txt

#判断是否存在hosts中定义的时间列,如果有就追加该列名进一个临时文件中

TIME_COL=({{TABLE_TIME_COL}})

forTIMEin${TIME_COL[@]}

if["$COL_NAME"="$TIME"]

then

echo"$COL_NAME"./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt

done

done

#拼接完整的SQL语句,追加到临时文件中

if[-f"./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt"]

then

#echo"select$(cat./TABLE_TIME/${TABLE_NAME}.txt|tr"\n"","|sed-e's/,$/\n/')from{{ORACLE_TABLES_USER}}.$TABLE_NAMEwhere$(sed-n${i}poracle_tables.txt|cut-d''-f$COL_NUM)=trunc(sysdate-2)and$(sed-n${i}poracle_tables.txt|cut-d''-f$COL_NUM)trunc(sysdate-1)"./TABLE_TIME/${TABLE_NAME}_time.sql

echo"select$(cat./TABLE_TIME/${TABLE_NAME}_time.txt|tr"\n"","|sed-e's/,$/\n/')from{{ORACLE_TABLES_USER}}.$TABLE_NAMEwhere$(cat./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)=trunc(sysdate-2)and$(cat./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)trunc(sysdate-1)"./TABLE_TIME/${TABLE_NAME}_time.sql

else

echo"select$(cat./TABLE_TIME/${TABLE_NAME}_time.txt|tr"\n"","|sed-e's/,$/\n/')from{{ORACLE_TABLES_USER}}.$TABLE_NAME"./TABLE_TIME/${TABLE_NAME}.sql

#循环追加每个表对应的连接器到自动创建连接器的脚本中

if[-f"./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt"]

catcreate-jdbc-connect-time.shEOF

#创建表$TABLE_NAME连接器的命令如下

curl-XPOSThttp://localhost:8083/connectors-H"Content-Type:application/json"-d'{

"name":"jdbc_time_$TABLE_NAME",

"config":{

"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",

"connection.url":"jdbc:oracle:thin:@{{ORACLE_IP}}:{{ORACLE_PORT}}:{{ORACLE_SERVER_NAME}}",

"connection.user":"{{ORACLE_USER}}",

"connection.password":"{{ORACLE_PASSWD}}",

"topic.prefix":"YC_${TABLE_NAME}_INSERT",

"erval.ms":"86400000",

"mode":"{{CONNECT_MODE}}",

"numeric.mapping":"best_fit",

"query":"$(cat./TABLE_TIME/${TABLE_NAME}_time.sql)"

}'/dev/null21

#判断连接器是否创建成功

if[\$-eq0]

echosgreen"\$(date+"%F%H:%M:%S")创建jdbc_time_${TABLE_NAME}连

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论