using HZQR.Common; using KafkaConsumer.Common; using SocketTransfer.SDK; using System; using System.Collections.Generic; using System.Configuration; using System.Data; namespace KafkaConsumer { class Program { protected static string _OracleConnStr = ConfigurationManager.AppSettings["OracleConnStr"].ToString(); protected static string RevenueRedisConfig = ConfigurationManager.AppSettings["RevenueRedisConfig"]; static void Main(string[] args) { string[] BootstrapServers = new string[] { "localhost:9092", "localhost:9093", "localhost:9094" };//kafka服务器 string groupId = ConfigurationManager.AppSettings["Kafka_groupId"];//分组 string topic = ConfigurationManager.AppSettings["Kafka_topic"];//主题 KafkaConsumerHelper consumer = new KafkaConsumerHelper(groupId, BootstrapServers); OracleHelper _OracleHelper = new OracleHelper(_OracleConnStr.Split(',')[0], _OracleConnStr.Split(',')[1], _OracleConnStr.Split(',')[2], _OracleConnStr.Split(',')[3]); int num = 0; string EST_STATEFEEDBACK_CODE = ""; try { consumer.ListenAsync(topic, result => { if (result != null) { //更新数据库 List ESTStateFeedbackModel = Common.CommonHelper.JSONStringToListUTC(result.Message); EST_STATEFEEDBACK_CODE = ESTStateFeedbackModel[0].SERVERPART_CODE + ESTStateFeedbackModel[0].SERVERPARTSHOP_CODE + ESTStateFeedbackModel[0].MACHINECODE; DataTable dtExist = _OracleHelper.ExcuteSqlGetDataSet(@"SELECT COUNT(1) FROM HIGHWAY_EXCHANGE.T_EST_STATEFEEDBACK WHERE EST_STATEFEEDBACK_CODE='" + EST_STATEFEEDBACK_CODE + "'").Tables[0]; string sql = string.Empty; if (dtExist.Rows[0][0].TryParseToInt() == 0) { //新增 //EST_STATEFEEDBACK_CODE:唯一标识(SERVERPART_CODE+SERVERPARTSHOP_CODE+MACHINECODE) sql = string.Format(@" INSERT INTO HIGHWAY_EXCHANGE.T_EST_STATEFEEDBACK (EST_STATEFEEDBACK_CODE,CONNECT_DATE,SERVERPART_CODE,SERVERPART_NAME, SERVERPARTSHOP_CODE,SERVERPARTSHOP_NAME,MACHINECODE,MACHINE_MACADDRESS, CONNECT_IP,CURRENT_SELLAMOUNT,CURRENT_SELLCOUNT,CURRENT_TICKETCOUNT, CURRENT_GOODSCOUNT,CURRENT_CASH,CURRENT_MOBILEPAYMENT,CURRENT_PERSON, SELL_CONTENT,VERSION_NUM) VALUES('{0}',TO_DATE('{1}','YYYY-MM-DD HH24:MI:SS'),'{2}','{3}', '{4}','{5}','{6}','{7}', '{8}',{9},{10},{11}, {12},{13},{14},'{15}', '{16}','{17}')", EST_STATEFEEDBACK_CODE, ESTStateFeedbackModel[0].CONNECT_DATE, ESTStateFeedbackModel[0].SERVERPART_CODE, ESTStateFeedbackModel[0].SERVERPART_NAME, ESTStateFeedbackModel[0].SERVERPARTSHOP_CODE, ESTStateFeedbackModel[0].SERVERPARTSHOP_NAME, ESTStateFeedbackModel[0].MACHINECODE, ESTStateFeedbackModel[0].MACHINE_MACADDRESS, ESTStateFeedbackModel[0].CONNECT_IP, ESTStateFeedbackModel[0].CURRENT_SELLAMOUNT, ESTStateFeedbackModel[0].CURRENT_SELLCOUNT, ESTStateFeedbackModel[0].CURRENT_TICKETCOUNT, ESTStateFeedbackModel[0].CURRENT_GOODSCOUNT, ESTStateFeedbackModel[0].CURRENT_CASH, ESTStateFeedbackModel[0].CURRENT_MOBILEPAYMENT, ESTStateFeedbackModel[0].CURRENT_PERSON, ESTStateFeedbackModel[0].SELL_CONTENT, ESTStateFeedbackModel[0].VERSION_NUM); } else { //修改 sql = string.Format(@" UPDATE HIGHWAY_EXCHANGE.T_EST_STATEFEEDBACK SET CONNECT_DATE=TO_DATE('{0}','YYYY-MM-DD HH24:MI:SS'),SERVERPART_NAME='{1}', SERVERPARTSHOP_NAME='{2}',MACHINE_MACADDRESS='{3}',CONNECT_IP='{4}',CURRENT_SELLAMOUNT={5}, CURRENT_SELLCOUNT={6},CURRENT_TICKETCOUNT={7},CURRENT_GOODSCOUNT={8},CURRENT_CASH={9}, CURRENT_MOBILEPAYMENT={10},CURRENT_PERSON='{11}',SELL_CONTENT='{12}',VERSION_NUM='{13}' WHERE EST_STATEFEEDBACK_CODE='{14}'", ESTStateFeedbackModel[0].CONNECT_DATE, ESTStateFeedbackModel[0].SERVERPART_NAME, ESTStateFeedbackModel[0].SERVERPARTSHOP_NAME, ESTStateFeedbackModel[0].MACHINE_MACADDRESS, ESTStateFeedbackModel[0].CONNECT_IP, ESTStateFeedbackModel[0].CURRENT_SELLAMOUNT, ESTStateFeedbackModel[0].CURRENT_SELLCOUNT, ESTStateFeedbackModel[0].CURRENT_TICKETCOUNT, ESTStateFeedbackModel[0].CURRENT_GOODSCOUNT, ESTStateFeedbackModel[0].CURRENT_CASH, ESTStateFeedbackModel[0].CURRENT_MOBILEPAYMENT, ESTStateFeedbackModel[0].CURRENT_PERSON, ESTStateFeedbackModel[0].SELL_CONTENT, ESTStateFeedbackModel[0].VERSION_NUM, EST_STATEFEEDBACK_CODE); LogUtil.WriteLog(null, sql, "KafkaUpdateData:" + DateTime.Now.ToString("yyyy-MM-dd")); } num += _OracleHelper.ExcuteSql(sql); Console.WriteLine("recieve message" + DateTime.Now.ToString("HH-mm-ss ffff") + ":" + result.Message + ";数据操作结果:" + num + "\n"); LogUtil.WriteLog(null, result.Message, "ConsumerData:" + DateTime.Now.ToString("yyyy-MM-dd")); result.Commit(); } }).Wait(); } catch (Exception ex) { LogUtil.WriteLog(ex, "", "KafkaConsumeDataError" + DateTime.Now.ToString("yyyy-MM-dd")); } Console.ReadLine(); } } }