"""
DROP TABLE "TEST"."YH";
CREATE TABLE "TEST"."YH" (
"VI_ID" NUMBER(11,0) NOT NULL,
"YHZH" VARCHAR2(200 BYTE),
"YHM" VARCHAR2(200 BYTE),
"ZIDUAN3" VARCHAR2(200 BYTE)
)
"""
import cx_Oracle
class OracleOperation:
def __init__(self, username, password, host, port, sid):
self.conn = cx_Oracle.connect(username + "/" + password + "@" + host + ":" + port + "/" + sid)
self.cursor = self.conn.cursor()
def insert(self, table, columns, values):
sql = "INSERT INTO " + table + " (" + ",".join(columns) + ") VALUES (" + ",".join(["'" + str(value) + "'" for value in values]) + ")"
print(sql)
self.cursor.execute(sql)
self.conn.commit()
def inserts(self, table, columns, values):
b=', '.join([f':{i + 1}' for i in range(len(columns))])
sql = "INSERT INTO " + table + " (" + ",".join(columns) + ") VALUES (" + b + ")"
print(sql)
self.cursor.executemany(sql,values)
self.conn.commit()
def update(self, table, set, where):
sql = "UPDATE " + table + " SET " + ",".join([key + "='" + str(set[key]) + "'" for key in set]) + " WHERE " + where
print(sql)
self.cursor.execute(sql)
self.conn.commit()
def delete(self, table, where):
sql = "DELETE FROM " + table + " WHERE " + where
self.cursor.execute(sql)
self.conn.commit()
def select(self, table, columns, where):
sql = "SELECT " + ",".join(columns) + " FROM " + table
if where != "":
sql += " WHERE " + where
self.cursor.execute(sql)
rows = self.cursor.fetchall()
return rows
def qselect(self, sql):
self.cursor.execute(sql)
rows = self.cursor.fetchall()
return rows
def nselect(self, sql):
self.cursor.execute(sql)
self.conn.commit()
def close(self):
self.cursor.close()
self.conn.close()
# Example usage
oracle = OracleOperation("test", "test", "127.0.0.1", "1521", "ORCL")
# Insert data
columns = ["yhm", "yhzh", "ziduan3","sz"]
values = ["value1", "value2", "value3",55]
oracle.insert("yh", columns, values)
# Inserts data
columns = ["yhm", "yhzh", "ziduan3","sz"]
values2 = [["value11", "value2", "value3",123],["value1", "value22", "value3",222.34],["value1", "value2", "value33",15034534.12]]
oracle.inserts("yh", columns, values2)
# Update data
set = {"yhm": "111111111","yhzh":"1111111111","sz":5.2}
where = "sz=520"
oracle.update("yh", set, where)
# Delete data
where = "yhm='John Doe'"
oracle.delete("yh", where)
# Select data
columns = ["*"]
where = ""
rows = oracle.select("yh", columns, where)
for row in rows:
print(row)
#qselect data
sql="select * from yh"
rows = oracle.qselect(sql)
for row in rows:
print(row)
#nselect data
sql = "update yh set yhm='123' where yhm='111'"
oracle.nselect(sql)
oracle.close()
DROP TABLE "TEST"."YH";
CREATE TABLE "TEST"."YH" (
"VI_ID" NUMBER(11,0) NOT NULL,
"YHZH" VARCHAR2(200 BYTE),
"YHM" VARCHAR2(200 BYTE),
"ZIDUAN3" VARCHAR2(200 BYTE)
)
"""
import cx_Oracle
class OracleOperation:
def __init__(self, username, password, host, port, sid):
self.conn = cx_Oracle.connect(username + "/" + password + "@" + host + ":" + port + "/" + sid)
self.cursor = self.conn.cursor()
def insert(self, table, columns, values):
sql = "INSERT INTO " + table + " (" + ",".join(columns) + ") VALUES (" + ",".join(["'" + str(value) + "'" for value in values]) + ")"
print(sql)
self.cursor.execute(sql)
self.conn.commit()
def inserts(self, table, columns, values):
b=', '.join([f':{i + 1}' for i in range(len(columns))])
sql = "INSERT INTO " + table + " (" + ",".join(columns) + ") VALUES (" + b + ")"
print(sql)
self.cursor.executemany(sql,values)
self.conn.commit()
def update(self, table, set, where):
sql = "UPDATE " + table + " SET " + ",".join([key + "='" + str(set[key]) + "'" for key in set]) + " WHERE " + where
print(sql)
self.cursor.execute(sql)
self.conn.commit()
def delete(self, table, where):
sql = "DELETE FROM " + table + " WHERE " + where
self.cursor.execute(sql)
self.conn.commit()
def select(self, table, columns, where):
sql = "SELECT " + ",".join(columns) + " FROM " + table
if where != "":
sql += " WHERE " + where
self.cursor.execute(sql)
rows = self.cursor.fetchall()
return rows
def qselect(self, sql):
self.cursor.execute(sql)
rows = self.cursor.fetchall()
return rows
def nselect(self, sql):
self.cursor.execute(sql)
self.conn.commit()
def close(self):
self.cursor.close()
self.conn.close()
# Example usage
oracle = OracleOperation("test", "test", "127.0.0.1", "1521", "ORCL")
# Insert data
columns = ["yhm", "yhzh", "ziduan3","sz"]
values = ["value1", "value2", "value3",55]
oracle.insert("yh", columns, values)
# Inserts data
columns = ["yhm", "yhzh", "ziduan3","sz"]
values2 = [["value11", "value2", "value3",123],["value1", "value22", "value3",222.34],["value1", "value2", "value33",15034534.12]]
oracle.inserts("yh", columns, values2)
# Update data
set = {"yhm": "111111111","yhzh":"1111111111","sz":5.2}
where = "sz=520"
oracle.update("yh", set, where)
# Delete data
where = "yhm='John Doe'"
oracle.delete("yh", where)
# Select data
columns = ["*"]
where = ""
rows = oracle.select("yh", columns, where)
for row in rows:
print(row)
#qselect data
sql="select * from yh"
rows = oracle.qselect(sql)
for row in rows:
print(row)
#nselect data
sql = "update yh set yhm='123' where yhm='111'"
oracle.nselect(sql)
oracle.close()