import refinitiv.data as rdimport pandas as pdfrom datetime import datetime, timedeltaimport osimport sysfrom dotenv import load_dotenvimport psycopg2# --- 1. IMPORT BRIDGE ---current_dir = os.path.dirname(os.path.abspath(__file__))parent_dir = os.path.dirname(current_dir)if parent_dir not in sys.path:sys.path.append(parent_dir)try:from master import dbupdateprint("✅ Successfully imported dbupdate from master.")except ImportError as e:print(f"❌ Import Error: {e}")sys.exit(1)# --- 2. AUTHENTICATION ---load_dotenv()APP_KEY = os.getenv("REFINITIV_APP_KEY")# --- CONFIG ---SCHEMA_NAME = "newShiny"TABLE_NAME = "asian_fcpo_cashmarket"from refinitiv.data.discovery import Chain# ============================================================# HELPER: Add Missing Columns to DB# ============================================================def add_missing_columns(schema, table, df):"""Checks the DB table for missing columns and adds them dynamically.Uses the same psycopg2 connection pattern as dbupdate."""db = dbupdate.DBConnection(dbupdate.DB_PARAMS)try:db.connect()cur = db.cursorconn = db.conn# 1. Fetch existing columns from DBcur.execute("""SELECT column_nameFROM information_schema.columnsWHERE table_schema = %s AND table_name = %s""", (schema, table))existing_cols = {row[0] for row in cur.fetchall()}# 2. Find columns in df that are missing in DBmissing_cols = [col for col in df.columns if col not in existing_cols]if not missing_cols:print(">> [DB] No new columns to add. Schema is up to date.")return# 3. ALTER TABLE to add each missing columnfor col in missing_cols:pg_type = dbupdate.map_dtype_to_pg(df[col].dtype)print(f">> [DB] Adding new column: '{col}' ({pg_type}) to {schema}.{table}")cur.execute(f"""ALTER TABLE "{schema}"."{table}"ADD COLUMN IF NOT EXISTS "{col}" {pg_type}""")conn.commit()print(f"✅ [DB] Schema updated. Added {len(missing_cols)} new column(s): {missing_cols}")except Exception as e:if db.conn:db.conn.rollback()print(f"❌ [DB] Failed to add columns: {e}")raisefinally:db.disconnect()# ============================================================# HELPER: Get Active FCPO Contracts# ============================================================def get_active_fcpo_contracts():print("--> Detecting active contracts via Chain...")try:chain = Chain(name="0#/FCPO:")df_chain = rd.get_data(chain, ["EXPIR_DATE"])except Exception as e:print(f"❌ Chain error: {e}")return []if df_chain is None or df_chain.empty:return []df_chain["EXPIR_DATE"] = pd.to_datetime(df_chain["EXPIR_DATE"])df_chain = df_chain[df_chain["EXPIR_DATE"] > datetime.now()]df_chain = df_chain.sort_values("EXPIR_DATE")outrights = (df_chain["Instrument"].astype(str).str.strip().str.lstrip("/").tolist())print(f"✅ Active contracts detected: {outrights[:3]}")return outrights[:3]# ============================================================# MAIN FUNCTION# ============================================================def get_fcpo_cash_market():# --- Step 1: Check if table exists and get latest date ---latest_db_date = dbupdate.check_table_exists(SCHEMA_NAME, TABLE_NAME)if latest_db_date:start_date = latest_db_date + timedelta(days=1)start_req = datetime.combine(start_date, datetime.min.time())print(f"ℹ️ Table found. Fetching updates from: {start_date}")else:start_req = datetime.now() - timedelta(days=90)print(f"⚠️ Table missing. Fetching full history from: {start_req.date()}")print(f"\n--- 🚀 PROCESSING FCPO CASH MARKET ---")# --- Step 2: Get active contracts from Chain ---outs = get_active_fcpo_contracts()if len(outs) < 3:print(f"❌ Not enough contracts found: {outs}")returnc1, c2, c3 = outs[0], outs[1], outs[2]# Dynamic spread column names based on active contractsspread1_name = f"{c1}-{c2.replace('FCPO', '')}"spread2_name = f"{c2}-{c3.replace('FCPO', '')}"print(f"✅ Active Contracts: {c1}, {c2}, {c3}")print(f"✅ Spread Columns : {spread1_name}, {spread2_name}")cash_ric = "MYPOD-CPOPMS"benchmarks = ["FCPOc1", "FCPOc2", "FCPOc3"]futures_rics = benchmarks + [c1, c2, c3]print("--> Fetching History...")try:# Fetch futures with TR.CLOSEPRICEdf_futures_raw = rd.get_history(universe=futures_rics,fields=["TR.CLOSEPRICE"],interval="1d",start=start_req,end=datetime.now())# Fetch cash RIC separately with CF_CLOSEdf_cash_raw = rd.get_history(universe=[cash_ric],fields=["CF_CLOSE"],interval="1d",start=start_req,end=datetime.now())if df_futures_raw.empty and df_cash_raw.empty:print("⚠️ No new data.")return# Flatten MultiIndex if presentif isinstance(df_futures_raw.columns, pd.MultiIndex):df_futures_raw.columns = df_futures_raw.columns.get_level_values(0)if isinstance(df_cash_raw.columns, pd.MultiIndex):df_cash_raw.columns = df_cash_raw.columns.get_level_values(0)# Rename cash columndf_cash_raw = df_cash_raw.rename(columns={"CF_CLOSE": "Cash"})df_futures = df_futures_rawdf_cash = df_cash_raw[["Cash"]]except Exception as e:print(f"❌ Data Fetch Error: {e}")return# --- Step 4: Build final DataFrame ---df_new = df_futures.join(df_cash, how='outer').ffill()for col in df_new.columns:df_new[col] = pd.to_numeric(df_new[col], errors='coerce')# Calendar Spreads (dynamic column names)if c1 in df_new.columns and c2 in df_new.columns:df_new[spread1_name] = df_new[c1] - df_new[c2]if c2 in df_new.columns and c3 in df_new.columns:df_new[spread2_name] = df_new[c2] - df_new[c3]# Basis (Cash vs Benchmark Futures)if "Cash" in df_new.columns:for i in range(1, 4):col = f"FCPOc{i}"if col in df_new.columns:df_new[f"Diff{i}"] = df_new["Cash"] - df_new[col]# --- Step 5: Format for DB ---df_new = df_new.reset_index()df_new = df_new.rename(columns={"index": "Date"})df_new["Date"] = pd.to_datetime(df_new["Date"]).dt.dateallowed_cols = ["Date", "Cash","FCPOc1", "FCPOc2", "FCPOc3",spread1_name, spread2_name,"Diff1", "Diff2", "Diff3"]final_cols = [c for c in allowed_cols if c in df_new.columns]df_upload = df_new[final_cols].dropna(subset=["Date"])print(f"--> Uploading {len(df_upload)} rows")# --- Step 6: Add any missing columns to DB before inserting ---# Only runs if table already exists (skip on first-time creation)if dbupdate.check_table_exists(SCHEMA_NAME, TABLE_NAME) is not None:add_missing_columns(SCHEMA_NAME, TABLE_NAME, df_upload)# --- Step 7: Upload to DB ---dbupdate.update_sliding_window(df=df_upload,schema=SCHEMA_NAME,table=TABLE_NAME,date_col="Date",retention_days=90)print("✅ SUCCESS: FCPO Cash Market updated.")# ============================================================# EXECUTION CONTROL# ============================================================if __name__ == "__main__":try:rd.open_session(app_key=APP_KEY)print("✅ Refinitiv Session Opened.")get_fcpo_cash_market()except Exception as e:print(f"❌ Execution Error: {e}")finally:rd.close_session()print("Session closed.")this code i am getting error Data Fetch Error: Error code -1 | Unable to collect data for the field 'TR.CLOSEPRICE' and some specific identifier(s). Requested universes: ['FCPOc1', 'FCPOc2', 'FCPOc3', 'FCPOK26', 'FCPOM26', 'FCPON26']. Requested fields: ['TR.CLOSEPRICE']what to do