




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
第python结合shell自动创建kafka的连接器实战教程目录环境安装连接oracle的python包获取oracle表信息去掉多余部分shell脚本处理表信息文件增强版处理表信息脚本
环境
cat/etc/redhat-release
CentOSLinuxrelease7.5.1804(Core)
[root@localhost~]#uname-a
Linuxlocalhost.localdomain3.10.0-862.el7.x86_64#1SMPFriApr2016:44:24UTC2025x86_64x86_64x86_64GNU/Linux
python-V
Python2.7.5
安装连接oracle的python包
pipinstallcx_Oracle==7.3
获取oracle表信息
catquery_oracle.py
#!/usr/bin/envpython
importcx_Oracle
importsys
importos
importcsv
importtraceback
file=open("oracle.txt",'w').close()
user="test"
passwd="test"
listener='5:1521/orcl'
conn=cx_Oracle.connect(user,passwd,listener)
cursor=conn.cursor()
sql="selecttable_namefromuser_tables"
cursor.execute(sql)
LIST1=[]
whileTrue:
row=cursor.fetchone()
ifrow==None:
break
fortableinrow:
#printtable
LIST1.append(table)
LIST2=[]
foriinLIST1:
sql3="selectCOLUMN_NAME,DATA_TYPE,DATA_PRECISION,DATA_SCALEfromcolsWHERETABLE_name=upper('%s')"%i
cursor.execute(sql3)
cursor.execute(sql3)
row3=cursor.fetchall()
fordatainrow3:
#LIST2.append(i)
LIST2.extend(list(data))
LIST2.append(i)
f=open('oracle.txt','a+')
printf,LIST2
LIST2=[]
#f=open('test.txt','a+')
#selecttable_name,column_name,DATA_TYPEfromcolsWHERETABLE_name=upper('student');
#selectcolumn_name,DATA_TYPEfromcolsWHERETABLE_name=upper('student');
去掉多余部分
catauto.sh
#!/bin/bash
#pythonquery_oracle.py|tr","''|tr"'"''|tr"["""|tr"]"""
#oracle.txt
oracle_tables.txt
catoracle.txt|tr"[],'"""|sed"s#[][]*##g"|sed's/^[\t]*//g'oracle_tables.txt
catoracle_tables.txt
SNONUMBER190SNAMEVARCHAR2NoneNoneSSEXVARCHAR2NoneNoneSBIRTHDAYDATENoneNoneSCLASSVARCHAR2NoneNoneSTUDENTDATE_DATE
SNO2NUMBER190SNAMEVARCHAR2NoneNoneSSEXVARCHAR2NoneNoneSBIRTHDAYDATENoneNoneSCLASSVARCHAR2NoneNoneSTUDENT2INPUT_TIME
SNO3NUMBER192SNAMEVARCHAR2NoneNoneSSEXVARCHAR2NoneNoneSBIRTHDAYDATENoneNoneSCLASSVARCHAR2NoneNoneSTUDENT3DATA_DATE
shell脚本处理表信息文件
catconnect.sh
#!/bin/bash
#获取临时文件的行数
FILE_NUM=$(catoracle_tables.txt|egrep-v'#|^$'|wc-l)
#清空自动创建连接器的脚本
create-connect.sh
#循环临时文件每一行
foriin`seq$FILE_NUM`
FILE_LINE=$(sed-n${i}poracle_tables.txt)
TABLE_NAME=$(echo${FILE_LINE}|sed's/[\t]*$//g'|awk'{print$(NF-1)}')
COL_NUM=$(echo${FILE_LINE}|sed's/[\t]*$//g'|awk-F"[]"'{printNF}')
REAL_COL_NUM=`expr$COL_NUM-2`
#清空临时文件
${TABLE_NAME}.txt
${TABLE_NAME}.sql
#循环临时文件每行列名所在的列
forjin`seq14$REAL_COL_NUM`
k=`expr$j+1`
m=`expr$j+2`
n=`expr$j+3`
COL_NAME=$(echo$FILE_LINE|cut-d""-f${j})
COL_DATA_TYPE=$(echo$FILE_LINE|cut-d""-f${k})
COL_DATA_PRECISION=$(echo$FILE_LINE|cut-d""-f${m})
COL_DATA_SCALE=$(echo$FILE_LINE|cut-d""-f${n})
#判断列的数据类型是否是NUMBER
if["$COL_DATA_TYPE"="NUMBER"]
then
#循环拼接SQL查询中的CAST(*AS*)AS*部分,追加到临时文件中
echo"CAST($COL_NAMEAS$COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE))AS$COL_NAME"${TABLE_NAME}.txt
else
#循环拼接SQL查询中的列名部分,追加到临时文件中
echo"$COL_NAME"${TABLE_NAME}.txt
done
#拼接完整的SQL语句,追加到临时文件中
echo"select$(cat${TABLE_NAME}.txt|tr"\n"","|sed-e's/,$/\n/')from$TABLE_NAMEwhere$(sed-n${i}poracle_tables.txt|cut-d''-f$COL_NUM)=trunc(sysdate-2)and$(sed-n${i}poracle_tables.txt|cut-d''-f$COL_NUM)trunc(sysdate-1)"${TABLE_NAME}.sql
#循环追加每个表对应的连接器到自动创建连接器的脚本中
catcreate-connect.shEOF
curl-XPOSThttp://localhost:8083/connectors-H"Content-Type:application/json"-d'{
"name":"jdbc_source_$TABLE_NAME",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:oracle:thin:@{{ORACLE_IP}}:{{ORACLE_PORT}}:orcl",
"connection.user":"{{ORACLE_USER}}",
"connection.password":"{{ORACLE_PASSWD}}",
"topic.prefix":"YC_$TABLE_NAME",
"mode":"{{CONNECT_MODE}}",
"query":"$(cat${TABLE_NAME}.sql)"
}'/dev/null21
说明:脚本中{{变量名}}部分的内容是获取ansible中的变量,这个脚本是和ansible结合使用的。
增强版处理表信息脚本
#!/bin/bash
#获取临时文件的行数
FILE_NUM=$(catoracle_time_tables.txt|egrep-v'#|^$'|wc-l)
#清空创建连接器的脚本并追加echos函数
create-jdbc-connect.sh
catcreate-jdbc-connect.shEOF
#!/bin/bash
echos(){
case\$1in
red)echo-e"\033[31m\$2\033[0m";;
green)echo-e"\033[32m\$2\033[0m";;
yellow)echo-e"\033[33m\$2\033[0m";;
blue)echo-e"\033[34m\$2\033[0m";;
purple)echo-e"\033[35m\$2\033[0m";;
*)echo"\$2";;
create-jdbc-connect-time.sh
catcreate-jdbc-connect-time.shEOF
#!/bin/bash
echos(){
case\$1in
red)echo-e"\033[31m\$2\033[0m";;
green)echo-e"\033[32m\$2\033[0m";;
yellow)echo-e"\033[33m\$2\033[0m";;
blue)echo-e"\033[34m\$2\033[0m";;
purple)echo-e"\033[35m\$2\033[0m";;
*)echo"\$2";;
#创建表相关文件目录
mkdir-p./TABLE_TIME
#循环临时文件每一行
foriin`seq$FILE_NUM`
FILE_LINE=$(sed-n${i}poracle_time_tables.txt)
TABLE_NAME=$(echo${FILE_LINE}|sed's/[\t]*$//g'|awk'{print$(NF)}')
COL_NUM=$(echo${FILE_LINE}|sed's/[\t]*$//g'|awk-F"[]"'{printNF}')
REAL_COL_NUM=`expr$COL_NUM-2`
#清空临时文件
./TABLE_TIME/${TABLE_NAME}_time.txt
./TABLE_TIME/${TABLE_NAME}_time.sql
./TABLE_TIME/${TABLE_NAME}.sql
#循环临时文件每行列名所在的列
forjin`seq14$REAL_COL_NUM`
k=`expr$j+1`
m=`expr$j+2`
n=`expr$j+3`
COL_NAME=$(echo$FILE_LINE|cut-d""-f${j})
COL_DATA_TYPE=$(echo$FILE_LINE|cut-d""-f${k})
COL_DATA_PRECISION=$(echo$FILE_LINE|cut-d""-f${m})
COL_DATA_SCALE=$(echo$FILE_LINE|cut-d""-f${n})
#判断列的数据类型是否是NUMBER
if["$COL_DATA_TYPE"="NUMBER"]
then
#循环拼接SQL查询中的CAST(*AS*)AS*部分,追加到临时文件中
echo"CAST($COL_NAMEAS$COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE))AS$COL_NAME"./TABLE_TIME/${TABLE_NAME}_time.txt
else
#循环拼接SQL查询中的列名部分,追加到临时文件中
echo"$COL_NAME"./TABLE_TIME/${TABLE_NAME}_time.txt
#判断是否存在hosts中定义的时间列,如果有就追加该列名进一个临时文件中
TIME_COL=({{TABLE_TIME_COL}})
forTIMEin${TIME_COL[@]}
if["$COL_NAME"="$TIME"]
then
echo"$COL_NAME"./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt
done
done
#拼接完整的SQL语句,追加到临时文件中
if[-f"./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt"]
then
#echo"select$(cat./TABLE_TIME/${TABLE_NAME}.txt|tr"\n"","|sed-e's/,$/\n/')from{{ORACLE_TABLES_USER}}.$TABLE_NAMEwhere$(sed-n${i}poracle_tables.txt|cut-d''-f$COL_NUM)=trunc(sysdate-2)and$(sed-n${i}poracle_tables.txt|cut-d''-f$COL_NUM)trunc(sysdate-1)"./TABLE_TIME/${TABLE_NAME}_time.sql
echo"select$(cat./TABLE_TIME/${TABLE_NAME}_time.txt|tr"\n"","|sed-e's/,$/\n/')from{{ORACLE_TABLES_USER}}.$TABLE_NAMEwhere$(cat./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)=trunc(sysdate-2)and$(cat./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)trunc(sysdate-1)"./TABLE_TIME/${TABLE_NAME}_time.sql
else
echo"select$(cat./TABLE_TIME/${TABLE_NAME}_time.txt|tr"\n"","|sed-e's/,$/\n/')from{{ORACLE_TABLES_USER}}.$TABLE_NAME"./TABLE_TIME/${TABLE_NAME}.sql
#循环追加每个表对应的连接器到自动创建连接器的脚本中
if[-f"./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt"]
catcreate-jdbc-connect-time.shEOF
#创建表$TABLE_NAME连接器的命令如下
curl-XPOSThttp://localhost:8083/connectors-H"Content-Type:application/json"-d'{
"name":"jdbc_time_$TABLE_NAME",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:oracle:thin:@{{ORACLE_IP}}:{{ORACLE_PORT}}:{{ORACLE_SERVER_NAME}}",
"connection.user":"{{ORACLE_USER}}",
"connection.password":"{{ORACLE_PASSWD}}",
"topic.prefix":"YC_${TABLE_NAME}_INSERT",
"erval.ms":"86400000",
"mode":"{{CONNECT_MODE}}",
"numeric.mapping":"best_fit",
"query":"$(cat./TABLE_TIME/${TABLE_NAME}_time.sql)"
}'/dev/null21
#判断连接器是否创建成功
if[\$-eq0]
echosgreen"\$(date+"%F%H:%M:%S")创建jdbc_time_${TABLE_NAME}连
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 中外购物合同范例
- 价格保密协议合同范例
- 360劳务合同标准文本
- 产品长期合作合同范例
- 校车安全与健康教育融合计划
- 2025年制造业智能化升级计划
- 大型公共设施施工进度措施
- 物业管理材料质量监控措施
- 电力工程脚手架搭设的安全技术措施
- 小学班长职责与领导力发展
- LY/T 2676-2016半干旱地区灌木林平茬与复壮技术规范
- 装配式混凝土结构的构件安装分项工程(验收批)质量验收记录表
- 2021年中原工学院辅导员招聘笔试试题及答案解析
- 作业许可检查表
- 湘美版美术三年级下册 《渔家乐-蟳埔情》课件(共20张PPT)
- 农产品集中交易市场等级技术规范-编制说明
- 张京16分钟中英文对照翻译稿
- 武汉绿地中心项目技术管理策划书(48页)
- 油田相关业务的税制及税率
- 北师大版物理八年级下册课课练:专题训练 透镜的相关作图(含答案)
- 《国际经济法》案例思考题
评论
0/150
提交评论