SQLAlchemy-将子查询的架构和数据复制到另一个数据库
技术问答
121 人阅读
|
0 人回复
|
2023-09-14
|
我正在尝试将子查询中的数据postgres(from_engine)复制到sqlite数据库。我可以使用以下命令复制表来实现这一目的:* x0 W1 T+ f) E" y
smeta = MetaData(bind=from_engine)table = Table(table_name,smeta,autoload=True)table.metadata.create_all(to_engine)但是,我不确定如何为子查询句子实现相同的功能。
3 e5 [2 k" ]2 X5 Q/ |2 ^-山迪普
2 |" `" {: N; P6 K" e9 u g编辑:跟进答案。创建表后,我要创建一个子查询stmt,如下所示:! z* T6 \( c7 K# P8 w
table = Table("newtable",dest_metadata,*columns)stmt = dest_session.query(table).subquery();但是,最后一个stmt最终出现了错误cursor.execute(statement,parameters)sqlalchemy.exc.ProgrammingError:(ProgrammingError)关系“
) Q4 Q: u1 C+ fnewtable第三行不存在:FROM newtable)AS anon_11 x5 H& e, ` p2 K. R/ C8 |
$ g$ v' E$ s2 w6 p9 K
解决方案:
* f1 f- L/ _ f) |. W) S/ ] 至少在某些情况下:
. c: G9 B2 c# |7 X1 B6 U[ol]使用column_descriptions查询对象获取各列相关结果的信息。
) r3 ?5 W& N$ R7 y& A! s6 w9 |+ J利用这些信息,您可以在另一个数据库中构建创建新表的模式。; p$ n* V2 }$ G! N, @
运行查询源数据库,并将结果插入新表。4 R8 D. Q- ]5 i% G, b* }9 D
[/ol]先设置示例:
: _. H9 }" t$ F/ V2 q1 p! kfrom sqlalchemy import create_engine,MetaData,from sqlalchemy import Column,Integer,String,Tablefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker# Engine to the database to query the data from# (postgresql)source_engine = create_engine('sqlite:///:memory:',echo=True)SourceSession = sessionmaker(source_engine)# Engine to the database to store the results in# (sqlite)dest_engine = create_engine('sqlite:///:memory:',echo=True)DestSession = sessionmaker(dest_engine)# Create some toy table and fills it with some dataBase = declarative_base()class Pet(Base): __tablename__ = 'pets' id = Column(Integer,primary_key=True) name = Column(String) race = Column(String)Base.metadata.create_all(source_engine)sourceSession = SourceSession()sourceSession.add(Pet(name="Fido",race="cat"))sourceSession.add(Pet(name="Ceasar",race="cat"))sourceSession.add(Pet(name="Rex",race="dog"))sourceSession.commit()现在去有趣的地方:
( x! c* P% C- B6 }9 S3 L; y: S i# This is the query we want to persist in a new table:query= sourceSession.query(Pet.name,Pet.race).filter_by(race='cat')# Build the schema for the new table# based on the columns that will be returned # by the query:metadata = MetaData(bind=dest_engine)columns = [Column(desc['name'],desc['type']) for desc in query.column_descriptions]column_names = [desc['name'] for desc in query.column_descriptions]table = Table("newtable",metadata,*columns)# Create the new table in the destination databasetable.create(dest_engine)# Finally execute the querydestSession = DestSession()for row in query: destSession.execute(table.insert(row))destSession.commit()最后一个一个循环应该有更有效的方法。但批量插入是另一个主题。 |
|
|
|
|
|