admin 发表于 2023-11-1 20:41:52

Python实现Oracle数据库同步

# -*- coding: utf-8 -*-
from WindPy import *
from datetime import datetime, timedelta
import time
import cx_Oracle
import sys,traceback
import os
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'

year = int(datetime.now().strftime('%Y'))
today = int(datetime.now().strftime('%Y%m%d'))
dayOfWeek = datetime.strptime(str(today),"%Y%m%d").date().weekday() + 1
print("today:%d,dayOfWeek:%d" %(today,dayOfWeek))

def getDataFromSjzx(today,sql,param,callFunName):
conn_sjzx = cx_Oracle.connect('stage/stage@10.201.200.29:1521/orcl')
cur_sjzx = conn_sjzx.cursor()
cur_sjzx.prepare(sql)
print("传入参数:%s" % (callFunName,param))
print("传入Sql语句:%s" % (callFunName,sql))
print("ips绑定变量:%s" % (callFunName,cur_sjzx.bindnames()))
cur_sjzx.execute(None, param)
fromData = cur_sjzx.fetchall()
desc = cur_sjzx.description
cur_sjzx.close()
conn_sjzx.close()
length = len(desc)
dataListFromSjzx = []
for data in fromData:
    row = {}
    for i in range(length):
      if data is not None:
      row.lower()] = data
    dataListFromSjzx.append(row)
print("日期查询[%d]总记录数:%d" % (callFunName ,today, len(dataListFromSjzx)))
return dataListFromSjzx

def updateDataToIps(sql,param,callFunName):
conn_ips = cx_Oracle.connect('ips/ips@10.201.200.42:1521/xjsdb')
cursor_ips = conn_ips.cursor()
cursor_ips.prepare(sql)
#print("传入参数:%s" % (param))
print("传入Sql语句:%s" % (callFunName,sql))
print("ips绑定变量:%s" % (callFunName,cursor_ips.bindnames()))
cursor_ips.executemany(None, param)
conn_ips.commit()
cursor_ips.close()
conn_ips.close()

def updateDeleteDataToIps(delete_sql,delete_param,insert_sql,insert_param,callFunName):
conn_ips = cx_Oracle.connect('ips/ips@10.201.200.42:1521/xjsdb')
cursor_ips = conn_ips.cursor()
cursor_ips.prepare(delete_sql)
print("传入Sql语句(删除):%s" % (callFunName,delete_sql))
print("ips绑定变量(删除):%s" % (callFunName,cursor_ips.bindnames()))
cursor_ips.execute(None,delete_param)
cursor_ips.prepare(insert_sql)
print("传入Sql语句(插入):%s" % (callFunName,insert_sql))
print("ips绑定变量(插入):%s" % (callFunName,cursor_ips.bindnames()))
cursor_ips.executemany(None, insert_param)
conn_ips.commit()
cursor_ips.close()
conn_ips.close()

def synTradeDayData():
select_sql = "select t.trade_days, t.s_info_exchmarket " \
               "from wind2_asharecalendar t " \
               " where to_number(t.trade_days) >= :today " \
               "   and t.s_info_exchmarket = :marketType " \
               " order by to_number(t.trade_days) "
param = {}
marketType = "SSE"
param["today"] = today
param["marketType"] = marketType
dataListFromSjzx = []
dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synTradeDayData")
dataListToIps = []
dataListToIps = fillTradeDay(dataListFromSjzx)
insert_sql = "insert into t_trade_day (l_trade_date,vc_market_type) values (:tradeDay,:marketType)"
delete_sql = "delete from t_trade_day a where a.l_trade_date >= :today and a.vc_market_type = :marketType"
if (dataListToIps is not None and len(dataListToIps) > 0):
    updateDeleteDataToIps(delete_sql,param,insert_sql,dataListToIps,"synTradeDayData")

def fillTradeDay(dataListFromSjzx):
dataListToIps = []
for data in dataListFromSjzx:
    result = {}
    result["tradeDay"] = data['trade_days']
    result["marketType"] = data['s_info_exchmarket']
    dataListToIps.append(result)
return dataListToIps

def synSpecialNonTradedayData():
if (dayOfWeek==6 or dayOfWeek==7):
    print("日期[%d]为周%d,无需进行同步处理" % (today,dayOfWeek))
    return
select_sql = "select nvl(count(1),0) l_count " \
               "from wind2_asharecalendar t " \
               " where to_number(t.trade_days) = :today " \
               "   and t.s_info_exchmarket = :marketType "
param = {}
marketType = "SSE"
param["today"] = today
param["marketType"] = marketType
dataListFromSjzx = []
dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synSpecialNonTradedayData")
if (dataListFromSjzx is not None and dataListFromSjzx["l_count"] > 0):
    print("日期[%d]为周%d且为交易日,无需进行同步处理" % (today, dayOfWeek))
    return
dataListToIps = []
dataListToIps = fillSpecialNonTradeday(dataListFromSjzx)
delete_sql = "delete from t_special_non_tradeday a where a.l_trade_date >= :today and a.vc_market_type = :marketType"
insert_sql = "insert into t_special_non_tradeday (l_trade_date,vc_market_type,l_day_week) values (:tradeDay,:marketType,:dayOfWeek)"
if (dataListToIps is not None and len(dataListToIps) > 0):
    updateDeleteDataToIps(delete_sql, param, insert_sql, dataListToIps, "synSpecialNonTradedayData")

def fillSpecialNonTradeday(dataListFromSjzx):
dataListToIps = []
result = {}
result["tradeDay"] = today
result["marketType"] = "SSE"
result["dayOfWeek"] = dayOfWeek
dataListToIps.append(result)
return dataListToIps

def synStockInfoWind():
select_sql = "select a.ob_object_id      vc_object_id, " \
               "       a.f1_0001         vc_wind_code, " \
               "       a.f4_0001         vc_stock_code, " \
               "       a.f6_0001         vc_stock_name, " \
               "       a.f10_0001          vc_type_wind, " \
               "       a.f11_0001          vc_type_detail_wind, " \
               "       a.f12_0001          vc_stock_type, " \
               "       a.f14_0001          vc_market_type, " \
               "       a.f16_0001          vc_stock_id_wind, " \
               "       a.f17_0001          vc_company_id_wind, " \
               "       to_number(b.s_info_listdate)   l_listing_date, " \
               "       nvl(to_number(b.s_info_delistdate),99999999) l_delisting_date " \
               "from wind_tb_object_0001 a, wind2_asharedescription b " \
               " where a.f12_0001 = :stockType " \
               "   and a.f1_0001 = b.s_info_windcode " \
               "   and b.s_info_listdate is not null " \
               "   and (to_number(b.s_info_listdate) >= :today or " \
               "       (b.s_info_delistdate is not null and to_number(b.s_info_delistdate) >= :today)) "
param = {}
stockType = "A"
param["today"] = today
param["stockType"] = stockType
dataListFromSjzx = []
dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synStockInfoWind")
dataListToIps = []
deleteStockCodeStr = ""
(dataListToIps,deleteStockCodeStr) = fillStockInfoWind(dataListFromSjzx)
print("待删除数据:%s" % (deleteStockCodeStr))
delete_sql = "delete from t_stock_info_wind where vc_wind_code in ("+deleteStockCodeStr+")"
insert_sql = "insert into t_stock_info_wind ( "\
               "    vc_object_id ,vc_wind_code ,vc_stock_code ,vc_stock_name , "\
               "    vc_type_wind ,vc_type_detail_wind ,vc_stock_type ,vc_market_type , "\
               "    vc_stock_id_wind ,vc_company_id_wind ,l_listing_date ,l_delisting_date) "\
               "values (:objectId,:windCode,:stockCode,:stockName, "\
               "    :typeWind,:typeDetailWind,:stockType,:marketType, "\
               "    :stockIdWind,:companyIdWind,:listingDate,:delistingDate) "
if (dataListToIps is not None and len(dataListToIps) > 0):
    if (deleteStockCodeStr is not None and len(deleteStockCodeStr) > 0):
      updateDeleteDataToIps(delete_sql, {}, insert_sql, dataListToIps, "synStockInfoWind")
    else:
      updateDataToIps(insert_sql, dataListToIps, "synStockInfoWind")

def fillStockInfoWind(dataListFromSjzx):
dataListToIps = []
deleteStockCodeStr = ""
for data in dataListFromSjzx:
    result = {}
    result["objectId"] = data["vc_object_id"]
    result["windCode"] = data["vc_wind_code"]
    result["stockCode"] = data["vc_stock_code"]
    result["stockName"] = data["vc_stock_name"]
    result["typeWind"] = data["vc_type_wind"]
    result["typeDetailWind"] = data["vc_type_detail_wind"]
    result["stockType"] = data["vc_stock_type"]
    result["marketType"] = data["vc_market_type"]
    result["stockIdWind"] = data["vc_stock_id_wind"]
    result["companyIdWind"] = data["vc_company_id_wind"]
    result["listingDate"] = data["l_listing_date"]
    result["delistingDate"] = data["l_delisting_date"]
    if (result["delistingDate"] is not None and result["delistingDate"] > 0 and result["delistingDate"] < 99999999):
      if (deleteStockCodeStr is None or len(deleteStockCodeStr) <= 0):
      deleteStockCodeStr = "'" + result["windCode"] + "'"
      else:
      deleteStockCodeStr = deleteStockCodeStr + "," + "'" + result["windCode"] + "'"
    dataListToIps.append(result)
return (dataListToIps,deleteStockCodeStr)

def synWind2AshareeodpricesData():
table = "wind2_ashareeodprices_" + str(year)
select_sql = "select a.object_id,a.s_info_windcode,nvl(to_number(a.trade_dt),0) trade_dt,a.crncy_code, " \
               "       nvl(to_number(a.s_dq_preclose),0) s_dq_preclose, " \
               "       nvl(to_number(a.s_dq_open),0) s_dq_open, " \
               "       nvl(to_number(a.s_dq_close),0) s_dq_close, " \
               "       nvl(to_number(a.s_dq_volume),0) s_dq_volume, " \
               "       nvl(to_number(a.s_dq_amount),0) s_dq_amount, " \
                "      a.s_dq_tradestatus, " \
               "       a.l_createdate,a.l_createtime " \
               "from " + table + " a " \
               " where to_number(a.trade_dt) >= :today"
param = {}
param["today"] = today
dataListFromSjzx = []
dataListFromSjzx = getDataFromSjzx(today,select_sql,param,"synWind2AshareeodpricesData")
dataListToIps = []
dataListToIps = fillWind2AshareeodpricesData(dataListFromSjzx)
insert_sql = "insert into " + table + "_bak ( " \
               "         object_id,s_info_windcode,trade_dt,crncy_code, " \
               "         s_dq_preclose,s_dq_open,s_dq_close,s_dq_volume,s_dq_amount, " \
               "         s_dq_tradestatus,l_createdate,l_createtime) " \
               "values(:objectId,:windcode,:tradeDt,:crncyCode, " \
               "         :dqPreclose,:dqOpen,:dqClose,:dqVolume,:dqAmount, " \
               "         :dqTradestatus,:createDate,:createTime)"
if (dataListToIps is not None and len(dataListToIps) > 0):
    updateDataToIps(insert_sql, dataListToIps, "synWind2AshareeodpricesData")

def fillWind2AshareeodpricesData(dataListFromSjzx):
dataListToIps = []
for data in dataListFromSjzx:
    result = {}
    result["objectId"] = data["object_id"]
    result["windcode"] = data["s_info_windcode"]
    result["tradeDt"] = data["trade_dt"]
    result["crncyCode"] = data["crncy_code"]
    result["dqPreclose"] = data["s_dq_preclose"]
    result["dqOpen"] = data["s_dq_open"]
    result["dqClose"] = data["s_dq_close"]
    result["dqVolume"] = data["s_dq_volume"]
    result["dqAmount"] = data["s_dq_amount"]
    result["dqTradestatus"] = data["s_dq_tradestatus"]
    result["createDate"] = data["l_createdate"]
    result["createTime"] = data["l_createtime"]
    dataListToIps.append(result)
return dataListToIps

def synAshareDividendData():
select_sql = "select a.s_info_windcode vc_wind_code, "\
               "       a.ex_dt l_ex_date, "\
               "       sum(a.cash_dvd_per_sh_pre_tax) en_cash_dvd_per_sh_pre_tax, "\
               "       sum(a.cash_dvd_per_sh_after_tax) en_cash_dvd_per_sh_after_tax "\
               "   from wind2_asharedividend a "\
               " where a.ex_dt is not null "\
               "   and to_number(a.ex_dt) >= :today "\
               "   group by a.s_info_windcode, a.ex_dt order by a.ex_dt desc"
param = {}
param["today"] = today
dataListFromSjzx = []
dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synAshareDividendData")
dataListToIps = []
dataListToIps = fillAshareDividendData(dataListFromSjzx)
insert_sql = "insert into t_ashare_dividend ( "\
               "         vc_wind_code,l_ex_date,en_cash_dvd_per_sh_pre_tax,en_cash_dvd_per_sh_after_tax) "\
               "values(:windCode,:exDate,:cashDvdPerShPreTax,:cashDvdPerShAfterTax)"
if (dataListToIps is not None and len(dataListToIps) > 0):
    updateDataToIps(insert_sql, dataListToIps, "synAshareDividendData")

def fillAshareDividendData(dataListFromSjzx):
dataListToIps = []
for data in dataListFromSjzx:
    result = {}
    result["windCode"] = data["vc_wind_code"]
    result["exDate"] = data["l_ex_date"]
    result["cashDvdPerShPreTax"] = data["en_cash_dvd_per_sh_pre_tax"]
    result["cashDvdPerShAfterTax"] = data["en_cash_dvd_per_sh_after_tax"]
    dataListToIps.append(result)
return dataListToIps

def synCapitalStockWindData():
select_sql = "select capital.vc_wind_code, " \
               "       capital.l_change_date, " \
               "       capital.l_capital_type, " \
               "       capital.en_captial_value " \
               "from (select b.f1_0001 vc_wind_code, " \
               "               to_number(a.f50_1432) l_change_date, "\
               "               '1' l_capital_type, " \
               "               a.f27_1432 en_captial_value " \
               "          from wind_tb_object_1432 a, wind_tb_object_0001 b " \
               "      where a.f1_1432 = b.f17_0001 " \
               "          and b.f12_0001 = 'A' " \
               "          and a.ob_is_valid_1432 = '1' " \
               "          and to_number(a.f50_1432) >= :today " \
               "      union all " \
               "      select d.f1_0001 vc_wind_code, " \
               "               to_number(c.f2_1931) l_change_date, "\
               "               '2' l_capital_type, " \
               "               c.f4_1931 en_captial_value " \
               "          from wind_tb_object_1931 c, wind_tb_object_0001 d " \
               "      where c.f1_1931 = d.f16_0001 " \
               "          and d.f12_0001 = 'A' " \
               "          and to_number(c.f2_1931) >= :today) capital " \
               "order by capital.l_change_date"
param = {}
param["today"] = today
dataListFromSjzx = []
dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synCapitalStockWindData")
dataListToIps = []
dataListToIps = fillCapitalStockWindData(dataListFromSjzx)
insert_sql = "insert into t_capital_stock_wind ( "\
               "         vc_wind_code,l_change_date,l_capital_type,en_captial_value) "\
               "values(:windCode,:changeDate,:capitalType,:captialValue)"
if (dataListToIps is not None and len(dataListToIps) > 0):
    updateDataToIps(insert_sql, dataListToIps, "synCapitalStockWindData")

def fillCapitalStockWindData(dataListFromSjzx):
dataListToIps = []
for data in dataListFromSjzx:
    result = {}
    result["windCode"] = data["vc_wind_code"]
    result["changeDate"] = data["l_change_date"]
    result["capitalType"] = data["l_capital_type"]
    result["captialValue"] = data["en_captial_value"]
    dataListToIps.append(result)
return dataListToIps

def synSwIndustryClass():
select_sql = "select c.f1_0001 vc_wind_code, " \
               "       c.f6_0001 vc_stock_name, " \
               "       to_number(a.f3_1476) l_import_date, " \
               "       to_number(nvl(a.f4_1476,0)) l_remove_date, " \
               "       a.f5_1476 l_new_flag, " \
               "       b.levelnum-1 l_industry_level, " \
               "       (case when b.levelnum=2 then substr(a.f2_1476,1,4) " \
               "             when b.levelnum=3 then substr(a.f2_1476,1,6) " \
               "             when b.levelnum=4 then substr(a.f2_1476,1,8) " \
               "      else '' end) vc_industry_code, " \
               "       b.name vc_industry_name " \
               "from wind_tb_object_1476 a, wind_tb_object_1022 b, wind_tb_object_0001 c " \
               " where b.used = '1' " \
               "   and b.code like '61%' " \
               "   and c.f12_0001 = 'A' " \
               "   and a.f1_1476 = c.f16_0001 " \
               "   and (to_number(a.f3_1476) >= :today or" \
               "      to_number(nvl(a.f4_1476,0)) >= :today) " \
               "   and ((b.levelnum = 2 and substr(b.code,1,4) = substr(a.f2_1476,1,4)) or" \
               "      (b.levelnum = 3 and substr(b.code,1,6) = substr(a.f2_1476,1,6)) or" \
               "      (b.levelnum = 4 and substr(b.code,1,8) = substr(a.f2_1476,1,8))) " \
               "   order by b.levelnum "
param = {}
param["today"] = today
dataListFromSjzx = []
dataListFromSjzx = getDataFromSjzx(today, select_sql, param, "synSwIndustryClass")
dataListToIps = []
dataListToIps = fillSwIndustryClass(dataListFromSjzx)
insert_sql = "insert into t_sw_industry_class ( "\
               "         vc_wind_code,vc_stock_name,l_import_date,l_remove_date, "\
               "         l_new_flag,l_industry_level,vc_industry_code,vc_industry_name) "\
               "values(:windCode,:stockName,:importDate,:removeDate, "\
               "         :newFlag,:industryLevel,:industryCode,:industryName)"
if (dataListToIps is not None and len(dataListToIps) > 0):
    updateDataToIps(insert_sql, dataListToIps, "synSwIndustryClass")

def fillSwIndustryClass(dataListFromSjzx):
dataListToIps = []
for data in dataListFromSjzx:
    result = {}
    result["windCode"] = data["vc_wind_code"]
    result["stockName"] = data["vc_stock_name"]
    result["importDate"] = data["l_import_date"]
    result["removeDate"] = data["l_remove_date"]
    result["newFlag"] = data["l_new_flag"]
    result["industryLevel"] = data["l_industry_level"]
    result["industryCode"] = data["vc_industry_code"]
    result["industryName"] = data["vc_industry_name"]
    dataListToIps.append(result)
return dataListToIps

if __name__ == "__main__":
synTradeDayData()
synSpecialNonTradedayData()
synStockInfoWind()
synWind2AshareeodpricesData()
synAshareDividendData()
synCapitalStockWindData()
synSwIndustryClass()
页: [1]
查看完整版本: Python实现Oracle数据库同步