dump.go (1461B)
1 package postgres 2 3 import ( 4 "context" 5 "fmt" 6 7 "git.defalsify.org/vise.git/db" 8 ) 9 10 // Dump implements Db. 11 func (pdb *pgDb) Dump(ctx context.Context, key []byte) (*db.Dumper, error) { 12 tx, err := pdb.conn.BeginTx(ctx, defaultTxOptions) 13 if err != nil { 14 return nil, err 15 } 16 17 pdb.SetLanguage(nil) 18 lk, err := pdb.ToKey(ctx, key) 19 if err != nil { 20 return nil, err 21 } 22 k := lk.Default 23 24 query := fmt.Sprintf("SELECT key, value FROM %s.kv_vise WHERE key >= $1", pdb.schema) 25 rs, err := tx.Query(ctx, query, k) 26 if err != nil { 27 logg.Debugf("query fail", "err", err) 28 tx.Rollback(ctx) 29 return nil, err 30 } 31 defer tx.Commit(ctx) 32 33 if rs.Next() { 34 var kk []byte 35 var vv []byte 36 err = rs.Scan(&kk, &vv) 37 if err != nil { 38 return nil, err 39 } 40 pdb.it = rs 41 pdb.itBase = k 42 kk, err = pdb.DecodeKey(ctx, kk) 43 if err != nil { 44 return nil, err 45 } 46 return db.NewDumper(pdb.dumpFunc).WithClose(pdb.closeFunc).WithFirst(kk, vv), nil 47 } 48 49 return nil, db.NewErrNotFound(k) 50 } 51 52 func (pdb *pgDb) dumpFunc(ctx context.Context) ([]byte, []byte) { 53 var kk []byte 54 var vv []byte 55 if !pdb.it.Next() { 56 logg.DebugCtxf(ctx, "no more data in pg iterator") 57 pdb.it = nil 58 pdb.itBase = nil 59 return nil, nil 60 } 61 err := pdb.it.Scan(&kk, &vv) 62 if err != nil { 63 return nil, nil 64 } 65 k, err := pdb.DecodeKey(ctx, kk) 66 if err != nil { 67 return nil, nil 68 } 69 return k, vv 70 } 71 72 func (pdb *pgDb) closeFunc() error { 73 if pdb.it != nil { 74 pdb.it.Close() 75 pdb.it = nil 76 } 77 return nil 78 }